Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Switch the JSON byte producer from a pull to a push producer. (#8116)
Browse files Browse the repository at this point in the history
  • Loading branch information
clokep authored Aug 19, 2020
1 parent cfeb37f commit f594e43
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 46 deletions.
1 change: 1 addition & 0 deletions changelog.d/8116.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Iteratively encode JSON to avoid blocking the reactor.
75 changes: 43 additions & 32 deletions synapse/http/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,7 @@ class RootOptionsRedirectResource(OptionsResource, RootRedirect):
pass


@implementer(interfaces.IPullProducer)
@implementer(interfaces.IPushProducer)
class _ByteProducer:
"""
Iteratively write bytes to the request.
Expand All @@ -515,52 +515,64 @@ def __init__(
):
self._request = request
self._iterator = iterator
self._paused = False

def start(self) -> None:
self._request.registerProducer(self, False)
# Register the producer and start producing data.
self._request.registerProducer(self, True)
self.resumeProducing()

def _send_data(self, data: List[bytes]) -> None:
"""
Send a list of strings as a response to the request.
Send a list of bytes as a chunk of a response.
"""
if not data:
return
self._request.write(b"".join(data))

def pauseProducing(self) -> None:
self._paused = True

def resumeProducing(self) -> None:
# We've stopped producing in the meantime (note that this might be
# re-entrant after calling write).
if not self._request:
return

# Get the next chunk and write it to the request.
#
# The output of the JSON encoder is coalesced until min_chunk_size is
# reached. (This is because JSON encoders produce a very small output
# per iteration.)
#
# Note that buffer stores a list of bytes (instead of appending to
# bytes) to hopefully avoid many allocations.
buffer = []
buffered_bytes = 0
while buffered_bytes < self.min_chunk_size:
try:
data = next(self._iterator)
buffer.append(data)
buffered_bytes += len(data)
except StopIteration:
# The entire JSON object has been serialized, write any
# remaining data, finalize the producer and the request, and
# clean-up any references.
self._send_data(buffer)
self._request.unregisterProducer()
self._request.finish()
self.stopProducing()
return

self._send_data(buffer)
self._paused = False

# Write until there's backpressure telling us to stop.
while not self._paused:
# Get the next chunk and write it to the request.
#
# The output of the JSON encoder is buffered and coalesced until
# min_chunk_size is reached. This is because JSON encoders produce
# very small output per iteration and the Request object converts
# each call to write() to a separate chunk. Without this there would
# be an explosion in bytes written (e.g. b"{" becoming "1\r\n{\r\n").
#
# Note that buffer stores a list of bytes (instead of appending to
# bytes) to hopefully avoid many allocations.
buffer = []
buffered_bytes = 0
while buffered_bytes < self.min_chunk_size:
try:
data = next(self._iterator)
buffer.append(data)
buffered_bytes += len(data)
except StopIteration:
# The entire JSON object has been serialized, write any
# remaining data, finalize the producer and the request, and
# clean-up any references.
self._send_data(buffer)
self._request.unregisterProducer()
self._request.finish()
self.stopProducing()
return

self._send_data(buffer)

def stopProducing(self) -> None:
# Clear a circular reference.
self._request = None


Expand Down Expand Up @@ -620,8 +632,7 @@ def respond_with_json(
if send_cors:
set_cors_headers(request)

producer = _ByteProducer(request, encoder(json_object))
producer.start()
_ByteProducer(request, encoder(json_object))
return NOT_DONE_YET


Expand Down
16 changes: 5 additions & 11 deletions tests/rest/client/v1/test_login.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,7 @@ def test_POST_ratelimiting_per_address(self):
"identifier": {"type": "m.id.user", "user": "kermit" + str(i)},
"password": "monkey",
}
request_data = json.dumps(params)
request, channel = self.make_request(b"POST", LOGIN_URL, request_data)
request, channel = self.make_request(b"POST", LOGIN_URL, params)
self.render(request)

if i == 5:
Expand All @@ -76,14 +75,13 @@ def test_POST_ratelimiting_per_address(self):
# than 1min.
self.assertTrue(retry_after_ms < 6000)

self.reactor.advance(retry_after_ms / 1000.0)
self.reactor.advance(retry_after_ms / 1000.0 + 1.0)

params = {
"type": "m.login.password",
"identifier": {"type": "m.id.user", "user": "kermit" + str(i)},
"password": "monkey",
}
request_data = json.dumps(params)
request, channel = self.make_request(b"POST", LOGIN_URL, params)
self.render(request)

Expand Down Expand Up @@ -111,8 +109,7 @@ def test_POST_ratelimiting_per_account(self):
"identifier": {"type": "m.id.user", "user": "kermit"},
"password": "monkey",
}
request_data = json.dumps(params)
request, channel = self.make_request(b"POST", LOGIN_URL, request_data)
request, channel = self.make_request(b"POST", LOGIN_URL, params)
self.render(request)

if i == 5:
Expand All @@ -132,7 +129,6 @@ def test_POST_ratelimiting_per_account(self):
"identifier": {"type": "m.id.user", "user": "kermit"},
"password": "monkey",
}
request_data = json.dumps(params)
request, channel = self.make_request(b"POST", LOGIN_URL, params)
self.render(request)

Expand Down Expand Up @@ -160,8 +156,7 @@ def test_POST_ratelimiting_per_account_failed_attempts(self):
"identifier": {"type": "m.id.user", "user": "kermit"},
"password": "notamonkey",
}
request_data = json.dumps(params)
request, channel = self.make_request(b"POST", LOGIN_URL, request_data)
request, channel = self.make_request(b"POST", LOGIN_URL, params)
self.render(request)

if i == 5:
Expand All @@ -174,14 +169,13 @@ def test_POST_ratelimiting_per_account_failed_attempts(self):
# than 1min.
self.assertTrue(retry_after_ms < 6000)

self.reactor.advance(retry_after_ms / 1000.0)
self.reactor.advance(retry_after_ms / 1000.0 + 1.0)

params = {
"type": "m.login.password",
"identifier": {"type": "m.id.user", "user": "kermit"},
"password": "notamonkey",
}
request_data = json.dumps(params)
request, channel = self.make_request(b"POST", LOGIN_URL, params)
self.render(request)

Expand Down
4 changes: 2 additions & 2 deletions tests/rest/client/v2_alpha/test_register.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ def test_POST_ratelimiting_guest(self):
else:
self.assertEquals(channel.result["code"], b"200", channel.result)

self.reactor.advance(retry_after_ms / 1000.0)
self.reactor.advance(retry_after_ms / 1000.0 + 1.0)

request, channel = self.make_request(b"POST", self.url + b"?kind=guest", b"{}")
self.render(request)
Expand All @@ -186,7 +186,7 @@ def test_POST_ratelimiting(self):
else:
self.assertEquals(channel.result["code"], b"200", channel.result)

self.reactor.advance(retry_after_ms / 1000.0)
self.reactor.advance(retry_after_ms / 1000.0 + 1.0)

request, channel = self.make_request(b"POST", self.url + b"?kind=guest", b"{}")
self.render(request)
Expand Down
3 changes: 2 additions & 1 deletion tests/storage/test_cleanup_extrems.py
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,7 @@ def test_expiry_logic(self):
self.event_creator_handler._rooms_to_exclude_from_dummy_event_insertion[
"3"
] = 300000

self.event_creator_handler._expire_rooms_to_exclude_from_dummy_event_insertion()
# All entries within time frame
self.assertEqual(
Expand All @@ -362,7 +363,7 @@ def test_expiry_logic(self):
3,
)
# Oldest room to expire
self.pump(1)
self.pump(1.01)
self.event_creator_handler._expire_rooms_to_exclude_from_dummy_event_insertion()
self.assertEqual(
len(
Expand Down

0 comments on commit f594e43

Please sign in to comment.