Skip to content

Commit

Permalink
[IMP] queue_job: Add split method
Browse files Browse the repository at this point in the history
  • Loading branch information
paradoxxxzero committed Jun 4, 2024
1 parent 76a42e2 commit feab369
Show file tree
Hide file tree
Showing 2 changed files with 130 additions and 0 deletions.
44 changes: 44 additions & 0 deletions queue_job/delay.py
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,50 @@ def delay(self):
"""Delay the whole graph"""
self._graph.delay()

def split(self, size):
"""Split the Delayable into a DelayableGroup containing batches
of size `size`
"""
if not self._job_method:
raise ValueError("No method set on the Delayable")

Check warning on line 542 in queue_job/delay.py

View check run for this annotation

Codecov / codecov/patch

queue_job/delay.py#L542

Added line #L542 was not covered by tests

total_records = len(self.recordset)

Check warning on line 544 in queue_job/delay.py

View check run for this annotation

Codecov / codecov/patch

queue_job/delay.py#L544

Added line #L544 was not covered by tests

delayables = []

Check warning on line 546 in queue_job/delay.py

View check run for this annotation

Codecov / codecov/patch

queue_job/delay.py#L546

Added line #L546 was not covered by tests
for index in range(0, total_records, size):
recordset = self.recordset[index : index + size]
delayable = Delayable(

Check warning on line 549 in queue_job/delay.py

View check run for this annotation

Codecov / codecov/patch

queue_job/delay.py#L548-L549

Added lines #L548 - L549 were not covered by tests
recordset,
priority=self.priority,
eta=self.eta,
max_retries=self.max_retries,
description=self.description,
channel=self.channel,
identity_key=self.identity_key,
)
# Update the __self__
delayable._job_method = getattr(recordset, self._job_method.__name__)
delayable._job_args = self._job_args
delayable._job_kwargs = self._job_kwargs

Check warning on line 561 in queue_job/delay.py

View check run for this annotation

Codecov / codecov/patch

queue_job/delay.py#L559-L561

Added lines #L559 - L561 were not covered by tests

delayables.append(delayable)

Check warning on line 563 in queue_job/delay.py

View check run for this annotation

Codecov / codecov/patch

queue_job/delay.py#L563

Added line #L563 was not covered by tests

description = self.description or (

Check warning on line 565 in queue_job/delay.py

View check run for this annotation

Codecov / codecov/patch

queue_job/delay.py#L565

Added line #L565 was not covered by tests
self._job_method.__doc__.splitlines()[0].strip()
if self._job_method.__doc__
else "{}.{}".format(self.recordset._name, self._job_method.__name__)
)
for index, delayable in enumerate(delayables):
delayable.set(

Check warning on line 571 in queue_job/delay.py

View check run for this annotation

Codecov / codecov/patch

queue_job/delay.py#L571

Added line #L571 was not covered by tests
description="%s (split %s/%s)"
% (description, index + 1, len(delayables))
)

# Prevent warning on deletion
self._generated_job = True

Check warning on line 577 in queue_job/delay.py

View check run for this annotation

Codecov / codecov/patch

queue_job/delay.py#L577

Added line #L577 was not covered by tests

return DelayableGroup(*delayables)

Check warning on line 579 in queue_job/delay.py

View check run for this annotation

Codecov / codecov/patch

queue_job/delay.py#L579

Added line #L579 was not covered by tests

def _build_job(self):
if self._generated_job:
return self._generated_job
Expand Down
86 changes: 86 additions & 0 deletions queue_job/tests/test_delayable.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,3 +271,89 @@ def test_graph_topological_sort(self):
]

self.assertIn(list(graph.topological_sort()), valid_solutions)


class TestDelayableSplit(unittest.TestCase):
def setUp(self):
super().setUp()

Check warning on line 278 in queue_job/tests/test_delayable.py

View check run for this annotation

Codecov / codecov/patch

queue_job/tests/test_delayable.py#L278

Added line #L278 was not covered by tests

class FakeRecordSet(list):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._name = "recordset"

Check warning on line 283 in queue_job/tests/test_delayable.py

View check run for this annotation

Codecov / codecov/patch

queue_job/tests/test_delayable.py#L281-L283

Added lines #L281 - L283 were not covered by tests

def __getitem__(self, key):

Check warning on line 285 in queue_job/tests/test_delayable.py

View check run for this annotation

Codecov / codecov/patch

queue_job/tests/test_delayable.py#L285

Added line #L285 was not covered by tests
if isinstance(key, slice):
return FakeRecordSet(super().__getitem__(key))
return super().__getitem__(key)

Check warning on line 288 in queue_job/tests/test_delayable.py

View check run for this annotation

Codecov / codecov/patch

queue_job/tests/test_delayable.py#L287-L288

Added lines #L287 - L288 were not covered by tests

def method(self, arg, kwarg=None):

Check warning on line 290 in queue_job/tests/test_delayable.py

View check run for this annotation

Codecov / codecov/patch

queue_job/tests/test_delayable.py#L290

Added line #L290 was not covered by tests
"""Method to be called"""
return arg, kwarg

Check warning on line 292 in queue_job/tests/test_delayable.py

View check run for this annotation

Codecov / codecov/patch

queue_job/tests/test_delayable.py#L292

Added line #L292 was not covered by tests

self.FakeRecordSet = FakeRecordSet

Check warning on line 294 in queue_job/tests/test_delayable.py

View check run for this annotation

Codecov / codecov/patch

queue_job/tests/test_delayable.py#L294

Added line #L294 was not covered by tests

def test_delayable_split_no_method_call_beforehand(self):
dl = Delayable(self.FakeRecordSet(range(20)))
with self.assertRaises(ValueError):
dl.split(3)

Check warning on line 299 in queue_job/tests/test_delayable.py

View check run for this annotation

Codecov / codecov/patch

queue_job/tests/test_delayable.py#L297-L299

Added lines #L297 - L299 were not covered by tests

def test_delayable_split_10_3(self):
dl = Delayable(self.FakeRecordSet(range(10)))
dl.method("arg", kwarg="kwarg")
group = dl.split(3)
self.assertEqual(len(group._delayables), 4)

Check warning on line 305 in queue_job/tests/test_delayable.py

View check run for this annotation

Codecov / codecov/patch

queue_job/tests/test_delayable.py#L302-L305

Added lines #L302 - L305 were not covered by tests
delayables = sorted(list(group._delayables), key=lambda x: x.description)
self.assertEqual(delayables[0].recordset, self.FakeRecordSet([0, 1, 2]))
self.assertEqual(delayables[1].recordset, self.FakeRecordSet([3, 4, 5]))
self.assertEqual(delayables[2].recordset, self.FakeRecordSet([6, 7, 8]))
self.assertEqual(delayables[3].recordset, self.FakeRecordSet([9]))
self.assertEqual(delayables[0].description, "Method to be called (split 1/4)")
self.assertEqual(delayables[1].description, "Method to be called (split 2/4)")
self.assertEqual(delayables[2].description, "Method to be called (split 3/4)")
self.assertEqual(delayables[3].description, "Method to be called (split 4/4)")
self.assertNotEqual(delayables[0]._job_method, dl._job_method)
self.assertNotEqual(delayables[1]._job_method, dl._job_method)
self.assertNotEqual(delayables[2]._job_method, dl._job_method)
self.assertNotEqual(delayables[3]._job_method, dl._job_method)
self.assertEqual(delayables[0]._job_method.__name__, dl._job_method.__name__)
self.assertEqual(delayables[1]._job_method.__name__, dl._job_method.__name__)
self.assertEqual(delayables[2]._job_method.__name__, dl._job_method.__name__)
self.assertEqual(delayables[3]._job_method.__name__, dl._job_method.__name__)
self.assertEqual(delayables[0]._job_args, ("arg",))
self.assertEqual(delayables[1]._job_args, ("arg",))
self.assertEqual(delayables[2]._job_args, ("arg",))
self.assertEqual(delayables[3]._job_args, ("arg",))
self.assertEqual(delayables[0]._job_kwargs, {"kwarg": "kwarg"})
self.assertEqual(delayables[1]._job_kwargs, {"kwarg": "kwarg"})
self.assertEqual(delayables[2]._job_kwargs, {"kwarg": "kwarg"})
self.assertEqual(delayables[3]._job_kwargs, {"kwarg": "kwarg"})

Check warning on line 330 in queue_job/tests/test_delayable.py

View check run for this annotation

Codecov / codecov/patch

queue_job/tests/test_delayable.py#L307-L330

Added lines #L307 - L330 were not covered by tests

def test_delayable_split_10_5(self):
dl = Delayable(self.FakeRecordSet(range(10)))
dl.method("arg", kwarg="kwarg")
group = dl.split(5)
self.assertEqual(len(group._delayables), 2)

Check warning on line 336 in queue_job/tests/test_delayable.py

View check run for this annotation

Codecov / codecov/patch

queue_job/tests/test_delayable.py#L333-L336

Added lines #L333 - L336 were not covered by tests
delayables = sorted(list(group._delayables), key=lambda x: x.description)
self.assertEqual(delayables[0].recordset, self.FakeRecordSet([0, 1, 2, 3, 4]))
self.assertEqual(delayables[1].recordset, self.FakeRecordSet([5, 6, 7, 8, 9]))
self.assertEqual(delayables[0].description, "Method to be called (split 1/2)")
self.assertEqual(delayables[1].description, "Method to be called (split 2/2)")

Check warning on line 341 in queue_job/tests/test_delayable.py

View check run for this annotation

Codecov / codecov/patch

queue_job/tests/test_delayable.py#L338-L341

Added lines #L338 - L341 were not covered by tests

def test_delayable_split_10_10(self):
dl = Delayable(self.FakeRecordSet(range(10)))
dl.method("arg", kwarg="kwarg")
group = dl.split(10)
self.assertEqual(len(group._delayables), 1)

Check warning on line 347 in queue_job/tests/test_delayable.py

View check run for this annotation

Codecov / codecov/patch

queue_job/tests/test_delayable.py#L344-L347

Added lines #L344 - L347 were not covered by tests
delayables = sorted(list(group._delayables), key=lambda x: x.description)
self.assertEqual(delayables[0].recordset, self.FakeRecordSet(range(10)))
self.assertEqual(delayables[0].description, "Method to be called (split 1/1)")

Check warning on line 350 in queue_job/tests/test_delayable.py

View check run for this annotation

Codecov / codecov/patch

queue_job/tests/test_delayable.py#L349-L350

Added lines #L349 - L350 were not covered by tests

def test_delayable_split_10_20(self):
dl = Delayable(self.FakeRecordSet(range(10)))
dl.method("arg", kwarg="kwarg")
group = dl.split(20)
self.assertEqual(len(group._delayables), 1)

Check warning on line 356 in queue_job/tests/test_delayable.py

View check run for this annotation

Codecov / codecov/patch

queue_job/tests/test_delayable.py#L353-L356

Added lines #L353 - L356 were not covered by tests
delayables = sorted(list(group._delayables), key=lambda x: x.description)
self.assertEqual(delayables[0].recordset, self.FakeRecordSet(range(10)))
self.assertEqual(delayables[0].description, "Method to be called (split 1/1)")

Check warning on line 359 in queue_job/tests/test_delayable.py

View check run for this annotation

Codecov / codecov/patch

queue_job/tests/test_delayable.py#L358-L359

Added lines #L358 - L359 were not covered by tests

0 comments on commit feab369

Please sign in to comment.