Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Switch the JSON byte producer from a pull to a push producer #8116

Merged
merged 3 commits into from
Aug 19, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/8116.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Iteratively encode JSON to avoid blocking the reactor.
75 changes: 43 additions & 32 deletions synapse/http/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,7 @@ class RootOptionsRedirectResource(OptionsResource, RootRedirect):
pass


@implementer(interfaces.IPullProducer)
@implementer(interfaces.IPushProducer)
class _ByteProducer:
"""
Iteratively write bytes to the request.
Expand All @@ -515,52 +515,64 @@ def __init__(
):
self._request = request
self._iterator = iterator
self._paused = False

def start(self) -> None:
self._request.registerProducer(self, False)
# Register the producer and start producing data.
self._request.registerProducer(self, True)
self.resumeProducing()

def _send_data(self, data: List[bytes]) -> None:
"""
Send a list of strings as a response to the request.
Send a list of bytes as a chunk of a response.
"""
if not data:
return
self._request.write(b"".join(data))

def pauseProducing(self) -> None:
self._paused = True

def resumeProducing(self) -> None:
# We've stopped producing in the meantime (note that this might be
# re-entrant after calling write).
if not self._request:
return

# Get the next chunk and write it to the request.
#
# The output of the JSON encoder is coalesced until min_chunk_size is
# reached. (This is because JSON encoders produce a very small output
# per iteration.)
#
# Note that buffer stores a list of bytes (instead of appending to
# bytes) to hopefully avoid many allocations.
buffer = []
buffered_bytes = 0
while buffered_bytes < self.min_chunk_size:
try:
data = next(self._iterator)
buffer.append(data)
buffered_bytes += len(data)
except StopIteration:
# The entire JSON object has been serialized, write any
# remaining data, finalize the producer and the request, and
# clean-up any references.
self._send_data(buffer)
self._request.unregisterProducer()
self._request.finish()
self.stopProducing()
return

self._send_data(buffer)
self._paused = False

# Write until there's backpressure telling us to stop.
while not self._paused:
Copy link
Member Author

@clokep clokep Aug 18, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This definitely looks like an infinite loop, but I don't think it is (see a previous discussion about this description here). I think when we call write we give up control to the reactor which might call pauseProducing before control is returned to resumeProducing.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When you call write() you go through the Request -> HTTPChannel -> iocpreactor.tcp.Connection (this is HTTPChannel.transport) -> iocpreactor.abstract.FileHandle, which might cause producer.pauseProducing (which goes back through the chain in the opposite direction, essentially).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does mean that if we write slower than the TCP buffer drains we will write everything in one go. I don't know whether we want to put some limit to the amount written in one reactor tick? Maybe something like 64KB (or even 512KB)? A reactor tick generally takes less than 1ms, so those values would mean maxing out at ~64MB/s (or 512MB/s).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aha, but FileHandle has a write buffer of 64KB and doesn't actually pull from the buffer until a reactor tick, so we don't need to worry about it. The flow becomes: 1) write data, 2) pause producing is called if over 64kb of data has been written, 3) we exit the loop until the FileHandle buffer has been drained.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That was my understanding as well! 👍 This is some fun spelunking into Twisted.

# Get the next chunk and write it to the request.
#
# The output of the JSON encoder is buffered and coalesced until
# min_chunk_size is reached. This is because JSON encoders produce
# very small output per iteration and the Request object converts
# each call to write() to a separate chunk. Without this there would
# be an explosion in bytes written (e.g. b"{" becoming "1\r\n{\r\n").
#
# Note that buffer stores a list of bytes (instead of appending to
# bytes) to hopefully avoid many allocations.
buffer = []
buffered_bytes = 0
while buffered_bytes < self.min_chunk_size:
try:
data = next(self._iterator)
buffer.append(data)
buffered_bytes += len(data)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at the code this is basically what iocpreactor.abstract.FileHandle is doing, so I think we can just skip all this and call self._request.write(data) for each chunk?

Copy link
Member Author

@clokep clokep Aug 19, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue is that the http.Request calls toChunk so each time we call write it will get turned into an HTTP chunk.

The goal of this code is to avoid b"{" being emitted from the JSONProducer becoming the HTTP chunk of 1\r\n{\r\n, which is a large explosion in the streamed bytes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh gah I hate that http.Request gets in the way :/

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a comment to that effect please?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I expanded the comment above this section in 2afe301.

except StopIteration:
# The entire JSON object has been serialized, write any
# remaining data, finalize the producer and the request, and
# clean-up any references.
self._send_data(buffer)
self._request.unregisterProducer()
self._request.finish()
self.stopProducing()
return

self._send_data(buffer)
clokep marked this conversation as resolved.
Show resolved Hide resolved

def stopProducing(self) -> None:
# Clear a circular reference.
self._request = None


Expand Down Expand Up @@ -620,8 +632,7 @@ def respond_with_json(
if send_cors:
set_cors_headers(request)

producer = _ByteProducer(request, encoder(json_object))
producer.start()
_ByteProducer(request, encoder(json_object))
return NOT_DONE_YET


Expand Down
16 changes: 5 additions & 11 deletions tests/rest/client/v1/test_login.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,7 @@ def test_POST_ratelimiting_per_address(self):
"identifier": {"type": "m.id.user", "user": "kermit" + str(i)},
"password": "monkey",
}
request_data = json.dumps(params)
request, channel = self.make_request(b"POST", LOGIN_URL, request_data)
request, channel = self.make_request(b"POST", LOGIN_URL, params)
self.render(request)

if i == 5:
Expand All @@ -76,14 +75,13 @@ def test_POST_ratelimiting_per_address(self):
# than 1min.
self.assertTrue(retry_after_ms < 6000)

self.reactor.advance(retry_after_ms / 1000.0)
self.reactor.advance(retry_after_ms / 1000.0 + 1.0)

params = {
"type": "m.login.password",
"identifier": {"type": "m.id.user", "user": "kermit" + str(i)},
"password": "monkey",
}
request_data = json.dumps(params)
request, channel = self.make_request(b"POST", LOGIN_URL, params)
self.render(request)

Expand Down Expand Up @@ -111,8 +109,7 @@ def test_POST_ratelimiting_per_account(self):
"identifier": {"type": "m.id.user", "user": "kermit"},
"password": "monkey",
}
request_data = json.dumps(params)
request, channel = self.make_request(b"POST", LOGIN_URL, request_data)
request, channel = self.make_request(b"POST", LOGIN_URL, params)
self.render(request)

if i == 5:
Expand All @@ -132,7 +129,6 @@ def test_POST_ratelimiting_per_account(self):
"identifier": {"type": "m.id.user", "user": "kermit"},
"password": "monkey",
}
request_data = json.dumps(params)
request, channel = self.make_request(b"POST", LOGIN_URL, params)
self.render(request)

Expand Down Expand Up @@ -160,8 +156,7 @@ def test_POST_ratelimiting_per_account_failed_attempts(self):
"identifier": {"type": "m.id.user", "user": "kermit"},
"password": "notamonkey",
}
request_data = json.dumps(params)
request, channel = self.make_request(b"POST", LOGIN_URL, request_data)
request, channel = self.make_request(b"POST", LOGIN_URL, params)
self.render(request)

if i == 5:
Expand All @@ -174,14 +169,13 @@ def test_POST_ratelimiting_per_account_failed_attempts(self):
# than 1min.
self.assertTrue(retry_after_ms < 6000)

self.reactor.advance(retry_after_ms / 1000.0)
self.reactor.advance(retry_after_ms / 1000.0 + 1.0)

params = {
"type": "m.login.password",
"identifier": {"type": "m.id.user", "user": "kermit"},
"password": "notamonkey",
}
request_data = json.dumps(params)
request, channel = self.make_request(b"POST", LOGIN_URL, params)
self.render(request)

Expand Down
4 changes: 2 additions & 2 deletions tests/rest/client/v2_alpha/test_register.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ def test_POST_ratelimiting_guest(self):
else:
self.assertEquals(channel.result["code"], b"200", channel.result)

self.reactor.advance(retry_after_ms / 1000.0)
self.reactor.advance(retry_after_ms / 1000.0 + 1.0)

request, channel = self.make_request(b"POST", self.url + b"?kind=guest", b"{}")
self.render(request)
Expand All @@ -186,7 +186,7 @@ def test_POST_ratelimiting(self):
else:
self.assertEquals(channel.result["code"], b"200", channel.result)

self.reactor.advance(retry_after_ms / 1000.0)
self.reactor.advance(retry_after_ms / 1000.0 + 1.0)

request, channel = self.make_request(b"POST", self.url + b"?kind=guest", b"{}")
self.render(request)
Expand Down
3 changes: 2 additions & 1 deletion tests/storage/test_cleanup_extrems.py
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,7 @@ def test_expiry_logic(self):
self.event_creator_handler._rooms_to_exclude_from_dummy_event_insertion[
"3"
] = 300000

self.event_creator_handler._expire_rooms_to_exclude_from_dummy_event_insertion()
# All entries within time frame
self.assertEqual(
Expand All @@ -362,7 +363,7 @@ def test_expiry_logic(self):
3,
)
# Oldest room to expire
self.pump(1)
self.pump(1.01)
self.event_creator_handler._expire_rooms_to_exclude_from_dummy_event_insertion()
self.assertEqual(
len(
Expand Down