diff --git a/plugins/logs/buffer.go b/plugins/logs/buffer.go index 281aca2376..8479177e36 100644 --- a/plugins/logs/buffer.go +++ b/plugins/logs/buffer.go @@ -18,8 +18,7 @@ type logBuffer struct { } type logBufferElem struct { - bs []byte - isChunk bool + bs []byte } func newLogBuffer(limit int64) *logBuffer { @@ -30,7 +29,7 @@ func newLogBuffer(limit int64) *logBuffer { } } -func (lb *logBuffer) Push(bs []byte, isChunk bool) (dropped int) { +func (lb *logBuffer) Push(bs []byte) (dropped int) { size := int64(len(bs)) for elem := lb.l.Front(); elem != nil && (lb.usage+size > lb.limit); elem = elem.Next() { @@ -40,20 +39,24 @@ func (lb *logBuffer) Push(bs []byte, isChunk bool) (dropped int) { dropped++ } - elem := logBufferElem{bs, isChunk} + elem := logBufferElem{bs} lb.l.PushBack(elem) lb.usage += size return dropped } -func (lb *logBuffer) Pop() ([]byte, bool) { +func (lb *logBuffer) Pop() []byte { elem := lb.l.Front() if elem != nil { e := elem.Value.(logBufferElem) lb.usage -= int64(len(e.bs)) lb.l.Remove(elem) - return e.bs, e.isChunk + return e.bs } - return nil, false + return nil +} + +func (lb *logBuffer) Len() int { + return lb.l.Len() } diff --git a/plugins/logs/buffer_test.go b/plugins/logs/buffer_test.go index 1e0ba46631..b580d94385 100644 --- a/plugins/logs/buffer_test.go +++ b/plugins/logs/buffer_test.go @@ -13,44 +13,40 @@ func TestLogBuffer(t *testing.T) { buffer := newLogBuffer(int64(20)) // 20 byte limit for test purposes - dropped := buffer.Push(make([]byte, 20), false) + dropped := buffer.Push(make([]byte, 20)) if dropped != 0 { t.Fatal("Expected dropped to be zero") } - bs, chunk := buffer.Pop() + bs := buffer.Pop() if len(bs) != 20 { t.Fatal("Expected buffer size to be 20") - } else if chunk { - t.Fatal("Expected !chunk") } - bs, _ = buffer.Pop() + bs = buffer.Pop() if bs != nil { t.Fatal("Expected buffer to be nil") } - dropped = buffer.Push(bytes.Repeat([]byte(`1`), 10), false) + dropped = buffer.Push(bytes.Repeat([]byte(`1`), 10)) if dropped != 0 { t.Fatal("Expected dropped to be zero") } - dropped = buffer.Push(bytes.Repeat([]byte(`2`), 10), true) + dropped = buffer.Push(bytes.Repeat([]byte(`2`), 10)) if dropped != 0 { t.Fatal("Expected dropped to be zero") } - dropped = buffer.Push(bytes.Repeat([]byte(`3`), 10), false) + dropped = buffer.Push(bytes.Repeat([]byte(`3`), 10)) if dropped != 1 { t.Fatal("Expected dropped to be 1") } - bs, chunk = buffer.Pop() + bs = buffer.Pop() exp := bytes.Repeat([]byte(`2`), 10) if !bytes.Equal(bs, exp) { t.Fatalf("Expected %v but got %v", exp, bs) - } else if !chunk { - t.Fatal("Expected chunk to be true") } if buffer.usage != 10 { diff --git a/plugins/logs/encoder.go b/plugins/logs/encoder.go index 00c73ce1a6..5c74185633 100644 --- a/plugins/logs/encoder.go +++ b/plugins/logs/encoder.go @@ -7,6 +7,7 @@ package logs import ( "bytes" "compress/gzip" + "encoding/json" "fmt" ) @@ -25,10 +26,17 @@ func newChunkEncoder(limit int64) *chunkEncoder { limit: limit, } enc.reset() + return enc } -func (enc *chunkEncoder) Write(bs []byte) (result []byte, err error) { +func (enc *chunkEncoder) Write(event EventV1) (result []byte, err error) { + var buf bytes.Buffer + if err := json.NewEncoder(&buf).Encode(event); err != nil { + return nil, err + } + + bs := buf.Bytes() if len(bs) == 0 { return nil, nil @@ -40,6 +48,7 @@ func (enc *chunkEncoder) Write(bs []byte) (result []byte, err error) { if err := enc.writeClose(); err != nil { return nil, err } + result = enc.reset() } diff --git a/plugins/logs/encoder_test.go b/plugins/logs/encoder_test.go index 7ed4314188..5a10db2fef 100644 --- a/plugins/logs/encoder_test.go +++ b/plugins/logs/encoder_test.go @@ -5,30 +5,35 @@ package logs import ( - "bytes" "testing" + "time" ) func TestChunkEncoder(t *testing.T) { enc := newChunkEncoder(1000) - - bs, err := enc.Write(bytes.Repeat([]byte(`1`), 500)) - if bs != nil || err != nil { - t.Fatalf("Unexpected error or chunk produced: err: %v", err) + var result interface{} = false + var expInput interface{} = map[string]interface{}{"method": "GET"} + ts, err := time.Parse(time.RFC3339Nano, "2018-01-01T12:00:00.123456Z") + if err != nil { + panic(err) } - bs, err = enc.Write(bytes.Repeat([]byte(`1`), 498)) - if bs != nil || err != nil { - t.Fatalf("Unexpected error or chunk produced: err: %v", err) + event := EventV1{ + Labels: map[string]string{ + "id": "test-instance-id", + "app": "example-app", + }, + Revision: "a", + DecisionID: "a", + Path: "foo/bar", + Input: &expInput, + Result: &result, + RequestedBy: "test", + Timestamp: ts, } - bs, err = enc.Write(bytes.Repeat([]byte(`1`), 1)) - if bs == nil || err != nil { - t.Fatalf("Unexpected error or NO chunk produced: err: %v", err) - } - - bs, err = enc.Write(bytes.Repeat([]byte(`1`), 1)) + bs, err := enc.Write(event) if bs != nil || err != nil { t.Fatalf("Unexpected error or chunk produced: err: %v", err) } @@ -43,9 +48,4 @@ func TestChunkEncoder(t *testing.T) { t.Fatalf("Unexpected error chunk produced: err: %v", err) } - bs, err = enc.Write(nil) - if bs != nil || err != nil || enc.bytesWritten != 0 { - t.Fatalf("Unexpected error chunk produced or bytesWritten != 0: err: %v, bytesWritten: %v", err, enc.bytesWritten) - } - } diff --git a/plugins/logs/plugin.go b/plugins/logs/plugin.go index 88170500a3..7d2880aeff 100644 --- a/plugins/logs/plugin.go +++ b/plugins/logs/plugin.go @@ -6,9 +6,7 @@ package logs import ( - "bytes" "context" - "encoding/json" "fmt" "math/rand" "net/http" @@ -17,6 +15,7 @@ import ( "time" "github.com/open-policy-agent/opa/plugins" + "github.com/open-policy-agent/opa/plugins/rest" "github.com/open-policy-agent/opa/server" "github.com/open-policy-agent/opa/util" "github.com/pkg/errors" @@ -40,8 +39,8 @@ const ( minRetryDelay = time.Millisecond * 100 defaultMinDelaySeconds = int64(300) defaultMaxDelaySeconds = int64(600) - defaultUploadSizeLimitBytes = int64(32768) // 32KB limit - defaultBufferSizeLimitBytes = int64(1048576) // 1MB limit + defaultUploadSizeLimitBytes = int64(32768) // 32KB limit + defaultBufferSizeLimitBytes = int64(104857600) // 100MB limit ) // ReportingConfig represents configuration for the plugin's reporting behaviour. @@ -121,6 +120,7 @@ type Plugin struct { manager *plugins.Manager config Config buffer *logBuffer + enc *chunkEncoder mtx sync.Mutex stop chan chan struct{} } @@ -143,6 +143,7 @@ func New(config []byte, manager *plugins.Manager) (*Plugin, error) { config: parsedConfig, stop: make(chan chan struct{}), buffer: newLogBuffer(*parsedConfig.Reporting.BufferSizeLimitBytes), + enc: newChunkEncoder(*parsedConfig.Reporting.UploadSizeLimitBytes), } return plugin, nil @@ -163,9 +164,6 @@ func (p *Plugin) Stop(ctx context.Context) { // Log appends a decision log event to the buffer for uploading. func (p *Plugin) Log(ctx context.Context, decision *server.Info) { - - var buf bytes.Buffer - path := strings.Replace(strings.TrimPrefix(decision.Query, "data."), ".", "/", -1) event := EventV1{ @@ -179,16 +177,17 @@ func (p *Plugin) Log(ctx context.Context, decision *server.Info) { Timestamp: decision.Timestamp, } - if err := json.NewEncoder(&buf).Encode(event); err != nil { - p.logError("Log serialization failed: %v.", err) + p.mtx.Lock() + defer p.mtx.Unlock() + + result, err := p.enc.Write(event) + if err != nil { + p.logError("Log encoding failed: %v.", err) return } - p.mtx.Lock() - defer p.mtx.Unlock() - dropped := p.buffer.Push(buf.Bytes(), false) - if dropped > 0 { - p.logInfo("Dropped %v events from buffer. Reduce reporting interval or increase buffer size.") + if result != nil { + p.bufferChunk(p.buffer, result) } } @@ -238,67 +237,75 @@ func (p *Plugin) loop() { } func (p *Plugin) oneShot(ctx context.Context) (ok bool, err error) { - - chunks, err := p.chunkBuffer() + // Make a local copy of the plugins's encoder and buffer and create + // a new encoder and buffer. This is needed as locking the buffer for + // the upload duration will block policy evaluation and result in + // increased latency for OPA clients + p.mtx.Lock() + oldChunkEnc := p.enc + oldBuffer := p.buffer + p.buffer = newLogBuffer(*p.config.Reporting.BufferSizeLimitBytes) + p.enc = newChunkEncoder(*p.config.Reporting.UploadSizeLimitBytes) + p.mtx.Unlock() + + // Along with uploading the compressed events in the buffer + // to the remote server, flush any pending compressed data to the + // underlying writer and add to the buffer. + chunk, err := oldChunkEnc.Flush() if err != nil { - return false, errors.Wrap(err, "Log processing failed") + return false, err + } else if chunk != nil { + p.bufferChunk(oldBuffer, chunk) } - var chunkIndex int - - defer func() { - if err != nil { - p.requeueChunks(chunks, chunkIndex) - } - }() - - for chunkIndex = range chunks { - - resp, err := p.manager.Client(p.config.Service). - WithHeader("Content-Type", "application/json"). - WithHeader("Content-Encoding", "gzip"). - WithBytes(chunks[chunkIndex]). - Do(ctx, "POST", fmt.Sprintf("/logs/%v", p.config.PartitionName)) + if oldBuffer.Len() == 0 { + return false, nil + } + for bs := oldBuffer.Pop(); bs != nil; bs = oldBuffer.Pop() { + err := uploadChunk(ctx, p.manager.Client(p.config.Service), p.config.PartitionName, bs) if err != nil { - return false, errors.Wrap(err, "Log upload failed") - } - - defer util.Close(resp) - - switch resp.StatusCode { - case http.StatusOK: - break - case http.StatusNotFound: - return false, fmt.Errorf("Log upload failed, server replied with not found") - case http.StatusUnauthorized: - return false, fmt.Errorf("Log upload failed, server replied with not authorized") - default: - return false, fmt.Errorf("Log upload failed, server replied with HTTP %v", resp.StatusCode) + // requeue the chunk + p.mtx.Lock() + p.bufferChunk(p.buffer, bs) + p.mtx.Unlock() + return false, err } } - return len(chunks) > 0, nil + return true, nil } -func (p *Plugin) chunkBuffer() ([][]byte, error) { - p.mtx.Lock() - defer p.mtx.Unlock() - return chunk(*p.config.Reporting.UploadSizeLimitBytes, p.buffer) +func (p *Plugin) bufferChunk(buffer *logBuffer, bs []byte) { + dropped := buffer.Push(bs) + if dropped > 0 { + p.logError("Dropped %v chunks from buffer. Reduce reporting interval or increase buffer size.", dropped) + } } -func (p *Plugin) requeueChunks(chunks [][]byte, idx int) { - p.mtx.Lock() - defer p.mtx.Unlock() +func uploadChunk(ctx context.Context, client rest.Client, partitionName string, data []byte) error { - var dropped int + resp, err := client. + WithHeader("Content-Type", "application/json"). + WithHeader("Content-Encoding", "gzip"). + WithBytes(data). + Do(ctx, "POST", fmt.Sprintf("/logs/%v", partitionName)) - for ; idx < len(chunks); idx++ { - dropped += p.buffer.Push(chunks[idx], true) + if err != nil { + return errors.Wrap(err, "Log upload failed") } - if dropped > 0 { - p.logInfo("Dropped %v events from buffer. Reduce reporting interval or increase buffer size.") + defer util.Close(resp) + + switch resp.StatusCode { + case http.StatusOK: + return nil + case http.StatusNotFound: + return fmt.Errorf("Log upload failed, server replied with not found") + case http.StatusUnauthorized: + return fmt.Errorf("Log upload failed, server replied with not authorized") + default: + return fmt.Errorf("Log upload failed, server replied with HTTP %v", resp.StatusCode) } } @@ -319,31 +326,3 @@ func (p *Plugin) logrusFields() logrus.Fields { "plugin": "decision_logs", } } - -func chunk(limit int64, buffer *logBuffer) ([][]byte, error) { - - enc := newChunkEncoder(limit) - chunks := [][]byte{} - - for bs, isChunk := buffer.Pop(); bs != nil; bs, isChunk = buffer.Pop() { - if !isChunk { - chunk, err := enc.Write(bs) - if err != nil { - return nil, err - } else if chunk != nil { - chunks = append(chunks, chunk) - } - } else { - chunks = append(chunks, bs) - } - } - - chunk, err := enc.Flush() - if err != nil { - return nil, err - } else if chunk != nil { - chunks = append(chunks, chunk) - } - - return chunks, nil -} diff --git a/plugins/logs/plugin_test.go b/plugins/logs/plugin_test.go index d410abb68d..2ed5425a49 100644 --- a/plugins/logs/plugin_test.go +++ b/plugins/logs/plugin_test.go @@ -20,13 +20,13 @@ import ( "github.com/open-policy-agent/opa/storage/inmem" ) -func TestPluginStart(t *testing.T) { +func TestPluginStartSameInput(t *testing.T) { ctx := context.Background() fixture := newTestFixture(t) defer fixture.server.stop() - fixture.server.ch = make(chan []EventV1, 2) + fixture.server.ch = make(chan []EventV1, 4) var result interface{} = false ts, err := time.Parse(time.RFC3339Nano, "2018-01-01T12:00:00.123456Z") @@ -34,9 +34,9 @@ func TestPluginStart(t *testing.T) { panic(err) } - for i := 0; i < 154; i++ { // first chunk fits exactly n-1 events + for i := 0; i < 500; i++ { fixture.plugin.Log(ctx, &server.Info{ - Revision: "a", + Revision: fmt.Sprint(i), DecisionID: fmt.Sprint(i), Query: "data.tda.bar", Input: map[string]interface{}{"method": "GET"}, @@ -53,11 +53,15 @@ func TestPluginStart(t *testing.T) { chunk1 := <-fixture.server.ch chunk2 := <-fixture.server.ch - expLen1 := 153 - expLen2 := 1 - - if len(chunk1) != expLen1 || len(chunk2) != expLen2 { - t.Fatalf("Expected chunk lens %v and %v but got: %v and %v", expLen1, expLen2, len(chunk1), len(chunk2)) + chunk3 := <-fixture.server.ch + chunk4 := <-fixture.server.ch + expLen1 := 152 + expLen2 := 151 + expLen3 := 151 + expLen4 := 46 + + if len(chunk1) != expLen1 || len(chunk2) != expLen2 || len((chunk3)) != expLen3 || len(chunk4) != expLen4 { + t.Fatalf("Expected chunk lens %v, %v, %v and %v but got: %v, %v, %v and %v", expLen1, expLen2, expLen3, expLen4, len(chunk1), len(chunk2), len(chunk3), len(chunk4)) } var expInput interface{} = map[string]interface{}{"method": "GET"} @@ -67,8 +71,8 @@ func TestPluginStart(t *testing.T) { "id": "test-instance-id", "app": "example-app", }, - Revision: "a", - DecisionID: "153", + Revision: "499", + DecisionID: "499", Path: "tda/bar", Input: &expInput, Result: &result, @@ -76,8 +80,136 @@ func TestPluginStart(t *testing.T) { Timestamp: ts, } - if !reflect.DeepEqual(chunk2[0], exp) { - t.Fatalf("Expected %v but got %v", exp, chunk2[0]) + if !reflect.DeepEqual(chunk4[expLen4-1], exp) { + t.Fatalf("Expected %+v but got %+v", exp, chunk4[expLen4-1]) + } +} + +func TestPluginStartChangingInputValues(t *testing.T) { + ctx := context.Background() + + fixture := newTestFixture(t) + defer fixture.server.stop() + + fixture.server.ch = make(chan []EventV1, 4) + var result interface{} = false + + ts, err := time.Parse(time.RFC3339Nano, "2018-01-01T12:00:00.123456Z") + if err != nil { + panic(err) + } + + var input map[string]interface{} + + for i := 0; i < 500; i++ { + input = map[string]interface{}{"method": getValueForMethod(i), "path": getValueForPath(i), "user": getValueForUser(i)} + + fixture.plugin.Log(ctx, &server.Info{ + Revision: fmt.Sprint(i), + DecisionID: fmt.Sprint(i), + Query: "data.foo.bar", + Input: input, + Results: &result, + RemoteAddr: "test", + Timestamp: ts, + }) + } + + _, err = fixture.plugin.oneShot(ctx) + if err != nil { + t.Fatal(err) + } + + chunk1 := <-fixture.server.ch + chunk2 := <-fixture.server.ch + chunk3 := <-fixture.server.ch + chunk4 := <-fixture.server.ch + expLen1 := 133 + expLen2 := 132 + expLen3 := 132 + expLen4 := 103 + + if len(chunk1) != expLen1 || len(chunk2) != expLen2 || len((chunk3)) != expLen3 || len(chunk4) != expLen4 { + t.Fatalf("Expected chunk lens %v, %v, %v and %v but got: %v, %v, %v and %v", expLen1, expLen2, expLen3, expLen4, len(chunk1), len(chunk2), len(chunk3), len(chunk4)) + } + + var expInput interface{} = input + + exp := EventV1{ + Labels: map[string]string{ + "id": "test-instance-id", + "app": "example-app", + }, + Revision: "499", + DecisionID: "499", + Path: "foo/bar", + Input: &expInput, + Result: &result, + RequestedBy: "test", + Timestamp: ts, + } + + if !reflect.DeepEqual(chunk4[expLen4-1], exp) { + t.Fatalf("Expected %+v but got %+v", exp, chunk4[expLen4-1]) + } +} + +func TestPluginStartChangingInputKeysAndValues(t *testing.T) { + ctx := context.Background() + + fixture := newTestFixture(t) + defer fixture.server.stop() + + fixture.server.ch = make(chan []EventV1, 5) + var result interface{} = false + + ts, err := time.Parse(time.RFC3339Nano, "2018-01-01T12:00:00.123456Z") + if err != nil { + panic(err) + } + + var input map[string]interface{} + + for i := 0; i < 250; i++ { + input = generateInputMap(i) + + fixture.plugin.Log(ctx, &server.Info{ + Revision: fmt.Sprint(i), + DecisionID: fmt.Sprint(i), + Query: "data.foo.bar", + Input: input, + Results: &result, + RemoteAddr: "test", + Timestamp: ts, + }) + } + + _, err = fixture.plugin.oneShot(ctx) + if err != nil { + t.Fatal(err) + } + + <-fixture.server.ch + chunk2 := <-fixture.server.ch + + var expInput interface{} = input + + exp := EventV1{ + Labels: map[string]string{ + "id": "test-instance-id", + "app": "example-app", + }, + Revision: "249", + DecisionID: "249", + Path: "foo/bar", + Input: &expInput, + Result: &result, + RequestedBy: "test", + Timestamp: ts, + } + + if !reflect.DeepEqual(chunk2[len(chunk2)-1], exp) { + t.Fatalf("Expected %+v but got %+v", exp, chunk2[len(chunk2)-1]) } } @@ -211,3 +343,31 @@ func (t *testServer) start() { func (t *testServer) stop() { t.server.Close() } + +func getValueForMethod(idx int) string { + methods := []string{"GET", "POST", "PUT", "DELETE", "PATCH"} + return methods[idx%len(methods)] +} + +func getValueForPath(idx int) string { + paths := []string{"/blah1", "/blah2", "/blah3", "/blah4"} + return paths[idx%len(paths)] +} + +func getValueForUser(idx int) string { + users := []string{"Alice", "Bob", "Charlie", "David", "Ed"} + return users[idx%len(users)] +} + +func generateInputMap(idx int) map[string]interface{} { + var letters = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ") + result := make(map[string]interface{}) + + for i := 0; i < 20; i++ { + n := idx % len(letters) + key := string(letters[n]) + result[key] = fmt.Sprint(idx) + } + return result + +}