Skip to content

[14.0][IMP] queue_job: backport ChannelJob enhancements #781

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: 14.0
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 39 additions & 16 deletions queue_job/jobrunner/channels.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# Copyright 2015-2016 Camptocamp SA
# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl.html)
import logging
from collections import namedtuple
from functools import total_ordering
from heapq import heappop, heappush
from weakref import WeakValueDictionary
Expand All @@ -10,6 +11,7 @@
from ..job import CANCELLED, DONE, ENQUEUED, FAILED, PENDING, STARTED, WAIT_DEPENDENCIES

NOT_DONE = (WAIT_DEPENDENCIES, PENDING, ENQUEUED, STARTED, FAILED)
JobSortingKey = namedtuple("SortingKey", "eta priority date_created seq")

_logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -108,7 +110,7 @@
job that are necessary to prioritise them.

Channel jobs are comparable according to the following rules:
* jobs with an eta come before all other jobs
* jobs with an eta cannot be compared with jobs without
* then jobs with a smaller eta come first
* then jobs with a smaller priority come first
* then jobs with a smaller creation time come first
Expand All @@ -135,14 +137,18 @@
>>> j3 < j1
True

j4 and j5 comes even before j3, because they have an eta
j4 and j5 have an eta, they cannot be compared with j3

>>> j4 = ChannelJob(None, None, 4,
... seq=0, date_created=4, priority=9, eta=9)
>>> j5 = ChannelJob(None, None, 5,
... seq=0, date_created=5, priority=9, eta=9)
>>> j4 < j5 < j3
>>> j4 < j5
True
>>> j4 < j3
Traceback (most recent call last):
...
TypeError: '<' not supported between instances of 'int' and 'NoneType'

j6 has same date_created and priority as j5 but a smaller eta

Expand All @@ -153,7 +159,7 @@

Here is the complete suite:

>>> j6 < j4 < j5 < j3 < j1 < j2
>>> j6 < j4 < j5 and j3 < j1 < j2
True

j0 has the same properties as j1 but they are not considered
Expand All @@ -173,14 +179,13 @@

"""

__slots__ = ("db_name", "channel", "uuid", "_sorting_key", "__weakref__")

def __init__(self, db_name, channel, uuid, seq, date_created, priority, eta):
self.db_name = db_name
self.channel = channel
self.uuid = uuid
self.seq = seq
self.date_created = date_created
self.priority = priority
self.eta = eta
self._sorting_key = JobSortingKey(eta, priority, date_created, seq)

def __repr__(self):
return "<ChannelJob %s>" % self.uuid
Expand All @@ -191,18 +196,36 @@
def __hash__(self):
return id(self)

def set_no_eta(self):
self._sorting_key = JobSortingKey(None, *self._sorting_key[1:])

@property
def seq(self):
return self._sorting_key.seq

@property
def date_created(self):
return self._sorting_key.date_created

@property
def priority(self):
return self._sorting_key.priority

@property
def eta(self):
return self._sorting_key.eta

def sorting_key(self):
return self.eta, self.priority, self.date_created, self.seq
# DEPRECATED
return self._sorting_key

Check warning on line 220 in queue_job/jobrunner/channels.py

View check run for this annotation

Codecov / codecov/patch

queue_job/jobrunner/channels.py#L220

Added line #L220 was not covered by tests

def sorting_key_ignoring_eta(self):
return self.priority, self.date_created, self.seq
return self._sorting_key[1:]

def __lt__(self, other):
if self.eta and not other.eta:
return True
elif not self.eta and other.eta:
return False
return self.sorting_key() < other.sorting_key()
# Do not compare job where ETA is set with job where it is not
# If one job 'eta' is set, and the other is None, it raises TypeError
return self._sorting_key < other._sorting_key


class ChannelQueue:
Expand Down Expand Up @@ -312,7 +335,7 @@
def pop(self, now):
while self._eta_queue and self._eta_queue[0].eta <= now:
eta_job = self._eta_queue.pop()
eta_job.eta = None
eta_job.set_no_eta()
self._queue.add(eta_job)
if self.sequential and self._eta_queue and self._queue:
eta_job = self._eta_queue[0]
Expand Down