Skip to content

Commit

Permalink
Networking & CGI optimizations (#50)
Browse files Browse the repository at this point in the history
More efficient TCP packets, miscellaneous fixes to CGI handling.
  • Loading branch information
michael-lazar committed Jan 6, 2021
1 parent 135dbda commit bfa68c6
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 35 deletions.
18 changes: 8 additions & 10 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,18 @@

### v0.8.0 (Unreleased)

#### Spec Changes

- Added support for international domain names using IDN encoding.

#### New Features

- Several fixes & improvements to python type hinting coverage.
- Several improvements to internal python type hinting coverage.
- Added a ``py.typed`` file to indicate project support for type hints.
#### Bug Fixes

- Optimized TCP packets when streaming directory listings.
- Optimized TCP packets when streaming large CGI responses.
- Improved error handling to catch invalid responses from CGI scripts.
- Fixed a bug where TLS_CLIENT_AUTHORISED would sometimes be set to
``True``/``False`` instead of ``1``/``0``.

- Fixed error handling edge case when the client killed the connection
before all data has been sent. A `CancelledError` exception will now
be raised internally instead of a ``ConnectionClosed`` exception.

### v0.7.0 (2020-12-06)

#### Spec Changes
Expand Down
75 changes: 59 additions & 16 deletions jetforce/app/static.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import codecs
import mimetypes
import os
import pathlib
import subprocess
import typing
import urllib.parse

from twisted.internet import reactor
from twisted.internet.task import deferLater
from twisted.internet.defer import Deferred

from .base import (
EnvironDict,
JetforceApplication,
Expand Down Expand Up @@ -34,6 +37,12 @@ class StaticDirectoryApplication(JetforceApplication):
# Chunk size for streaming files, taken from the twisted FileSender class
CHUNK_SIZE = 2 ** 14

# Length of time to defer while waiting for more data from a CGI script
CGI_POLLING_PERIOD = 0.05

# Maximum size in bytes of the first line of a server response
CGI_MAX_RESPONSE_HEADER_SIZE = 2048

mimetypes: mimetypes.MimeTypes

def __init__(
Expand Down Expand Up @@ -157,27 +166,54 @@ def run_cgi_script(
cgi_env = {k: str(v) for k, v in environ.items() if k.isupper()}
cgi_env["GATEWAY_INTERFACE"] = "CGI/1.1"

# Decode the stream as unicode so we can parse the status line
# Use surrogateescape to preserve any non-UTF8 byte sequences.
out = subprocess.Popen(
proc = subprocess.Popen(
[str(filesystem_path)],
stdout=subprocess.PIPE,
env=cgi_env,
bufsize=1,
universal_newlines=True,
errors="surrogateescape",
bufsize=0,
)

status_line = out.stdout.readline().strip()
status_parts = status_line.split(maxsplit=1)
status_line = proc.stdout.readline(self.CGI_MAX_RESPONSE_HEADER_SIZE)
if len(status_line) == self.CGI_MAX_RESPONSE_HEADER_SIZE:
# Too large response header line received from the CGI script.
return Response(Status.CGI_ERROR, "Unexpected Error")

status_parts = status_line.decode().strip().split(maxsplit=1)
if len(status_parts) != 2 or not status_parts[0].isdecimal():
# Malformed header line received from the CGI script.
return Response(Status.CGI_ERROR, "Unexpected Error")

status, meta = status_parts
return Response(int(status), meta, self.cgi_body_generator(proc))

def cgi_body_generator(
self,
proc: subprocess.Popen[bytes],
) -> typing.Iterator[typing.Union[bytes, Deferred]]:
"""
Non-blocking read from the stdout of the CGI process and pipe it
to the socket transport.
"""
while True:
proc.poll()

# Re-encode the rest of the body as bytes
body = codecs.iterencode(out.stdout, encoding="utf-8", errors="surrogateescape")
return Response(int(status), meta, body)
data = proc.stdout.read(self.CHUNK_SIZE)
if len(data) == self.CHUNK_SIZE:
# Send the chunk and yield control of the event loop
yield data
elif proc.returncode is None:
# We didn't get a full chunk's worth of data from the
# subprocess. Send what we have, but add a delay before
# attempting to read again to allow time for more bytes
# to buffer in stdout.
if data:
yield data
yield deferLater(reactor, self.CGI_POLLING_PERIOD)
else:
# Subprocess has finished, send everything that's left.
if data:
yield data
break

def load_file(self, filesystem_path: pathlib.Path) -> typing.Iterator[bytes]:
"""
Expand All @@ -196,9 +232,9 @@ def list_directory(
"""
Auto-generate a text/gemini document based on the contents of the file system.
"""
yield f"Directory: /{url_path}\r\n".encode()
buffer = f"Directory: /{url_path}]\r\n".encode()
if url_path.parent != url_path:
yield f"=>/{url_path.parent}\t..\r\n".encode()
buffer += f"=>/{url_path.parent}\t..\r\n".encode()

for file in sorted(filesystem_path.iterdir()):
if file.name.startswith("."):
Expand All @@ -207,9 +243,16 @@ def list_directory(

encoded_path = urllib.parse.quote(str(url_path / file.name))
if file.is_dir():
yield f"=>/{encoded_path}/\t{file.name}/\r\n".encode()
buffer += f"=>/{encoded_path}/\t{file.name}/\r\n".encode()
else:
yield f"=>/{encoded_path}\t{file.name}\r\n".encode()
buffer += f"=>/{encoded_path}\t{file.name}\r\n".encode()

if len(buffer) >= self.CHUNK_SIZE:
data, buffer = buffer[: self.CHUNK_SIZE], buffer[self.CHUNK_SIZE :]
yield data

if buffer:
yield buffer

def guess_mimetype(self, filename: str) -> str:
"""
Expand Down
29 changes: 20 additions & 9 deletions jetforce/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@
import urllib.parse

from twisted.internet.address import IPv4Address, IPv6Address
from twisted.internet.defer import Deferred, ensureDeferred
from twisted.internet.error import ConnectionClosed
from twisted.internet.defer import Deferred, ensureDeferred, CancelledError
from twisted.internet.protocol import connectionDone
from twisted.internet.task import deferLater
from twisted.protocols.basic import LineOnlyReceiver
Expand Down Expand Up @@ -40,6 +39,7 @@ class GeminiProtocol(LineOnlyReceiver):
"""

TIMESTAMP_FORMAT = "%d/%b/%Y:%H:%M:%S %z"
DEBUG = False

client_addr: typing.Union[IPv4Address, IPv6Address]
connected_timestamp: time.struct_time
Expand Down Expand Up @@ -69,8 +69,7 @@ def connectionLost(self, reason: Failure = connectionDone) -> None:
This is invoked by twisted after the connection has been closed.
"""
if self._currently_deferred:
self._currently_deferred.errback(reason)
self._currently_deferred = None
self._currently_deferred.cancel()

def lineReceived(self, line: bytes) -> Deferred:
"""
Expand Down Expand Up @@ -150,7 +149,8 @@ async def _handle_request_noblock(self) -> None:
response_generator = await self.track_deferred(response_generator)
else:
# Yield control of the event loop
await deferLater(self.server.reactor, 0)
deferred = deferLater(self.server.reactor, 0)
await self.track_deferred(deferred)

for data in response_generator:
if isinstance(data, Deferred):
Expand All @@ -159,9 +159,9 @@ async def _handle_request_noblock(self) -> None:
else:
self.write_body(data)
# Yield control of the event loop
await deferLater(self.server.reactor, 0)

except ConnectionClosed:
deferred = deferLater(self.server.reactor, 0)
await self.track_deferred(deferred)
except CancelledError:
pass
except Exception:
self.server.log_message(traceback.format_exc())
Expand All @@ -172,6 +172,10 @@ async def _handle_request_noblock(self) -> None:
self.finish_connection()

async def track_deferred(self, deferred: Deferred) -> typing.Union[str, bytes]:
"""
Keep track of the deferred that we're waiting on so we can send an
error back to it if the connection is abruptly killed.
"""
self._currently_deferred = deferred
try:
return await deferred
Expand Down Expand Up @@ -252,15 +256,20 @@ def write_status(self, status: int, meta: str) -> None:
self.meta = meta
self.response_buffer = f"{status} {meta}\r\n"

def write_body(self, data: typing.Union[str, bytes]) -> None:
def write_body(self, data: typing.Union[str, bytes, None]) -> None:
"""
Write bytes to the gemini response body.
"""
if data is None:
return

if isinstance(data, str):
data = data.encode()

self.flush_status()
self.response_size += len(data)
if self.DEBUG:
print(f"Writing body: {len(data)} bytes")
self.transport.write(data)

def flush_status(self) -> None:
Expand All @@ -270,6 +279,8 @@ def flush_status(self) -> None:
if self.response_buffer and not self.response_size:
data = self.response_buffer.encode()
self.response_size += len(data)
if self.DEBUG:
print(f"Writing status: {len(data)} bytes")
self.transport.write(data)
self.response_buffer = ""

Expand Down

0 comments on commit bfa68c6

Please sign in to comment.