Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Handle federation inbound instances being killed more gracefully #11262

Merged
merged 3 commits into from
Nov 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/11262.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix a bug where if a remote event is being processed by a worker when it gets killed then it won't get processed on restart. Introduced in v1.37.1.
5 changes: 5 additions & 0 deletions synapse/federation/federation_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,11 @@ async def on_incoming_transaction(
self._started_handling_of_staged_events = True
self._handle_old_staged_events()

# Start a periodic check for old staged events. This is to handle
# the case where locks time out, e.g. if another process gets killed
# without dropping its locks.
self._clock.looping_call(self._handle_old_staged_events, 60 * 1000)

# keep this as early as possible to make the calculated origin ts as
# accurate as possible.
request_time = self._clock.time_msec()
Expand Down
31 changes: 21 additions & 10 deletions synapse/storage/databases/main/lock.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import logging
from types import TracebackType
from typing import TYPE_CHECKING, Dict, Optional, Tuple, Type
from weakref import WeakValueDictionary

from twisted.internet.interfaces import IReactorCore

Expand Down Expand Up @@ -61,7 +62,7 @@ def __init__(self, database: DatabasePool, db_conn: Connection, hs: "HomeServer"

# A map from `(lock_name, lock_key)` to the token of any locks that we
# think we currently hold.
self._live_tokens: Dict[Tuple[str, str], str] = {}
self._live_tokens: Dict[Tuple[str, str], Lock] = WeakValueDictionary()

# When we shut down we want to remove the locks. Technically this can
# lead to a race, as we may drop the lock while we are still processing.
Expand All @@ -80,10 +81,10 @@ async def _on_shutdown(self) -> None:

# We need to take a copy of the tokens dict as dropping the locks will
# cause the dictionary to change.
tokens = dict(self._live_tokens)
locks = dict(self._live_tokens)

for (lock_name, lock_key), token in tokens.items():
await self._drop_lock(lock_name, lock_key, token)
for lock in locks.values():
await lock.release()

logger.info("Dropped locks due to shutdown")

Expand All @@ -93,14 +94,21 @@ async def try_acquire_lock(self, lock_name: str, lock_key: str) -> Optional["Loc
used (otherwise the lock will leak).
"""

# Check if this process has taken out a lock and if it's still valid.
lock = self._live_tokens.get((lock_name, lock_key))
if lock and await lock.is_still_valid():
return None

now = self._clock.time_msec()
token = random_string(6)

if self.db_pool.engine.can_native_upsert:

def _try_acquire_lock_txn(txn: LoggingTransaction) -> bool:
# We take out the lock if either a) there is no row for the lock
# already or b) the existing row has timed out.
# already, b) the existing row has timed out, or c) the row is
# for this instance (which means the process got killed and
# restarted)
sql = """
INSERT INTO worker_locks (lock_name, lock_key, instance_name, token, last_renewed_ts)
VALUES (?, ?, ?, ?, ?)
Expand All @@ -112,6 +120,7 @@ def _try_acquire_lock_txn(txn: LoggingTransaction) -> bool:
last_renewed_ts = EXCLUDED.last_renewed_ts
WHERE
worker_locks.last_renewed_ts < ?
OR worker_locks.instance_name = EXCLUDED.instance_name
Comment on lines 121 to +123
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the DO UPDATE above only apply to the row(s?) that's caused a conflict? If not I'm not sure I understand what's happening here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, so it's basically equivalent to doing the INSERT if there is no conflict, or applying the UPDATE SET ... WHERE ... if there is a conflict (with the special EXCLUDED.* table matching the VALUES in the INSERT).

I.e. in this specific case we're either inserting a new row, or updating the existing row so long as either a) the lock has timed out OR b) the instance name matches.

"""
txn.execute(
sql,
Expand Down Expand Up @@ -148,11 +157,11 @@ def _try_acquire_lock_emulated_txn(txn: LoggingTransaction) -> bool:
WHERE
lock_name = ?
AND lock_key = ?
AND last_renewed_ts < ?
AND (last_renewed_ts < ? OR instance_name = ?)
"""
txn.execute(
sql,
(lock_name, lock_key, now - _LOCK_TIMEOUT_MS),
(lock_name, lock_key, now - _LOCK_TIMEOUT_MS, self._instance_name),
)

inserted = self.db_pool.simple_upsert_txn_emulated(
Expand All @@ -179,9 +188,7 @@ def _try_acquire_lock_emulated_txn(txn: LoggingTransaction) -> bool:
if not did_lock:
return None

self._live_tokens[(lock_name, lock_key)] = token

return Lock(
lock = Lock(
self._reactor,
self._clock,
self,
Expand All @@ -190,6 +197,10 @@ def _try_acquire_lock_emulated_txn(txn: LoggingTransaction) -> bool:
token=token,
)

self._live_tokens[(lock_name, lock_key)] = lock

return lock

async def _is_lock_still_valid(
self, lock_name: str, lock_key: str, token: str
) -> bool:
Expand Down