Skip to content

Commit

Permalink
Merge pull request #1858 from ResearchHub/pull-updated-openalex-works
Browse files Browse the repository at this point in the history
Pull updated openalex works
  • Loading branch information
koutst committed Sep 19, 2024
2 parents 14bccd1 + 3b4ef84 commit 2a71433
Show file tree
Hide file tree
Showing 3 changed files with 220 additions and 13 deletions.
47 changes: 36 additions & 11 deletions src/paper/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -522,11 +522,28 @@ def log_daily_uploads():
def pull_new_openalex_works(self, retry=0, paper_fetch_log_id=None):
from paper.models import PaperFetchLog

return _pull_openalex_works(
self, PaperFetchLog.FETCH_NEW, retry, paper_fetch_log_id
)


@app.task(bind=True, max_retries=3)
def pull_updated_openalex_works(self, retry=0, paper_fetch_log_id=None):
from paper.models import PaperFetchLog

return _pull_openalex_works(
self, PaperFetchLog.FETCH_UPDATE, retry, paper_fetch_log_id
)


def _pull_openalex_works(self, fetch_type, retry=0, paper_fetch_log_id=None):
from paper.models import PaperFetchLog

"""
Pull new works (papers) from OpenAlex.
Pull works (papers) from OpenAlex.
This looks complicated because we're trying to handle retries and logging.
But simply:
1. Get new works from OpenAlex in batches
1. Get new or updated works from OpenAlex in batches
2. Kick-off a task to create/update papers for each work
3. If we hit an error, retry the job from where we left off
4. Log the results
Expand Down Expand Up @@ -555,7 +572,7 @@ def pull_new_openalex_works(self, retry=0, paper_fetch_log_id=None):
last_successful_run_log = (
PaperFetchLog.objects.filter(
source=PaperFetchLog.OPENALEX,
fetch_type=PaperFetchLog.FETCH_NEW,
fetch_type=fetch_type,
status__in=[PaperFetchLog.SUCCESS, PaperFetchLog.FAILED],
)
.order_by("-started_date")
Expand All @@ -573,7 +590,7 @@ def pull_new_openalex_works(self, retry=0, paper_fetch_log_id=None):
date_to_fetch_from = last_successful_run_log.fetch_since_date
next_cursor = last_successful_run_log.next_cursor or "*"
except Exception as e:
sentry.log_error(e, message="Failed to get last successful log")
sentry.log_error(e, message="Failed to get last successful or failed log")

# check if there's a pending log within the last 24 hours
# if there is, skip this run.
Expand All @@ -582,7 +599,7 @@ def pull_new_openalex_works(self, retry=0, paper_fetch_log_id=None):
try:
pending_log = PaperFetchLog.objects.filter(
source=PaperFetchLog.OPENALEX,
fetch_type=PaperFetchLog.FETCH_NEW,
fetch_type=fetch_type,
status=PaperFetchLog.PENDING,
started_date__gte=date_to_fetch_from,
).exists()
Expand All @@ -595,7 +612,7 @@ def pull_new_openalex_works(self, retry=0, paper_fetch_log_id=None):

lg = PaperFetchLog.objects.create(
source=PaperFetchLog.OPENALEX,
fetch_type=PaperFetchLog.FETCH_NEW,
fetch_type=fetch_type,
status=PaperFetchLog.PENDING,
started_date=start_date,
fetch_since_date=date_to_fetch_from,
Expand Down Expand Up @@ -627,11 +644,19 @@ def pull_new_openalex_works(self, retry=0, paper_fetch_log_id=None):
open_alex = OpenAlex()

while True:
works, next_cursor = open_alex.get_works(
types=["article"],
since_date=date_to_fetch_from,
next_cursor=next_cursor,
)
if fetch_type == PaperFetchLog.FETCH_NEW:
works, next_cursor = open_alex.get_works(
types=["article"],
since_date=date_to_fetch_from,
next_cursor=next_cursor,
)
elif fetch_type == PaperFetchLog.FETCH_UPDATE:
works, next_cursor = open_alex.get_works(
types=["article"],
from_updated_date=date_to_fetch_from,
next_cursor=next_cursor,
)

# if we've reached the end of the results, exit the loop
if next_cursor is None or works is None or len(works) == 0:
break
Expand Down
178 changes: 176 additions & 2 deletions src/paper/tests/test_pull_openalex_works.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
from django.test import TestCase, override_settings
from django.utils import timezone

from paper.models import PaperFetchLog
from paper.tasks import pull_new_openalex_works
from paper.models import Paper, PaperFetchLog
from paper.tasks import pull_new_openalex_works, pull_updated_openalex_works


class TestPullNewOpenAlexWorks(TestCase):
Expand Down Expand Up @@ -172,3 +172,177 @@ def test_pull_new_openalex_works_successful_retry(

self.assertTrue(result.successful())
self.assertEqual(result.result, True)


class TestPullUpdatedOpenAlexWorks(TestCase):
@override_settings(CELERY_TASK_ALWAYS_EAGER=True, CELERY_TASK_EAGER_PROPAGATES=True)
@patch("paper.tasks.OpenAlex")
@patch("paper.tasks.process_openalex_works")
def test_pull_updated_openalex_works_success(
self, mock_process_works, mock_openalex
):
# Mock OpenAlex.get_works to return some test data
mock_openalex_instance = mock_openalex.return_value
mock_openalex_instance.get_works.side_effect = [
(
[{"id": "W1", "doi": "10.1234/1"}, {"id": "W2", "doi": "10.1234/2"}],
"next_cursor_1",
),
(
[{"id": "W3", "doi": "10.1234/3"}, {"id": "W4", "doi": "10.1234/4"}],
"next_cursor_2",
),
([], None),
]

# Call the function
pull_updated_openalex_works.apply()

# Check that get_works was called thrice
self.assertEqual(mock_openalex_instance.get_works.call_count, 3)

# Check that process_openalex_works was called twice
self.assertEqual(mock_process_works.call_count, 2)

# Check that a PaperFetchLog was created and updated correctly
log = PaperFetchLog.objects.latest("id")
self.assertEqual(log.source, PaperFetchLog.OPENALEX)
self.assertEqual(log.fetch_type, PaperFetchLog.FETCH_UPDATE)
self.assertEqual(log.status, PaperFetchLog.SUCCESS)
self.assertEqual(log.total_papers_processed, 4)

@override_settings(CELERY_TASK_ALWAYS_EAGER=True, CELERY_TASK_EAGER_PROPAGATES=True)
@patch("paper.tasks.OpenAlex")
@patch("paper.tasks.process_openalex_works")
def test_pull_updated_openalex_works_no_results(
self, mock_process_works, mock_openalex
):
# Mock OpenAlex.get_works to return no results
mock_openalex_instance = mock_openalex.return_value
mock_openalex_instance.get_works.return_value = ([], None)

# Call the function
pull_updated_openalex_works.apply()

# Check that get_works was called once
mock_openalex_instance.get_works.assert_called_once()

# Check that process_openalex_works was not called
mock_process_works.assert_not_called()

# Check that a PaperFetchLog was created and updated correctly
log = PaperFetchLog.objects.latest("id")
self.assertEqual(log.source, PaperFetchLog.OPENALEX)
self.assertEqual(log.fetch_type, PaperFetchLog.FETCH_UPDATE)
self.assertEqual(log.status, PaperFetchLog.SUCCESS)
self.assertEqual(log.total_papers_processed, 0)

@override_settings(CELERY_TASK_ALWAYS_EAGER=True, CELERY_TASK_EAGER_PROPAGATES=True)
@patch("paper.tasks.OpenAlex")
@patch("paper.tasks.process_openalex_works")
def test_pull_updated_openalex_works_existing_pending_log(
self, mock_process_works, mock_openalex
):
# Create a pending log
PaperFetchLog.objects.create(
source=PaperFetchLog.OPENALEX,
fetch_type=PaperFetchLog.FETCH_UPDATE,
status=PaperFetchLog.PENDING,
started_date=timezone.now(),
)

# Call the function
pull_updated_openalex_works.apply()

# Check that get_works was not called
mock_openalex.assert_not_called()

# Check that process_openalex_works was not called
mock_process_works.assert_not_called()

# Check that no new PaperFetchLog was created
self.assertEqual(PaperFetchLog.objects.count(), 1)

@override_settings(CELERY_TASK_ALWAYS_EAGER=True, CELERY_TASK_EAGER_PROPAGATES=True)
@patch("paper.tasks.OpenAlex")
@patch("paper.tasks.process_openalex_works")
@patch("paper.tasks.pull_updated_openalex_works.retry")
def test_pull_updated_openalex_works_retry_and_fail(
self, mock_retry, mock_process_works, mock_openalex
):
# Mock OpenAlex.get_works to always raise an exception
mock_openalex_instance = mock_openalex.return_value
mock_openalex_instance.get_works.side_effect = Exception("Test exception")

# Mock the retry method to raise MaxRetriesExceededError on the 4th call
mock_retry.side_effect = [Retry(), Retry(), Retry(), MaxRetriesExceededError()]

# Create a PaperFetchLog
paper_fetch_log = PaperFetchLog.objects.create(
source=PaperFetchLog.OPENALEX,
fetch_type=PaperFetchLog.FETCH_UPDATE,
status=PaperFetchLog.PENDING,
)

# Call the task and expect it to raise Retry exceptions
with self.assertRaises(Retry):
pull_updated_openalex_works(retry=0, paper_fetch_log_id=paper_fetch_log.id)
with self.assertRaises(Retry):
pull_updated_openalex_works(retry=1, paper_fetch_log_id=paper_fetch_log.id)
with self.assertRaises(Retry):
pull_updated_openalex_works(retry=2, paper_fetch_log_id=paper_fetch_log.id)

# On the 4th attempt, it should raise the original exception (Test exception)
with self.assertRaises(Exception) as cm:
pull_updated_openalex_works(retry=3, paper_fetch_log_id=paper_fetch_log.id)
self.assertEqual(str(cm.exception), "Test exception")

# Check that get_works was called 4 times
self.assertEqual(mock_openalex_instance.get_works.call_count, 4)

# Check that process_openalex_works was not called
mock_process_works.assert_not_called()

# Refresh the log from the database
paper_fetch_log.refresh_from_db()

# Check that the log status was updated to FAILED
self.assertEqual(paper_fetch_log.status, PaperFetchLog.FAILED)
self.assertEqual(paper_fetch_log.total_papers_processed, 0)
self.assertIsNotNone(paper_fetch_log.completed_date)

@override_settings(CELERY_TASK_ALWAYS_EAGER=True, CELERY_TASK_EAGER_PROPAGATES=True)
@patch("paper.tasks.OpenAlex")
@patch("paper.tasks.process_openalex_works")
def test_pull_updated_openalex_works_successful_retry(
self, mock_process_works, mock_openalex
):
# Mock OpenAlex.get_works to raise an exception on the second call, then succeed
mock_openalex_instance = mock_openalex.return_value
mock_openalex_instance.get_works.side_effect = [
Exception("Test exception"),
([{"id": "W1"}, {"id": "W2"}], "next_cursor_1"),
([{"id": "W3"}, {"id": "W4"}], "next_cursor_2"),
([], None),
]

task = pull_updated_openalex_works.s()
task.retry = MagicMock(side_effect=Retry())

with self.assertRaises(Retry):
task.apply()

paper_fetch_log = PaperFetchLog.objects.latest("id")
result = task.apply(args=[1, paper_fetch_log.id])

self.assertEqual(mock_openalex_instance.get_works.call_count, 4)
self.assertEqual(mock_process_works.call_count, 2)

log = PaperFetchLog.objects.latest("id")
self.assertEqual(log.source, PaperFetchLog.OPENALEX)
self.assertEqual(log.fetch_type, PaperFetchLog.FETCH_UPDATE)
self.assertEqual(log.status, PaperFetchLog.SUCCESS)
self.assertEqual(log.total_papers_processed, 4)

self.assertTrue(result.successful())
self.assertEqual(result.result, True)
8 changes: 8 additions & 0 deletions src/researchhub/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,14 @@
"queue": QUEUE_PULL_PAPERS,
},
},
"paper_pull-updated-openalex-works": {
"task": "paper.tasks.pull_updated_openalex_works",
"schedule": crontab(minute=0, hour=0),
"options": {
"priority": 3,
"queue": QUEUE_PULL_PAPERS,
},
},
# Purchase
"purchase_update-purchases": {
"task": "purchase.tasks.update_purchases",
Expand Down

0 comments on commit 2a71433

Please sign in to comment.