Skip to content

Commit

Permalink
Reduce memory overhead of decision logs
Browse files Browse the repository at this point in the history
Fixes #705

Signed-off-by: Ashutosh Narkar <anarkar4387@gmail.com>
  • Loading branch information
ashutosh-narkar authored and tsandall committed Jun 15, 2018
1 parent db708b8 commit 1b33bf3
Show file tree
Hide file tree
Showing 6 changed files with 285 additions and 138 deletions.
17 changes: 10 additions & 7 deletions plugins/logs/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ type logBuffer struct {
}

type logBufferElem struct {
bs []byte
isChunk bool
bs []byte
}

func newLogBuffer(limit int64) *logBuffer {
Expand All @@ -30,7 +29,7 @@ func newLogBuffer(limit int64) *logBuffer {
}
}

func (lb *logBuffer) Push(bs []byte, isChunk bool) (dropped int) {
func (lb *logBuffer) Push(bs []byte) (dropped int) {
size := int64(len(bs))

for elem := lb.l.Front(); elem != nil && (lb.usage+size > lb.limit); elem = elem.Next() {
Expand All @@ -40,20 +39,24 @@ func (lb *logBuffer) Push(bs []byte, isChunk bool) (dropped int) {
dropped++
}

elem := logBufferElem{bs, isChunk}
elem := logBufferElem{bs}

lb.l.PushBack(elem)
lb.usage += size
return dropped
}

func (lb *logBuffer) Pop() ([]byte, bool) {
func (lb *logBuffer) Pop() []byte {
elem := lb.l.Front()
if elem != nil {
e := elem.Value.(logBufferElem)
lb.usage -= int64(len(e.bs))
lb.l.Remove(elem)
return e.bs, e.isChunk
return e.bs
}
return nil, false
return nil
}

func (lb *logBuffer) Len() int {
return lb.l.Len()
}
18 changes: 7 additions & 11 deletions plugins/logs/buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,44 +13,40 @@ func TestLogBuffer(t *testing.T) {

buffer := newLogBuffer(int64(20)) // 20 byte limit for test purposes

dropped := buffer.Push(make([]byte, 20), false)
dropped := buffer.Push(make([]byte, 20))
if dropped != 0 {
t.Fatal("Expected dropped to be zero")
}

bs, chunk := buffer.Pop()
bs := buffer.Pop()
if len(bs) != 20 {
t.Fatal("Expected buffer size to be 20")
} else if chunk {
t.Fatal("Expected !chunk")
}

bs, _ = buffer.Pop()
bs = buffer.Pop()
if bs != nil {
t.Fatal("Expected buffer to be nil")
}

dropped = buffer.Push(bytes.Repeat([]byte(`1`), 10), false)
dropped = buffer.Push(bytes.Repeat([]byte(`1`), 10))
if dropped != 0 {
t.Fatal("Expected dropped to be zero")
}

dropped = buffer.Push(bytes.Repeat([]byte(`2`), 10), true)
dropped = buffer.Push(bytes.Repeat([]byte(`2`), 10))
if dropped != 0 {
t.Fatal("Expected dropped to be zero")
}

dropped = buffer.Push(bytes.Repeat([]byte(`3`), 10), false)
dropped = buffer.Push(bytes.Repeat([]byte(`3`), 10))
if dropped != 1 {
t.Fatal("Expected dropped to be 1")
}

bs, chunk = buffer.Pop()
bs = buffer.Pop()
exp := bytes.Repeat([]byte(`2`), 10)
if !bytes.Equal(bs, exp) {
t.Fatalf("Expected %v but got %v", exp, bs)
} else if !chunk {
t.Fatal("Expected chunk to be true")
}

if buffer.usage != 10 {
Expand Down
11 changes: 10 additions & 1 deletion plugins/logs/encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package logs
import (
"bytes"
"compress/gzip"
"encoding/json"
"fmt"
)

Expand All @@ -25,10 +26,17 @@ func newChunkEncoder(limit int64) *chunkEncoder {
limit: limit,
}
enc.reset()

return enc
}

func (enc *chunkEncoder) Write(bs []byte) (result []byte, err error) {
func (enc *chunkEncoder) Write(event EventV1) (result []byte, err error) {
var buf bytes.Buffer
if err := json.NewEncoder(&buf).Encode(event); err != nil {
return nil, err
}

bs := buf.Bytes()

if len(bs) == 0 {
return nil, nil
Expand All @@ -40,6 +48,7 @@ func (enc *chunkEncoder) Write(bs []byte) (result []byte, err error) {
if err := enc.writeClose(); err != nil {
return nil, err
}

result = enc.reset()
}

Expand Down
38 changes: 19 additions & 19 deletions plugins/logs/encoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,30 +5,35 @@
package logs

import (
"bytes"
"testing"
"time"
)

func TestChunkEncoder(t *testing.T) {

enc := newChunkEncoder(1000)

bs, err := enc.Write(bytes.Repeat([]byte(`1`), 500))
if bs != nil || err != nil {
t.Fatalf("Unexpected error or chunk produced: err: %v", err)
var result interface{} = false
var expInput interface{} = map[string]interface{}{"method": "GET"}
ts, err := time.Parse(time.RFC3339Nano, "2018-01-01T12:00:00.123456Z")
if err != nil {
panic(err)
}

bs, err = enc.Write(bytes.Repeat([]byte(`1`), 498))
if bs != nil || err != nil {
t.Fatalf("Unexpected error or chunk produced: err: %v", err)
event := EventV1{
Labels: map[string]string{
"id": "test-instance-id",
"app": "example-app",
},
Revision: "a",
DecisionID: "a",
Path: "foo/bar",
Input: &expInput,
Result: &result,
RequestedBy: "test",
Timestamp: ts,
}

bs, err = enc.Write(bytes.Repeat([]byte(`1`), 1))
if bs == nil || err != nil {
t.Fatalf("Unexpected error or NO chunk produced: err: %v", err)
}

bs, err = enc.Write(bytes.Repeat([]byte(`1`), 1))
bs, err := enc.Write(event)
if bs != nil || err != nil {
t.Fatalf("Unexpected error or chunk produced: err: %v", err)
}
Expand All @@ -43,9 +48,4 @@ func TestChunkEncoder(t *testing.T) {
t.Fatalf("Unexpected error chunk produced: err: %v", err)
}

bs, err = enc.Write(nil)
if bs != nil || err != nil || enc.bytesWritten != 0 {
t.Fatalf("Unexpected error chunk produced or bytesWritten != 0: err: %v, bytesWritten: %v", err, enc.bytesWritten)
}

}
Loading

0 comments on commit 1b33bf3

Please sign in to comment.