Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(perf): scan only new data while looking for end of event tokens #173

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 43 additions & 8 deletions event.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,19 @@ func NewEventStreamReader(eventStream io.Reader, maxBufferSize int) *EventStream
initBufferSize := minPosInt(4096, maxBufferSize)
scanner.Buffer(make([]byte, initBufferSize), maxBufferSize)

// this ensures we don't keep checking data we've already scanned within one Split
newDataIndex := 0
split := func(data []byte, atEOF bool) (int, []byte, error) {
if atEOF && len(data) == 0 {
return 0, nil, nil
}

// We have a full event payload to parse.
if i, nlen := containsDoubleNewline(data); i >= 0 {
if i, nlen := containsDoubleNewline(data, newDataIndex); i >= 0 {
newDataIndex = 0 // reset for next token
return i + nlen, data[0:i], nil
}
newDataIndex = len(data) // we've already scanned the entire data up to this point
// If we're at EOF, we have all of the data.
if atEOF {
return len(data), data, nil
Expand All @@ -64,16 +68,38 @@ func NewEventStreamReader(eventStream io.Reader, maxBufferSize int) *EventStream
// Returns a tuple containing the index of a double newline, and the number of bytes
// represented by that sequence. If no double newline is present, the first value
// will be negative.
func containsDoubleNewline(data []byte) (int, int) {
func containsDoubleNewline(data []byte, newDataIndex int) (int, int) {
// Look back of the length of the longest search string to start from the end of the data that
// we looked at already minus the smallest length that could contain the search string from
// backtracking. this prevents n^2 lookups if data repeatedly does not contain the search
// string and slowly grows, especially to a large size.
lookBackStart := max(0, newDataIndex-3) // len(cr lf cr lf) - 1
lookBack := data[lookBackStart:]

// Search for each potentially valid sequence of newline characters
crcr := bytes.Index(data, []byte("\r\r"))
lflf := bytes.Index(data, []byte("\n\n"))
crlflf := bytes.Index(data, []byte("\r\n\n"))
lfcrlf := bytes.Index(data, []byte("\n\r\n"))
crlfcrlf := bytes.Index(data, []byte("\r\n\r\n"))
crcr := bytes.Index(lookBack, []byte("\r\r"))
if crcr >= 0 {
crcr += lookBackStart
}
lflf := bytes.Index(lookBack, []byte("\n\n"))
if lflf >= 0 {
lflf += lookBackStart
}
crlflf := bytes.Index(lookBack, []byte("\r\n\n"))
if crlflf >= 0 {
crlflf += lookBackStart
}
lfcrlf := bytes.Index(lookBack, []byte("\n\r\n"))
if lfcrlf >= 0 {
lfcrlf += lookBackStart
}
crlfcrlf := bytes.Index(lookBack, []byte("\r\n\r\n"))
if crlfcrlf >= 0 {
crlfcrlf += lookBackStart
}
// Find the earliest position of a double newline combination
minPos := minPosInt(crcr, minPosInt(lflf, minPosInt(crlflf, minPosInt(lfcrlf, crlfcrlf))))
// Detemine the length of the sequence
// Determine the length of the sequence
nlen := 2
if minPos == crlfcrlf {
nlen = 4
Expand All @@ -98,6 +124,15 @@ func minPosInt(a, b int) int {
return a
}

// max returns the max integer between the two inputs
// TODO remove when min supported go version is 1.21, as max is now a built in function
func max(a, b int) int {
if a > b {
return a
}
return b
}

// ReadEvent scans the EventStream for events.
func (e *EventStreamReader) ReadEvent() ([]byte, error) {
if e.scanner.Scan() {
Expand Down