Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Reduce the number of tests using TCP replication #13543

Merged
merged 6 commits into from
Aug 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/13543.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Reduce the number of tests using legacy TCP replication.
4 changes: 2 additions & 2 deletions tests/handlers/test_room_member.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from synapse.types import UserID, create_requester
from synapse.util import Clock

from tests.replication._base import RedisMultiWorkerStreamTestCase
from tests.replication._base import BaseMultiWorkerStreamTestCase
from tests.server import make_request
from tests.test_utils import make_awaitable
from tests.unittest import FederatingHomeserverTestCase, override_config
Expand Down Expand Up @@ -216,7 +216,7 @@ def test_remote_joins_contribute_to_rate_limit(self) -> None:
# - trying to remote-join again.


class TestReplicatedJoinsLimitedByPerRoomRateLimiter(RedisMultiWorkerStreamTestCase):
class TestReplicatedJoinsLimitedByPerRoomRateLimiter(BaseMultiWorkerStreamTestCase):
servlets = [
synapse.rest.admin.register_servlets,
synapse.rest.client.login.register_servlets,
Expand Down
7 changes: 0 additions & 7 deletions tests/module_api/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
from tests.test_utils import simple_async_mock
from tests.test_utils.event_injection import inject_member_event
from tests.unittest import HomeserverTestCase, override_config
from tests.utils import USE_POSTGRES_FOR_TESTS


class ModuleApiTestCase(HomeserverTestCase):
Expand Down Expand Up @@ -738,11 +737,6 @@ def test_create_room(self) -> None:
class ModuleApiWorkerTestCase(BaseMultiWorkerStreamTestCase):
"""For testing ModuleApi functionality in a multi-worker setup"""

# Testing stream ID replication from the main to worker processes requires postgres
# (due to needing `MultiWriterIdGenerator`).
if not USE_POSTGRES_FOR_TESTS:
skip = "Requires Postgres"

servlets = [
admin.register_servlets,
login.register_servlets,
Expand All @@ -752,7 +746,6 @@ class ModuleApiWorkerTestCase(BaseMultiWorkerStreamTestCase):

def default_config(self):
conf = super().default_config()
conf["redis"] = {"enabled": "true"}
conf["stream_writers"] = {"presence": ["presence_writer"]}
conf["instance_map"] = {
"presence_writer": {"host": "testserv", "port": 1001},
Expand Down
90 changes: 31 additions & 59 deletions tests/replication/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@
from synapse.replication.http import ReplicationRestResource
from synapse.replication.tcp.client import ReplicationDataHandler
from synapse.replication.tcp.handler import ReplicationCommandHandler
from synapse.replication.tcp.protocol import ClientReplicationStreamProtocol
from synapse.replication.tcp.resource import (
ReplicationStreamProtocolFactory,
from synapse.replication.tcp.protocol import (
ClientReplicationStreamProtocol,
ServerReplicationStreamProtocol,
)
from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory
from synapse.server import HomeServer

from tests import unittest
Expand Down Expand Up @@ -220,15 +220,34 @@ def assert_request_is_get_repl_stream_updates(
class BaseMultiWorkerStreamTestCase(unittest.HomeserverTestCase):
"""Base class for tests running multiple workers.

Enables Redis, providing a fake Redis server.

Automatically handle HTTP replication requests from workers to master,
unlike `BaseStreamTestCase`.
"""

if not hiredis:
skip = "Requires hiredis"

if not USE_POSTGRES_FOR_TESTS:
# Redis replication only takes place on Postgres
skip = "Requires Postgres"

def default_config(self) -> Dict[str, Any]:
"""
Overrides the default config to enable Redis.
Even if the test only uses make_worker_hs, the main process needs Redis
enabled otherwise it won't create a Fake Redis server to listen on the
Redis port and accept fake TCP connections.
"""
base = super().default_config()
base["redis"] = {"enabled": True}
return base

def setUp(self):
super().setUp()

# build a replication server
self.server_factory = ReplicationStreamProtocolFactory(self.hs)
self.streamer = self.hs.get_replication_streamer()

# Fake in memory Redis server that servers can connect to.
Expand All @@ -247,15 +266,14 @@ def setUp(self):
# handling inbound HTTP requests to that instance.
self._hs_to_site = {self.hs: self.site}

if self.hs.config.redis.redis_enabled:
# Handle attempts to connect to fake redis server.
self.reactor.add_tcp_client_callback(
"localhost",
6379,
self.connect_any_redis_attempts,
)
# Handle attempts to connect to fake redis server.
self.reactor.add_tcp_client_callback(
"localhost",
6379,
self.connect_any_redis_attempts,
)

self.hs.get_replication_command_handler().start_replication(self.hs)
self.hs.get_replication_command_handler().start_replication(self.hs)

# When we see a connection attempt to the master replication listener we
# automatically set up the connection. This is so that tests don't
Expand Down Expand Up @@ -339,27 +357,6 @@ def make_worker_hs(
store = worker_hs.get_datastores().main
store.db_pool._db_pool = self.database_pool._db_pool

# Set up TCP replication between master and the new worker if we don't
# have Redis support enabled.
if not worker_hs.config.redis.redis_enabled:
repl_handler = ReplicationCommandHandler(worker_hs)
client = ClientReplicationStreamProtocol(
worker_hs,
"client",
"test",
self.clock,
repl_handler,
)
server = self.server_factory.buildProtocol(
IPv4Address("TCP", "127.0.0.1", 0)
)

client_transport = FakeTransport(server, self.reactor)
client.makeConnection(client_transport)

server_transport = FakeTransport(client, self.reactor)
server.makeConnection(server_transport)

# Set up a resource for the worker
resource = ReplicationRestResource(worker_hs)

Expand All @@ -378,8 +375,7 @@ def make_worker_hs(
reactor=self.reactor,
)

if worker_hs.config.redis.redis_enabled:
worker_hs.get_replication_command_handler().start_replication(worker_hs)
worker_hs.get_replication_command_handler().start_replication(worker_hs)

return worker_hs

Expand Down Expand Up @@ -582,27 +578,3 @@ def encode(self, obj):

def connectionLost(self, reason):
self._server.remove_subscriber(self)


class RedisMultiWorkerStreamTestCase(BaseMultiWorkerStreamTestCase):
"""
A test case that enables Redis, providing a fake Redis server.
"""

if not hiredis:
skip = "Requires hiredis"

if not USE_POSTGRES_FOR_TESTS:
# Redis replication only takes place on Postgres
skip = "Requires Postgres"

def default_config(self) -> Dict[str, Any]:
"""
Overrides the default config to enable Redis.
Even if the test only uses make_worker_hs, the main process needs Redis
enabled otherwise it won't create a Fake Redis server to listen on the
Redis port and accept fake TCP connections.
"""
base = super().default_config()
base["redis"] = {"enabled": True}
return base
4 changes: 2 additions & 2 deletions tests/replication/tcp/test_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from tests.replication._base import RedisMultiWorkerStreamTestCase
from tests.replication._base import BaseMultiWorkerStreamTestCase


class ChannelsTestCase(RedisMultiWorkerStreamTestCase):
class ChannelsTestCase(BaseMultiWorkerStreamTestCase):
def test_subscribed_to_enough_redis_channels(self) -> None:
# The default main process is subscribed to the USER_IP channel.
self.assertCountEqual(
Expand Down
7 changes: 0 additions & 7 deletions tests/replication/test_sharded_event_persister.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,13 @@

from tests.replication._base import BaseMultiWorkerStreamTestCase
from tests.server import make_request
from tests.utils import USE_POSTGRES_FOR_TESTS

logger = logging.getLogger(__name__)


class EventPersisterShardTestCase(BaseMultiWorkerStreamTestCase):
"""Checks event persisting sharding works"""

# Event persister sharding requires postgres (due to needing
# `MultiWriterIdGenerator`).
if not USE_POSTGRES_FOR_TESTS:
skip = "Requires Postgres"

servlets = [
admin.register_servlets_for_client_rest_resource,
room.register_servlets,
Expand All @@ -50,7 +44,6 @@ def prepare(self, reactor, clock, hs):

def default_config(self):
conf = super().default_config()
conf["redis"] = {"enabled": "true"}
conf["stream_writers"] = {"events": ["worker1", "worker2"]}
conf["instance_map"] = {
"worker1": {"host": "testserv", "port": 1001},
Expand Down