Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Refactor federation_sender and pusher configuration loading. #14496

Merged
merged 13 commits into from
Nov 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/14496.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Refactor `federation_sender` and `pusher` configuration loading.
139 changes: 70 additions & 69 deletions synapse/config/workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,6 @@
)
from .server import DIRECT_TCP_ERROR, ListenerConfig, parse_listener_def

_FEDERATION_SENDER_WITH_SEND_FEDERATION_ENABLED_ERROR = """
The send_federation config option must be disabled in the main
synapse process before they can be run in a separate worker.

Please add ``send_federation: false`` to the main config
"""

_PUSHER_WITH_START_PUSHERS_ENABLED_ERROR = """
The start_pushers config option must be disabled in the main
synapse process before they can be run in a separate worker.

Please add ``start_pushers: false`` to the main config
"""

_DEPRECATED_WORKER_DUTY_OPTION_USED = """
The '%s' configuration option is deprecated and will be removed in a future
Synapse version. Please use ``%s: name_of_worker`` instead.
Expand Down Expand Up @@ -182,40 +168,12 @@ def read_config(self, config: JsonDict, **kwargs: Any) -> None:
)
)

# Handle federation sender configuration.
#
# There are two ways of configuring which instances handle federation
# sending:
# 1. The old way where "send_federation" is set to false and running a
# `synapse.app.federation_sender` worker app.
# 2. Specifying the workers sending federation in
# `federation_sender_instances`.
#

send_federation = config.get("send_federation", True)

federation_sender_instances = config.get("federation_sender_instances")
if federation_sender_instances is None:
# Default to an empty list, which means "another, unknown, worker is
# responsible for it".
federation_sender_instances = []

# If no federation sender instances are set we check if
# `send_federation` is set, which means use master
if send_federation:
federation_sender_instances = ["master"]

if self.worker_app == "synapse.app.federation_sender":
if send_federation:
# If we're running federation senders, and not using
# `federation_sender_instances`, then we should have
# explicitly set `send_federation` to false.
raise ConfigError(
_FEDERATION_SENDER_WITH_SEND_FEDERATION_ENABLED_ERROR
)

federation_sender_instances = [self.worker_name]

federation_sender_instances = self._worker_names_performing_this_duty(
config,
"send_federation",
"synapse.app.federation_sender",
"federation_sender_instances",
)
self.send_federation = self.instance_name in federation_sender_instances
self.federation_shard_config = ShardedWorkerHandlingConfig(
federation_sender_instances
Expand Down Expand Up @@ -282,27 +240,12 @@ def read_config(self, config: JsonDict, **kwargs: Any) -> None:
)

# Handle sharded push
start_pushers = config.get("start_pushers", True)
pusher_instances = config.get("pusher_instances")
if pusher_instances is None:
# Default to an empty list, which means "another, unknown, worker is
# responsible for it".
pusher_instances = []

# If no pushers instances are set we check if `start_pushers` is
# set, which means use master
if start_pushers:
pusher_instances = ["master"]

if self.worker_app == "synapse.app.pusher":
if start_pushers:
# If we're running pushers, and not using
# `pusher_instances`, then we should have explicitly set
# `start_pushers` to false.
raise ConfigError(_PUSHER_WITH_START_PUSHERS_ENABLED_ERROR)

pusher_instances = [self.instance_name]

pusher_instances = self._worker_names_performing_this_duty(
config,
"start_pushers",
"synapse.app.pusher",
"pusher_instances",
)
self.start_pushers = self.instance_name in pusher_instances
self.pusher_shard_config = ShardedWorkerHandlingConfig(pusher_instances)

Expand Down Expand Up @@ -425,6 +368,64 @@ def _should_this_worker_perform_duty(
# (By this point, these are either the same value or only one is not None.)
return bool(new_option_should_run_here or legacy_option_should_run_here)

def _worker_names_performing_this_duty(
self,
config: Dict[str, Any],
legacy_option_name: str,
legacy_app_name: str,
modern_instance_list_name: str,
) -> List[str]:
"""
Retrieves the names of the workers handling a given duty, by either legacy
option or instance list.

There are two ways of configuring which instances handle a given duty, e.g.
for configuring pushers:

1. The old way where "start_pushers" is set to false and running a
`synapse.app.pusher'` worker app.
2. Specifying the workers sending federation in `pusher_instances`.

Args:
config: settings read from yaml.
legacy_option_name: the old way of enabling options. e.g. 'start_pushers'
legacy_app_name: The historical app name. e.g. 'synapse.app.pusher'
modern_instance_list_name: the string name of the new instance_list. e.g.
'pusher_instances'

Returns:
A list of worker instance names handling the given duty.
"""
realtyem marked this conversation as resolved.
Show resolved Hide resolved

legacy_option = config.get(legacy_option_name, True)

worker_instances = config.get(modern_instance_list_name)
if worker_instances is None:
# Default to an empty list, which means "another, unknown, worker is
# responsible for it".
worker_instances = []

# If no worker instances are set we check if the legacy option
# is set, which means use the main process.
if legacy_option:
worker_instances = ["master"]

if self.worker_app == legacy_app_name:
if legacy_option:
# If we're using `legacy_app_name`, and not using
# `modern_instance_list_name`, then we should have
# explicitly set `legacy_option_name` to false.
raise ConfigError(
f"The '{legacy_option_name}' config option must be disabled in "
"the main synapse process before they can be run in a separate "
"worker.\n"
f"Please add `{legacy_option_name}: false` to the main config.\n",
realtyem marked this conversation as resolved.
Show resolved Hide resolved
)

worker_instances = [self.worker_name]

return worker_instances

def read_arguments(self, args: argparse.Namespace) -> None:
# We support a bunch of command line arguments that override options in
# the config. A lot of these options have a worker_* prefix when running
Expand Down