Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Make event persisters periodically announce position over replication. #8499

Merged
merged 10 commits into from
Oct 12, 2020
1 change: 1 addition & 0 deletions changelog.d/8499.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Allow events to be sent to clients sooner when using sharded event persisters.
13 changes: 9 additions & 4 deletions docs/tcp_replication.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ example flow would be (where '>' indicates master to worker and

> SERVER example.com
< REPLICATE
> POSITION events master 53
> POSITION events master 53 53
> RDATA events master 54 ["$foo1:bar.com", ...]
> RDATA events master 55 ["$foo4:bar.com", ...]

Expand Down Expand Up @@ -138,9 +138,9 @@ the wire:
< NAME synapse.app.appservice
< PING 1490197665618
< REPLICATE
> POSITION events master 1
> POSITION backfill master 1
> POSITION caches master 1
> POSITION events master 1 1
> POSITION backfill master 1 1
> POSITION caches master 1 1
> RDATA caches master 2 ["get_user_by_id",["@01register-user:localhost:8823"],1490197670513]
> RDATA events master 14 ["$149019767112vOHxz:localhost:8823",
"!AFDCvgApUmpdfVjIXm:localhost:8823","m.room.guest_access","",null]
Expand Down Expand Up @@ -185,6 +185,11 @@ client (C):
updates via HTTP API, rather than via the DB, then processes should make the
request to the appropriate process.

Two positions are included, the "new" position and the last position sent respectively.
This allows servers to tell instances that the positions have advanced but no
data has been written, without clients needlessly checking to see if they
have missed any updates.

#### ERROR (S, C)

There was an error
Expand Down
4 changes: 4 additions & 0 deletions synapse/replication/tcp/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,10 @@ async def on_rdata(
async def on_position(self, stream_name: str, instance_name: str, token: int):
self.store.process_replication_rows(stream_name, instance_name, token, [])

# We poke the generic "replication" notifier to wake anything up that
# may be streaming.
self.notifier.notify_replication()

def on_remote_server_up(self, server: str):
"""Called when get a new REMOTE_SERVER_UP command."""

Expand Down
36 changes: 26 additions & 10 deletions synapse/replication/tcp/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,34 +141,50 @@ def get_logcontext_id(self):


class PositionCommand(Command):
"""Sent by the server to tell the client the stream position without
needing to send an RDATA.
"""Sent by an instance to tell others the stream position without needing to
send an RDATA.

Two tokens are sent, the new position and the last position sent by the
instance (in an RDATA or other POSITION). The tokens are chosen so that *no*
rows were written by the instance between the `prev_token` and `new_token`.
(If an instance hasn't sent a position before then the new position can be
used for both.)

Format::

POSITION <stream_name> <instance_name> <token>
POSITION <stream_name> <instance_name> <prev_token> <new_token>

On receipt of a POSITION command clients should check if they have missed
any updates, and if so then fetch them out of band.
On receipt of a POSITION command instances should check if they have missed
any updates, and if so then fetch them out of band. Instances can check this
by comparing their view of the current token for the sending instance with
the included `prev_token`.

The `<instance_name>` is the process that sent the command and is the source
of the stream.
"""

NAME = "POSITION"

def __init__(self, stream_name, instance_name, token):
def __init__(self, stream_name, instance_name, prev_token, new_token):
self.stream_name = stream_name
self.instance_name = instance_name
self.token = token
self.prev_token = prev_token
self.new_token = new_token

@classmethod
def from_line(cls, line):
stream_name, instance_name, token = line.split(" ", 2)
return cls(stream_name, instance_name, int(token))
stream_name, instance_name, prev_token, new_token = line.split(" ", 3)
return cls(stream_name, instance_name, int(prev_token), int(new_token))

def to_line(self):
return " ".join((self.stream_name, self.instance_name, str(self.token)))
return " ".join(
(
self.stream_name,
self.instance_name,
str(self.prev_token),
str(self.new_token),
)
)


class ErrorCommand(_SimpleCommand):
Expand Down
24 changes: 14 additions & 10 deletions synapse/replication/tcp/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,9 @@ def __init__(self, hs):
self._streams_to_replicate = [] # type: List[Stream]

for stream in self._streams.values():
if stream.NAME == CachesStream.NAME:
# All workers can write to the cache invalidation stream.
if hs.config.redis.redis_enabled and stream.NAME == CachesStream.NAME:
# All workers can write to the cache invalidation stream when
# using redis.
self._streams_to_replicate.append(stream)
continue

Expand Down Expand Up @@ -313,11 +314,14 @@ def send_positions_to_connection(self, conn: AbstractConnection):
# We respond with current position of all streams this instance
# replicates.
for stream in self.get_streams_to_replicate():
# Note that we use the current token as the prev token here (rather
# than stream.last_token), as we can't be sure that there have been
# no rows written between last token and the current token (since we
# might be racing with the replication sending bg process).
current_token = stream.current_token(self._instance_name)
self.send_command(
PositionCommand(
stream.NAME,
self._instance_name,
stream.current_token(self._instance_name),
stream.NAME, self._instance_name, current_token, current_token,
)
)

Expand Down Expand Up @@ -511,16 +515,16 @@ async def _process_position(
# If the position token matches our current token then we're up to
# date and there's nothing to do. Otherwise, fetch all updates
# between then and now.
missing_updates = cmd.token != current_token
missing_updates = cmd.prev_token != current_token
while missing_updates:
logger.info(
"Fetching replication rows for '%s' between %i and %i",
stream_name,
current_token,
cmd.token,
cmd.new_token,
)
(updates, current_token, missing_updates) = await stream.get_updates_since(
cmd.instance_name, current_token, cmd.token
cmd.instance_name, current_token, cmd.new_token
)

# TODO: add some tests for this
Expand All @@ -536,11 +540,11 @@ async def _process_position(
[stream.parse_row(row) for row in rows],
)

logger.info("Caught up with stream '%s' to %i", stream_name, cmd.token)
logger.info("Caught up with stream '%s' to %i", stream_name, cmd.new_token)

# We've now caught up to position sent to us, notify handler.
await self._replication_data_handler.on_position(
cmd.stream_name, cmd.instance_name, cmd.token
cmd.stream_name, cmd.instance_name, cmd.new_token
)

self._streams_by_connection.setdefault(conn, set()).add(stream_name)
Expand Down
47 changes: 46 additions & 1 deletion synapse/replication/tcp/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
from twisted.internet.protocol import Factory

from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.replication.tcp.commands import PositionCommand
from synapse.replication.tcp.protocol import ServerReplicationStreamProtocol
from synapse.replication.tcp.streams import EventsStream
from synapse.util.metrics import Measure

stream_updates_counter = Counter(
Expand Down Expand Up @@ -84,14 +86,31 @@ def __init__(self, hs):
# Set of streams to replicate.
self.streams = self.command_handler.get_streams_to_replicate()

# If we have streams then we must have redis enabled or on master
assert (
not self.streams
or hs.config.redis.redis_enabled
or not hs.config.worker.worker_app
)

# If we are replicating an event stream we want to periodically check if
# we should send updated POSITIONs. We do this as a looping call rather
# explicitly poking when the position advances (without new data to
# replicate) to reduce replication traffic (otherwise each writer would
# likely send a POSITION for each new event received over replication).
#
# Note that if the position hasn't advanced then we won't send anything.
if any(EventsStream.NAME == s.NAME for s in self.streams):
self.clock.looping_call(self.on_notifier_poke, 1000)

def on_notifier_poke(self):
"""Checks if there is actually any new data and sends it to the
connections if there are.

This should get called each time new data is available, even if it
is currently being executed, so that nothing gets missed
"""
if not self.command_handler.connected():
if not self.command_handler.connected() or not self.streams:
# Don't bother if nothing is listening. We still need to advance
# the stream tokens otherwise they'll fall behind forever
for stream in self.streams:
Expand Down Expand Up @@ -136,6 +155,8 @@ async def _run_notifier_loop(self):
self._replication_torture_level / 1000.0
)

last_token = stream.last_token

logger.debug(
"Getting stream: %s: %s -> %s",
stream.NAME,
Expand All @@ -159,6 +180,30 @@ async def _run_notifier_loop(self):
)
stream_updates_counter.labels(stream.NAME).inc(len(updates))

else:
# The token has advanced but there is no data to
# send, so we send a `POSITION` to inform other
# workers of the updated position.
if stream.NAME == EventsStream.NAME:
# XXX: We only do this for the EventStream as it
# turns out that e.g. account data streams share
# their "current token" with each other, meaning
# that it is *not* safe to send a POSITION.
Comment on lines +188 to +191
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why exactly is it not safe to share positions with each other? Can you only have one worker advancing the account data stream?

Are you only doing the eventstream here out of caution? Would this be a problem for streams other than account?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Take the account data streams as an example, say we persist a bunch of room account data rows at 3, 4, 5, then if before we handle the room account data stream we send a POSITION global_account_data 2 5. The workers will update the shared account data current token to 5, so when they finally do receive RDATA room_account_data 3 ... etc they will compare the stream ID with the current token (i.e. 3 vs 5) and drop the RDATA as old.

Really, we should have one replication stream per stream ID generator, and I plan to do that but didn't want to change the world in this PR

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, that makes sense. For the event persisters this is fine as an event only needs to be persisted once.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeeeah, specifically it's fine as we're using an ID gen that supports sharded streams (rather than reusing the same ID gen across different replication streams)

logger.info(
"Sending position: %s -> %s",
stream.NAME,
current_token,
)
self.command_handler.send_command(
PositionCommand(
stream.NAME,
self._instance_name,
last_token,
current_token,
)
)
continue

# Some streams return multiple rows with the same stream IDs,
# we need to make sure they get sent out in batches. We do
# this by setting the current token to all but the last of
Expand Down
10 changes: 10 additions & 0 deletions synapse/storage/util/id_generators.py
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,16 @@ def _add_persisted_position(self, new_id: int):

heapq.heappush(self._known_persisted_positions, new_id)

# If we're a writer and we don't have any active writes we update our
# current position to the latest position seen. This allows the instance
# to report a recent position when asked, rather than a potentially old
# one (if this instance hasn't written anything for a while).
our_current_position = self._current_positions.get(self._instance_name)
if our_current_position and not self._unfinished_ids:
self._current_positions[self._instance_name] = max(
our_current_position, new_id
)

# We move the current min position up if the minimum current positions
# of all instances is higher (since by definition all positions less
# that that have been persisted).
Expand Down
2 changes: 2 additions & 0 deletions synapse/storage/util/sequence.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ def check_consistency(
if max_stream_id > last_value:
logger.warning(
"Postgres sequence %s is behind table %s: %d < %d",
self._sequence_name,
table,
last_value,
max_stream_id,
)
Expand Down
25 changes: 16 additions & 9 deletions tests/storage/test_id_generators.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,10 +199,17 @@ def test_multi_instance(self):
first_id_gen = self._create_id_generator("first", writers=["first", "second"])
second_id_gen = self._create_id_generator("second", writers=["first", "second"])

self.assertEqual(first_id_gen.get_positions(), {"first": 3, "second": 7})
self.assertEqual(first_id_gen.get_current_token_for_writer("first"), 3)
# The first ID gen will notice that it can advance its token to 7 as it
# has no in progress writes...
self.assertEqual(first_id_gen.get_positions(), {"first": 7, "second": 7})
self.assertEqual(first_id_gen.get_current_token_for_writer("first"), 7)
self.assertEqual(first_id_gen.get_current_token_for_writer("second"), 7)

# ... but the second ID gen doesn't know that.
self.assertEqual(second_id_gen.get_positions(), {"first": 3, "second": 7})
self.assertEqual(second_id_gen.get_current_token_for_writer("first"), 3)
self.assertEqual(second_id_gen.get_current_token_for_writer("second"), 7)

# Try allocating a new ID gen and check that we only see position
# advanced after we leave the context manager.

Expand All @@ -211,7 +218,7 @@ async def _get_next_async():
self.assertEqual(stream_id, 8)

self.assertEqual(
first_id_gen.get_positions(), {"first": 3, "second": 7}
first_id_gen.get_positions(), {"first": 7, "second": 7}
)

self.get_success(_get_next_async())
Expand Down Expand Up @@ -279,7 +286,7 @@ def test_get_persisted_upto_position(self):
self._insert_row_with_id("first", 3)
self._insert_row_with_id("second", 5)

id_gen = self._create_id_generator("first", writers=["first", "second"])
id_gen = self._create_id_generator("worker", writers=["first", "second"])

self.assertEqual(id_gen.get_positions(), {"first": 3, "second": 5})

Expand Down Expand Up @@ -319,14 +326,14 @@ def test_get_persisted_upto_position_get_next(self):

id_gen = self._create_id_generator("first", writers=["first", "second"])

self.assertEqual(id_gen.get_positions(), {"first": 3, "second": 5})
self.assertEqual(id_gen.get_positions(), {"first": 5, "second": 5})

self.assertEqual(id_gen.get_persisted_upto_position(), 3)
self.assertEqual(id_gen.get_persisted_upto_position(), 5)

async def _get_next_async():
async with id_gen.get_next() as stream_id:
self.assertEqual(stream_id, 6)
self.assertEqual(id_gen.get_persisted_upto_position(), 3)
self.assertEqual(id_gen.get_persisted_upto_position(), 5)

self.get_success(_get_next_async())

Expand Down Expand Up @@ -388,7 +395,7 @@ def test_writer_config_change(self):
self._insert_row_with_id("second", 5)

# Initial config has two writers
id_gen = self._create_id_generator("first", writers=["first", "second"])
id_gen = self._create_id_generator("worker", writers=["first", "second"])
self.assertEqual(id_gen.get_persisted_upto_position(), 3)
self.assertEqual(id_gen.get_current_token_for_writer("first"), 3)
self.assertEqual(id_gen.get_current_token_for_writer("second"), 5)
Expand Down Expand Up @@ -568,7 +575,7 @@ async def _get_next_async2():

self.get_success(_get_next_async2())

self.assertEqual(id_gen_1.get_positions(), {"first": -1, "second": -2})
self.assertEqual(id_gen_1.get_positions(), {"first": -2, "second": -2})
self.assertEqual(id_gen_2.get_positions(), {"first": -1, "second": -2})
self.assertEqual(id_gen_1.get_persisted_upto_position(), -2)
self.assertEqual(id_gen_2.get_persisted_upto_position(), -2)