diff --git a/docs/src/models/lsi_dispatcher.rst b/docs/src/models/lsi_dispatcher.rst index 59bc80e35c..49d217c174 100644 --- a/docs/src/models/lsi_dispatcher.rst +++ b/docs/src/models/lsi_dispatcher.rst @@ -5,4 +5,5 @@ :synopsis: Dispatcher for distributed LSI :members: :inherited-members: - + :undoc-members: + :show-inheritance: diff --git a/docs/src/models/lsi_worker.rst b/docs/src/models/lsi_worker.rst index baf999f105..5588fa114f 100644 --- a/docs/src/models/lsi_worker.rst +++ b/docs/src/models/lsi_worker.rst @@ -5,4 +5,5 @@ :synopsis: Worker for distributed LSI :members: :inherited-members: - + :undoc-members: + :show-inheritance: diff --git a/gensim/models/basemodel.py b/gensim/models/basemodel.py index eb6071e8dd..371b5b7010 100644 --- a/gensim/models/basemodel.py +++ b/gensim/models/basemodel.py @@ -1,24 +1,51 @@ class BaseTopicModel(object): def print_topic(self, topicno, topn=10): - """ - Return a single topic as a formatted string. See `show_topic()` for parameters. + """Get a single topic as a formatted string. + + Parameters + ---------- + topicno : int + Topic id. + topn : int + Number of words from topic that will be used. - >>> lsimodel.print_topic(10, topn=5) - '-0.340 * "category" + 0.298 * "$M$" + 0.183 * "algebra" + -0.174 * "functor" + -0.168 * "operator"' + Returns + ------- + str + String representation of topic, like '-0.340 * "category" + 0.298 * "$M$" + 0.183 * "algebra" + ... '. """ return ' + '.join(['%.3f*"%s"' % (v, k) for k, v in self.show_topic(topicno, topn)]) def print_topics(self, num_topics=20, num_words=10): - """Alias for `show_topics()` that prints the `num_words` most - probable words for `topics` number of topics to log. - Set `topics=-1` to print all topics.""" + """Get the most significant topics (alias for `show_topics()` method). + + Parameters + ---------- + num_topics : int, optional + The number of topics to be selected, if -1 - all topics will be in result (ordered by significance). + num_words : int, optional + The number of words to be included per topics (ordered by significance). + + Returns + ------- + list of (int, list of (str, float)) + Sequence with (topic_id, [(word, value), ... ]). + + """ return self.show_topics(num_topics=num_topics, num_words=num_words, log=True) def get_topics(self): - """ - Returns: - np.ndarray: `num_topics` x `vocabulary_size` array of floats which represents - the term topic matrix learned during inference. + """Get words X topics matrix. + + Returns + -------- + numpy.ndarray: + The term topic matrix learned during inference, shape (`num_topics`, `vocabulary_size`). + + Raises + ------ + NotImplementedError + """ raise NotImplementedError diff --git a/gensim/models/lsi_dispatcher.py b/gensim/models/lsi_dispatcher.py index 75a9367d5e..e4c06ef307 100755 --- a/gensim/models/lsi_dispatcher.py +++ b/gensim/models/lsi_dispatcher.py @@ -1,200 +1,294 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- -# -# Copyright (C) 2010 Radim Rehurek -# Licensed under the GNU LGPL v2.1 - http://www.gnu.org/licenses/lgpl.html - -""" -USAGE: %(program)s SIZE_OF_JOBS_QUEUE - - Dispatcher process which orchestrates distributed LSI computations. Run this \ -script only once, on any node in your cluster. - -Example: python -m gensim.models.lsi_dispatcher -""" - - -from __future__ import with_statement -import os -import sys -import logging -import threading -import time -from six import iteritems, itervalues -try: - from Queue import Queue -except ImportError: - from queue import Queue -import Pyro4 -from gensim import utils - - -logger = logging.getLogger("gensim.models.lsi_dispatcher") - - -# How many jobs (=chunks of N documents) to keep "pre-fetched" in a queue? -# A small number is usually enough, unless iteration over the corpus is very very -# slow (slower than the actual computation of LSI), in which case you can override -# this value from command line. ie. run "python ./lsi_dispatcher.py 100" -MAX_JOBS_QUEUE = 10 - -# timeout for the Queue object put/get blocking methods. -# it should really be infinity, but then keyboard interrupts don't work. -# so this is really just a hack, see http://bugs.python.org/issue1360 -HUGE_TIMEOUT = 365 * 24 * 60 * 60 # one year - - -class Dispatcher(object): - """ - Dispatcher object that communicates and coordinates individual workers. - - There should never be more than one dispatcher running at any one time. - """ - - def __init__(self, maxsize=0): - """ - Note that the constructor does not fully initialize the dispatcher; - use the `initialize()` function to populate it with workers etc. - """ - self.maxsize = maxsize - self.workers = {} - self.callback = None # a pyro proxy to this object (unknown at init time, but will be set later) - - @Pyro4.expose - def initialize(self, **model_params): - """ - `model_params` are parameters used to initialize individual workers (gets - handed all the way down to worker.initialize()). - """ - self.jobs = Queue(maxsize=self.maxsize) - self.lock_update = threading.Lock() - self._jobsdone = 0 - self._jobsreceived = 0 - - # locate all available workers and store their proxies, for subsequent RMI calls - self.workers = {} - with utils.getNS() as ns: - self.callback = Pyro4.Proxy('PYRONAME:gensim.lsi_dispatcher') # = self - for name, uri in iteritems(ns.list(prefix='gensim.lsi_worker')): - try: - worker = Pyro4.Proxy(uri) - workerid = len(self.workers) - # make time consuming methods work asynchronously - logger.info("registering worker #%i from %s", workerid, uri) - worker.initialize(workerid, dispatcher=self.callback, **model_params) - self.workers[workerid] = worker - except Pyro4.errors.PyroError: - logger.exception("unresponsive worker at %s, deleting it from the name server" % uri) - ns.remove(name) - - if not self.workers: - raise RuntimeError('no workers found; run some lsi_worker scripts on your machines first!') - - @Pyro4.expose - def getworkers(self): - """ - Return pyro URIs of all registered workers. - """ - return [worker._pyroUri for worker in itervalues(self.workers)] - - @Pyro4.expose - def getjob(self, worker_id): - logger.info("worker #%i requesting a new job", worker_id) - job = self.jobs.get(block=True, timeout=1) - logger.info("worker #%i got a new job (%i left)", worker_id, self.jobs.qsize()) - return job - - @Pyro4.expose - def putjob(self, job): - self._jobsreceived += 1 - self.jobs.put(job, block=True, timeout=HUGE_TIMEOUT) - logger.info("added a new job (len(queue)=%i items)", self.jobs.qsize()) - - @Pyro4.expose - def getstate(self): - """ - Merge projections from across all workers and return the final projection. - """ - logger.info("end of input, assigning all remaining jobs") - logger.debug("jobs done: %s, jobs received: %s", self._jobsdone, self._jobsreceived) - while self._jobsdone < self._jobsreceived: - time.sleep(0.5) # check every half a second - - # TODO: merge in parallel, so that we're done in `log_2(workers)` merges, - # and not `workers - 1` merges! - # but merging only takes place once, after all input data has been processed, - # so the overall effect would be small... compared to the amount of coding :-) - logger.info("merging states from %i workers", len(self.workers)) - workers = list(self.workers.items()) - result = workers[0][1].getstate() - for workerid, worker in workers[1:]: - logger.info("pulling state from worker %s", workerid) - result.merge(worker.getstate()) - logger.info("sending out merged projection") - return result - - @Pyro4.expose - def reset(self): - """ - Initialize all workers for a new decomposition. - """ - for workerid, worker in iteritems(self.workers): - logger.info("resetting worker %s", workerid) - worker.reset() - worker.requestjob() - self._jobsdone = 0 - self._jobsreceived = 0 - - @Pyro4.expose - @Pyro4.oneway - @utils.synchronous('lock_update') - def jobdone(self, workerid): - """ - A worker has finished its job. Log this event and then asynchronously - transfer control back to the worker. - - In this way, control flow basically oscillates between dispatcher.jobdone() - worker.requestjob(). - """ - self._jobsdone += 1 - logger.info("worker #%s finished job #%i", workerid, self._jobsdone) - worker = self.workers[workerid] - worker.requestjob() # tell the worker to ask for another job, asynchronously (one-way) - - def jobsdone(self): - """Wrap self._jobsdone, needed for remote access through proxies""" - return self._jobsdone - - @Pyro4.oneway - def exit(self): - """ - Terminate all registered workers and then the dispatcher. - """ - for workerid, worker in iteritems(self.workers): - logger.info("terminating worker %s", workerid) - worker.exit() - logger.info("terminating dispatcher") - os._exit(0) # exit the whole process (not just this thread ala sys.exit()) - - -def main(): - logging.basicConfig(format='%(asctime)s : %(levelname)s : %(message)s', level=logging.INFO) - logger.info("running %s", " ".join(sys.argv)) - - program = os.path.basename(sys.argv[0]) - # make sure we have enough cmd line parameters - if len(sys.argv) < 1: - print(globals()["__doc__"] % locals()) - sys.exit(1) - - if len(sys.argv) < 2: - maxsize = MAX_JOBS_QUEUE - else: - maxsize = int(sys.argv[1]) - utils.pyro_daemon('gensim.lsi_dispatcher', Dispatcher(maxsize=maxsize)) - - logger.info("finished running %s", program) - - -if __name__ == '__main__': - main() +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# +# Copyright (C) 2010 Radim Rehurek +# Licensed under the GNU LGPL v2.1 - http://www.gnu.org/licenses/lgpl.html + +""":class:`~gensim.models.lsi_dispatcher.Dispatcher` process which orchestrates +distributed :class:`~gensim.models.lsimodel.LsiModel` computations. +Run this script only once, on the master node in your cluster. + +Notes +----- +The dispatches expects to find worker scripts already running. Make sure you run as many workers as you like on +your machines **before** launching the dispatcher. + +Warnings +-------- +Requires installed `Pyro4 `_. +Distributed version works only in local network. + + +How to use distributed :class:`~gensim.models.lsimodel.LsiModel` +---------------------------------------------------------------- + + +#. Install needed dependencies (Pyro4) :: + + pip install gensim[distributed] + +#. Setup serialization (on each machine) :: + + export PYRO_SERIALIZERS_ACCEPTED=pickle + export PYRO_SERIALIZER=pickle + +#. Run nameserver :: + + python -m Pyro4.naming -n 0.0.0.0 & + +#. Run workers (on each machine) :: + + python -m gensim.models.lsi_worker & + +#. Run dispatcher :: + + python -m gensim.models.lsi_dispatcher & + +#. Run :class:`~gensim.models.lsimodel.LsiModel` in distributed mode :: + + >>> from gensim.test.utils import common_corpus, common_dictionary + >>> from gensim.models import LsiModel + >>> + >>> model = LsiModel(common_corpus, id2word=common_dictionary, distributed=True) + + +Command line arguments +---------------------- + +.. program-output:: python -m gensim.models.lsi_dispatcher --help + :ellipsis: 0, -5 + +""" + + +from __future__ import with_statement +import os +import sys +import logging +import argparse +import threading +import time +from six import iteritems, itervalues + +try: + from Queue import Queue +except ImportError: + from queue import Queue +import Pyro4 +from gensim import utils + +logger = logging.getLogger(__name__) + +# How many jobs (=chunks of N documents) to keep "pre-fetched" in a queue? +# A small number is usually enough, unless iteration over the corpus is very very +# slow (slower than the actual computation of LSI), in which case you can override +# this value from command line. ie. run "python ./lsi_dispatcher.py 100" +MAX_JOBS_QUEUE = 10 + +# timeout for the Queue object put/get blocking methods. +# it should really be infinity, but then keyboard interrupts don't work. +# so this is really just a hack, see http://bugs.python.org/issue1360 +HUGE_TIMEOUT = 365 * 24 * 60 * 60 # one year + + +class Dispatcher(object): + """Dispatcher object that communicates and coordinates individual workers. + + Warnings + -------- + There should never be more than one dispatcher running at any one time. + + """ + def __init__(self, maxsize=0): + """Partly initializes the dispatcher. + + A full initialization (including initialization of the workers) requires a call to + :meth:`~gensim.models.lsi_dispatcher.Dispatcher.initialize` + + Parameters + ---------- + maxsize : int, optional + Maximum number of jobs to be kept pre-fetched in the queue. + + """ + self.maxsize = maxsize + self.workers = {} + self.callback = None # a pyro proxy to this object (unknown at init time, but will be set later) + + @Pyro4.expose + def initialize(self, **model_params): + """Fully initializes the dispatcher and all its workers. + + Parameters + ---------- + **model_params + Keyword parameters used to initialize individual workers, see :class:`~gensim.models.lsimodel.LsiModel`. + + Raises + ------ + RuntimeError + When no workers are found (the `gensim.scripts.lsi_worker` script must be ran beforehand). + + """ + self.jobs = Queue(maxsize=self.maxsize) + self.lock_update = threading.Lock() + self._jobsdone = 0 + self._jobsreceived = 0 + + # locate all available workers and store their proxies, for subsequent RMI calls + self.workers = {} + with utils.getNS() as ns: + self.callback = Pyro4.Proxy('PYRONAME:gensim.lsi_dispatcher') # = self + for name, uri in iteritems(ns.list(prefix='gensim.lsi_worker')): + try: + worker = Pyro4.Proxy(uri) + workerid = len(self.workers) + # make time consuming methods work asynchronously + logger.info("registering worker #%i from %s", workerid, uri) + worker.initialize(workerid, dispatcher=self.callback, **model_params) + self.workers[workerid] = worker + except Pyro4.errors.PyroError: + logger.exception("unresponsive worker at %s, deleting it from the name server" % uri) + ns.remove(name) + + if not self.workers: + raise RuntimeError('no workers found; run some lsi_worker scripts on your machines first!') + + @Pyro4.expose + def getworkers(self): + """Get pyro URIs of all registered workers. + + Returns + ------- + list of URIs + The pyro URIs for each worker. + + """ + return [worker._pyroUri for worker in itervalues(self.workers)] + + @Pyro4.expose + def getjob(self, worker_id): + """Atomically pops a job from the queue. + + Parameters + ---------- + worker_id : int + The worker that requested the job. + + Returns + ------- + iterable of iterable of (int, float) + The corpus in BoW format. + + """ + logger.info("worker #%i requesting a new job", worker_id) + job = self.jobs.get(block=True, timeout=1) + logger.info("worker #%i got a new job (%i left)", worker_id, self.jobs.qsize()) + return job + + @Pyro4.expose + def putjob(self, job): + """Atomically add a job to the queue. + + Parameters + ---------- + job : iterable of iterable of (int, float) + The corpus in BoW format. + + """ + self._jobsreceived += 1 + self.jobs.put(job, block=True, timeout=HUGE_TIMEOUT) + logger.info("added a new job (len(queue)=%i items)", self.jobs.qsize()) + + @Pyro4.expose + def getstate(self): + """Merge projections from across all workers and get the final projection. + + Returns + ------- + :class:`~gensim.models.lsimodel.Projection` + The current projection of the total model. + + """ + logger.info("end of input, assigning all remaining jobs") + logger.debug("jobs done: %s, jobs received: %s", self._jobsdone, self._jobsreceived) + while self._jobsdone < self._jobsreceived: + time.sleep(0.5) # check every half a second + + # TODO: merge in parallel, so that we're done in `log_2(workers)` merges, + # and not `workers - 1` merges! + # but merging only takes place once, after all input data has been processed, + # so the overall effect would be small... compared to the amount of coding :-) + logger.info("merging states from %i workers", len(self.workers)) + workers = list(self.workers.items()) + result = workers[0][1].getstate() + for workerid, worker in workers[1:]: + logger.info("pulling state from worker %s", workerid) + result.merge(worker.getstate()) + logger.info("sending out merged projection") + return result + + @Pyro4.expose + def reset(self): + """Re-initialize all workers for a new decomposition.""" + for workerid, worker in iteritems(self.workers): + logger.info("resetting worker %s", workerid) + worker.reset() + worker.requestjob() + self._jobsdone = 0 + self._jobsreceived = 0 + + @Pyro4.expose + @Pyro4.oneway + @utils.synchronous('lock_update') + def jobdone(self, workerid): + """Callback used by workers to notify when their job is done. + + The job done event is logged and then control is asynchronously transfered back to the worker + (who can then request another job). In this way, control flow basically oscillates between + :meth:`gensim.models.lsi_dispatcher.Dispatcher.jobdone` and + :meth:`gensim.models.lsi_worker.Worker.requestjob`. + + Parameters + ---------- + workerid : int + The ID of the worker that finished the job (used for logging). + + """ + self._jobsdone += 1 + logger.info("worker #%s finished job #%i", workerid, self._jobsdone) + worker = self.workers[workerid] + worker.requestjob() # tell the worker to ask for another job, asynchronously (one-way) + + def jobsdone(self): + """Wrap :attr:`~gensim.models.lsi_dispatcher.Dispatcher._jobsdone`, needed for remote access through proxies. + + Returns + ------- + int + Number of jobs already completed. + + """ + return self._jobsdone + + @Pyro4.oneway + def exit(self): + """Terminate all registered workers and then the dispatcher.""" + for workerid, worker in iteritems(self.workers): + logger.info("terminating worker %s", workerid) + worker.exit() + logger.info("terminating dispatcher") + os._exit(0) # exit the whole process (not just this thread ala sys.exit()) + + +if __name__ == '__main__': + logging.basicConfig(format='%(asctime)s - %(levelname)s - %(message)s', level=logging.INFO) + parser = argparse.ArgumentParser(description=__doc__[:-135], formatter_class=argparse.RawTextHelpFormatter) + parser.add_argument( + 'maxsize', type=int, help='Maximum number of jobs to be kept pre-fetched in the queue.', default=MAX_JOBS_QUEUE + ) + args = parser.parse_args() + + logger.info("running %s", " ".join(sys.argv)) + utils.pyro_daemon('gensim.lsi_dispatcher', Dispatcher(maxsize=args.maxsize)) + logger.info("finished running %s", parser.prog) diff --git a/gensim/models/lsi_worker.py b/gensim/models/lsi_worker.py index ceca83d9e6..5f4ccc5c2f 100755 --- a/gensim/models/lsi_worker.py +++ b/gensim/models/lsi_worker.py @@ -1,123 +1,192 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- -# -# Copyright (C) 2010 Radim Rehurek -# Licensed under the GNU LGPL v2.1 - http://www.gnu.org/licenses/lgpl.html - -""" -USAGE: %(program)s - - Worker ("slave") process used in computing distributed LSI. Run this script \ -on every node in your cluster. If you wish, you may even run it multiple times \ -on a single machine, to make better use of multiple cores (just beware that \ -memory footprint increases accordingly). - -Example: python -m gensim.models.lsi_worker -""" - - -from __future__ import with_statement -import os -import sys -import logging -import threading -import tempfile -try: - import Queue -except ImportError: - import queue as Queue -import Pyro4 -from gensim.models import lsimodel -from gensim import utils - -logger = logging.getLogger('gensim.models.lsi_worker') - - -SAVE_DEBUG = 0 # save intermediate models after every SAVE_DEBUG updates (0 for never) - - -class Worker(object): - def __init__(self): - self.model = None - - @Pyro4.expose - def initialize(self, myid, dispatcher, **model_params): - self.lock_update = threading.Lock() - self.jobsdone = 0 # how many jobs has this worker completed? - # id of this worker in the dispatcher; just a convenience var for easy access/logging TODO remove? - self.myid = myid - self.dispatcher = dispatcher - self.finished = False - logger.info("initializing worker #%s", myid) - self.model = lsimodel.LsiModel(**model_params) - - @Pyro4.expose - @Pyro4.oneway - def requestjob(self): - """ - Request jobs from the dispatcher, in a perpetual loop until `getstate()` is called. - """ - if self.model is None: - raise RuntimeError("worker must be initialized before receiving jobs") - - job = None - while job is None and not self.finished: - try: - job = self.dispatcher.getjob(self.myid) - except Queue.Empty: - # no new job: try again, unless we're finished with all work - continue - if job is not None: - logger.info("worker #%s received job #%i", self.myid, self.jobsdone) - self.processjob(job) - self.dispatcher.jobdone(self.myid) - else: - logger.info("worker #%i stopping asking for jobs", self.myid) - - @utils.synchronous('lock_update') - def processjob(self, job): - self.model.add_documents(job) - self.jobsdone += 1 - if SAVE_DEBUG and self.jobsdone % SAVE_DEBUG == 0: - fname = os.path.join(tempfile.gettempdir(), 'lsi_worker.pkl') - self.model.save(fname) - - @Pyro4.expose - @utils.synchronous('lock_update') - def getstate(self): - logger.info("worker #%i returning its state after %s jobs", self.myid, self.jobsdone) - assert isinstance(self.model.projection, lsimodel.Projection) - self.finished = True - return self.model.projection - - @Pyro4.expose - @utils.synchronous('lock_update') - def reset(self): - logger.info("resetting worker #%i", self.myid) - self.model.projection = self.model.projection.empty_like() - self.finished = False - - @Pyro4.oneway - def exit(self): - logger.info("terminating worker #%i", self.myid) - os._exit(0) -# endclass Worker - - -def main(): - logging.basicConfig(format='%(asctime)s : %(levelname)s : %(message)s', level=logging.INFO) - logger.info("running %s", " ".join(sys.argv)) - - program = os.path.basename(sys.argv[0]) - # make sure we have enough cmd line parameters - if len(sys.argv) < 1: - print(globals()["__doc__"] % locals()) - sys.exit(1) - - utils.pyro_daemon('gensim.lsi_worker', Worker(), random_suffix=True) - - logger.info("finished running %s", program) - - -if __name__ == '__main__': - main() +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# +# Copyright (C) 2010 Radim Rehurek +# Licensed under the GNU LGPL v2.1 - http://www.gnu.org/licenses/lgpl.html + +""":class:`~gensim.models.lsi_worker.Worker` ("slave") process used in computing +distributed :class:`~gensim.models.lsimodel.LsiModel`. + +Run this script on every node in your cluster. If you wish, you may even run it multiple times on a single machine, +to make better use of multiple cores (just beware that memory footprint increases accordingly). + +Warnings +-------- +Requires installed `Pyro4 `_. +Distributed version works only in local network. + + +How to use distributed :class:`~gensim.models.lsimodel.LsiModel` +---------------------------------------------------------------- + + +#. Install needed dependencies (Pyro4) :: + + pip install gensim[distributed] + +#. Setup serialization (on each machine) :: + + export PYRO_SERIALIZERS_ACCEPTED=pickle + export PYRO_SERIALIZER=pickle + +#. Run nameserver :: + + python -m Pyro4.naming -n 0.0.0.0 & + +#. Run workers (on each machine) :: + + python -m gensim.models.lsi_worker & + +#. Run dispatcher :: + + python -m gensim.models.lsi_dispatcher & + +#. Run :class:`~gensim.models.lsimodel.LsiModel` in distributed mode :: + + >>> from gensim.test.utils import common_corpus, common_dictionary + >>> from gensim.models import LsiModel + >>> + >>> model = LsiModel(common_corpus, id2word=common_dictionary, distributed=True) + + +Command line arguments +---------------------- + +.. program-output:: python -m gensim.models.lsi_worker --help + :ellipsis: 0, -3 + +""" +from __future__ import with_statement +import os +import sys +import logging +import argparse +import threading +import tempfile +try: + import Queue +except ImportError: + import queue as Queue +import Pyro4 +from gensim.models import lsimodel +from gensim import utils + +logger = logging.getLogger(__name__) + + +SAVE_DEBUG = 0 # save intermediate models after every SAVE_DEBUG updates (0 for never) + + +class Worker(object): + def __init__(self): + """Partly initializes the model. + + A full initialization requires a call to :meth:`~gensim.models.lsi_worker.Worker.initialize`. + + """ + self.model = None + + @Pyro4.expose + def initialize(self, myid, dispatcher, **model_params): + """Fully initializes the worker. + + Parameters + ---------- + myid : int + An ID number used to identify this worker in the dispatcher object. + dispatcher : :class:`~gensim.models.lsi_dispatcher.Dispatcher` + The dispatcher responsible for scheduling this worker. + **model_params + Keyword parameters to initialize the inner LSI model, see :class:`~gensim.models.lsimodel.LsiModel`. + + """ + self.lock_update = threading.Lock() + self.jobsdone = 0 # how many jobs has this worker completed? + # id of this worker in the dispatcher; just a convenience var for easy access/logging TODO remove? + self.myid = myid + self.dispatcher = dispatcher + self.finished = False + logger.info("initializing worker #%s", myid) + self.model = lsimodel.LsiModel(**model_params) + + @Pyro4.expose + @Pyro4.oneway + def requestjob(self): + """Request jobs from the dispatcher, in a perpetual loop until + :meth:`~gensim.models.lsi_worker.Worker.getstate()` is called. + + """ + if self.model is None: + raise RuntimeError("worker must be initialized before receiving jobs") + + job = None + while job is None and not self.finished: + try: + job = self.dispatcher.getjob(self.myid) + except Queue.Empty: + # no new job: try again, unless we're finished with all work + continue + if job is not None: + logger.info("worker #%s received job #%i", self.myid, self.jobsdone) + self.processjob(job) + self.dispatcher.jobdone(self.myid) + else: + logger.info("worker #%i stopping asking for jobs", self.myid) + + @utils.synchronous('lock_update') + def processjob(self, job): + """Incrementally processes the job and potentially logs progress. + + Parameters + ---------- + job : iterable of list of (int, float) + Corpus in BoW format. + + """ + self.model.add_documents(job) + self.jobsdone += 1 + if SAVE_DEBUG and self.jobsdone % SAVE_DEBUG == 0: + fname = os.path.join(tempfile.gettempdir(), 'lsi_worker.pkl') + self.model.save(fname) + + @Pyro4.expose + @utils.synchronous('lock_update') + def getstate(self): + """Log and get the LSI model's current projection. + + Returns + ------- + :class:`~gensim.models.lsimodel.Projection` + The current projection. + + """ + logger.info("worker #%i returning its state after %s jobs", self.myid, self.jobsdone) + assert isinstance(self.model.projection, lsimodel.Projection) + self.finished = True + return self.model.projection + + @Pyro4.expose + @utils.synchronous('lock_update') + def reset(self): + """Resets the worker by deleting its current projection.""" + logger.info("resetting worker #%i", self.myid) + self.model.projection = self.model.projection.empty_like() + self.finished = False + + @Pyro4.oneway + def exit(self): + """Terminates the worker.""" + logger.info("terminating worker #%i", self.myid) + os._exit(0) + + +if __name__ == '__main__': + """The main script. """ + logging.basicConfig(format='%(asctime)s - %(levelname)s - %(message)s', level=logging.INFO) + + parser = argparse.ArgumentParser(description=__doc__[:-135], formatter_class=argparse.RawTextHelpFormatter) + _ = parser.parse_args() + + logger.info("running %s", " ".join(sys.argv)) + utils.pyro_daemon('gensim.lsi_worker', Worker(), random_suffix=True) + logger.info("finished running %s", parser.prog) diff --git a/gensim/models/lsimodel.py b/gensim/models/lsimodel.py index 1ab3e68401..d17563af06 100644 --- a/gensim/models/lsimodel.py +++ b/gensim/models/lsimodel.py @@ -5,12 +5,11 @@ # Licensed under the GNU LGPL v2.1 - http://www.gnu.org/licenses/lgpl.html -""" -Module for Latent Semantic Analysis (aka Latent Semantic Indexing) in Python. +"""Module for `Latent Semantic Analysis (aka Latent Semantic Indexing) +`_. -Implements fast truncated SVD (Singular Value Decomposition). The SVD -decomposition can be updated with new observations at any time, for an online, -incremental, memory-efficient training. +Implements fast truncated SVD (Singular Value Decomposition). The SVD decomposition can be updated with new observations +at any time, for an online, incremental, memory-efficient training. This module actually contains several algorithms for decomposition of large corpora, a combination of which effectively and transparently allows building LSI models for: @@ -28,7 +27,6 @@ (2G corpus positions, 3.2M documents, 100K features, 0.5G non-zero entries in the final TF-IDF matrix), requesting the top 400 LSI factors: - ====================================================== ============ ================== algorithm serial distributed ====================================================== ============ ================== @@ -42,6 +40,16 @@ *distributed* = cluster of four logical nodes on three physical machines, each with dual core Xeon 2.0GHz, 4GB RAM, ATLAS + +Examples +-------- +>>> from gensim.test.utils import common_dictionary, common_corpus +>>> from gensim.models import LsiModel +>>> +>>> model = LsiModel(common_corpus, id2word=common_dictionary) +>>> vectorized_corpus = model[common_corpus] # vectorize input copus in BoW format + + .. [1] The stochastic algo could be distributed too, but most time is already spent reading/decompressing the input from disk in its 4 passes. The extra network traffic due to data distribution across cluster nodes would likely make it @@ -49,7 +57,6 @@ """ - import logging import sys @@ -65,32 +72,55 @@ logger = logging.getLogger(__name__) - # accuracy defaults for the multi-pass stochastic algo P2_EXTRA_DIMS = 100 # set to `None` for dynamic P2_EXTRA_DIMS=k P2_EXTRA_ITERS = 2 def clip_spectrum(s, k, discard=0.001): - """ - Given eigenvalues `s`, return how many factors should be kept to avoid - storing spurious (tiny, numerically instable) values. + """Find how many factors should be kept to avoid storing spurious (tiny, numerically unstable) values. + + Parameters + ---------- + s : list of float + Eigenvalues of the original matrix. + k : int + Maximum desired rank (number of factors) + discard: float + Percentage of the spectrum's energy to be discarded. + + Returns + ------- + int + Rank (number of factors) of the reduced matrix. - This will ignore the tail of the spectrum with relative combined mass < min(`discard`, 1/k). - The returned value is clipped against `k` (= never return more than `k`). """ # compute relative contribution of eigenvalues towards the energy spectrum rel_spectrum = np.abs(1.0 - np.cumsum(s / np.sum(s))) # ignore the last `discard` mass (or 1/k, whichever is smaller) of the spectrum small = 1 + len(np.where(rel_spectrum > min(discard, 1.0 / k))[0]) k = min(k, small) # clip against k - logger.info("keeping %i factors (discarding %.3f%% of energy spectrum)", - k, 100 * rel_spectrum[k - 1]) + logger.info("keeping %i factors (discarding %.3f%% of energy spectrum)", k, 100 * rel_spectrum[k - 1]) return k def asfarray(a, name=''): + """Get an array laid out in Fortran order in memory. + + Parameters + ---------- + a : numpy.ndarray + Input array. + name : str, optional + Array name, used for logging purposes. + + Returns + ------- + np.ndarray + The input `a` in Fortran, or column-major order. + + """ if not a.flags.f_contiguous: logger.debug("converting %s array %s to FORTRAN order", a.shape, name) a = np.asfortranarray(a) @@ -98,6 +128,21 @@ def asfarray(a, name=''): def ascarray(a, name=''): + """Return a contiguous array in memory (C order). + + Parameters + ---------- + a : numpy.ndarray + Input array. + name : str, optional + Array name, used for logging purposes. + + Returns + ------- + np.ndarray + Contiguous array (row-major order) of same shape and content as `a`. + + """ if not a.flags.contiguous: logger.debug("converting %s array %s to C order", a.shape, name) a = np.ascontiguousarray(a) @@ -105,15 +150,39 @@ def ascarray(a, name=''): class Projection(utils.SaveLoad): + """Lower dimension projections of a Term-Passage matrix. + + This is the class taking care of the 'core math': interfacing with corpora, splitting large corpora into chunks + and merging them etc. This done through the higher-level :class:`~gensim.models.lsimodel.LsiModel` class. + + Notes + ----- + The projection can be later updated by merging it with another :class:`~gensim.models.lsimodel.Projection` + via :meth:`~gensim.models.lsimodel.Projection.merge`. + + """ def __init__(self, m, k, docs=None, use_svdlibc=False, power_iters=P2_EXTRA_ITERS, extra_dims=P2_EXTRA_DIMS, dtype=np.float64): - """ - Construct the (U, S) projection from a corpus `docs`. The projection can - be later updated by merging it with another Projection via `self.merge()`. + """Construct the (U, S) projection from a corpus. + + Parameters + ---------- + m : int + Number of features (terms) in the corpus. + k : int + Desired rank of the decomposed matrix. + docs : {iterable of list of (int, float), scipy.sparse.csc} + Corpus in BoW format or as sparse matrix. + use_svdlibc : bool, optional + If True - will use `sparsesvd library `_, + otherwise - our own version will be used. + power_iters: int, optional + Number of power iteration steps to be used. Tune to improve accuracy. + extra_dims : int, optional + Extra samples to be used besides the rank `k`. Tune to improve accuracy. + dtype : numpy.dtype, optional + Enforces a type for elements of the decomposed matrix. - This is the class taking care of the 'core math'; interfacing with corpora, - splitting large corpora into chunks and merging them etc. is done through - the higher-level `LsiModel` class. """ self.m, self.k = m, k self.power_iters = power_iters @@ -138,21 +207,42 @@ def __init__(self, m, k, docs=None, use_svdlibc=False, power_iters=P2_EXTRA_ITER ut, s, vt = sparsesvd.sparsesvd(docs, k + 30) u = ut.T del ut, vt - k = clip_spectrum(s**2, self.k) + k = clip_spectrum(s ** 2, self.k) self.u = u[:, :k].copy() self.s = s[:k].copy() else: self.u, self.s = None, None def empty_like(self): + """Get an empty Projection with the same parameters as the current object. + + Returns + ------- + :class:`~gensim.models.lsimodel.Projection` + An empty copy (without corpus) of the current projection. + + """ return Projection(self.m, self.k, power_iters=self.power_iters, extra_dims=self.extra_dims) def merge(self, other, decay=1.0): - """ - Merge this Projection with another. + """Merge current :class:`~gensim.models.lsimodel.Projection` instance with another. + + Warnings + -------- + The content of `other` is destroyed in the process, so pass this function a copy of `other` + if you need it further. The `other` :class:`~gensim.models.lsimodel.Projection` is expected to contain + the same number of features. + + Parameters + ---------- + other : :class:`~gensim.models.lsimodel.Projection` + The Projection object to be merged into the current one. It will be destroyed after merging. + decay : float, optional + Weight of existing observations relatively to new ones. + Setting `decay` < 1.0 causes re-orientation towards new data trends in the input document stream, + by giving less emphasis to old observations. This allows LSA to gradually "forget" old observations + (documents) and give more preference to new ones. - The content of `other` is destroyed in the process, so pass this function a - copy of `other` if you need it further. """ if other.u is None: # the other projection is empty => do nothing @@ -207,7 +297,7 @@ def merge(self, other, decay=1.0): u_k, s_k, _ = scipy.linalg.svd(np.dot(k, k.T), full_matrices=False) s_k = np.sqrt(s_k) # go back from eigen values to singular values - k = clip_spectrum(s_k**2, self.k) + k = clip_spectrum(s_k ** 2, self.k) u1_k, u2_k, s_k = np.array(u_k[:n1, :k]), np.array(u_k[n1:, :k]), s_k[:k] # update & rotate current basis U = [U, U']*[U1_k, U2_k] @@ -228,60 +318,68 @@ def merge(self, other, decay=1.0): class LsiModel(interfaces.TransformationABC, basemodel.BaseTopicModel): - """ - Objects of this class allow building and maintaining a model for Latent - Semantic Indexing (also known as Latent Semantic Analysis). - - The main methods are: - - 1. constructor, which initializes the projection into latent topics space, - 2. the ``[]`` method, which returns representation of any input document in the - latent space, - 3. `add_documents()` for incrementally updating the model with new documents. - - The left singular vectors are stored in `lsi.projection.u`, singular values - in `lsi.projection.s`. Right singular vectors can be reconstructed from the output - of `lsi[training_corpus]`, if needed. See also FAQ [2]_. - - Model persistency is achieved via its load/save methods. - - .. [2] https://github.com/piskvorky/gensim/wiki/Recipes-&-FAQ#q4-how-do-you-output-the-u-s-vt-matrices-of-lsi + """Model for `Latent Semantic Indexing + `_. + + Algorithm of decomposition described in `"Fast and Faster: A Comparison of Two Streamed + Matrix Decomposition Algorithms" `_. + + Notes + ----- + * :attr:`gensim.models.lsimodel.LsiModel.projection.u` - left singular vectors, + * :attr:`gensim.models.lsimodel.LsiModel.projection.s` - singular values, + * ``model[training_corpus]`` - right singular vectors (can be reconstructed if needed). + + See Also + -------- + `FAQ about LSI matrices + `_. + + Examples + -------- + >>> from gensim.test.utils import common_corpus, common_dictionary, get_tmpfile + >>> from gensim.models import LsiModel + >>> + >>> model = LsiModel(common_corpus[:3], id2word=common_dictionary) # train model + >>> vector = model[common_corpus[4]] # apply model to BoW document + >>> model.add_documents(common_corpus[4:]) # update model with new documents + >>> tmp_fname = get_tmpfile("lsi.model") + >>> model.save(tmp_fname) # save model + >>> loaded_model = LsiModel.load(tmp_fname) # load model """ def __init__(self, corpus=None, num_topics=200, id2word=None, chunksize=20000, decay=1.0, distributed=False, onepass=True, power_iters=P2_EXTRA_ITERS, extra_samples=P2_EXTRA_DIMS, dtype=np.float64): - """ - `num_topics` is the number of requested factors (latent dimensions). - - After the model has been trained, you can estimate topics for an - arbitrary, unseen document, using the ``topics = self[document]`` dictionary - notation. You can also add new training documents, with ``self.add_documents``, - so that training can be stopped and resumed at any time, and the - LSI transformation is available at any point. - - If you specify a `corpus`, it will be used to train the model. See the - method `add_documents` for a description of the `chunksize` and `decay` parameters. - - Turn `onepass` off to force a multi-pass stochastic algorithm. - - `power_iters` and `extra_samples` affect the accuracy of the stochastic - multi-pass algorithm, which is used either internally (`onepass=True`) or - as the front-end algorithm (`onepass=False`). Increasing the number of - power iterations improves accuracy, but lowers performance. See [3]_ for - some hard numbers. - - Turn on `distributed` to enable distributed computing. - - Example: - - >>> lsi = LsiModel(corpus, num_topics=10) - >>> print(lsi[doc_tfidf]) # project some document into LSI space - >>> lsi.add_documents(corpus2) # update LSI on additional documents - >>> print(lsi[doc_tfidf]) - - .. [3] http://nlp.fi.muni.cz/~xrehurek/nips/rehurek_nips.pdf + """Construct an `LsiModel` object. + + Either `corpus` or `id2word` must be supplied in order to train the model. + + Parameters + ---------- + corpus : {iterable of list of (int, float), scipy.sparse.csc}, optional + Stream of document vectors or sparse matrix of shape (`num_terms`, `num_documents`). + num_topics : int, optional + Number of requested factors (latent dimensions) + id2word : dict of {int: str}, optional + ID to word mapping, optional. + chunksize : int, optional + Number of documents to be used in each training chunk. + decay : float, optional + Weight of existing observations relatively to new ones. + distributed : bool, optional + If True - distributed mode (parallel execution on several machines) will be used. + onepass : bool, optional + Whether the one-pass algorithm should be used for training. + Pass `False` to force a multi-pass stochastic algorithm. + power_iters: int, optional + Number of power iteration steps to be used. + Increasing the number of power iterations improves accuracy, but lowers performance + extra_samples : int, optional + Extra samples to be used besides the rank `k`. Can improve accuracy. + dtype : type, optional + Enforces a type for elements of the decomposed matrix. """ self.id2word = id2word @@ -343,19 +441,23 @@ def __init__(self, corpus=None, num_topics=200, id2word=None, chunksize=20000, self.add_documents(corpus) def add_documents(self, corpus, chunksize=None, decay=None): - """ - Update singular value decomposition to take into account a new - corpus of documents. - - Training proceeds in chunks of `chunksize` documents at a time. The size of - `chunksize` is a tradeoff between increased speed (bigger `chunksize`) - vs. lower memory footprint (smaller `chunksize`). If the distributed mode - is on, each chunk is sent to a different worker/computer. - - Setting `decay` < 1.0 causes re-orientation towards new data trends in the - input document stream, by giving less emphasis to old observations. This allows - LSA to gradually "forget" old observations (documents) and give more - preference to new ones. + """Update model with new `corpus`. + + Parameters + ---------- + corpus : {iterable of list of (int, float), scipy.sparse.csc} + Stream of document vectors or sparse matrix of shape (`num_terms`, num_documents). + chunksize : int, optional + Number of documents to be used in each training chunk, will use `self.chunksize` if not specified. + decay : float, optional + Weight of existing observations relatively to new ones, will use `self.decay` if not specified. + + Notes + ----- + Training proceeds in chunks of `chunksize` documents at a time. The size of `chunksize` is a tradeoff + between increased speed (bigger `chunksize`) vs. lower memory footprint (smaller `chunksize`). + If the distributed mode is on, each chunk is sent to a different worker/computer. + """ logger.info("updating model with new documents") @@ -427,17 +529,36 @@ def add_documents(self, corpus, chunksize=None, decay=None): self.docs_processed += corpus.shape[1] def __str__(self): + """Get a human readable representation of model. + + Returns + ------- + str + A human readable string of the current objects parameters. + + """ return "LsiModel(num_terms=%s, num_topics=%s, decay=%s, chunksize=%s)" % ( self.num_terms, self.num_topics, self.decay, self.chunksize ) def __getitem__(self, bow, scaled=False, chunksize=512): - """ - Return latent representation, as a list of (topic_id, topic_value) 2-tuples. - - This is done by folding input document into the latent topic space. - - If `scaled` is set, scale topics by the inverse of singular values (default: no scaling). + """Get the latent representation for `bow`. + + Parameters + ---------- + bow : {list of (int, int), iterable of list of (int, int)} + Document or corpus in BoW representation. + scaled : bool, optional + If True - topics will be scaled by the inverse of singular values. + chunksize : int, optional + Number of documents to be used in each applying chunk. + + Returns + ------- + list of (int, float) + Latent representation of topics in BoW format for document **OR** + :class:`gensim.matutils.Dense2Corpus` + Latent representation of corpus in BoW format if `bow` is corpus. """ assert self.projection.u is not None, "decomposition not initialized yet" @@ -490,15 +611,18 @@ def __getitem__(self, bow, scaled=False, chunksize=512): return result def get_topics(self): - """ - Returns: - np.ndarray: `num_topics` x `vocabulary_size` array of floats which represents - the term topic matrix learned during inference. - - Note: - The number of topics can actually be smaller than `self.num_topics`, - if there were not enough factors (real rank of input matrix smaller than - `self.num_topics`). + """Get the topic vectors. + + Notes + ----- + The number of topics can actually be smaller than `self.num_topics`, if there were not enough factors + (real rank of input matrix smaller than `self.num_topics`). + + Returns + ------- + np.ndarray + The term topic matrix with shape (`num_topics`, `vocabulary_size`) + """ projections = self.projection.u.T num_topics = len(projections) @@ -510,15 +634,22 @@ def get_topics(self): return np.array(topics) def show_topic(self, topicno, topn=10): - """ - Return a specified topic (=left singular vector), 0 <= `topicno` < `self.num_topics`, - as a string. + """Get the words that define a topic along with their contribution. - Return only the `topn` words which contribute the most to the direction - of the topic (both negative and positive). + This is actually the left singular vector of the specified topic. The most important words in defining the topic + (in both directions) are included in the string, along with their contribution to the topic. - >>> lsimodel.show_topic(10, topn=5) - [("category", -0.340), ("$M$", 0.298), ("algebra", 0.183), ("functor", -0.174), ("operator", -0.168)] + Parameters + ---------- + topicno : int + The topics id number. + topn : int + Number of words to be included to the result. + + Returns + ------- + list of (str, float) + Topic representation in BoW format. """ # size of the projection matrix can actually be smaller than `self.num_topics`, @@ -532,14 +663,25 @@ def show_topic(self, topicno, topn=10): return [(self.id2word[val], 1.0 * c[val] / norm) for val in most] def show_topics(self, num_topics=-1, num_words=10, log=False, formatted=True): - """ - Return `num_topics` most significant topics (return all by default). - For each topic, show `num_words` most significant words (10 words by default). - - The topics are returned as a list -- a list of strings if `formatted` is - True, or a list of `(word, probability)` 2-tuples if False. - - If `log` is True, also output this result to log. + """Get the most significant topics. + + Parameters + ---------- + num_topics : int, optional + The number of topics to be selected, if -1 - all topics will be in result (ordered by significance). + num_words : int, optional + The number of words to be included per topics (ordered by significance). + log : bool, optional + If True - log topics with logger. + formatted : bool, optional + If True - each topic represented as string, otherwise - in BoW format. + + Returns + ------- + list of (int, str) + If `formatted=True`, return sequence with (topic_id, string representation of topics) **OR** + list of (int, list of (str, float)) + Otherwise, return sequence with (topic_id, [(word, value), ... ]). """ shown = [] @@ -557,12 +699,21 @@ def show_topics(self, num_topics=-1, num_words=10, log=False, formatted=True): return shown def print_debug(self, num_topics=5, num_words=10): - """ - Print (to log) the most salient words of the first `num_topics` topics. + """Print (to log) the most salient words of the first `num_topics` topics. + + Unlike :meth:`~gensim.models.lsimodel.LsiModel.print_topics`, this looks for words that are significant for + a particular topic *and* not for others. This *should* result in a + more human-interpretable description of topics. + + Alias for :func:`~gensim.models.lsimodel.print_debug`. + + Parameters + ---------- + num_topics : int, optional + The number of topics to be selected (ordered by significance). + num_words : int, optional + The number of words to be included per topics (ordered by significance). - Unlike `print_topics()`, this looks for words that are significant for a - particular topic *and* not for others. This *should* result in a more - human-interpretable description of topics. """ # only wrap the module-level fnc print_debug( @@ -572,26 +723,65 @@ def print_debug(self, num_topics=5, num_words=10): ) def save(self, fname, *args, **kwargs): - """ - Save the model to file. + """Save the model to a file. + Notes + ----- Large internal arrays may be stored into separate files, with `fname` as prefix. - Note: do not save as a compressed file if you intend to load the file back with `mmap`. + Warnings + -------- + Do not save as a compressed file if you intend to load the file back with `mmap`. + + Parameters + ---------- + fname : str + Path to output file. + *args + Variable length argument list, see :meth:`gensim.utils.SaveLoad.save`. + **kwargs + Arbitrary keyword arguments, see :meth:`gensim.utils.SaveLoad.save`. + + See Also + -------- + :meth:`~gensim.models.lsimodel.LsiModel.load` """ + if self.projection is not None: self.projection.save(utils.smart_extension(fname, '.projection'), *args, **kwargs) super(LsiModel, self).save(fname, *args, ignore=['projection', 'dispatcher'], **kwargs) @classmethod def load(cls, fname, *args, **kwargs): - """ - Load a previously saved object from file (also see `save`). + """Load a previously saved object using :meth:`~gensim.models.lsimodel.LsiModel.save` from file. + Notes + ----- Large arrays can be memmap'ed back as read-only (shared memory) by setting `mmap='r'`: - >>> LsiModel.load(fname, mmap='r') + Parameters + ---------- + fname : str + Path to file that contains LsiModel. + *args + Variable length argument list, see :meth:`gensim.utils.SaveLoad.load`. + **kwargs + Arbitrary keyword arguments, see :meth:`gensim.utils.SaveLoad.load`. + + See Also + -------- + :meth:`~gensim.models.lsimodel.LsiModel.save` + + Returns + ------- + :class:`~gensim.models.lsimodel.LsiModel` + Loaded instance. + + Raises + ------ + IOError + When methods are called on instance (should be called from class). """ kwargs['mmap'] = kwargs.get('mmap', None) @@ -605,6 +795,24 @@ def load(cls, fname, *args, **kwargs): def print_debug(id2token, u, s, topics, num_words=10, num_neg=None): + """Log the most salient words per topic. + + Parameters + ---------- + id2token : :class:`~gensim.corpora.dictionary.Dictionary` + Mapping from ID to word in the Dictionary. + u : np.ndarray + The 2D U decomposition matrix. + s : np.ndarray + The 1D reduced array of eigenvalues used for decomposition. + topics : list of int + Sequence of topic IDs to be printed + num_words : int, optional + Number of words to be included for each topic. + num_neg : int, optional + Number of words with a negative contribution to a topic that should be included. + + """ if num_neg is None: # by default, print half as many salient negative words as positive num_neg = num_words / 2 @@ -646,25 +854,49 @@ def print_debug(id2token, u, s, topics, num_words=10, num_neg=None): def stochastic_svd(corpus, rank, num_terms, chunksize=20000, extra_dims=None, power_iters=0, dtype=np.float64, eps=1e-6): - """ - Run truncated Singular Value Decomposition (SVD) on a sparse input. - - Return (U, S): the left singular vectors and the singular values of the input - data stream `corpus` [4]_. The corpus may be larger than RAM (iterator of vectors). - - This may return less than the requested number of top `rank` factors, in case - the input itself is of lower rank. The `extra_dims` (oversampling) and especially - `power_iters` (power iterations) parameters affect accuracy of the decomposition. - - This algorithm uses `2+power_iters` passes over the input data. In case you can only - afford a single pass, set `onepass=True` in :class:`LsiModel` and avoid using - this function directly. - - The decomposition algorithm is based on - **Halko, Martinsson, Tropp. Finding structure with randomness, 2009.** + """Run truncated Singular Value Decomposition (SVD) on a sparse input. + + Parameters + ---------- + corpus : {iterable of list of (int, float), scipy.sparse} + Input corpus as a stream (does not have to fit in RAM) + or a sparse matrix of shape (`num_terms`, num_documents). + rank : int + Desired number of factors to be retained after decomposition. + num_terms : int + The number of features (terms) in `corpus`. + chunksize : int, optional + Number of documents to be used in each training chunk. + extra_dims : int, optional + Extra samples to be used besides the rank `k`. Can improve accuracy. + power_iters: int, optional + Number of power iteration steps to be used. Increasing the number of power iterations improves accuracy, + but lowers performance. + dtype : numpy.dtype, optional + Enforces a type for elements of the decomposed matrix. + eps: float, optional + Percentage of the spectrum's energy to be discarded. + + Notes + ----- + The corpus may be larger than RAM (iterator of vectors), if `corpus` is a `scipy.sparse.csc` instead, + it is assumed the whole corpus fits into core memory and a different (more efficient) code path is chosen. + This may return less than the requested number of top `rank` factors, in case the input itself is of lower rank. + The `extra_dims` (oversampling) and especially `power_iters` (power iterations) parameters affect accuracy of the + decomposition. + + This algorithm uses `2 + power_iters` passes over the input data. In case you can only afford a single pass, + set `onepass=True` in :class:`~gensim.models.lsimodel.LsiModel` and avoid using this function directly. + + The decomposition algorithm is based on `"Finding structure with randomness: + Probabilistic algorithms for constructing approximate matrix decompositions" `_. + + + Returns + ------- + (np.ndarray 2D, np.ndarray 1D) + The left singular vectors and the singular values of the `corpus`. - .. [4] If `corpus` is a scipy.sparse matrix instead, it is assumed the whole - corpus fits into core memory and a different (more efficient) code path is chosen. """ rank = int(rank) if extra_dims is None: @@ -773,7 +1005,7 @@ def stochastic_svd(corpus, rank, num_terms, chunksize=20000, extra_dims=None, del qt logger.info("computing the final decomposition") - keep = clip_spectrum(s**2, rank, discard=eps) + keep = clip_spectrum(s ** 2, rank, discard=eps) u = u[:, :keep].copy() s = s[:keep] u = np.dot(q, u)