diff --git a/queue_job/jobrunner/channels.py b/queue_job/jobrunner/channels.py index 3f81ec0b2..54188db11 100644 --- a/queue_job/jobrunner/channels.py +++ b/queue_job/jobrunner/channels.py @@ -2,6 +2,7 @@ # Copyright 2015-2016 Camptocamp SA # License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl.html) import logging +from collections import namedtuple from functools import total_ordering from heapq import heappop, heappush from weakref import WeakValueDictionary @@ -10,6 +11,7 @@ from ..job import CANCELLED, DONE, ENQUEUED, FAILED, PENDING, STARTED, WAIT_DEPENDENCIES NOT_DONE = (WAIT_DEPENDENCIES, PENDING, ENQUEUED, STARTED, FAILED) +JobSortingKey = namedtuple("SortingKey", "eta priority date_created seq") _logger = logging.getLogger(__name__) @@ -108,7 +110,7 @@ class ChannelJob: job that are necessary to prioritise them. Channel jobs are comparable according to the following rules: - * jobs with an eta come before all other jobs + * jobs with an eta cannot be compared with jobs without * then jobs with a smaller eta come first * then jobs with a smaller priority come first * then jobs with a smaller creation time come first @@ -135,14 +137,18 @@ class ChannelJob: >>> j3 < j1 True - j4 and j5 comes even before j3, because they have an eta + j4 and j5 have an eta, they cannot be compared with j3 >>> j4 = ChannelJob(None, None, 4, ... seq=0, date_created=4, priority=9, eta=9) >>> j5 = ChannelJob(None, None, 5, ... seq=0, date_created=5, priority=9, eta=9) - >>> j4 < j5 < j3 + >>> j4 < j5 True + >>> j4 < j3 + Traceback (most recent call last): + ... + TypeError: '<' not supported between instances of 'int' and 'NoneType' j6 has same date_created and priority as j5 but a smaller eta @@ -153,7 +159,7 @@ class ChannelJob: Here is the complete suite: - >>> j6 < j4 < j5 < j3 < j1 < j2 + >>> j6 < j4 < j5 and j3 < j1 < j2 True j0 has the same properties as j1 but they are not considered @@ -173,14 +179,13 @@ class ChannelJob: """ + __slots__ = ("db_name", "channel", "uuid", "_sorting_key", "__weakref__") + def __init__(self, db_name, channel, uuid, seq, date_created, priority, eta): self.db_name = db_name self.channel = channel self.uuid = uuid - self.seq = seq - self.date_created = date_created - self.priority = priority - self.eta = eta + self._sorting_key = JobSortingKey(eta, priority, date_created, seq) def __repr__(self): return "" % self.uuid @@ -191,18 +196,36 @@ def __eq__(self, other): def __hash__(self): return id(self) + def set_no_eta(self): + self._sorting_key = JobSortingKey(None, *self._sorting_key[1:]) + + @property + def seq(self): + return self._sorting_key.seq + + @property + def date_created(self): + return self._sorting_key.date_created + + @property + def priority(self): + return self._sorting_key.priority + + @property + def eta(self): + return self._sorting_key.eta + def sorting_key(self): - return self.eta, self.priority, self.date_created, self.seq + # DEPRECATED + return self._sorting_key def sorting_key_ignoring_eta(self): - return self.priority, self.date_created, self.seq + return self._sorting_key[1:] def __lt__(self, other): - if self.eta and not other.eta: - return True - elif not self.eta and other.eta: - return False - return self.sorting_key() < other.sorting_key() + # Do not compare job where ETA is set with job where it is not + # If one job 'eta' is set, and the other is None, it raises TypeError + return self._sorting_key < other._sorting_key class ChannelQueue: @@ -312,7 +335,7 @@ def remove(self, job): def pop(self, now): while self._eta_queue and self._eta_queue[0].eta <= now: eta_job = self._eta_queue.pop() - eta_job.eta = None + eta_job.set_no_eta() self._queue.add(eta_job) if self.sequential and self._eta_queue and self._queue: eta_job = self._eta_queue[0]