Skip to content

Commit

Permalink
Go: add support for stream handling of attachments
Browse files Browse the repository at this point in the history
Adds support for streaming reads and writes of attachment data. With
this change, ParseAttachment is dropped as a public function and
replaced with a private function, parseAttachmentReader.

A new type AttachmentReader is exposed, which exposes the Data portion
of an attachment as a reader that can be streamed. The Attachment type
is modified to include a new ContentLength field, to allow the writer to
write the attachment length before stream processing the body.

A new option, AttachmentCallback, is exposed in the Lexer. Rather than
emitting attachments, the Lexer now parses an attachment reader and
calls the supplied callback (if supplied), and consumes any unconsumed
bytes from the attachment record (whether or not a callback is
supplied).

This is a breaking change in the Go SDK, and introduces some awkwardness
due to the presence of the new ContentLength field in the Attachment
record. In the specification, that value is folded into the Data field,
however this is not a tenable approach for stream handling. The actual
binary format is not affected by this - only the way in which the binary
format is described.

This change is required for readers and writers to be able to handle
attachments bigger than available RAM. Without it, attaching a
sufficiently large attachment will crash the reader/writer with an OOM.
  • Loading branch information
wkalt committed Nov 21, 2022
1 parent 75503df commit d6f8b05
Show file tree
Hide file tree
Showing 18 changed files with 533 additions and 184 deletions.
12 changes: 10 additions & 2 deletions go/cli/mcap/cmd/attachment.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,10 +134,17 @@ var addAttachmentCmd = &cobra.Command{
if err != nil {
die("failed to create temp file: %s", err)
}
attachment, err := os.ReadFile(addAttachmentFilename)
attachment, err := os.Open(addAttachmentFilename)
if err != nil {
die("failed to read attachment: %s", err)
die("failed to open attachment file: %s", err)
}
defer attachment.Close()

stat, err := attachment.Stat()
if err != nil {
die("failed to stat file: %s", err)
}
contentLength := stat.Size()
err = utils.WithReader(ctx, filename, func(remote bool, rs io.ReadSeeker) error {
if remote {
die("not supported on remote MCAP files")
Expand All @@ -160,6 +167,7 @@ var addAttachmentCmd = &cobra.Command{
CreateTime: createTime,
Name: addAttachmentFilename,
MediaType: addAttachmentMediaType,
DataSize: uint64(contentLength),
Data: attachment,
})
})
Expand Down
17 changes: 6 additions & 11 deletions go/cli/mcap/cmd/doctor.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,9 @@ func (doctor *mcapDoctor) examineChunk(chunk *mcap.Chunk) {
uncompressedBytesReader := bytes.NewReader(uncompressedBytes)

lexer, err := mcap.NewLexer(uncompressedBytesReader, &mcap.LexerOptions{
SkipMagic: true,
ValidateCRC: true,
EmitChunks: true,
SkipMagic: true,
ValidateChunkCRCs: true,
EmitChunks: true,
})
if err != nil {
doctor.error("Failed to make lexer for chunk bytes", err)
Expand Down Expand Up @@ -209,9 +209,9 @@ func (doctor *mcapDoctor) examineChunk(chunk *mcap.Chunk) {

func (doctor *mcapDoctor) Examine() error {
lexer, err := mcap.NewLexer(doctor.reader, &mcap.LexerOptions{
SkipMagic: false,
ValidateCRC: true,
EmitChunks: true,
SkipMagic: false,
ValidateChunkCRCs: true,
EmitChunks: true,
})
if err != nil {
doctor.fatal(err)
Expand Down Expand Up @@ -334,11 +334,6 @@ func (doctor *mcapDoctor) Examine() error {
doctor.error("Multiple chunk indexes found for chunk at offset", chunkIndex.ChunkStartOffset)
}
doctor.chunkIndexes[chunkIndex.ChunkStartOffset] = chunkIndex
case mcap.TokenAttachment:
_, err := mcap.ParseAttachment(data)
if err != nil {
doctor.error("Failed to parse attachment:", err)
}
case mcap.TokenAttachmentIndex:
_, err := mcap.ParseAttachmentIndex(data)
if err != nil {
Expand Down
54 changes: 32 additions & 22 deletions go/cli/mcap/cmd/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,10 +168,6 @@ func filter(
w io.Writer,
opts *filterOpts,
) error {
lexer, err := mcap.NewLexer(r, &mcap.LexerOptions{ValidateCRC: true, EmitInvalidChunks: opts.recover})
if err != nil {
return err
}
mcapWriter, err := mcap.NewWriter(w, &mcap.WriterOptions{
Compression: opts.compressionFormat,
Chunked: true,
Expand All @@ -183,6 +179,38 @@ func filter(

var numMessages, numAttachments, numMetadata uint64

lexer, err := mcap.NewLexer(r, &mcap.LexerOptions{
ValidateChunkCRCs: true,
EmitInvalidChunks: opts.recover,
AttachmentCallback: func(ar *mcap.AttachmentReader) error {
if !opts.includeAttachments {
return nil
}
if ar.LogTime < opts.start {
return nil
}
if ar.LogTime >= opts.end {
return nil
}
err = mcapWriter.WriteAttachment(&mcap.Attachment{
LogTime: ar.LogTime,
CreateTime: ar.CreateTime,
Name: ar.Name,
MediaType: ar.MediaType,
DataSize: ar.DataSize,
Data: ar.Data(),
})
if err != nil {
return err
}
numAttachments++
return nil
},
})
if err != nil {
return err
}

defer func() {
err := mcapWriter.Close()
if err != nil {
Expand Down Expand Up @@ -297,24 +325,6 @@ func filter(
return err
}
numMessages++
case mcap.TokenAttachment:
if !opts.includeAttachments {
continue
}
attachment, err := mcap.ParseAttachment(data)
if err != nil {
return err
}
if attachment.LogTime < opts.start {
continue
}
if attachment.LogTime >= opts.end {
continue
}
if err = mcapWriter.WriteAttachment(attachment); err != nil {
return err
}
numAttachments++
case mcap.TokenMetadata:
if !opts.includeMetadata {
continue
Expand Down
38 changes: 23 additions & 15 deletions go/cli/mcap/cmd/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ func writeFilterTestInput(t *testing.T, w io.Writer) {
assert.Nil(t, writer.WriteAttachment(&mcap.Attachment{
LogTime: 50,
Name: "attachment",
Data: bytes.NewReader(nil),
}))
assert.Nil(t, writer.WriteMetadata(&mcap.Metadata{
Name: "metadata",
Expand Down Expand Up @@ -154,16 +155,20 @@ func TestFiltering(t *testing.T) {

writeFilterTestInput(t, &readBuf)
assert.Nil(t, filter(&readBuf, &writeBuf, c.opts))

lexer, err := mcap.NewLexer(&writeBuf)
attachmentCounter := 0
metadataCounter := 0
lexer, err := mcap.NewLexer(&writeBuf, &mcap.LexerOptions{
AttachmentCallback: func(ar *mcap.AttachmentReader) error {
attachmentCounter++
return nil
},
})
assert.Nil(t, err)
messageCounter := map[uint16]int{
1: 0,
2: 0,
3: 0,
}
attachmentCounter := 0
metadataCounter := 0
for {
token, record, err := lexer.Next(nil)
if err != nil {
Expand All @@ -175,8 +180,6 @@ func TestFiltering(t *testing.T) {
message, err := mcap.ParseMessage(record)
assert.Nil(t, err)
messageCounter[message.ChannelID]++
case mcap.TokenAttachment:
attachmentCounter++
case mcap.TokenMetadata:
metadataCounter++
}
Expand All @@ -202,15 +205,20 @@ func TestRecover(t *testing.T) {
includeMetadata: true,
}))

lexer, err := mcap.NewLexer(&writeBuf)
assert.Nil(t, err)
messageCounter := map[uint16]int{
1: 0,
2: 0,
3: 0,
}
attachmentCounter := 0
metadataCounter := 0
lexer, err := mcap.NewLexer(&writeBuf, &mcap.LexerOptions{
AttachmentCallback: func(ar *mcap.AttachmentReader) error {
attachmentCounter++
return nil
},
})
assert.Nil(t, err)
for {
token, record, err := lexer.Next(nil)
if err != nil {
Expand All @@ -222,8 +230,6 @@ func TestRecover(t *testing.T) {
message, err := mcap.ParseMessage(record)
assert.Nil(t, err)
messageCounter[message.ChannelID]++
case mcap.TokenAttachment:
attachmentCounter++
case mcap.TokenMetadata:
metadataCounter++
}
Expand All @@ -249,16 +255,20 @@ func TestRecover(t *testing.T) {
includeAttachments: true,
includeMetadata: true,
}))

lexer, err := mcap.NewLexer(&writeBuf)
assert.Nil(t, err)
messageCounter := map[uint16]int{
1: 0,
2: 0,
3: 0,
}
attachmentCounter := 0
metadataCounter := 0
lexer, err := mcap.NewLexer(&writeBuf, &mcap.LexerOptions{
AttachmentCallback: func(ar *mcap.AttachmentReader) error {
attachmentCounter++
return nil
},
})
assert.Nil(t, err)
for {
token, record, err := lexer.Next(nil)
if err != nil {
Expand All @@ -270,8 +280,6 @@ func TestRecover(t *testing.T) {
message, err := mcap.ParseMessage(record)
assert.Nil(t, err)
messageCounter[message.ChannelID]++
case mcap.TokenAttachment:
attachmentCounter++
case mcap.TokenMetadata:
metadataCounter++
}
Expand Down
25 changes: 13 additions & 12 deletions go/cli/mcap/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,9 +163,19 @@ func RewriteMCAP(w io.Writer, r io.ReadSeeker, fns ...func(writer *mcap.Writer)
return fmt.Errorf("failed to seek to reader start: %w", err)
}
lexer, err := mcap.NewLexer(r, &mcap.LexerOptions{
SkipMagic: false,
ValidateCRC: false,
EmitChunks: false,
SkipMagic: false,
ValidateChunkCRCs: false,
EmitChunks: false,
AttachmentCallback: func(ar *mcap.AttachmentReader) error {
return writer.WriteAttachment(&mcap.Attachment{
LogTime: ar.LogTime,
CreateTime: ar.CreateTime,
Name: ar.Name,
MediaType: ar.MediaType,
DataSize: ar.DataSize,
Data: ar.Data(),
})
},
})
if err != nil {
return fmt.Errorf("failed to construct lexer: %w", err)
Expand Down Expand Up @@ -227,15 +237,6 @@ func RewriteMCAP(w io.Writer, r io.ReadSeeker, fns ...func(writer *mcap.Writer)
if err != nil {
return fmt.Errorf("failed to write metadata: %w", err)
}
case mcap.TokenAttachment:
record, err := mcap.ParseAttachment(token)
if err != nil {
return fmt.Errorf("failed to parse metadata: %w", err)
}
err = writer.WriteAttachment(record)
if err != nil {
return fmt.Errorf("failed to write metadata: %w", err)
}
default:
continue
}
Expand Down
51 changes: 39 additions & 12 deletions go/conformance/test-streamed-read-conformance/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,10 +135,12 @@ func (r Record) MarshalJSON() ([]byte, error) {
fields := make([]Field, 0, v.NumField())
for i := 0; i < v.NumField(); i++ {
if name := toSnakeCase(t.Field(i).Name); name != "crc" {
fields = append(fields, Field{
Name: toSnakeCase(t.Field(i).Name),
Value: v.Field(i).Interface(),
})
if v.Field(i).CanInterface() {
fields = append(fields, Field{
Name: toSnakeCase(t.Field(i).Name),
Value: v.Field(i).Interface(),
})
}
}
}
sort.Slice(fields, func(i, j int) bool {
Expand All @@ -158,17 +160,48 @@ func (r Record) MarshalJSON() ([]byte, error) {
return bytes, nil
}

type Attachment struct {
LogTime uint64
CreateTime uint64
Name string
MediaType string
Data []byte
CRC uint32
}

func mcapToJSON(w io.Writer, filepath string) error {
f, err := os.Open(filepath)
if err != nil {
return err
}
defer f.Close()
lexer, err := mcap.NewLexer(f)
records := []Record{}

lexer, err := mcap.NewLexer(f, &mcap.LexerOptions{
AttachmentCallback: func(ar *mcap.AttachmentReader) error {
data, err := io.ReadAll(ar.Data())
if err != nil {
return err
}
crc, err := ar.ParsedCRC()
if err != nil {
return err
}
parsed := Attachment{
LogTime: ar.LogTime,
CreateTime: ar.CreateTime,
Name: ar.Name,
MediaType: ar.MediaType,
Data: data,
CRC: crc,
}
records = append(records, Record{parsed})
return nil
},
})
if err != nil {
return err
}
records := []Record{}
for {
tokenType, data, err := lexer.Next(nil)
if err != nil {
Expand Down Expand Up @@ -227,12 +260,6 @@ func mcapToJSON(w io.Writer, filepath string) error {
return err
}
records = append(records, Record{*chunkIndex})
case mcap.TokenAttachment:
attachment, err := mcap.ParseAttachment(data)
if err != nil {
return err
}
records = append(records, Record{*attachment})
case mcap.TokenAttachmentIndex:
attachmentIndex, err := mcap.ParseAttachmentIndex(data)
if err != nil {
Expand Down
10 changes: 3 additions & 7 deletions go/conformance/test-streamed-write-conformance/main.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"bytes"
"encoding/json"
"fmt"
"io"
Expand Down Expand Up @@ -279,13 +280,8 @@ func parseAttachment(fields []InputField) (*mcap.Attachment, error) {
if err != nil {
return nil, err
}
attachment.Data = data
case "crc":
crc, err := parseUint32(field.Value.(string))
if err != nil {
return nil, err
}
attachment.CRC = crc
attachment.Data = bytes.NewReader(data)
attachment.DataSize = uint64(len(data))
default:
return nil, UnknownField(field.Name)
}
Expand Down
Loading

0 comments on commit d6f8b05

Please sign in to comment.