-
-
Notifications
You must be signed in to change notification settings - Fork 4.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
File-based fast training for Any2Vec models #2127
Changes from 20 commits
39a2c11
20c22f7
dd0e9ca
6203c77
03bf799
660493f
1aedfe8
9ff0bb1
9e498b7
1d4a2a8
97bac7e
4de3a84
36d1412
625025b
8173da8
bd0a0e0
63663fa
2ee2405
8f8e817
ac28bbb
942a12f
2a44fbc
e4a8ba0
394a417
9ab6b1b
5d2e2cf
0489561
901cad4
c09035c
1e3c314
d6755be
cc4680c
9978f6b
35333dd
86b91ac
fca6f50
45ca084
16bb386
c83b96f
1a21b0b
dd83a3e
c72f0b6
74e51b3
58fc112
5e70184
9727782
d97ac0c
b6d7bb3
0c1fc5f
fd66e34
da9f3da
81329d6
f2ba633
51cec43
a72ddf1
aba7682
03d44b2
e4e8cb2
3e989de
d8c5cdc
2a42b85
002a60c
3850f49
c1e8a9b
7b7195b
3a8a915
6beb96a
a2eb5fc
57f7b66
83ce7c2
a3ede08
d38463e
ec4c677
a5311d2
f499d5b
645499c
dc1b98d
b9564e9
eefdd65
f669979
e80189f
cf6b032
87d8ea7
e2851b4
1fdaa43
c2fa0d8
5427416
7f7760b
926fd5e
df47983
0df7f6f
5fd1c99
d9257be
67c572c
8e82b9f
db2a77f
a96bc6d
3b4da64
53b967c
f57d1cb
b652afe
260cfb5
a433018
20ec49b
1ced17d
0731449
35f0ab4
49905f0
95c6ec9
aed2b6b
c1af621
33bf97a
e592b6a
d08e4c1
468a000
f71e1f8
811388b
ddd5901
a3490c7
a28ff0d
b2996f0
64bb617
816f63f
abad1b8
0b03839
6217c73
f70d159
9593d5f
7b714b2
b833f0f
bcc0fb9
384e0b1
527266f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -43,7 +43,6 @@ | |
from types import GeneratorType | ||
from gensim.utils import deprecated | ||
import warnings | ||
import itertools | ||
|
||
try: | ||
from queue import Queue | ||
|
@@ -123,6 +122,9 @@ def _clear_post_train(self): | |
"""Resets certain properties of the model post training. eg. `keyedvectors.vectors_norm`.""" | ||
raise NotImplementedError() | ||
|
||
def _do_train_epoch(self, input_stream, thread_private_mem, cur_epoch, total_examples=None, total_words=None): | ||
raise NotImplementedError() | ||
|
||
def _do_train_job(self, data_iterable, job_parameters, thread_private_mem): | ||
"""Train a single batch. Return 2-tuple `(effective word count, total word count)`.""" | ||
raise NotImplementedError() | ||
|
@@ -136,6 +138,16 @@ def _check_input_data_sanity(self, data_iterable=None, data_iterables=None): | |
if not ((data_iterable is not None) ^ (data_iterables is not None)): | ||
raise ValueError("You must provide only one of singlestream or multistream arguments.") | ||
|
||
def _worker_loop_multistream(self, input_stream, progress_queue, cur_epoch=0, | ||
total_examples=None, total_words=None): | ||
thread_private_mem = self._get_thread_working_mem() | ||
|
||
examples, tally, raw_tally = self._do_train_epoch(input_stream, thread_private_mem, cur_epoch, | ||
total_examples=total_examples, total_words=total_words) | ||
|
||
progress_queue.put((examples, tally, raw_tally)) | ||
progress_queue.put(None) | ||
|
||
def _worker_loop(self, job_queue, progress_queue): | ||
"""Train the model, lifting batches of data from the queue. | ||
|
||
|
@@ -258,8 +270,8 @@ def _log_epoch_end(self, cur_epoch, example_count, total_examples, raw_word_coun | |
def _log_train_end(self, raw_word_count, trained_word_count, total_elapsed, job_tally): | ||
raise NotImplementedError() | ||
|
||
def _log_epoch_progress(self, progress_queue, job_queue, cur_epoch=0, total_examples=None, total_words=None, | ||
report_delay=1.0): | ||
def _log_epoch_progress(self, progress_queue=None, job_queue=None, cur_epoch=0, total_examples=None, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. because in multistream mode there is no |
||
total_words=None, report_delay=1.0): | ||
"""Get the progress report for a single training epoch. | ||
|
||
Parameters | ||
|
@@ -328,8 +340,32 @@ def _log_epoch_progress(self, progress_queue, job_queue, cur_epoch=0, total_exam | |
self.total_train_time += elapsed | ||
return trained_word_count, raw_word_count, job_tally | ||
|
||
def _train_epoch(self, data_iterable=None, data_iterables=None, cur_epoch=0, total_examples=None, | ||
total_words=None, queue_factor=2, report_delay=1.0): | ||
def _train_epoch_multistream(self, data_iterables, cur_epoch=0, total_examples=None, total_words=None): | ||
assert len(data_iterables) == self.workers, "You have to pass the same amount of input streams as workers, " \ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
"because each worker gets its own independent input stream." | ||
|
||
progress_queue = Queue() | ||
|
||
workers = [ | ||
threading.Thread( | ||
target=self._worker_loop_multistream, | ||
args=(input_stream, progress_queue,), | ||
kwargs={'cur_epoch': cur_epoch, 'total_examples': total_examples, 'total_words': total_words} | ||
) for input_stream in data_iterables | ||
] | ||
|
||
for thread in workers: | ||
thread.daemon = True | ||
thread.start() | ||
|
||
trained_word_count, raw_word_count, job_tally = self._log_epoch_progress( | ||
progress_queue=progress_queue, job_queue=None, cur_epoch=cur_epoch, total_examples=total_examples, | ||
total_words=total_words) | ||
|
||
return trained_word_count, raw_word_count, job_tally | ||
|
||
def _train_epoch(self, data_iterable, cur_epoch=0, total_examples=None, total_words=None, | ||
queue_factor=2, report_delay=1.0): | ||
"""Train the model for a single epoch. | ||
|
||
Parameters | ||
|
@@ -361,7 +397,6 @@ def _train_epoch(self, data_iterable=None, data_iterables=None, cur_epoch=0, tot | |
* Total word count used in training. | ||
|
||
""" | ||
self._check_input_data_sanity(data_iterable, data_iterables) | ||
job_queue = Queue(maxsize=queue_factor * self.workers) | ||
progress_queue = Queue(maxsize=(queue_factor + 1) * self.workers) | ||
|
||
|
@@ -372,9 +407,6 @@ def _train_epoch(self, data_iterable=None, data_iterables=None, cur_epoch=0, tot | |
for _ in xrange(self.workers) | ||
] | ||
|
||
# Chain all input streams into one, because multistream training is not supported yet. | ||
if data_iterables is not None: | ||
data_iterable = itertools.chain(*data_iterables) | ||
workers.append(threading.Thread( | ||
target=self._job_producer, | ||
args=(data_iterable, job_queue), | ||
|
@@ -444,10 +476,14 @@ def train(self, data_iterable=None, data_iterables=None, epochs=None, total_exam | |
for callback in self.callbacks: | ||
callback.on_epoch_begin(self) | ||
|
||
trained_word_count_epoch, raw_word_count_epoch, job_tally_epoch = self._train_epoch( | ||
data_iterable=data_iterable, data_iterables=data_iterables, cur_epoch=cur_epoch, | ||
total_examples=total_examples, total_words=total_words, queue_factor=queue_factor, | ||
report_delay=report_delay) | ||
if data_iterable is not None: | ||
trained_word_count_epoch, raw_word_count_epoch, job_tally_epoch = self._train_epoch( | ||
data_iterable, cur_epoch=cur_epoch, total_examples=total_examples, | ||
total_words=total_words, queue_factor=queue_factor, report_delay=report_delay) | ||
else: | ||
trained_word_count_epoch, raw_word_count_epoch, job_tally_epoch = self._train_epoch_multistream( | ||
data_iterables, cur_epoch=cur_epoch, total_examples=total_examples, total_words=total_words) | ||
|
||
trained_word_count += trained_word_count_epoch | ||
raw_word_count += raw_word_count_epoch | ||
job_tally += job_tally_epoch | ||
|
@@ -550,6 +586,9 @@ def __init__(self, sentences=None, input_streams=None, workers=3, vector_size=10 | |
consider an iterable that streams the sentences directly from disk/network. | ||
See :class:`~gensim.models.word2vec.BrownCorpus`, :class:`~gensim.models.word2vec.Text8Corpus` | ||
or :class:`~gensim.models.word2vec.LineSentence` for such examples. | ||
input_streams : list or tuple of iterable of iterables | ||
The tuple or list of `sentences`-like arguments. Use it if you have multiple input streams. It is possible | ||
to process streams in parallel, using `workers` parameter. | ||
workers : int, optional | ||
Number of working threads, used for multiprocessing. | ||
vector_size : int, optional | ||
|
@@ -928,6 +967,9 @@ def train(self, sentences=None, input_streams=None, total_examples=None, total_w | |
consider an iterable that streams the sentences directly from disk/network. | ||
See :class:`~gensim.models.word2vec.BrownCorpus`, :class:`~gensim.models.word2vec.Text8Corpus` | ||
or :class:`~gensim.models.word2vec.LineSentence` module for such examples. | ||
input_streams : list or tuple of iterable of iterables | ||
The tuple or list of `sentences`-like arguments. Use it if you have multiple input streams. It is possible | ||
to process streams in parallel, using `workers` parameter. | ||
total_examples : int, optional | ||
Count of sentences. | ||
total_words : int, optional | ||
|
@@ -1181,14 +1223,14 @@ def _log_progress(self, job_queue, progress_queue, cur_epoch, example_count, tot | |
logger.info( | ||
"EPOCH %i - PROGRESS: at %.2f%% examples, %.0f words/s, in_qsize %i, out_qsize %i", | ||
cur_epoch + 1, 100.0 * example_count / total_examples, trained_word_count / elapsed, | ||
utils.qsize(job_queue), utils.qsize(progress_queue) | ||
None if job_queue is None else utils.qsize(job_queue), utils.qsize(progress_queue) | ||
) | ||
else: | ||
# words-based progress % | ||
logger.info( | ||
"EPOCH %i - PROGRESS: at %.2f%% words, %.0f words/s, in_qsize %i, out_qsize %i", | ||
cur_epoch + 1, 100.0 * raw_word_count / total_words, trained_word_count / elapsed, | ||
utils.qsize(job_queue), utils.qsize(progress_queue) | ||
None if job_queue is None else utils.qsize(job_queue), utils.qsize(progress_queue) | ||
) | ||
|
||
def _log_epoch_end(self, cur_epoch, example_count, total_examples, raw_word_count, total_words, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,25 @@ | ||
#include <stdexcept> | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why did this need? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. removed |
||
#include "fast_line_sentence.h" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what's about glue There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
|
||
|
||
FastLineSentence::FastLineSentence() : is_eof_(false) { } | ||
FastLineSentence::FastLineSentence(const std::string& filename) : filename_(filename), fs_(filename), is_eof_(false) { } | ||
|
||
std::vector<std::string> FastLineSentence::ReadSentence() { | ||
if (is_eof_) { | ||
return {}; | ||
} | ||
std::string line, word; | ||
std::getline(fs_, line); | ||
std::vector<std::string> res; | ||
|
||
std::istringstream iss(line); | ||
while (iss >> word) { | ||
res.push_back(word); | ||
} | ||
|
||
if (fs_.eof()) { | ||
is_eof_ = true; | ||
} | ||
return res; | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
#pragma once | ||
|
||
#include <fstream> | ||
#include <sstream> | ||
#include <vector> | ||
|
||
|
||
class FastLineSentence { | ||
public: | ||
explicit FastLineSentence(); | ||
explicit FastLineSentence(const std::string& filename); | ||
|
||
std::vector<std::string> ReadSentence(); | ||
inline bool IsEof() const { return is_eof_; } | ||
inline void Reset() { fs_ = std::ifstream(filename_); is_eof_ = false; } | ||
|
||
private: | ||
std::string filename_; | ||
std::ifstream fs_; | ||
bool is_eof_; | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hanging indent please (here and everywhere else).