From d6f8b05261287f53093f4550970fa37ff7556a30 Mon Sep 17 00:00:00 2001 From: Wyatt Alt Date: Sun, 20 Nov 2022 15:17:40 -0800 Subject: [PATCH] Go: add support for stream handling of attachments Adds support for streaming reads and writes of attachment data. With this change, ParseAttachment is dropped as a public function and replaced with a private function, parseAttachmentReader. A new type AttachmentReader is exposed, which exposes the Data portion of an attachment as a reader that can be streamed. The Attachment type is modified to include a new ContentLength field, to allow the writer to write the attachment length before stream processing the body. A new option, AttachmentCallback, is exposed in the Lexer. Rather than emitting attachments, the Lexer now parses an attachment reader and calls the supplied callback (if supplied), and consumes any unconsumed bytes from the attachment record (whether or not a callback is supplied). This is a breaking change in the Go SDK, and introduces some awkwardness due to the presence of the new ContentLength field in the Attachment record. In the specification, that value is folded into the Data field, however this is not a tenable approach for stream handling. The actual binary format is not affected by this - only the way in which the binary format is described. This change is required for readers and writers to be able to handle attachments bigger than available RAM. Without it, attaching a sufficiently large attachment will crash the reader/writer with an OOM. --- go/cli/mcap/cmd/attachment.go | 12 +- go/cli/mcap/cmd/doctor.go | 17 +-- go/cli/mcap/cmd/filter.go | 54 +++++---- go/cli/mcap/cmd/filter_test.go | 38 ++++--- go/cli/mcap/utils/utils.go | 25 +++-- .../test-streamed-read-conformance/main.go | 51 +++++++-- .../test-streamed-write-conformance/main.go | 10 +- go/mcap/crc_reader.go | 33 ++++++ go/mcap/lexer.go | 86 +++++++++++---- go/mcap/lexer_test.go | 104 ++++++++++++++---- go/mcap/mcap.go | 58 +++++++++- go/mcap/parse.go | 65 ++++++----- go/mcap/reader.go | 10 +- go/mcap/reader_test.go | 7 +- go/mcap/testutils.go | 10 +- go/mcap/utils.go | 34 ++++++ go/mcap/writer.go | 53 +++++++-- go/mcap/writer_test.go | 50 +++++++-- 18 files changed, 533 insertions(+), 184 deletions(-) create mode 100644 go/mcap/crc_reader.go diff --git a/go/cli/mcap/cmd/attachment.go b/go/cli/mcap/cmd/attachment.go index bba70f3474..7ebca3e02c 100644 --- a/go/cli/mcap/cmd/attachment.go +++ b/go/cli/mcap/cmd/attachment.go @@ -134,10 +134,17 @@ var addAttachmentCmd = &cobra.Command{ if err != nil { die("failed to create temp file: %s", err) } - attachment, err := os.ReadFile(addAttachmentFilename) + attachment, err := os.Open(addAttachmentFilename) if err != nil { - die("failed to read attachment: %s", err) + die("failed to open attachment file: %s", err) } + defer attachment.Close() + + stat, err := attachment.Stat() + if err != nil { + die("failed to stat file: %s", err) + } + contentLength := stat.Size() err = utils.WithReader(ctx, filename, func(remote bool, rs io.ReadSeeker) error { if remote { die("not supported on remote MCAP files") @@ -160,6 +167,7 @@ var addAttachmentCmd = &cobra.Command{ CreateTime: createTime, Name: addAttachmentFilename, MediaType: addAttachmentMediaType, + DataSize: uint64(contentLength), Data: attachment, }) }) diff --git a/go/cli/mcap/cmd/doctor.go b/go/cli/mcap/cmd/doctor.go index e78790b330..3477f14ff0 100644 --- a/go/cli/mcap/cmd/doctor.go +++ b/go/cli/mcap/cmd/doctor.go @@ -105,9 +105,9 @@ func (doctor *mcapDoctor) examineChunk(chunk *mcap.Chunk) { uncompressedBytesReader := bytes.NewReader(uncompressedBytes) lexer, err := mcap.NewLexer(uncompressedBytesReader, &mcap.LexerOptions{ - SkipMagic: true, - ValidateCRC: true, - EmitChunks: true, + SkipMagic: true, + ValidateChunkCRCs: true, + EmitChunks: true, }) if err != nil { doctor.error("Failed to make lexer for chunk bytes", err) @@ -209,9 +209,9 @@ func (doctor *mcapDoctor) examineChunk(chunk *mcap.Chunk) { func (doctor *mcapDoctor) Examine() error { lexer, err := mcap.NewLexer(doctor.reader, &mcap.LexerOptions{ - SkipMagic: false, - ValidateCRC: true, - EmitChunks: true, + SkipMagic: false, + ValidateChunkCRCs: true, + EmitChunks: true, }) if err != nil { doctor.fatal(err) @@ -334,11 +334,6 @@ func (doctor *mcapDoctor) Examine() error { doctor.error("Multiple chunk indexes found for chunk at offset", chunkIndex.ChunkStartOffset) } doctor.chunkIndexes[chunkIndex.ChunkStartOffset] = chunkIndex - case mcap.TokenAttachment: - _, err := mcap.ParseAttachment(data) - if err != nil { - doctor.error("Failed to parse attachment:", err) - } case mcap.TokenAttachmentIndex: _, err := mcap.ParseAttachmentIndex(data) if err != nil { diff --git a/go/cli/mcap/cmd/filter.go b/go/cli/mcap/cmd/filter.go index d229dc41f3..dda14b7293 100644 --- a/go/cli/mcap/cmd/filter.go +++ b/go/cli/mcap/cmd/filter.go @@ -168,10 +168,6 @@ func filter( w io.Writer, opts *filterOpts, ) error { - lexer, err := mcap.NewLexer(r, &mcap.LexerOptions{ValidateCRC: true, EmitInvalidChunks: opts.recover}) - if err != nil { - return err - } mcapWriter, err := mcap.NewWriter(w, &mcap.WriterOptions{ Compression: opts.compressionFormat, Chunked: true, @@ -183,6 +179,38 @@ func filter( var numMessages, numAttachments, numMetadata uint64 + lexer, err := mcap.NewLexer(r, &mcap.LexerOptions{ + ValidateChunkCRCs: true, + EmitInvalidChunks: opts.recover, + AttachmentCallback: func(ar *mcap.AttachmentReader) error { + if !opts.includeAttachments { + return nil + } + if ar.LogTime < opts.start { + return nil + } + if ar.LogTime >= opts.end { + return nil + } + err = mcapWriter.WriteAttachment(&mcap.Attachment{ + LogTime: ar.LogTime, + CreateTime: ar.CreateTime, + Name: ar.Name, + MediaType: ar.MediaType, + DataSize: ar.DataSize, + Data: ar.Data(), + }) + if err != nil { + return err + } + numAttachments++ + return nil + }, + }) + if err != nil { + return err + } + defer func() { err := mcapWriter.Close() if err != nil { @@ -297,24 +325,6 @@ func filter( return err } numMessages++ - case mcap.TokenAttachment: - if !opts.includeAttachments { - continue - } - attachment, err := mcap.ParseAttachment(data) - if err != nil { - return err - } - if attachment.LogTime < opts.start { - continue - } - if attachment.LogTime >= opts.end { - continue - } - if err = mcapWriter.WriteAttachment(attachment); err != nil { - return err - } - numAttachments++ case mcap.TokenMetadata: if !opts.includeMetadata { continue diff --git a/go/cli/mcap/cmd/filter_test.go b/go/cli/mcap/cmd/filter_test.go index b6a4a67aae..f1b707aee9 100644 --- a/go/cli/mcap/cmd/filter_test.go +++ b/go/cli/mcap/cmd/filter_test.go @@ -54,6 +54,7 @@ func writeFilterTestInput(t *testing.T, w io.Writer) { assert.Nil(t, writer.WriteAttachment(&mcap.Attachment{ LogTime: 50, Name: "attachment", + Data: bytes.NewReader(nil), })) assert.Nil(t, writer.WriteMetadata(&mcap.Metadata{ Name: "metadata", @@ -154,16 +155,20 @@ func TestFiltering(t *testing.T) { writeFilterTestInput(t, &readBuf) assert.Nil(t, filter(&readBuf, &writeBuf, c.opts)) - - lexer, err := mcap.NewLexer(&writeBuf) + attachmentCounter := 0 + metadataCounter := 0 + lexer, err := mcap.NewLexer(&writeBuf, &mcap.LexerOptions{ + AttachmentCallback: func(ar *mcap.AttachmentReader) error { + attachmentCounter++ + return nil + }, + }) assert.Nil(t, err) messageCounter := map[uint16]int{ 1: 0, 2: 0, 3: 0, } - attachmentCounter := 0 - metadataCounter := 0 for { token, record, err := lexer.Next(nil) if err != nil { @@ -175,8 +180,6 @@ func TestFiltering(t *testing.T) { message, err := mcap.ParseMessage(record) assert.Nil(t, err) messageCounter[message.ChannelID]++ - case mcap.TokenAttachment: - attachmentCounter++ case mcap.TokenMetadata: metadataCounter++ } @@ -202,8 +205,6 @@ func TestRecover(t *testing.T) { includeMetadata: true, })) - lexer, err := mcap.NewLexer(&writeBuf) - assert.Nil(t, err) messageCounter := map[uint16]int{ 1: 0, 2: 0, @@ -211,6 +212,13 @@ func TestRecover(t *testing.T) { } attachmentCounter := 0 metadataCounter := 0 + lexer, err := mcap.NewLexer(&writeBuf, &mcap.LexerOptions{ + AttachmentCallback: func(ar *mcap.AttachmentReader) error { + attachmentCounter++ + return nil + }, + }) + assert.Nil(t, err) for { token, record, err := lexer.Next(nil) if err != nil { @@ -222,8 +230,6 @@ func TestRecover(t *testing.T) { message, err := mcap.ParseMessage(record) assert.Nil(t, err) messageCounter[message.ChannelID]++ - case mcap.TokenAttachment: - attachmentCounter++ case mcap.TokenMetadata: metadataCounter++ } @@ -249,9 +255,6 @@ func TestRecover(t *testing.T) { includeAttachments: true, includeMetadata: true, })) - - lexer, err := mcap.NewLexer(&writeBuf) - assert.Nil(t, err) messageCounter := map[uint16]int{ 1: 0, 2: 0, @@ -259,6 +262,13 @@ func TestRecover(t *testing.T) { } attachmentCounter := 0 metadataCounter := 0 + lexer, err := mcap.NewLexer(&writeBuf, &mcap.LexerOptions{ + AttachmentCallback: func(ar *mcap.AttachmentReader) error { + attachmentCounter++ + return nil + }, + }) + assert.Nil(t, err) for { token, record, err := lexer.Next(nil) if err != nil { @@ -270,8 +280,6 @@ func TestRecover(t *testing.T) { message, err := mcap.ParseMessage(record) assert.Nil(t, err) messageCounter[message.ChannelID]++ - case mcap.TokenAttachment: - attachmentCounter++ case mcap.TokenMetadata: metadataCounter++ } diff --git a/go/cli/mcap/utils/utils.go b/go/cli/mcap/utils/utils.go index 9a98b09f10..3372d97ad8 100644 --- a/go/cli/mcap/utils/utils.go +++ b/go/cli/mcap/utils/utils.go @@ -163,9 +163,19 @@ func RewriteMCAP(w io.Writer, r io.ReadSeeker, fns ...func(writer *mcap.Writer) return fmt.Errorf("failed to seek to reader start: %w", err) } lexer, err := mcap.NewLexer(r, &mcap.LexerOptions{ - SkipMagic: false, - ValidateCRC: false, - EmitChunks: false, + SkipMagic: false, + ValidateChunkCRCs: false, + EmitChunks: false, + AttachmentCallback: func(ar *mcap.AttachmentReader) error { + return writer.WriteAttachment(&mcap.Attachment{ + LogTime: ar.LogTime, + CreateTime: ar.CreateTime, + Name: ar.Name, + MediaType: ar.MediaType, + DataSize: ar.DataSize, + Data: ar.Data(), + }) + }, }) if err != nil { return fmt.Errorf("failed to construct lexer: %w", err) @@ -227,15 +237,6 @@ func RewriteMCAP(w io.Writer, r io.ReadSeeker, fns ...func(writer *mcap.Writer) if err != nil { return fmt.Errorf("failed to write metadata: %w", err) } - case mcap.TokenAttachment: - record, err := mcap.ParseAttachment(token) - if err != nil { - return fmt.Errorf("failed to parse metadata: %w", err) - } - err = writer.WriteAttachment(record) - if err != nil { - return fmt.Errorf("failed to write metadata: %w", err) - } default: continue } diff --git a/go/conformance/test-streamed-read-conformance/main.go b/go/conformance/test-streamed-read-conformance/main.go index 25f0f9ec34..7291c8acd9 100644 --- a/go/conformance/test-streamed-read-conformance/main.go +++ b/go/conformance/test-streamed-read-conformance/main.go @@ -135,10 +135,12 @@ func (r Record) MarshalJSON() ([]byte, error) { fields := make([]Field, 0, v.NumField()) for i := 0; i < v.NumField(); i++ { if name := toSnakeCase(t.Field(i).Name); name != "crc" { - fields = append(fields, Field{ - Name: toSnakeCase(t.Field(i).Name), - Value: v.Field(i).Interface(), - }) + if v.Field(i).CanInterface() { + fields = append(fields, Field{ + Name: toSnakeCase(t.Field(i).Name), + Value: v.Field(i).Interface(), + }) + } } } sort.Slice(fields, func(i, j int) bool { @@ -158,17 +160,48 @@ func (r Record) MarshalJSON() ([]byte, error) { return bytes, nil } +type Attachment struct { + LogTime uint64 + CreateTime uint64 + Name string + MediaType string + Data []byte + CRC uint32 +} + func mcapToJSON(w io.Writer, filepath string) error { f, err := os.Open(filepath) if err != nil { return err } defer f.Close() - lexer, err := mcap.NewLexer(f) + records := []Record{} + + lexer, err := mcap.NewLexer(f, &mcap.LexerOptions{ + AttachmentCallback: func(ar *mcap.AttachmentReader) error { + data, err := io.ReadAll(ar.Data()) + if err != nil { + return err + } + crc, err := ar.ParsedCRC() + if err != nil { + return err + } + parsed := Attachment{ + LogTime: ar.LogTime, + CreateTime: ar.CreateTime, + Name: ar.Name, + MediaType: ar.MediaType, + Data: data, + CRC: crc, + } + records = append(records, Record{parsed}) + return nil + }, + }) if err != nil { return err } - records := []Record{} for { tokenType, data, err := lexer.Next(nil) if err != nil { @@ -227,12 +260,6 @@ func mcapToJSON(w io.Writer, filepath string) error { return err } records = append(records, Record{*chunkIndex}) - case mcap.TokenAttachment: - attachment, err := mcap.ParseAttachment(data) - if err != nil { - return err - } - records = append(records, Record{*attachment}) case mcap.TokenAttachmentIndex: attachmentIndex, err := mcap.ParseAttachmentIndex(data) if err != nil { diff --git a/go/conformance/test-streamed-write-conformance/main.go b/go/conformance/test-streamed-write-conformance/main.go index 683ffb3028..f65316b73a 100644 --- a/go/conformance/test-streamed-write-conformance/main.go +++ b/go/conformance/test-streamed-write-conformance/main.go @@ -1,6 +1,7 @@ package main import ( + "bytes" "encoding/json" "fmt" "io" @@ -279,13 +280,8 @@ func parseAttachment(fields []InputField) (*mcap.Attachment, error) { if err != nil { return nil, err } - attachment.Data = data - case "crc": - crc, err := parseUint32(field.Value.(string)) - if err != nil { - return nil, err - } - attachment.CRC = crc + attachment.Data = bytes.NewReader(data) + attachment.DataSize = uint64(len(data)) default: return nil, UnknownField(field.Name) } diff --git a/go/mcap/crc_reader.go b/go/mcap/crc_reader.go new file mode 100644 index 0000000000..7edec193fa --- /dev/null +++ b/go/mcap/crc_reader.go @@ -0,0 +1,33 @@ +package mcap + +import ( + "hash" + "hash/crc32" + "io" +) + +type crcReader struct { + r io.Reader + crc hash.Hash32 + computeCRC bool +} + +func (r *crcReader) Read(p []byte) (int, error) { + n, err := r.r.Read(p) + if r.computeCRC { + _, _ = r.crc.Write(p[:n]) + } + return n, err +} + +func (r *crcReader) Checksum() uint32 { + return r.crc.Sum32() +} + +func newCRCReader(r io.Reader, computeCRC bool) *crcReader { + return &crcReader{ + r: r, + crc: crc32.NewIEEE(), + computeCRC: computeCRC, + } +} diff --git a/go/mcap/lexer.go b/go/mcap/lexer.go index 9cd17bf62c..47790ce95a 100644 --- a/go/mcap/lexer.go +++ b/go/mcap/lexer.go @@ -80,8 +80,6 @@ const ( TokenMessageIndex // TokenChunkIndex represents a chunk index token. TokenChunkIndex - // TokenAttachment represents an attachment token. - TokenAttachment // TokenAttachmentIndex represents an attachment index token. TokenAttachmentIndex // TokenStatistics represents a statistics token. @@ -122,8 +120,6 @@ func (t TokenType) String() string { return "message index" case TokenChunkIndex: return "chunk index" - case TokenAttachment: - return "attachment" case TokenAttachmentIndex: return "attachment index" case TokenStatistics: @@ -155,10 +151,12 @@ type Lexer struct { inChunk bool buf []byte uncompressedChunk []byte - validateCRC bool + validateChunkCRCs bool + computeAttachmentCRCs bool emitInvalidChunks bool maxRecordSize int maxDecompressedChunkSize int + attachmentCallback func(*AttachmentReader) error } // Next returns the next token from the lexer as a byte array. The result will @@ -191,16 +189,52 @@ func (l *Lexer) Next(p []byte) (TokenType, []byte, error) { if l.maxRecordSize > 0 && recordLen > uint64(l.maxRecordSize) { return TokenError, nil, ErrRecordTooLarge } - if opcode == OpChunk && !l.emitChunks { - err := loadChunk(l, recordLen) - if err != nil { - if l.emitInvalidChunks { - var invalidCrc *errInvalidChunkCrc - if errors.As(err, &invalidCrc) { - return TokenInvalidChunk, nil, err + + // Chunks and attachments require special handling to avoid + // materialization into RAM. If it's a chunk, open up a decompressor and + // swap it in as the active reader, then continue on the next message + // (which will be from the chunk data). If it's an attachment, parse the + // record into an AttachmentReader and call any user-supplied callback. + // Then discard any remaining data and continue to the next record. + switch opcode { + case OpChunk: + if !l.emitChunks { + err := loadChunk(l, recordLen) + if err != nil { + if l.emitInvalidChunks { + var invalidCrc *errInvalidChunkCrc + if errors.As(err, &invalidCrc) { + return TokenInvalidChunk, nil, err + } } + return TokenError, nil, err } - return TokenError, nil, err + continue + } + case OpAttachment: + limitReader := &io.LimitedReader{ + R: l.reader, + N: int64(recordLen), + } + + if l.attachmentCallback != nil { + attachmentReader, err := parseAttachmentReader( + limitReader, + l.computeAttachmentCRCs, + ) + if err != nil { + return TokenError, nil, fmt.Errorf("failed to parse attachment: %w", err) + } + err = l.attachmentCallback(attachmentReader) + if err != nil { + return TokenError, nil, fmt.Errorf("failed to handle attachment: %w", err) + } + } + + // skip the base reader ahead to cover any unconsumed bytes of the attachment + err := skipReader(limitReader.R, limitReader.N) + if err != nil { + return TokenError, nil, fmt.Errorf("failed to consume unhandled attachment data: %w", err) } continue } @@ -238,8 +272,6 @@ func (l *Lexer) Next(p []byte) (TokenType, []byte, error) { return TokenChannel, record, nil case OpFooter: return TokenFooter, record, nil - case OpAttachment: - return TokenAttachment, record, nil case OpAttachmentIndex: return TokenAttachmentIndex, record, nil case OpChunkIndex: @@ -394,7 +426,7 @@ func loadChunk(l *Lexer, recordLen uint64) error { // validation. If we are not validating CRCs, we can use incremental // decompression for the chunk's data, which may be beneficial to streaming // readers. - if l.validateCRC { + if l.validateChunkCRCs { if l.maxDecompressedChunkSize > 0 && uncompressedSize > uint64(l.maxDecompressedChunkSize) { return ErrChunkTooLarge } @@ -437,8 +469,13 @@ func loadChunk(l *Lexer, recordLen uint64) error { type LexerOptions struct { // SkipMagic instructs the lexer not to perform validation of the leading magic bytes. SkipMagic bool - // ValidateCRC instructs the lexer to validate CRC checksums for chunks. - ValidateCRC bool + // ValidateChunkCRC instructs the lexer to validate CRC checksums for + // chunks. + ValidateChunkCRCs bool + // ComputeAttachmentCRCs instructs the lexer to compute CRCs for any + // attachments parsed from the file. Consumers should only set this to true + // if they intend to validate those CRCs in their attachment callback. + ComputeAttachmentCRCs bool // EmitChunks instructs the lexer to emit chunk records without de-chunking. // It is incompatible with ValidateCRC. EmitChunks bool @@ -451,19 +488,24 @@ type LexerOptions struct { // MaxRecordSize defines the maximum size record the lexer will read. // Records larger than this will result in an error. MaxRecordSize int + // AttachmentCallback is a function to execute on attachments encountered in the file. + AttachmentCallback func(*AttachmentReader) error } // NewLexer returns a new lexer for the given reader. func NewLexer(r io.Reader, opts ...*LexerOptions) (*Lexer, error) { var maxRecordSize, maxDecompressedChunkSize int - var validateCRC, emitChunks, emitInvalidChunks, skipMagic bool + var computeAttachmentCRCs, validateChunkCRCs, emitChunks, emitInvalidChunks, skipMagic bool + var attachmentCallback func(*AttachmentReader) error if len(opts) > 0 { - validateCRC = opts[0].ValidateCRC + validateChunkCRCs = opts[0].ValidateChunkCRCs + computeAttachmentCRCs = opts[0].ComputeAttachmentCRCs emitChunks = opts[0].EmitChunks emitInvalidChunks = opts[0].EmitInvalidChunks skipMagic = opts[0].SkipMagic maxRecordSize = opts[0].MaxRecordSize maxDecompressedChunkSize = opts[0].MaxDecompressedChunkSize + attachmentCallback = opts[0].AttachmentCallback } if !skipMagic { err := validateMagic(r) @@ -475,10 +517,12 @@ func NewLexer(r io.Reader, opts ...*LexerOptions) (*Lexer, error) { basereader: r, reader: r, buf: make([]byte, 32), - validateCRC: validateCRC, + validateChunkCRCs: validateChunkCRCs, + computeAttachmentCRCs: computeAttachmentCRCs, emitChunks: emitChunks, emitInvalidChunks: emitInvalidChunks, maxRecordSize: maxRecordSize, maxDecompressedChunkSize: maxDecompressedChunkSize, + attachmentCallback: attachmentCallback, }, nil } diff --git a/go/mcap/lexer_test.go b/go/mcap/lexer_test.go index 79abbda4eb..cc0c7c9824 100644 --- a/go/mcap/lexer_test.go +++ b/go/mcap/lexer_test.go @@ -20,8 +20,8 @@ func TestLexUnchunkedFile(t *testing.T) { channelInfo(), message(), message(), - record(OpAttachment), - record(OpAttachment), + attachment(), + attachment(), channelInfo(), record(OpAttachmentIndex), footer(), @@ -33,8 +33,6 @@ func TestLexUnchunkedFile(t *testing.T) { TokenChannel, TokenMessage, TokenMessage, - TokenAttachment, - TokenAttachment, TokenChannel, TokenAttachmentIndex, TokenFooter, @@ -75,7 +73,7 @@ func TestRejectsTooLargeChunks(t *testing.T) { file := file(header(), bigChunk, footer()) lexer, err := NewLexer(bytes.NewReader(file), &LexerOptions{ MaxDecompressedChunkSize: 999, - ValidateCRC: true, + ValidateChunkCRCs: true, }) assert.Nil(t, err) _, _, err = lexer.Next(nil) @@ -165,7 +163,7 @@ func TestLexChunkedFile(t *testing.T) { footer(), ) lexer, err := NewLexer(bytes.NewReader(file), &LexerOptions{ - ValidateCRC: validateCRC, + ValidateChunkCRCs: validateCRC, }) assert.Nil(t, err) expected := []TokenType{ @@ -176,8 +174,6 @@ func TestLexChunkedFile(t *testing.T) { TokenChannel, TokenMessage, TokenMessage, - TokenAttachment, - TokenAttachment, TokenFooter, } for i, expectedTokenType := range expected { @@ -224,7 +220,7 @@ func TestChunkCRCValidation(t *testing.T) { footer(), ) lexer, err := NewLexer(bytes.NewReader(file), &LexerOptions{ - ValidateCRC: true, + ValidateChunkCRCs: true, }) assert.Nil(t, err) expected := []TokenType{ @@ -235,8 +231,6 @@ func TestChunkCRCValidation(t *testing.T) { TokenChannel, TokenMessage, TokenMessage, - TokenAttachment, - TokenAttachment, TokenFooter, } for i, expectedTokenType := range expected { @@ -254,7 +248,7 @@ func TestChunkCRCValidation(t *testing.T) { footer(), ) lexer, err := NewLexer(bytes.NewReader(file), &LexerOptions{ - ValidateCRC: true, + ValidateChunkCRCs: true, }) assert.Nil(t, err) expected := []TokenType{ @@ -265,8 +259,6 @@ func TestChunkCRCValidation(t *testing.T) { TokenChannel, TokenMessage, TokenMessage, - TokenAttachment, - TokenAttachment, TokenFooter, } for i, expectedTokenType := range expected { @@ -289,7 +281,7 @@ func TestChunkCRCValidation(t *testing.T) { footer(), ) lexer, err := NewLexer(bytes.NewReader(file), &LexerOptions{ - ValidateCRC: true, + ValidateChunkCRCs: true, }) assert.Nil(t, err) expected := []TokenType{ @@ -309,6 +301,80 @@ func TestChunkCRCValidation(t *testing.T) { }) } +func TestAttachmentHandling(t *testing.T) { + cases := []struct { + assertion string + attachmentData []byte + attachment *Attachment + }{ + { + "empty attachment", + []byte{}, + &Attachment{ + LogTime: 0, + CreateTime: 0, + Name: "empty", + MediaType: "mediaType", + DataSize: 0, + }, + }, + { + "nonempty attachment", + []byte{0x01, 0x02, 0x03, 0x04}, + &Attachment{ + LogTime: 0, + CreateTime: 0, + Name: "nonempty", + MediaType: "media", + DataSize: 4, + }, + }, + } + for _, c := range cases { + t.Run(c.assertion, func(t *testing.T) { + c.attachment.Data = bytes.NewReader(c.attachmentData) + file := &bytes.Buffer{} + writer, err := NewWriter(file, &WriterOptions{}) + assert.Nil(t, err) + assert.Nil(t, writer.WriteHeader(&Header{ + Profile: "", + Library: "", + })) + assert.Nil(t, writer.WriteAttachment(c.attachment)) + assert.Nil(t, writer.Close()) + + var called bool + lexer, err := NewLexer(file, &LexerOptions{ + ComputeAttachmentCRCs: true, + AttachmentCallback: func(ar *AttachmentReader) error { + assert.Equal(t, c.attachment.LogTime, ar.LogTime) + assert.Equal(t, c.attachment.CreateTime, ar.CreateTime) + assert.Equal(t, c.attachment.Name, ar.Name) + assert.Equal(t, c.attachment.MediaType, ar.MediaType) + data, err := io.ReadAll(ar.Data()) + assert.Nil(t, err) + assert.Equal(t, c.attachmentData, data) + computedCRC, err := ar.ComputedCRC() + assert.Nil(t, err) + parsedCRC, err := ar.ParsedCRC() + assert.Nil(t, err) + assert.Equal(t, computedCRC, parsedCRC) + called = true + return nil + }, + }) + + for !errors.Is(err, io.EOF) { + _, _, err = lexer.Next(nil) + if !errors.Is(err, io.EOF) { + assert.Nil(t, err) + } + } + assert.True(t, called) + }) + } +} + func TestChunkEmission(t *testing.T) { for _, validateCRC := range []bool{ true, @@ -329,16 +395,14 @@ func TestChunkEmission(t *testing.T) { footer(), ) lexer, err := NewLexer(bytes.NewReader(file), &LexerOptions{ - ValidateCRC: validateCRC, - EmitChunks: true, + ValidateChunkCRCs: validateCRC, + EmitChunks: true, }) assert.Nil(t, err) expected := []TokenType{ TokenHeader, TokenChunk, TokenChunk, - TokenAttachment, - TokenAttachment, TokenFooter, } for i, expectedTokenType := range expected { @@ -385,7 +449,7 @@ func BenchmarkLexer(b *testing.B) { var tokens, bytecount int64 reader.Reset(input) lexer, err := NewLexer(reader, &LexerOptions{ - ValidateCRC: validateCRC, + ValidateChunkCRCs: validateCRC, }) assert.Nil(b, err) for { diff --git a/go/mcap/mcap.go b/go/mcap/mcap.go index ac26d218a5..f097b38857 100644 --- a/go/mcap/mcap.go +++ b/go/mcap/mcap.go @@ -1,8 +1,10 @@ package mcap import ( + "encoding/binary" "errors" "fmt" + "io" "math" ) @@ -218,8 +220,60 @@ type Attachment struct { CreateTime uint64 Name string MediaType string - Data []byte - CRC uint32 + DataSize uint64 + Data io.Reader +} + +// AttachmentReader represents an attachment for handling in a streaming manner. +type AttachmentReader struct { + LogTime uint64 + CreateTime uint64 + Name string + MediaType string + DataSize uint64 + + data *io.LimitedReader + baseReader io.Reader + crcReader *crcReader + crc *uint32 +} + +// ComputedCRC discards any remaining data in the Data portion of the +// AttachmentReader, then returns the checksum computed from the fields of the +// attachment up to the CRC. If it is called before the data portion of the +// reader has been fully consumed, an error will be returned. If the +// AttachmentReader has been created with a crcReader that is instructed not to +// compute the CRC, this will return a CRC of zero. +func (ar *AttachmentReader) ComputedCRC() (uint32, error) { + if ar.data.N > 0 { + return 0, fmt.Errorf("attachment CRC requested with unhandled data") + } + return ar.crcReader.Checksum(), nil +} + +// ParsedCRC returns the CRC from the crc field of the record. It must be called +// after the data field has been handled. If ParsedCRC is called before the data +// reader is exhausted, an error is returned. +func (ar *AttachmentReader) ParsedCRC() (uint32, error) { + if ar.crc != nil { + return *ar.crc, nil + } + if ar.data.N > 0 { + return 0, fmt.Errorf("attachment CRC requested with unhandled data") + } + buf := make([]byte, 4) + _, err := io.ReadFull(ar.baseReader, buf) + if err != nil { + return 0, fmt.Errorf("failed to read CRC: %w", err) + } + crc := binary.LittleEndian.Uint32(buf) + ar.crc = &crc + return crc, nil +} + +// Data returns a reader over the data section of the attachment. +func (ar *AttachmentReader) Data() io.Reader { + return ar.data } // AttachmentIndex records contain the location of attachments in the file. An diff --git a/go/mcap/parse.go b/go/mcap/parse.go index 31830fb94a..992b35eaf7 100644 --- a/go/mcap/parse.go +++ b/go/mcap/parse.go @@ -7,11 +7,11 @@ import ( // ParseHeader parses a header record. func ParseHeader(buf []byte) (*Header, error) { - profile, offset, err := readPrefixedString(buf, 0) + profile, offset, err := getPrefixedString(buf, 0) if err != nil { return nil, fmt.Errorf("failed to read profile: %w", err) } - library, _, err := readPrefixedString(buf, offset) + library, _, err := getPrefixedString(buf, offset) if err != nil { return nil, fmt.Errorf("failed to read library: %w", err) } @@ -48,15 +48,15 @@ func ParseSchema(buf []byte) (*Schema, error) { if err != nil { return nil, fmt.Errorf("failed to read schema ID: %w", err) } - name, offset, err := readPrefixedString(buf, offset) + name, offset, err := getPrefixedString(buf, offset) if err != nil { return nil, fmt.Errorf("failed to read schema name: %w", err) } - encoding, offset, err := readPrefixedString(buf, offset) + encoding, offset, err := getPrefixedString(buf, offset) if err != nil { return nil, fmt.Errorf("failed to read encoding: %w", err) } - data, _, err := readPrefixedBytes(buf, offset) + data, _, err := getPrefixedBytes(buf, offset) if err != nil { return nil, fmt.Errorf("failed to read schema data: %w", err) } @@ -78,15 +78,15 @@ func ParseChannel(buf []byte) (*Channel, error) { if err != nil { return nil, fmt.Errorf("failed to read schema ID: %w", err) } - topic, offset, err := readPrefixedString(buf, offset) + topic, offset, err := getPrefixedString(buf, offset) if err != nil { return nil, fmt.Errorf("failed to read topic name: %w", err) } - messageEncoding, offset, err := readPrefixedString(buf, offset) + messageEncoding, offset, err := getPrefixedString(buf, offset) if err != nil { return nil, fmt.Errorf("failed to read message encoding: %w", err) } - metadata, _, err := readPrefixedMap(buf, offset) + metadata, _, err := getPrefixedMap(buf, offset) if err != nil { return nil, fmt.Errorf("failed to read metadata: %w", err) } @@ -145,7 +145,7 @@ func ParseChunk(buf []byte) (*Chunk, error) { if err != nil { return nil, fmt.Errorf("failed to read uncompressed CRC: %w", err) } - compression, offset, err := readPrefixedString(buf, offset) + compression, offset, err := getPrefixedString(buf, offset) if err != nil { return nil, fmt.Errorf("failed to read compression: %w", err) } @@ -239,7 +239,7 @@ func ParseChunkIndex(buf []byte) (*ChunkIndex, error) { if err != nil { return nil, fmt.Errorf("failed to read message index length: %w", err) } - compression, offset, err := readPrefixedString(buf, offset) + compression, offset, err := getPrefixedString(buf, offset) if err != nil { return nil, fmt.Errorf("failed to read compression: %w", err) } @@ -264,41 +264,46 @@ func ParseChunkIndex(buf []byte) (*ChunkIndex, error) { }, nil } -// ParseAttachment parses an attachment record. -func ParseAttachment(buf []byte) (*Attachment, error) { - logTime, offset, err := getUint64(buf, 0) +func parseAttachmentReader( + r io.Reader, + computeCRC bool, +) (*AttachmentReader, error) { + buf := make([]byte, 8) + crcReader := newCRCReader(r, computeCRC) + logTime, err := readUint64(buf, crcReader) if err != nil { return nil, fmt.Errorf("failed to read record time: %w", err) } - createTime, offset, err := getUint64(buf, offset) + createTime, err := readUint64(buf, crcReader) if err != nil { return nil, fmt.Errorf("failed to read create time: %w", err) } - name, offset, err := readPrefixedString(buf, offset) + name, err := readPrefixedString(buf, crcReader) if err != nil { return nil, fmt.Errorf("failed to read attachment name: %w", err) } - mediaType, offset, err := readPrefixedString(buf, offset) + mediaType, err := readPrefixedString(buf, crcReader) if err != nil { return nil, fmt.Errorf("failed to read media type: %w", err) } - dataSize, offset, err := getUint64(buf, offset) + dataSize, err := readUint64(buf, crcReader) if err != nil { return nil, fmt.Errorf("failed to read attachment data size: %w", err) } - data := buf[offset : offset+int(dataSize)] - offset += int(dataSize) - crc, _, err := getUint32(buf, offset) - if err != nil { - return nil, fmt.Errorf("failed to read CRC: %w", err) + limitReader := &io.LimitedReader{ + R: crcReader, + N: int64(dataSize), } - return &Attachment{ + return &AttachmentReader{ LogTime: logTime, CreateTime: createTime, Name: name, MediaType: mediaType, - Data: data, - CRC: crc, + DataSize: dataSize, + + baseReader: r, + crcReader: crcReader, + data: limitReader, }, nil } @@ -324,11 +329,11 @@ func ParseAttachmentIndex(buf []byte) (*AttachmentIndex, error) { if err != nil { return nil, fmt.Errorf("failed to read data size: %w", err) } - name, offset, err := readPrefixedString(buf, offset) + name, offset, err := getPrefixedString(buf, offset) if err != nil { return nil, fmt.Errorf("failed to read attachment name: %w", err) } - mediaType, _, err := readPrefixedString(buf, offset) + mediaType, _, err := getPrefixedString(buf, offset) if err != nil { return nil, fmt.Errorf("failed to read media type: %w", err) } @@ -417,11 +422,11 @@ func ParseStatistics(buf []byte) (*Statistics, error) { // ParseMetadata parses a metadata record. func ParseMetadata(buf []byte) (*Metadata, error) { - name, offset, err := readPrefixedString(buf, 0) + name, offset, err := getPrefixedString(buf, 0) if err != nil { return nil, fmt.Errorf("failed to read metadata name: %w", err) } - metadata, _, err := readPrefixedMap(buf, offset) + metadata, _, err := getPrefixedMap(buf, offset) if err != nil { return nil, fmt.Errorf("failed to read metadata: %w", err) } @@ -441,7 +446,7 @@ func ParseMetadataIndex(buf []byte) (*MetadataIndex, error) { if err != nil { return nil, fmt.Errorf("failed to read metadata length: %w", err) } - name, _, err := readPrefixedString(buf, offset) + name, _, err := getPrefixedString(buf, offset) if err != nil { return nil, fmt.Errorf("failed to read metadata name: %w", err) } diff --git a/go/mcap/reader.go b/go/mcap/reader.go index 620f4220c8..f0a3419b99 100644 --- a/go/mcap/reader.go +++ b/go/mcap/reader.go @@ -10,7 +10,7 @@ import ( "github.com/foxglove/mcap/go/mcap/readopts" ) -func readPrefixedString(data []byte, offset int) (s string, newoffset int, err error) { +func getPrefixedString(data []byte, offset int) (s string, newoffset int, err error) { if len(data[offset:]) < 4 { return "", 0, io.ErrShortBuffer } @@ -21,7 +21,7 @@ func readPrefixedString(data []byte, offset int) (s string, newoffset int, err e return string(data[offset+4 : offset+length+4]), offset + 4 + length, nil } -func readPrefixedBytes(data []byte, offset int) (s []byte, newoffset int, err error) { +func getPrefixedBytes(data []byte, offset int) (s []byte, newoffset int, err error) { if len(data[offset:]) < 4 { return nil, 0, io.ErrShortBuffer } @@ -32,7 +32,7 @@ func readPrefixedBytes(data []byte, offset int) (s []byte, newoffset int, err er return data[offset+4 : offset+length+4], offset + 4 + length, nil } -func readPrefixedMap(data []byte, offset int) (result map[string]string, newoffset int, err error) { +func getPrefixedMap(data []byte, offset int) (result map[string]string, newoffset int, err error) { var key, value string var inset int m := make(map[string]string) @@ -41,11 +41,11 @@ func readPrefixedMap(data []byte, offset int) (result map[string]string, newoffs return nil, 0, fmt.Errorf("failed to read map length: %w", err) } for uint32(offset+inset) < uint32(offset)+maplen { - key, inset, err = readPrefixedString(data[offset:], inset) + key, inset, err = getPrefixedString(data[offset:], inset) if err != nil { return nil, 0, fmt.Errorf("failed to read map key: %w", err) } - value, inset, err = readPrefixedString(data[offset:], inset) + value, inset, err = getPrefixedString(data[offset:], inset) if err != nil { return nil, 0, fmt.Errorf("failed to read map value: %w", err) } diff --git a/go/mcap/reader_test.go b/go/mcap/reader_test.go index abe8ddb75a..c6e0b8190c 100644 --- a/go/mcap/reader_test.go +++ b/go/mcap/reader_test.go @@ -107,7 +107,7 @@ func TestReadPrefixedBytes(t *testing.T) { } for _, c := range cases { t.Run(c.assertion, func(t *testing.T) { - s, off, err := readPrefixedBytes(c.data, 0) + s, off, err := getPrefixedBytes(c.data, 0) assert.ErrorIs(t, c.expectedError, err) assert.Equal(t, c.expectedBytes, s) assert.Equal(t, c.expectedOffset, off) @@ -169,7 +169,7 @@ func TestReadPrefixedMap(t *testing.T) { } for _, c := range cases { t.Run(c.assertion, func(t *testing.T) { - output, offset, err := readPrefixedMap(c.input, 0) + output, offset, err := getPrefixedMap(c.input, 0) assert.ErrorIs(t, err, c.err) assert.Equal(t, offset, c.newOffset) assert.Equal(t, output, c.output) @@ -209,7 +209,7 @@ func TestReadPrefixedString(t *testing.T) { } for _, c := range cases { t.Run(c.assertion, func(t *testing.T) { - s, off, err := readPrefixedString(c.data, 0) + s, off, err := getPrefixedString(c.data, 0) assert.ErrorIs(t, c.expectedError, err) assert.Equal(t, c.expectedString, s) assert.Equal(t, c.expectedOffset, off) @@ -484,6 +484,7 @@ func TestMCAPInfo(t *testing.T) { []*Attachment{ { Name: "my attachment", + Data: &bytes.Buffer{}, }, }, }, diff --git a/go/mcap/testutils.go b/go/mcap/testutils.go index 346eb228e9..8bbbc870f7 100644 --- a/go/mcap/testutils.go +++ b/go/mcap/testutils.go @@ -142,7 +142,15 @@ func record(op OpCode) []byte { } func attachment() []byte { - buf := make([]byte, 9) + recordLen := 9 + // opcode + record length + 8 + // record time + 8 + // create time + 4 + // attachment name length + 4 + // media type length + 8 + // data size + 4 // crc length + buf := make([]byte, recordLen) buf[0] = byte(OpAttachment) + putUint64(buf[1:], uint64(recordLen-9)) return buf } diff --git a/go/mcap/utils.go b/go/mcap/utils.go index a63866980a..f8a913f0c7 100644 --- a/go/mcap/utils.go +++ b/go/mcap/utils.go @@ -5,6 +5,25 @@ import ( "io" ) +func readUint64(buf []byte, r io.Reader) (uint64, error) { + if _, err := io.ReadFull(r, buf[:8]); err != nil { + return 0, err + } + return binary.LittleEndian.Uint64(buf[:8]), nil +} + +func readPrefixedString(buf []byte, r io.Reader) (string, error) { + if _, err := io.ReadFull(r, buf[:4]); err != nil { + return "", err + } + strlen := binary.LittleEndian.Uint32(buf[:4]) + s := make([]byte, strlen) + if _, err := io.ReadFull(r, s); err != nil { + return "", err + } + return string(s), nil +} + func putByte(buf []byte, x byte) (int, error) { if len(buf) < 1 { return 0, io.ErrShortBuffer @@ -60,3 +79,18 @@ func putPrefixedBytes(buf []byte, s []byte) int { offset += copy(buf[offset:], s) return offset } + +func skipReader(r io.Reader, n int64) error { + if rs, ok := r.(io.ReadSeeker); ok { + _, err := rs.Seek(n, io.SeekCurrent) + if err != nil { + return err + } + return nil + } + _, err := io.CopyN(io.Discard, r, n) + if err != nil { + return err + } + return nil +} diff --git a/go/mcap/writer.go b/go/mcap/writer.go index b4e83add7a..89221dba17 100644 --- a/go/mcap/writer.go +++ b/go/mcap/writer.go @@ -4,7 +4,6 @@ import ( "bytes" "errors" "fmt" - "hash/crc32" "io" "math" "sort" @@ -16,6 +15,10 @@ import ( // ErrUnknownSchema is returned when a schema ID is not known to the writer. var ErrUnknownSchema = errors.New("unknown schema") +// ErrAttachmentDataSizeIncorrect is returned when the length of a written +// attachment does not match the length supplied. +var ErrAttachmentDataSizeIncorrect = errors.New("attachment content length incorrect") + // Writer is a writer for the MCAP format. type Writer struct { // Statistics collected over the course of the recording. @@ -245,27 +248,55 @@ func (w *Writer) WriteMessageIndex(idx *MessageIndex) error { // contain auxiliary artifacts such as text, core dumps, calibration data, or // other arbitrary data. Attachment records must not appear within a chunk. func (w *Writer) WriteAttachment(a *Attachment) error { - msglen := 4 + len(a.Name) + 8 + 8 + 4 + len(a.MediaType) + 8 + len(a.Data) + 4 - w.ensureSized(msglen) - offset := putUint64(w.msg, a.LogTime) + bufferLen := 1 + // opcode + 8 + // record length + 8 + // log time + 8 + // create time + 4 + len(a.Name) + // name + 4 + len(a.MediaType) + // media type + 8 // content length + w.ensureSized(bufferLen) + + offset, err := putByte(w.msg, byte(OpAttachment)) + if err != nil { + return err + } + offset += putUint64(w.msg[offset:], uint64(bufferLen)+a.DataSize+4-9) + offset += putUint64(w.msg[offset:], a.LogTime) offset += putUint64(w.msg[offset:], a.CreateTime) offset += putPrefixedString(w.msg[offset:], a.Name) offset += putPrefixedString(w.msg[offset:], a.MediaType) - offset += putUint64(w.msg[offset:], uint64(len(a.Data))) - offset += copy(w.msg[offset:], a.Data) - crc := crc32.ChecksumIEEE(w.msg[:offset]) - offset += putUint32(w.msg[offset:], crc) + offset += putUint64(w.msg[offset:], a.DataSize) + attachmentOffset := w.w.Size() - c, err := w.writeRecord(w.w, OpAttachment, w.msg[:offset]) + // leading 9 bytes not included in CRC + _, err = w.w.Write(w.msg[:9]) if err != nil { return err } + crcWriter := newCRCWriter(w.w) + _, err = crcWriter.Write(w.msg[9:offset]) + if err != nil { + return fmt.Errorf("failed to write attachment metadata: %w", err) + } + bytesWritten, err := io.Copy(crcWriter, a.Data) + if err != nil { + return fmt.Errorf("failed to write attachment data: %w", err) + } + if uint64(bytesWritten) != a.DataSize { + return ErrAttachmentDataSizeIncorrect + } + putUint32(w.msg[:4], crcWriter.Checksum()) + _, err = w.w.Write(w.msg[:4]) + if err != nil { + return fmt.Errorf("failed to write attachment crc: %w", err) + } w.AttachmentIndexes = append(w.AttachmentIndexes, &AttachmentIndex{ Offset: attachmentOffset, - Length: uint64(c), + Length: uint64(bufferLen) + a.DataSize + 4, LogTime: a.LogTime, CreateTime: a.CreateTime, - DataSize: uint64(len(a.Data)), + DataSize: a.DataSize, Name: a.Name, MediaType: a.MediaType, }) diff --git a/go/mcap/writer_test.go b/go/mcap/writer_test.go index 3317165941..f4fb769ec1 100644 --- a/go/mcap/writer_test.go +++ b/go/mcap/writer_test.go @@ -28,10 +28,10 @@ func TestMCAPReadWrite(t *testing.T) { assert.Nil(t, err) // body of the header is the profile, followed by the metadata map offset := 0 - profile, offset, err := readPrefixedString(record, offset) + profile, offset, err := getPrefixedString(record, offset) assert.Nil(t, err) assert.Equal(t, "ros1", profile) - library, _, err := readPrefixedString(record, offset) + library, _, err := getPrefixedString(record, offset) assert.Nil(t, err) assert.Equal(t, "libfoo v0", library) assert.Equal(t, TokenHeader, tokenType) @@ -118,13 +118,15 @@ func TestOutputDeterminism(t *testing.T) { Name: "file.jpg", LogTime: 0, MediaType: "image/jpeg", - Data: []byte{0x01, 0x02, 0x03, 0x04}, + DataSize: 4, + Data: bytes.NewReader([]byte{0x01, 0x02, 0x03, 0x04}), })) assert.Nil(t, w.WriteAttachment(&Attachment{ Name: "file2.jpg", LogTime: 0, MediaType: "image/jpeg", - Data: []byte{0x01, 0x02, 0x03, 0x04}, + DataSize: 4, + Data: bytes.NewReader([]byte{0x01, 0x02, 0x03, 0x04}), })) assert.Nil(t, w.Close()) if i == 0 { @@ -312,7 +314,8 @@ func TestIndexStructures(t *testing.T) { LogTime: 100, CreateTime: 99, MediaType: "image/jpeg", - Data: []byte{0x01, 0x02, 0x03, 0x04}, + DataSize: 4, + Data: bytes.NewReader([]byte{0x01, 0x02, 0x03, 0x04}), })) assert.Nil(t, w.Close()) t.Run("chunk indexes correct", func(t *testing.T) { @@ -386,7 +389,8 @@ func TestStatistics(t *testing.T) { Name: "file.jpg", LogTime: 0, MediaType: "image/jpeg", - Data: []byte{0x01, 0x02, 0x03, 0x04}, + DataSize: 4, + Data: bytes.NewReader([]byte{0x01, 0x02, 0x03, 0x04}), })) assert.Nil(t, w.Close()) assert.Equal(t, uint64(1000), w.Statistics.MessageCount) @@ -441,7 +445,8 @@ func TestUnchunkedReadWrite(t *testing.T) { Name: "file.jpg", LogTime: 0, MediaType: "image/jpeg", - Data: []byte{0x01, 0x02, 0x03, 0x04}, + DataSize: 4, + Data: bytes.NewReader([]byte{0x01, 0x02, 0x03, 0x04}), }) assert.Nil(t, err) assert.Nil(t, w.Close()) @@ -461,7 +466,6 @@ func TestUnchunkedReadWrite(t *testing.T) { TokenSchema, TokenChannel, TokenMessage, - TokenAttachment, TokenDataEnd, TokenSchema, TokenChannel, @@ -506,10 +510,10 @@ func TestLibraryString(t *testing.T) { assert.Nil(t, err) assert.Equal(t, tokenType, TokenHeader) offset := 0 - profile, offset, err := readPrefixedString(record, offset) + profile, offset, err := getPrefixedString(record, offset) assert.Nil(t, err) assert.Equal(t, "ros1", profile) - library, _, err := readPrefixedString(record, offset) + library, _, err := getPrefixedString(record, offset) assert.Nil(t, err) assert.Equal(t, library, c.output) }) @@ -618,3 +622,29 @@ func BenchmarkWriterAllocs(b *testing.B) { }) } } + +func TestWriteAttachment(t *testing.T) { + cases := []struct { + assertion string + attachment *Attachment + err error + }{ + { + "incorrect content size", + &Attachment{ + DataSize: 2, + Data: bytes.NewReader([]byte{0x01, 0x02, 0x03, 0x04}), + }, + ErrAttachmentDataSizeIncorrect, + }, + } + for _, c := range cases { + t.Run(c.assertion, func(t *testing.T) { + buf := &bytes.Buffer{} + writer, err := NewWriter(buf, &WriterOptions{}) + assert.Nil(t, err) + err = writer.WriteAttachment(c.attachment) + assert.ErrorIs(t, err, c.err) + }) + } +}