Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

(Hopefully) stop leaking file descriptors in media repo #9497

Merged
merged 4 commits into from
Mar 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/9497.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix a long-standing bug where the media repository could leak file descriptors while previewing media.
31 changes: 29 additions & 2 deletions synapse/http/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -750,7 +750,32 @@ class BodyExceededMaxSize(Exception):
"""The maximum allowed size of the HTTP body was exceeded."""


class _DiscardBodyWithMaxSizeProtocol(protocol.Protocol):
"""A protocol which immediately errors upon receiving data."""

def __init__(self, deferred: defer.Deferred):
self.deferred = deferred

def _maybe_fail(self):
"""
Report a max size exceed error and disconnect the first time this is called.
"""
if not self.deferred.called:
self.deferred.errback(BodyExceededMaxSize())
# Close the connection (forcefully) since all the data will get
# discarded anyway.
self.transport.abortConnection()

def dataReceived(self, data: bytes) -> None:
self._maybe_fail()

def connectionLost(self, reason: Failure) -> None:
self._maybe_fail()


class _ReadBodyWithMaxSizeProtocol(protocol.Protocol):
"""A protocol which reads body to a stream, erroring if the body exceeds a maximum size."""

def __init__(
self, stream: BinaryIO, deferred: defer.Deferred, max_size: Optional[int]
):
Expand Down Expand Up @@ -807,13 +832,15 @@ def read_body_with_max_size(
Returns:
A Deferred which resolves to the length of the read body.
"""
d = defer.Deferred()

# If the Content-Length header gives a size larger than the maximum allowed
# size, do not bother downloading the body.
if max_size is not None and response.length != UNKNOWN_LENGTH:
if response.length > max_size:
return defer.fail(BodyExceededMaxSize())
response.deliverBody(_DiscardBodyWithMaxSizeProtocol(d))
return d

d = defer.Deferred()
response.deliverBody(_ReadBodyWithMaxSizeProtocol(stream, d, max_size))
return d

Expand Down
91 changes: 55 additions & 36 deletions tests/http/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,77 +26,96 @@


class ReadBodyWithMaxSizeTests(TestCase):
def setUp(self):
def _build_response(self, length=UNKNOWN_LENGTH):
"""Start reading the body, returns the response, result and proto"""
response = Mock(length=UNKNOWN_LENGTH)
self.result = BytesIO()
self.deferred = read_body_with_max_size(response, self.result, 6)
response = Mock(length=length)
result = BytesIO()
deferred = read_body_with_max_size(response, result, 6)

# Fish the protocol out of the response.
self.protocol = response.deliverBody.call_args[0][0]
self.protocol.transport = Mock()
protocol = response.deliverBody.call_args[0][0]
protocol.transport = Mock()

def _cleanup_error(self):
return result, deferred, protocol

def _assert_error(self, deferred, protocol):
"""Ensure that the expected error is received."""
self.assertIsInstance(deferred.result, Failure)
self.assertIsInstance(deferred.result.value, BodyExceededMaxSize)
protocol.transport.abortConnection.assert_called_once()

def _cleanup_error(self, deferred):
"""Ensure that the error in the Deferred is handled gracefully."""
called = [False]

def errback(f):
called[0] = True

self.deferred.addErrback(errback)
deferred.addErrback(errback)
self.assertTrue(called[0])

def test_no_error(self):
"""A response that is NOT too large."""
result, deferred, protocol = self._build_response()

# Start sending data.
self.protocol.dataReceived(b"12345")
protocol.dataReceived(b"12345")
# Close the connection.
self.protocol.connectionLost(Failure(ResponseDone()))
protocol.connectionLost(Failure(ResponseDone()))

self.assertEqual(self.result.getvalue(), b"12345")
self.assertEqual(self.deferred.result, 5)
self.assertEqual(result.getvalue(), b"12345")
self.assertEqual(deferred.result, 5)

def test_too_large(self):
"""A response which is too large raises an exception."""
result, deferred, protocol = self._build_response()

# Start sending data.
self.protocol.dataReceived(b"1234567890")
# Close the connection.
self.protocol.connectionLost(Failure(ResponseDone()))
protocol.dataReceived(b"1234567890")

self.assertEqual(self.result.getvalue(), b"1234567890")
self.assertIsInstance(self.deferred.result, Failure)
self.assertIsInstance(self.deferred.result.value, BodyExceededMaxSize)
self._cleanup_error()
self.assertEqual(result.getvalue(), b"1234567890")
self._assert_error(deferred, protocol)
self._cleanup_error(deferred)

def test_multiple_packets(self):
"""Data should be accummulated through mutliple packets."""
"""Data should be accumulated through mutliple packets."""
result, deferred, protocol = self._build_response()

# Start sending data.
self.protocol.dataReceived(b"12")
self.protocol.dataReceived(b"34")
protocol.dataReceived(b"12")
protocol.dataReceived(b"34")
# Close the connection.
self.protocol.connectionLost(Failure(ResponseDone()))
protocol.connectionLost(Failure(ResponseDone()))

self.assertEqual(self.result.getvalue(), b"1234")
self.assertEqual(self.deferred.result, 4)
self.assertEqual(result.getvalue(), b"1234")
self.assertEqual(deferred.result, 4)

def test_additional_data(self):
"""A connection can receive data after being closed."""
result, deferred, protocol = self._build_response()

# Start sending data.
self.protocol.dataReceived(b"1234567890")
self.assertIsInstance(self.deferred.result, Failure)
self.assertIsInstance(self.deferred.result.value, BodyExceededMaxSize)
self.protocol.transport.abortConnection.assert_called_once()
protocol.dataReceived(b"1234567890")
self._assert_error(deferred, protocol)

# More data might have come in.
self.protocol.dataReceived(b"1234567890")
# Close the connection.
self.protocol.connectionLost(Failure(ResponseDone()))
protocol.dataReceived(b"1234567890")

self.assertEqual(result.getvalue(), b"1234567890")
self._assert_error(deferred, protocol)
self._cleanup_error(deferred)

def test_content_length(self):
"""The body shouldn't be read (at all) if the Content-Length header is too large."""
result, deferred, protocol = self._build_response(length=10)

# Deferred shouldn't be called yet.
self.assertFalse(deferred.called)

# Start sending data.
protocol.dataReceived(b"12345")
self._assert_error(deferred, protocol)
self._cleanup_error(deferred)

self.assertEqual(self.result.getvalue(), b"1234567890")
self.assertIsInstance(self.deferred.result, Failure)
self.assertIsInstance(self.deferred.result.value, BodyExceededMaxSize)
self._cleanup_error()
# The data is never consumed.
self.assertEqual(result.getvalue(), b"")
clokep marked this conversation as resolved.
Show resolved Hide resolved