Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LSI documentation #1892

Merged
merged 20 commits into from
Feb 16, 2018
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 91 additions & 28 deletions gensim/models/lsi_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,18 @@
# Licensed under the GNU LGPL v2.1 - http://www.gnu.org/licenses/lgpl.html

"""
USAGE: %(program)s SIZE_OF_JOBS_QUEUE
Dispatcher process which orchestrates distributed LSI computations. Run this
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is executable script, some "special case", please look into __doc__ and .. program-output from 117d447 (same for lsi_worker)

script only once, on the master node in your cluster.

Dispatcher process which orchestrates distributed LSI computations. Run this \
script only once, on any node in your cluster.
Notes
-----
The dispatches expects to find worker scripts already running. Make sure you run
as many workers as you like on your machines **before** launching the dispatcher.

Examples
--------
python -m gensim.models.lsi_dispatcher SIZE_OF_JOBS_QUEUE

Example: python -m gensim.models.lsi_dispatcher
"""


Expand All @@ -21,17 +27,16 @@
import threading
import time
from six import iteritems, itervalues

try:
from Queue import Queue
except ImportError:
from queue import Queue
import Pyro4
from gensim import utils


logger = logging.getLogger("gensim.models.lsi_dispatcher")


# How many jobs (=chunks of N documents) to keep "pre-fetched" in a queue?
# A small number is usually enough, unless iteration over the corpus is very very
# slow (slower than the actual computation of LSI), in which case you can override
Expand All @@ -45,26 +50,44 @@


class Dispatcher(object):
"""
Dispatcher object that communicates and coordinates individual workers.
"""Dispatcher object that communicates and coordinates individual workers.

Notes
-----
There should never be more than one dispatcher running at any one time.

"""

def __init__(self, maxsize=0):
"""
Note that the constructor does not fully initialize the dispatcher;
use the `initialize()` function to populate it with workers etc.
"""Partly initializes the dispatcher.

A full initialization (including initialization of the workers) requires a call to
`self.initialize()`

Parameters
----------
maxsize : int
Maximum number of jobs to be kept pre-fetched in the queue.

"""
self.maxsize = maxsize
self.workers = {}
self.callback = None # a pyro proxy to this object (unknown at init time, but will be set later)

@Pyro4.expose
def initialize(self, **model_params):
"""
`model_params` are parameters used to initialize individual workers (gets
handed all the way down to worker.initialize()).
"""Fully initializes the dispatcher and all its workers.

Parameters
----------
**model_params
Keyword parameters used to initialize individual workers.

Raises
------
RuntimeError
When no workers are found (the `lsi_worker` script must be ran beforehand).

"""
self.jobs = Queue(maxsize=self.maxsize)
self.lock_update = threading.Lock()
Expand Down Expand Up @@ -92,28 +115,59 @@ def initialize(self, **model_params):

@Pyro4.expose
def getworkers(self):
"""
Return pyro URIs of all registered workers.
"""Return pyro URIs of all registered workers.

Returns
-------
list of URIs
The pyro URIs for each worker.

"""
return [worker._pyroUri for worker in itervalues(self.workers)]

@Pyro4.expose
def getjob(self, worker_id):
"""Atomically pops a job from the queue.

Parameters
----------
worker_id : int
The worker that requested the job.

Returns
-------
job : iterable of iterable of (int, float)
The corpus to be processed by the worker.

"""
logger.info("worker #%i requesting a new job", worker_id)
job = self.jobs.get(block=True, timeout=1)
logger.info("worker #%i got a new job (%i left)", worker_id, self.jobs.qsize())
return job

@Pyro4.expose
def putjob(self, job):
"""Atomically add a job to the queue.

Parameters
----------
job : iterable of iterable of (int, float)
The corpus to be added to the queue.

"""
self._jobsreceived += 1
self.jobs.put(job, block=True, timeout=HUGE_TIMEOUT)
logger.info("added a new job (len(queue)=%i items)", self.jobs.qsize())

@Pyro4.expose
def getstate(self):
"""
Merge projections from across all workers and return the final projection.
"""Merge projections from across all workers and return the final projection.

Returns
-------
:class:`~gensim.models.lsimodel.Projection`
The current projection of the total model.

"""
logger.info("end of input, assigning all remaining jobs")
logger.debug("jobs done: %s, jobs received: %s", self._jobsdone, self._jobsreceived)
Expand All @@ -135,9 +189,7 @@ def getstate(self):

@Pyro4.expose
def reset(self):
"""
Initialize all workers for a new decomposition.
"""
"""Re-initialize all workers for a new decomposition. """
for workerid, worker in iteritems(self.workers):
logger.info("resetting worker %s", workerid)
worker.reset()
Expand All @@ -149,27 +201,38 @@ def reset(self):
@Pyro4.oneway
@utils.synchronous('lock_update')
def jobdone(self, workerid):
"""
A worker has finished its job. Log this event and then asynchronously
transfer control back to the worker.
"""Callback used by workers to notify when their job is done.

The job done event is logged and then control is asynchronously
transfered back to the worker (who can then request another job).
In this way, control flow basically oscillates between dispatcher.jobdone()
worker.requestjob().

Parameters
----------
workerid : int
The ID of the worker that finished the job (used for loggign).

"""
self._jobsdone += 1
logger.info("worker #%s finished job #%i", workerid, self._jobsdone)
worker = self.workers[workerid]
worker.requestjob() # tell the worker to ask for another job, asynchronously (one-way)

def jobsdone(self):
"""Wrap self._jobsdone, needed for remote access through proxies"""
"""Wrap self._jobsdone, needed for remote access through proxies.

Returns
-------
int
Number of jobs already completed.

"""
return self._jobsdone

@Pyro4.oneway
def exit(self):
"""
Terminate all registered workers and then the dispatcher.
"""
"""Terminate all registered workers and then the dispatcher. """
for workerid, worker in iteritems(self.workers):
logger.info("terminating worker %s", workerid)
worker.exit()
Expand Down
53 changes: 44 additions & 9 deletions gensim/models/lsi_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,16 @@
# Licensed under the GNU LGPL v2.1 - http://www.gnu.org/licenses/lgpl.html

"""
USAGE: %(program)s

Worker ("slave") process used in computing distributed LSI. Run this script \
on every node in your cluster. If you wish, you may even run it multiple times \
on a single machine, to make better use of multiple cores (just beware that \
Worker ("slave") process used in computing distributed LSI. Run this script
on every node in your cluster. If you wish, you may even run it multiple times
on a single machine, to make better use of multiple cores (just beware that
memory footprint increases accordingly).

Example: python -m gensim.models.lsi_worker
Examples
--------
python -m gensim.models.lsi_worker

"""


Expand All @@ -38,10 +40,27 @@

class Worker(object):
def __init__(self):
"""Partly initializes the model.

A full initialization requires a call to `self.initialize` as well.

"""
self.model = None

@Pyro4.expose
def initialize(self, myid, dispatcher, **model_params):
"""Fully initializes the worker.

Parameters
----------
myid : int
An ID number used to identify this worker in the dispatcher object.
dispatcher : :class:`~gensim.models.lsi_dispatcher.Dispatcher`
The dispatcher responsible for scheduling this worker.
**model_params
Keyword parameters to initialize the inner LSI model.

"""
self.lock_update = threading.Lock()
self.jobsdone = 0 # how many jobs has this worker completed?
# id of this worker in the dispatcher; just a convenience var for easy access/logging TODO remove?
Expand All @@ -54,9 +73,7 @@ def initialize(self, myid, dispatcher, **model_params):
@Pyro4.expose
@Pyro4.oneway
def requestjob(self):
"""
Request jobs from the dispatcher, in a perpetual loop until `getstate()` is called.
"""
"""Request jobs from the dispatcher, in a perpetual loop until `self.getstate()` is called."""
if self.model is None:
raise RuntimeError("worker must be initialized before receiving jobs")

Expand All @@ -76,6 +93,14 @@ def requestjob(self):

@utils.synchronous('lock_update')
def processjob(self, job):
"""Incrementally proccesses the job and potentially logs progress.

Parameters
----------
job : iterable of iterable of (int, float)
The corpus to be used for further training the LSI model.

"""
self.model.add_documents(job)
self.jobsdone += 1
if SAVE_DEBUG and self.jobsdone % SAVE_DEBUG == 0:
Expand All @@ -85,6 +110,14 @@ def processjob(self, job):
@Pyro4.expose
@utils.synchronous('lock_update')
def getstate(self):
"""Logs and returns the LSI model's current projection.

Returns
-------
:class:`~gensim.models.lsimodel.Projection`
The current projection.

"""
logger.info("worker #%i returning its state after %s jobs", self.myid, self.jobsdone)
assert isinstance(self.model.projection, lsimodel.Projection)
self.finished = True
Expand All @@ -93,18 +126,20 @@ def getstate(self):
@Pyro4.expose
@utils.synchronous('lock_update')
def reset(self):
"""Resets the worker by deleting its current projection."""
logger.info("resetting worker #%i", self.myid)
self.model.projection = self.model.projection.empty_like()
self.finished = False

@Pyro4.oneway
def exit(self):
"""Terminates the worker. """
logger.info("terminating worker #%i", self.myid)
os._exit(0)
# endclass Worker


def main():
"""The main script. """
logging.basicConfig(format='%(asctime)s : %(levelname)s : %(message)s', level=logging.INFO)
logger.info("running %s", " ".join(sys.argv))

Expand Down
Loading