Skip to content

Commit

Permalink
Improve Wal Replay (#668)
Browse files Browse the repository at this point in the history
* Added page length

Signed-off-by: Joe Elliott <number101010@gmail.com>

* Added len to NextPage

Signed-off-by: Joe Elliott <number101010@gmail.com>

* Added appendblockfromfile

Signed-off-by: Joe Elliott <number101010@gmail.com>

* Ditched replay block. Long live AppendBlock

Signed-off-by: Joe Elliott <number101010@gmail.com>

* Fixed bench

Signed-off-by: Joe Elliott <number101010@gmail.com>

* Use new replay code in ingester/instance

Signed-off-by: Joe Elliott <number101010@gmail.com>

* removed PushBytes

Signed-off-by: Joe Elliott <number101010@gmail.com>

* Added test for bad blocks

Signed-off-by: Joe Elliott <number101010@gmail.com>

* commment

Signed-off-by: Joe Elliott <number101010@gmail.com>

* Rename AllBlocks. Pass in logger

Signed-off-by: Joe Elliott <number101010@gmail.com>

* Added test for empty wal file

Signed-off-by: Joe Elliott <number101010@gmail.com>

* Added partial replay support and tests

Signed-off-by: Joe Elliott <number101010@gmail.com>

* lint

Signed-off-by: Joe Elliott <number101010@gmail.com>

* Added flush queue test

Signed-off-by: Joe Elliott <number101010@gmail.com>

* removed jitter on tests...horribly

Signed-off-by: Joe Elliott <number101010@gmail.com>
  • Loading branch information
joe-elliott committed Apr 23, 2021
1 parent bde2845 commit 0b72812
Show file tree
Hide file tree
Showing 23 changed files with 473 additions and 444 deletions.
90 changes: 23 additions & 67 deletions modules/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/grafana/tempo/pkg/tempopb"
"github.com/grafana/tempo/pkg/validation"
"github.com/grafana/tempo/tempodb/backend/local"
tempodb_wal "github.com/grafana/tempo/tempodb/wal"
)

// ErrReadOnly is returned when the ingester is shutting down and a push was
Expand All @@ -46,9 +45,10 @@ type Ingester struct {
instances map[string]*instance
readonly bool

lifecycler *ring.Lifecycler
store storage.Store
local *local.Backend
lifecycler *ring.Lifecycler
store storage.Store
local *local.Backend
replayJitter bool // this var exists so tests can remove jitter

flushQueues *flushqueues.ExclusiveQueues
flushQueuesDone sync.WaitGroup
Expand All @@ -61,10 +61,11 @@ type Ingester struct {
// New makes a new Ingester.
func New(cfg Config, store storage.Store, limits *overrides.Overrides) (*Ingester, error) {
i := &Ingester{
cfg: cfg,
instances: map[string]*instance{},
store: store,
flushQueues: flushqueues.New(cfg.ConcurrentFlushes, metricFlushQueueLength),
cfg: cfg,
instances: map[string]*instance{},
store: store,
flushQueues: flushqueues.New(cfg.ConcurrentFlushes, metricFlushQueueLength),
replayJitter: true,
}

i.local = store.WAL().LocalBackend()
Expand Down Expand Up @@ -286,76 +287,31 @@ func (i *Ingester) TransferOut(ctx context.Context) error {
}

func (i *Ingester) replayWal() error {
blocks, err := i.store.WAL().AllBlocks()
// todo: should this fail startup?
level.Info(log.Logger).Log("msg", "beginning wal replay")

blocks, err := i.store.WAL().RescanBlocks(log.Logger)
if err != nil {
level.Error(log.Logger).Log("msg", "error beginning wal replay", "err", err)
return nil
return fmt.Errorf("fatal error replaying wal %w", err)
}

level.Info(log.Logger).Log("msg", "beginning wal replay", "numBlocks", len(blocks))

for _, b := range blocks {
tenantID := b.TenantID()
level.Info(log.Logger).Log("msg", "beginning block replay", "tenantID", tenantID, "block", b.BlockID())
tenantID := b.Meta().TenantID

err = i.replayBlock(b)
if err != nil {
// there was an error, log and keep on keeping on
level.Error(log.Logger).Log("msg", "error replaying block. removing", "error", err)
}
err = b.Clear()
instance, err := i.getOrCreateInstance(tenantID)
if err != nil {
return err
}
}

level.Info(log.Logger).Log("msg", "wal replay complete")
instance.AddCompletingBlock(b)

return nil
}

func (i *Ingester) replayBlock(b *tempodb_wal.ReplayBlock) error {
iterator, err := b.Iterator()
if err != nil {
return err
}
defer iterator.Close()

ctx := context.Background()

// Pull first entry to see if block has any data
id, obj, err := iterator.Next(ctx)
if err != nil {
return err
}
if id == nil {
// Block is empty
return nil
i.enqueue(&flushOp{
kind: opKindComplete,
userID: tenantID,
blockID: b.Meta().BlockID,
}, i.replayJitter)
}

// Only create instance for tenant now that we know data exists
instance, err := i.getOrCreateInstance(b.TenantID())
if err != nil {
return err
}

for {
// obj gets written to disk immediately but the id escapes the iterator and needs to be copied
writeID := append([]byte(nil), id...)
err = instance.PushBytes(context.Background(), writeID, obj)
if err != nil {
return err
}

id, obj, err = iterator.Next(ctx)
if id == nil {
break
}
if err != nil {
return err
}
}
level.Info(log.Logger).Log("msg", "wal replay complete")

return nil
}
Expand Down Expand Up @@ -388,7 +344,7 @@ func (i *Ingester) rediscoverLocalBlocks() error {
kind: opKindFlush,
userID: t,
blockID: b.BlockMeta().BlockID,
}, true)
}, i.replayJitter)
}
}
}
Expand Down
20 changes: 20 additions & 0 deletions modules/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,25 @@ func TestWal(t *testing.T) {
equal := proto.Equal(traces[i], foundTrace.Trace)
assert.True(t, equal)
}

// a block that has been replayed should have a flush queue entry to complete it
// wait for the flush queues to be empty and then confirm there is a complete block
for !ingester.flushQueues.IsEmpty() {
time.Sleep(100 * time.Millisecond)
}

assert.Len(t, ingester.instances["test"].completingBlocks, 0)
assert.Len(t, ingester.instances["test"].completeBlocks, 1)

// should be able to find old traces that were replayed
for i, traceID := range traceIDs {
foundTrace, err := ingester.FindTraceByID(ctx, &tempopb.TraceByIDRequest{
TraceID: traceID,
})
assert.NoError(t, err, "unexpected error querying")
equal := proto.Equal(traces[i], foundTrace.Trace)
assert.True(t, equal)
}
}

func TestFlush(t *testing.T) {
Expand Down Expand Up @@ -207,6 +226,7 @@ func defaultIngester(t *testing.T, tmpDir string) (*Ingester, []*tempopb.Trace,

ingester, err := New(ingesterConfig, s, limits)
require.NoError(t, err, "unexpected error creating ingester")
ingester.replayJitter = false

err = ingester.starting(context.Background())
require.NoError(t, err, "unexpected error starting ingester")
Expand Down
18 changes: 10 additions & 8 deletions modules/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,14 +109,6 @@ func (i *instance) Push(ctx context.Context, req *tempopb.PushRequest) error {
return nil
}

// PushBytes is used by the wal replay code and so it can push directly into the head block with 0 shenanigans
func (i *instance) PushBytes(ctx context.Context, id []byte, object []byte) error {
i.blocksMtx.Lock()
defer i.blocksMtx.Unlock()

return i.headBlock.Write(id, object)
}

// Moves any complete traces out of the map to complete traces
func (i *instance) CutCompleteTraces(cutoff time.Duration, immediate bool) error {
tracesToCut := i.tracesToCut(cutoff, immediate)
Expand Down Expand Up @@ -321,6 +313,16 @@ func (i *instance) FindTraceByID(id []byte) (*tempopb.Trace, error) {
return nil, nil
}

// AddCompletingBlock adds an AppendBlock directly to the slice of completing blocks.
// This is used during wal replay. It is expected that calling code will add the appropriate
// jobs to the queue to eventually flush these.
func (i *instance) AddCompletingBlock(b *wal.AppendBlock) {
i.blocksMtx.Lock()
defer i.blocksMtx.Unlock()

i.completingBlocks = append(i.completingBlocks, b)
}

// getOrCreateTrace will return a new trace object for the given request
// It must be called under the i.tracesMtx lock
func (i *instance) getOrCreateTrace(req *tempopb.PushRequest) (*trace, error) {
Expand Down
7 changes: 0 additions & 7 deletions modules/ingester/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,13 +172,6 @@ func TestInstanceDoesNotRace(t *testing.T) {
assert.NoError(t, err, "error pushing traces")
})

go concurrent(func() {
id := make([]byte, 16)
_, _ = rand.Read(id)
err := i.PushBytes(context.Background(), id, []byte{0x01})
assert.NoError(t, err, "error pushing traces")
})

go concurrent(func() {
err := i.CutCompleteTraces(0, true)
assert.NoError(t, err, "error cutting complete traces")
Expand Down
38 changes: 38 additions & 0 deletions tempodb/encoding/appender_record.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package encoding

import (
"github.com/grafana/tempo/tempodb/encoding/common"
)

type recordAppender struct {
records []*common.Record
}

// NewRecordAppender returns an appender that stores records only.
func NewRecordAppender(records []*common.Record) Appender {
return &recordAppender{
records: records,
}
}

// Append appends the id/object to the writer. Note that the caller is giving up ownership of the two byte arrays backing the slices.
// Copies should be made and passed in if this is a problem
func (a *recordAppender) Append(id common.ID, b []byte) error {
return common.ErrUnsupported
}

func (a *recordAppender) Records() []*common.Record {
return a.records
}

func (a *recordAppender) Length() int {
return len(a.records)
}

func (a *recordAppender) DataLength() uint64 {
return 0
}

func (a *recordAppender) Complete() error {
return nil
}
32 changes: 32 additions & 0 deletions tempodb/encoding/common/record.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package common

import (
"bytes"
"sort"
)

type recordSorter struct {
records []*Record
}

// SortRecords sorts a slice of record pointers
func SortRecords(records []*Record) {
sort.Sort(&recordSorter{
records: records,
})
}

func (t *recordSorter) Len() int {
return len(t.records)
}

func (t *recordSorter) Less(i, j int) bool {
a := t.records[i]
b := t.records[j]

return bytes.Compare(a.ID, b.ID) == -1
}

func (t *recordSorter) Swap(i, j int) {
t.records[i], t.records[j] = t.records[j], t.records[i]
}
4 changes: 3 additions & 1 deletion tempodb/encoding/common/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ type DataReader interface {

// NextPage can be used to iterate at a page at a time. May return ErrUnsupported for older formats
// NextPage takes a reusable buffer to read the page into and returns it in case it needs to resize
NextPage([]byte) ([]byte, error)
// NextPage returns the uncompressed page buffer ready for object iteration and the length of the
// original page from the page header. len(page) might not equal page len!
NextPage([]byte) ([]byte, uint32, error)
}

// IndexReader is used to abstract away the details of an index. Currently
Expand Down
44 changes: 0 additions & 44 deletions tempodb/encoding/iterator_recordless.go

This file was deleted.

10 changes: 5 additions & 5 deletions tempodb/encoding/v0/data_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,17 +70,17 @@ func (r *dataReader) Close() {
}

// NextPage implements common.DataReader
func (r *dataReader) NextPage(buffer []byte) ([]byte, error) {
func (r *dataReader) NextPage(buffer []byte) ([]byte, uint32, error) {
reader, err := r.r.Reader()
if err != nil {
return nil, err
return nil, 0, err
}

// v0 pages are just single objects. this method will return one object at a time from the encapsulated reader
var totalLength uint32
err = binary.Read(reader, binary.LittleEndian, &totalLength)
if err != nil {
return nil, err
return nil, 0, err
}

if cap(buffer) < int(totalLength) {
Expand All @@ -92,8 +92,8 @@ func (r *dataReader) NextPage(buffer []byte) ([]byte, error) {

_, err = reader.Read(buffer[uint32Size:])
if err != nil {
return nil, err
return nil, 0, err
}

return buffer, nil
return buffer, totalLength, nil
}
Loading

0 comments on commit 0b72812

Please sign in to comment.