Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Bg update to populate new events table columns #13215

Merged
merged 1 commit into from
Jul 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/13215.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Preparation for database schema simplifications: populate `state_key` and `rejection_reason` for existing rows in the `events` table.
87 changes: 87 additions & 0 deletions synapse/storage/databases/main/events_bg_updates.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ class _BackgroundUpdates:
EVENT_EDGES_DROP_INVALID_ROWS = "event_edges_drop_invalid_rows"
EVENT_EDGES_REPLACE_INDEX = "event_edges_replace_index"

EVENTS_POPULATE_STATE_KEY_REJECTIONS = "events_populate_state_key_rejections"


@attr.s(slots=True, frozen=True, auto_attribs=True)
class _CalculateChainCover:
Expand Down Expand Up @@ -253,6 +255,11 @@ def __init__(
replaces_index="ev_edges_id",
)

self.db_pool.updates.register_background_update_handler(
_BackgroundUpdates.EVENTS_POPULATE_STATE_KEY_REJECTIONS,
self._background_events_populate_state_key_rejections,
)

async def _background_reindex_fields_sender(
self, progress: JsonDict, batch_size: int
) -> int:
Expand Down Expand Up @@ -1399,3 +1406,83 @@ def drop_invalid_event_edges_txn(txn: LoggingTransaction) -> bool:
)

return batch_size

async def _background_events_populate_state_key_rejections(
self, progress: JsonDict, batch_size: int
) -> int:
"""Back-populate `events.state_key` and `events.rejection_reason"""

min_stream_ordering_exclusive = progress["min_stream_ordering_exclusive"]
max_stream_ordering_inclusive = progress["max_stream_ordering_inclusive"]

def _populate_txn(txn: LoggingTransaction) -> bool:
"""Returns True if we're done."""

# first we need to find an endpoint.
# we need to find the final row in the batch of batch_size, which means
# we need to skip over (batch_size-1) rows and get the next row.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

events table can only have one row per stream ordering, so this isn't quite accurate.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really understand why it's not accurate.

suppose we have these rows in the db:

stream_ordering | event_id
----------------+-----------
 1              | $event1
 3              | $event2
 5              | $event3
 7              | $event4
 9              | $event5

... and we want to get a batch of size 3. So we need to skip over 2 rows and get the stream_ordering from the third.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, hrmmm, I misread the rationale I think. I was assuming we were doing this query to handle having multiple rows per stream ordering (which is why we often do it in other places for other tables. But actually the reason we're doing this is because we can't use LIMIT with UPDATE?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

even if we could use LIMIT with UPDATE, we'd need to know where we got up to, to persist back to the progress dict.

txn.execute(
"""
SELECT stream_ordering FROM events
WHERE stream_ordering > ? AND stream_ordering <= ?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to stop when we hit rows that have filled out with state_key and rejections? Or do we have to worry about holes where people have upgraded and then downgraded?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I'm basically worrying about that. I'm somewhat assuming that the gain from skipping such rows is outweighed by the risk of ending up with holes of incorrect rows.

ORDER BY stream_ordering
LIMIT 1 OFFSET ?
""",
(
min_stream_ordering_exclusive,
max_stream_ordering_inclusive,
batch_size - 1,
),
)

endpoint = None
row = txn.fetchone()
if row:
endpoint = row[0]

where_clause = "stream_ordering > ?"
args = [min_stream_ordering_exclusive]
if endpoint:
where_clause += " AND stream_ordering <= ?"
args.append(endpoint)

# now do the updates.
txn.execute(
f"""
UPDATE events
SET state_key = (SELECT state_key FROM state_events se WHERE se.event_id = events.event_id),
rejection_reason = (SELECT reason FROM rejections rej WHERE rej.event_id = events.event_id)
WHERE ({where_clause})
""",
args,
)

logger.info(
"populated new `events` columns up to %s/%i: updated %i rows",
endpoint,
max_stream_ordering_inclusive,
txn.rowcount,
)

if endpoint is None:
# we're done
return True

progress["min_stream_ordering_exclusive"] = endpoint
self.db_pool.updates._background_update_progress_txn(
txn,
_BackgroundUpdates.EVENTS_POPULATE_STATE_KEY_REJECTIONS,
progress,
)
return False

done = await self.db_pool.runInteraction(
desc="events_populate_state_key_rejections", func=_populate_txn
)

if done:
await self.db_pool.updates._end_background_update(
_BackgroundUpdates.EVENTS_POPULATE_STATE_KEY_REJECTIONS
)

return batch_size
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# Copyright 2022 The Matrix.org Foundation C.I.C
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import json

from synapse.storage.types import Cursor


def run_create(cur: Cursor, database_engine, *args, **kwargs):
"""Add a bg update to populate the `state_key` and `rejection_reason` columns of `events`"""

# we know that any new events will have the columns populated (and that has been
# the case since schema_version 68, so there is no chance of rolling back now).
#
# So, we only need to make sure that existing rows are updated. We read the
# current min and max stream orderings, since that is guaranteed to include all
# the events that were stored before the new columns were added.
cur.execute("SELECT MIN(stream_ordering), MAX(stream_ordering) FROM events")
(min_stream_ordering, max_stream_ordering) = cur.fetchone()

if min_stream_ordering is None:
# no rows, nothing to do.
return

cur.execute(
"INSERT into background_updates (ordering, update_name, progress_json)"
" VALUES (7203, 'events_populate_state_key_rejections', ?)",
(
json.dumps(
{
"min_stream_ordering_exclusive": min_stream_ordering - 1,
"max_stream_ordering_inclusive": max_stream_ordering,
}
),
),
)