Skip to content

Commit

Permalink
[refactor] - Update Write method signature in contentWriter interface (
Browse files Browse the repository at this point in the history
…#2721)

* Update write method in contentWriter interface

* fix lint
  • Loading branch information
ahrav authored Apr 23, 2024
1 parent 642fce5 commit 4a5fbf8
Show file tree
Hide file tree
Showing 8 changed files with 43 additions and 74 deletions.
10 changes: 5 additions & 5 deletions pkg/gitparse/gitparse.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ const (
// based on performance needs or resource constraints, providing a unified way to interact with different content types.
type contentWriter interface { // Write appends data to the content storage.
// Write appends data to the content storage.
Write(ctx context.Context, data []byte) (int, error)
Write(data []byte) (int, error)
// ReadCloser provides a reader for accessing stored content.
ReadCloser() (io.ReadCloser, error)
// CloseForWriting closes the content storage for writing.
Expand Down Expand Up @@ -92,8 +92,8 @@ func (d *Diff) Len() int { return d.contentWriter.Len() }
func (d *Diff) ReadCloser() (io.ReadCloser, error) { return d.contentWriter.ReadCloser() }

// write delegates to the contentWriter.
func (d *Diff) write(ctx context.Context, p []byte) error {
_, err := d.contentWriter.Write(ctx, p)
func (d *Diff) write(p []byte) error {
_, err := d.contentWriter.Write(p)
return err
}

Expand Down Expand Up @@ -483,15 +483,15 @@ func (c *Parser) FromReader(ctx context.Context, stdOut io.Reader, diffChan chan
latestState = HunkContentLine
}
// TODO: Why do we care about this? It creates empty lines in the diff. If there are no plusLines, it's just newlines.
if err := currentDiff.write(ctx, []byte("\n")); err != nil {
if err := currentDiff.write([]byte("\n")); err != nil {
ctx.Logger().Error(err, "failed to write to diff")
}
case isHunkPlusLine(latestState, line):
if latestState != HunkContentLine {
latestState = HunkContentLine
}

if err := currentDiff.write(ctx, line[1:]); err != nil {
if err := currentDiff.write(line[1:]); err != nil {
ctx.Logger().Error(err, "failed to write to diff")
}
// NoOp. We only care about additions.
Expand Down
4 changes: 2 additions & 2 deletions pkg/gitparse/gitparse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -694,7 +694,7 @@ func TestCommitParsing(t *testing.T) {
func newBufferedFileWriterWithContent(content []byte) *bufferedfilewriter.BufferedFileWriter {
ctx := context.Background()
b := bufferedfilewriter.New(ctx)
_, err := b.Write(ctx, content) // Using Write method to add content
_, err := b.Write(content) // Using Write method to add content
if err != nil {
panic(err)
}
Expand All @@ -704,7 +704,7 @@ func newBufferedFileWriterWithContent(content []byte) *bufferedfilewriter.Buffer
func newBufferWithContent(content []byte) *bufferwriter.BufferWriter {
ctx := context.Background()
b := bufferwriter.New(ctx)
_, _ = b.Write(ctx, content) // Using Write method to add content
_, _ = b.Write(content) // Using Write method to add content
return b
}

Expand Down
17 changes: 2 additions & 15 deletions pkg/writers/buffer/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,34 +118,21 @@ func (b *Buffer) recordGrowth(size int) {
}

// Write date to the buffer.
func (b *Buffer) Write(ctx context.Context, data []byte) (int, error) {
func (b *Buffer) Write(data []byte) (int, error) {
if b.Buffer == nil {
// This case should ideally never occur if buffers are properly managed.
ctx.Logger().Error(fmt.Errorf("buffer is nil, initializing a new buffer"), "action", "initializing_new_buffer")
b.Buffer = bytes.NewBuffer(make([]byte, 0, defaultBufferSize))
b.resetMetric()
}

size := len(data)
bufferLength := b.Buffer.Len()
totalSizeNeeded := bufferLength + size
// If the total size is within the threshold, write to the buffer.
ctx.Logger().V(4).Info(
"writing to buffer",
"data_size", size,
"content_size", bufferLength,
)

// If the total size is within the threshold, write to the buffer.
availableSpace := b.Buffer.Cap() - bufferLength
growSize := totalSizeNeeded - bufferLength
if growSize > availableSpace {
ctx.Logger().V(4).Info(
"buffer size exceeded, growing buffer",
"current_size", bufferLength,
"new_size", totalSizeNeeded,
"available_space", availableSpace,
"grow_size", growSize,
)
// We are manually growing the buffer so we can track the growth via metrics.
// Knowing the exact data size, we directly resize to fit it, rather than exponential growth
// which may require multiple allocations and copies if the size required is much larger
Expand Down
2 changes: 1 addition & 1 deletion pkg/writers/buffer/buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func TestBufferWrite(t *testing.T) {
buf := &Buffer{Buffer: bytes.NewBuffer(make([]byte, 0, tc.initialCapacity))}
totalWritten := 0
for _, data := range tc.writeDataSequence {
n, err := buf.Write(context.Background(), data)
n, err := buf.Write(data)
assert.NoError(t, err)

totalWritten += n
Expand Down
14 changes: 3 additions & 11 deletions pkg/writers/buffer_writer/bufferwriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,8 @@ func New(ctx context.Context) *BufferWriter {
return &BufferWriter{buf: buf, state: writeOnly, bufPool: bufferPool}
}

// Write delegates the writing operation to the underlying bytes.Buffer, ignoring the context.
// The context is included to satisfy the contentWriter interface, allowing for future extensions
// where context handling might be necessary (e.g., for timeouts or cancellation).
func (b *BufferWriter) Write(ctx context.Context, data []byte) (int, error) {
// Write delegates the writing operation to the underlying bytes.Buffer.
func (b *BufferWriter) Write(data []byte) (int, error) {
if b.state != writeOnly {
return 0, fmt.Errorf("buffer must be in write-only mode to write data; current state: %d", b.state)
}
Expand All @@ -67,14 +65,8 @@ func (b *BufferWriter) Write(ctx context.Context, data []byte) (int, error) {
bufferLength := uint64(b.buf.Len())
b.metrics.recordDataProcessed(bufferLength, time.Since(start))

ctx.Logger().V(4).Info(
"write complete",
"data_size", size,
"buffer_len", bufferLength,
"buffer_size", b.buf.Cap(),
)
}(start)
return b.buf.Write(ctx, data)
return b.buf.Write(data)
}

// ReadCloser provides a read-closer for the buffer's content.
Expand Down
4 changes: 2 additions & 2 deletions pkg/writers/buffer_writer/bufferwriter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func TestBufferWriterWrite(t *testing.T) {
writer := New(context.Background())
writer.state = tc.initialState

_, err := writer.Write(context.Background(), tc.input)
_, err := writer.Write(tc.input)
if tc.expectedError {
assert.Error(t, err)
} else {
Expand Down Expand Up @@ -121,7 +121,7 @@ func TestBufferWriterString(t *testing.T) {
{
name: "String with data",
prepareBuffer: func(bw *BufferWriter) {
_, _ = bw.Write(context.Background(), []byte("test data"))
_, _ = bw.Write([]byte("test data"))
},
expectedStr: "test data",
expectedError: false,
Expand Down
18 changes: 4 additions & 14 deletions pkg/writers/buffered_file_writer/bufferedfilewriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,10 @@ func (bufferedFileWriterMetrics) recordDataProcessed(size uint64, dur time.Durat
totalWriteDuration.Add(float64(dur.Microseconds()))
}

func (bufferedFileWriterMetrics) recordDiskWrite(ctx context.Context, f *os.File) {
func (bufferedFileWriterMetrics) recordDiskWrite(f *os.File) {
diskWriteCount.Inc()
size, err := f.Stat()
if err != nil {
ctx.Logger().Error(err, "failed to get file size for metric")
return
}
fileSizeHistogram.Observe(float64(size.Size()))
Expand Down Expand Up @@ -122,7 +121,7 @@ func (w *BufferedFileWriter) String() (string, error) {
}

// Write writes data to the buffer or a file, depending on the size.
func (w *BufferedFileWriter) Write(ctx context.Context, data []byte) (int, error) {
func (w *BufferedFileWriter) Write(data []byte) (int, error) {
if w.state != writeOnly {
return 0, fmt.Errorf("BufferedFileWriter must be in write-only mode to write")
}
Expand All @@ -132,19 +131,12 @@ func (w *BufferedFileWriter) Write(ctx context.Context, data []byte) (int, error
start := time.Now()
defer func(start time.Time) {
w.metrics.recordDataProcessed(size, time.Since(start))

w.size += size
ctx.Logger().V(4).Info(
"write complete",
"data_size", size,
"content_size", bufferLength,
"total_size", w.size,
)
}(start)

totalSizeNeeded := uint64(bufferLength) + size
if totalSizeNeeded <= w.threshold {
return w.buf.Write(ctx, data)
return w.buf.Write(data)
}

// Switch to file writing if threshold is exceeded.
Expand All @@ -157,19 +149,17 @@ func (w *BufferedFileWriter) Write(ctx context.Context, data []byte) (int, error

w.filename = file.Name()
w.file = file
w.metrics.recordDiskWrite(ctx, file)
w.metrics.recordDiskWrite(file)

// Transfer existing data in buffer to the file, then clear the buffer.
// This ensures all the data is in one place - either entirely in the buffer or the file.
if bufferLength > 0 {
ctx.Logger().V(4).Info("writing buffer to file", "content_size", bufferLength)
if _, err := w.buf.WriteTo(w.file); err != nil {
return 0, err
}
w.bufPool.Put(w.buf)
}
}
ctx.Logger().V(4).Info("writing to file", "data_size", size)

return w.file.Write(data)
}
Expand Down
Loading

0 comments on commit 4a5fbf8

Please sign in to comment.