diff --git a/CHANGELOG.md b/CHANGELOG.md index 6a05b420490..78adc1a295d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,7 @@ ## master / unreleased +* [ENHANCEMENT] Add a Shutdown handler to flush data to backend, at "/shutdown". [#526](https://github.com/grafana/tempo/pull/526) + ## v0.6.0 * [CHANGE] Fixed ingester latency spikes on read [#461](https://github.com/grafana/tempo/pull/461) diff --git a/cmd/tempo/app/modules.go b/cmd/tempo/app/modules.go index b903fdf5a00..ff9d3167595 100644 --- a/cmd/tempo/app/modules.go +++ b/cmd/tempo/app/modules.go @@ -125,6 +125,7 @@ func (t *App) initIngester() (services.Service, error) { tempopb.RegisterPusherServer(t.server.GRPC, t.ingester) tempopb.RegisterQuerierServer(t.server.GRPC, t.ingester) t.server.HTTP.Path("/flush").Handler(http.HandlerFunc(t.ingester.FlushHandler)) + t.server.HTTP.Path("/shutdown").Handler(http.HandlerFunc(t.ingester.ShutdownHandler)) return t.ingester, nil } diff --git a/integration/microservices/docker-compose.yaml b/integration/microservices/docker-compose.yaml index 6e6475862ff..de17cf7ff16 100644 --- a/integration/microservices/docker-compose.yaml +++ b/integration/microservices/docker-compose.yaml @@ -51,7 +51,6 @@ services: synthetic-load-generator: image: omnition/synthetic-load-generator:1.0.25 - scale: 4 # every container = 1000 spans/s volumes: - ./load-generator.json:/etc/load-generator.json environment: diff --git a/modules/ingester/flush.go b/modules/ingester/flush.go index fb8ef58c358..99e300cbe7e 100644 --- a/modules/ingester/flush.go +++ b/modules/ingester/flush.go @@ -3,10 +3,15 @@ package ingester import ( "context" "fmt" + "math" "net/http" + "strconv" "time" + "github.com/google/uuid" + "github.com/cortexproject/cortex/pkg/util/log" + "github.com/cortexproject/cortex/pkg/util/services" "github.com/go-kit/kit/log/level" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" @@ -38,6 +43,11 @@ const ( flushBackoff = 1 * time.Second ) +const ( + opKindComplete = iota + opKindFlush +) + // Flush triggers a flush of all in memory traces to disk. This is called // by the lifecycler on shutdown and will put our traces in the WAL to be // replayed. @@ -52,28 +62,59 @@ func (i *Ingester) Flush() { } } -// FlushHandler calls sweepUsers(true) which will force push all traces into the WAL and force +// ShutdownHandler handles a graceful shutdown for an ingester. It does the following things in order +// * Stop incoming writes by exiting from the ring +// * Flush all blocks to backend +func (i *Ingester) ShutdownHandler(w http.ResponseWriter, _ *http.Request) { + go func() { + level.Info(log.Logger).Log("msg", "shutdown handler called") + + // lifecycler should exit the ring on shutdown + i.lifecycler.SetUnregisterOnShutdown(true) + + // stop accepting new writes + i.markUnavailable() + + // move all data into flushQueue + i.sweepAllInstances(true) + + for !i.flushQueues.IsEmpty() { + time.Sleep(100 * time.Millisecond) + } + + // stop ingester service + _ = services.StopAndAwaitTerminated(context.Background(), i) + + level.Info(log.Logger).Log("msg", "shutdown handler complete") + }() + + _, _ = w.Write([]byte("shutdown job acknowledged")) +} + +// FlushHandler calls sweepAllInstances(true) which will force push all traces into the WAL and force // mark all head blocks as ready to flush. func (i *Ingester) FlushHandler(w http.ResponseWriter, _ *http.Request) { - i.sweepUsers(true) + i.sweepAllInstances(true) w.WriteHeader(http.StatusNoContent) } type flushOp struct { - from int64 - userID string + kind int + from int64 + userID string + blockID uuid.UUID } func (o *flushOp) Key() string { - return o.userID + return o.userID + "/" + strconv.Itoa(o.kind) + "/" + o.blockID.String() } func (o *flushOp) Priority() int64 { return -o.from } -// sweepUsers periodically schedules series for flushing and garbage collects users with no series -func (i *Ingester) sweepUsers(immediate bool) { +// sweepAllInstances periodically schedules series for flushing and garbage collects instances with no series +func (i *Ingester) sweepAllInstances(immediate bool) { instances := i.getInstances() for _, instance := range instances { @@ -89,26 +130,27 @@ func (i *Ingester) sweepInstance(instance *instance, immediate bool) { return } - // see if it's ready to cut a block? - err = instance.CutBlockIfReady(i.cfg.MaxBlockDuration, i.cfg.MaxBlockBytes, immediate) + // see if it's ready to cut a block + blockID, err := instance.CutBlockIfReady(i.cfg.MaxBlockDuration, i.cfg.MaxBlockBytes, immediate) if err != nil { level.Error(log.WithUserID(instance.instanceID, log.Logger)).Log("msg", "failed to cut block", "err", err) return } + if blockID != uuid.Nil { + i.flushQueues.Enqueue(&flushOp{ + kind: opKindComplete, + from: math.MaxInt64, + userID: instance.instanceID, + blockID: blockID, + }) + } + // dump any blocks that have been flushed for awhile err = instance.ClearFlushedBlocks(i.cfg.CompleteBlockTimeout) if err != nil { level.Error(log.WithUserID(instance.instanceID, log.Logger)).Log("msg", "failed to complete block", "err", err) } - - // see if any complete blocks are ready to be flushed - if instance.GetBlockToBeFlushed() != nil { - i.flushQueues.Enqueue(&flushOp{ - time.Now().Unix(), - instance.instanceID, - }) - } } func (i *Ingester) flushLoop(j int) { @@ -124,13 +166,39 @@ func (i *Ingester) flushLoop(j int) { } op := o.(*flushOp) - level.Debug(log.Logger).Log("msg", "flushing block", "userid", op.userID, "fp") + var completeBlockID uuid.UUID + var err error + if op.kind == opKindComplete { + level.Debug(log.Logger).Log("msg", "completing block", "userid", op.userID) + instance, exists := i.getInstanceByID(op.userID) + if !exists { + // instance no longer exists? that's bad, log and continue + level.Error(log.Logger).Log("msg", "instance not found", "tenantID", op.userID) + continue + } + + completeBlockID, err = instance.CompleteBlock(op.blockID) + if completeBlockID != uuid.Nil { + // add a flushOp for the block we just completed + i.flushQueues.Enqueue(&flushOp{ + kind: opKindFlush, + from: time.Now().Unix(), + userID: instance.instanceID, + blockID: completeBlockID, + }) + } + + } else { + level.Debug(log.Logger).Log("msg", "flushing block", "userid", op.userID, "fp") + + err = i.flushBlock(op.userID, op.blockID) + } - err := i.flushUserTraces(op.userID) if err != nil { - level.Error(log.WithUserID(op.userID, log.Logger)).Log("msg", "failed to flush user", "err", err) - - // re-queue failed flush + level.Error(log.WithUserID(op.userID, log.Logger)).Log("msg", "error performing op in flushQueue", + "op", op.kind, "block", op.blockID.String(), "err", err) + metricFailedFlushes.Inc() + // re-queue op with backoff op.from += int64(flushBackoff) i.flushQueues.Requeue(op) continue @@ -140,7 +208,7 @@ func (i *Ingester) flushLoop(j int) { } } -func (i *Ingester) flushUserTraces(userID string) error { +func (i *Ingester) flushBlock(userID string, blockID uuid.UUID) error { instance, err := i.getOrCreateInstance(userID) if err != nil { return err @@ -150,12 +218,7 @@ func (i *Ingester) flushUserTraces(userID string) error { return fmt.Errorf("instance id %s not found", userID) } - for { - block := instance.GetBlockToBeFlushed() - if block == nil { - break - } - + if block := instance.GetBlockToBeFlushed(blockID); block != nil { ctx := user.InjectOrgID(context.Background(), userID) ctx, cancel := context.WithTimeout(ctx, i.cfg.FlushOpTimeout) defer cancel() @@ -164,10 +227,11 @@ func (i *Ingester) flushUserTraces(userID string) error { err = i.store.WriteBlock(ctx, block) metricFlushDuration.Observe(time.Since(start).Seconds()) if err != nil { - metricFailedFlushes.Inc() return err } metricBlocksFlushed.Inc() + } else { + return fmt.Errorf("error getting block to flush") } return nil diff --git a/modules/ingester/ingester.go b/modules/ingester/ingester.go index 42fa4ac834c..aadcbb45091 100644 --- a/modules/ingester/ingester.go +++ b/modules/ingester/ingester.go @@ -112,7 +112,7 @@ func (i *Ingester) loop(ctx context.Context) error { for { select { case <-flushTicker.C: - i.sweepUsers(false) + i.sweepAllInstances(false) case <-ctx.Done(): return nil @@ -125,14 +125,7 @@ func (i *Ingester) loop(ctx context.Context) error { // stopping is run when ingester is asked to stop func (i *Ingester) stopping(_ error) error { - // This will prevent us accepting any more samples - i.stopIncomingRequests() - - // Lifecycler can be nil if the ingester is for a flusher. - if i.lifecycler != nil { - // Next initiate our graceful exit from the ring. - return services.StopAndAwaitTerminated(context.Background(), i.lifecycler) - } + i.markUnavailable() if i.flushQueues != nil { i.flushQueues.Stop() @@ -142,6 +135,19 @@ func (i *Ingester) stopping(_ error) error { return nil } +func (i *Ingester) markUnavailable() { + // Lifecycler can be nil if the ingester is for a flusher. + if i.lifecycler != nil { + // Next initiate our graceful exit from the ring. + if err := services.StopAndAwaitTerminated(context.Background(), i.lifecycler); err != nil { + level.Warn(log.Logger).Log("msg", "failed to stop ingester lifecycler", "err", err) + } + } + + // This will prevent us accepting any more samples + i.stopIncomingRequests() +} + // Push implements tempopb.Pusher.Push func (i *Ingester) Push(ctx context.Context, req *tempopb.PushRequest) (*tempopb.PushResponse, error) { instanceID, err := user.ExtractOrgID(ctx) diff --git a/modules/ingester/instance.go b/modules/ingester/instance.go index 99fa2306719..2d6aaffc9be 100644 --- a/modules/ingester/instance.go +++ b/modules/ingester/instance.go @@ -54,10 +54,10 @@ type instance struct { tracesMtx sync.Mutex traces map[uint32]*trace - blocksMtx sync.RWMutex - headBlock *wal.AppendBlock - completingBlock *wal.AppendBlock - completeBlocks []*encoding.CompleteBlock + blocksMtx sync.RWMutex + headBlock *wal.AppendBlock + completingBlocks []*wal.AppendBlock + completeBlocks []*encoding.CompleteBlock lastBlockCut time.Time @@ -133,54 +133,81 @@ func (i *instance) CutCompleteTraces(cutoff time.Duration, immediate bool) error return nil } -func (i *instance) CutBlockIfReady(maxBlockLifetime time.Duration, maxBlockBytes uint64, immediate bool) error { +// CutBlockIfReady cuts a completingBlock from the HeadBlock if ready +// Returns a bool indicating if a block was cut along with the error (if any). +func (i *instance) CutBlockIfReady(maxBlockLifetime time.Duration, maxBlockBytes uint64, immediate bool) (uuid.UUID, error) { i.blocksMtx.Lock() defer i.blocksMtx.Unlock() if i.headBlock == nil || i.headBlock.DataLength() == 0 { - return nil + return uuid.Nil, nil } now := time.Now() if i.lastBlockCut.Add(maxBlockLifetime).Before(now) || i.headBlock.DataLength() >= maxBlockBytes || immediate { - if i.completingBlock != nil { - return fmt.Errorf("unable to complete head block for %s b/c there is already a completing block. Will try again next cycle", i.instanceID) - } + completingBlock := i.headBlock + + i.completingBlocks = append(i.completingBlocks, completingBlock) - i.completingBlock = i.headBlock err := i.resetHeadBlock() if err != nil { - return fmt.Errorf("failed to resetHeadBlock: %w", err) + return uuid.Nil, fmt.Errorf("failed to resetHeadBlock: %w", err) } - // todo : this should be a queue of blocks to complete with workers - go func() { - completeBlock, err := i.writer.CompleteBlock(i.completingBlock, i) - i.blocksMtx.Lock() - defer i.blocksMtx.Unlock() - - if err != nil { - // this is a really bad error that results in data loss. most likely due to disk full - _ = i.completingBlock.Clear() - metricFailedFlushes.Inc() - i.completingBlock = nil - level.Error(log.Logger).Log("msg", "unable to complete block. THIS BLOCK WAS LOST", "tenantID", i.instanceID, "err", err) - return - } - i.completingBlock = nil - i.completeBlocks = append(i.completeBlocks, completeBlock) - }() + return completingBlock.BlockID(), nil } - return nil + return uuid.Nil, nil } -func (i *instance) GetBlockToBeFlushed() *encoding.CompleteBlock { +// CompleteBlock() moves a completingBlock to a completeBlock +func (i *instance) CompleteBlock(blockID uuid.UUID) (uuid.UUID, error) { + i.blocksMtx.Lock() + + var completingBlock *wal.AppendBlock + for _, iterBlock := range i.completingBlocks { + if iterBlock.BlockID() == blockID { + completingBlock = iterBlock + break + } + } + i.blocksMtx.Unlock() + + if completingBlock == nil { + return uuid.Nil, fmt.Errorf("error finding completingBlock") + } + + // potentially long running operation placed outside blocksMtx + completeBlock, err := i.writer.CompleteBlock(completingBlock, i) + + i.blocksMtx.Lock() + if err != nil { + metricFailedFlushes.Inc() + level.Error(log.Logger).Log("msg", "unable to complete block.", "tenantID", i.instanceID, "err", err) + i.blocksMtx.Unlock() + return uuid.Nil, err + } + // remove completingBlock from list + for j, iterBlock := range i.completingBlocks { + if iterBlock.BlockID() == blockID { + i.completingBlocks = append(i.completingBlocks[:j], i.completingBlocks[j+1:]...) + break + } + } + completeBlockID := completeBlock.BlockMeta().BlockID + i.completeBlocks = append(i.completeBlocks, completeBlock) + i.blocksMtx.Unlock() + + return completeBlockID, nil +} + +// GetBlockToBeFlushed gets a list of blocks that can be flushed to the backend +func (i *instance) GetBlockToBeFlushed(blockID uuid.UUID) *encoding.CompleteBlock { i.blocksMtx.Lock() defer i.blocksMtx.Unlock() for _, c := range i.completeBlocks { - if c.FlushedTime().IsZero() { + if c.BlockMeta().BlockID == blockID && c.FlushedTime().IsZero() { return c } } @@ -240,8 +267,8 @@ func (i *instance) FindTraceByID(id []byte) (*tempopb.Trace, error) { allBytes = i.Combine(foundBytes, allBytes) // completingBlock - if i.completingBlock != nil { - foundBytes, err = i.completingBlock.Find(id, i) + for _, c := range i.completingBlocks { + foundBytes, err = c.Find(id, i) if err != nil { return nil, fmt.Errorf("completingBlock.Find failed: %w", err) } diff --git a/modules/ingester/instance_test.go b/modules/ingester/instance_test.go index b4f241a186d..9aad0b32bf3 100644 --- a/modules/ingester/instance_test.go +++ b/modules/ingester/instance_test.go @@ -9,6 +9,8 @@ import ( "testing" "time" + "github.com/google/uuid" + "github.com/go-kit/kit/log" "github.com/grafana/tempo/modules/overrides" "github.com/grafana/tempo/modules/storage" @@ -52,20 +54,17 @@ func TestInstance(t *testing.T) { err = i.CutCompleteTraces(0, true) assert.NoError(t, err) - err = i.CutBlockIfReady(0, 0, false) + blockID, err := i.CutBlockIfReady(0, 0, false) assert.NoError(t, err, "unexpected error cutting block") + assert.NotEqual(t, blockID, uuid.Nil) - // try a few times while the block gets completed - block := i.GetBlockToBeFlushed() - for j := 0; j < 5; j++ { - if block != nil { - continue - } - time.Sleep(100 * time.Millisecond) - block = i.GetBlockToBeFlushed() - } - assert.NotNil(t, block) - assert.Nil(t, i.completingBlock) + completeBlockID, err := i.CompleteBlock(blockID) + assert.NoError(t, err, "unexpected error completing block") + assert.NotEqual(t, completeBlockID, uuid.Nil) + + block := i.GetBlockToBeFlushed(completeBlockID) + require.NotNil(t, block) + assert.Len(t, i.completingBlocks, 0) assert.Len(t, i.completeBlocks, 1) err = ingester.store.WriteBlock(context.Background(), block) @@ -83,6 +82,33 @@ func TestInstance(t *testing.T) { assert.NoError(t, err, "unexpected error resetting block") } +func pushAndQuery(t *testing.T, i *instance, request *tempopb.PushRequest) uuid.UUID { + traceID := test.MustTraceID(request) + err := i.Push(context.Background(), request) + assert.NoError(t, err) + + trace, err := i.FindTraceByID(traceID) + assert.NotNil(t, trace) + assert.NoError(t, err) + + err = i.CutCompleteTraces(0, true) + assert.NoError(t, err) + + trace, err = i.FindTraceByID(traceID) + assert.NotNil(t, trace) + assert.NoError(t, err) + + blockID, err := i.CutBlockIfReady(0, 0, false) + assert.NoError(t, err, "unexpected error cutting block") + assert.NotEqual(t, blockID, uuid.Nil) + + trace, err = i.FindTraceByID(traceID) + assert.NotNil(t, trace) + assert.NoError(t, err) + + return blockID +} + func TestInstanceFind(t *testing.T) { limits, err := overrides.NewOverrides(overrides.Limits{}) assert.NoError(t, err, "unexpected error creating limits") @@ -93,31 +119,30 @@ func TestInstanceFind(t *testing.T) { defer os.RemoveAll(tempDir) ingester, _, _ := defaultIngester(t, tempDir) - request := test.MakeRequest(10, []byte{}) - traceID := test.MustTraceID(request) - i, err := newInstance("fake", limiter, ingester.store) assert.NoError(t, err, "unexpected error creating new instance") - err = i.Push(context.Background(), request) - assert.NoError(t, err) - trace, err := i.FindTraceByID(traceID) - assert.NotNil(t, trace) - assert.NoError(t, err) + request := test.MakeRequest(10, []byte{}) + blockID := pushAndQuery(t, i, request) - err = i.CutCompleteTraces(0, true) - assert.NoError(t, err) + // make another completingBlock + request2 := test.MakeRequest(10, []byte{}) + pushAndQuery(t, i, request2) + assert.Len(t, i.completingBlocks, 2) - trace, err = i.FindTraceByID(traceID) - assert.NotNil(t, trace) - assert.NoError(t, err) + _, err = i.CompleteBlock(blockID) + assert.NoError(t, err, "unexpected error completing block") - err = i.CutBlockIfReady(0, 0, false) - assert.NoError(t, err) + assert.Len(t, i.completingBlocks, 1) - trace, err = i.FindTraceByID(traceID) + traceID := test.MustTraceID(request) + trace, err := i.FindTraceByID(traceID) assert.NotNil(t, trace) assert.NoError(t, err) + + completeBlockID, err := i.CompleteBlock(blockID) + assert.EqualError(t, err, "error finding completingBlock") + assert.Equal(t, completeBlockID, uuid.Nil) } func TestInstanceDoesNotRace(t *testing.T) { @@ -158,14 +183,16 @@ func TestInstanceDoesNotRace(t *testing.T) { }) go concurrent(func() { - _ = i.CutBlockIfReady(0, 0, false) - }) - - go concurrent(func() { - block := i.GetBlockToBeFlushed() - if block != nil { - err := ingester.store.WriteBlock(context.Background(), block) - assert.NoError(t, err, "error writing block") + blockID, _ := i.CutBlockIfReady(0, 0, false) + if blockID != uuid.Nil { + completeBlockID, err := i.CompleteBlock(blockID) + assert.NoError(t, err, "unexpected error completing block") + if completeBlockID != uuid.Nil { + block := i.GetBlockToBeFlushed(completeBlockID) + require.NotNil(t, block) + err := ingester.store.WriteBlock(context.Background(), block) + assert.NoError(t, err, "error writing block") + } } }) @@ -183,7 +210,7 @@ func TestInstanceDoesNotRace(t *testing.T) { close(end) // Wait for go funcs to quit before // exiting and cleaning up - time.Sleep(100 * time.Millisecond) + time.Sleep(2 * time.Second) } func TestInstanceLimits(t *testing.T) { @@ -414,9 +441,14 @@ func TestInstanceCutBlockIfReady(t *testing.T) { err := instance.CutCompleteTraces(0, true) require.NoError(t, err) - err = instance.CutBlockIfReady(tc.maxBlockLifetime, tc.maxBlockBytes, tc.immediate) + blockID, err := instance.CutBlockIfReady(tc.maxBlockLifetime, tc.maxBlockBytes, tc.immediate) require.NoError(t, err) + _, err = instance.CompleteBlock(blockID) + if tc.expectedToCutBlock { + assert.NoError(t, err, "unexpected error completing block") + } + // Wait for goroutine to finish flushing to avoid test flakiness if tc.expectedToCutBlock { time.Sleep(time.Millisecond * 250) diff --git a/pkg/flushqueues/exclusivequeues.go b/pkg/flushqueues/exclusivequeues.go index f802e4bac80..97eb288488c 100644 --- a/pkg/flushqueues/exclusivequeues.go +++ b/pkg/flushqueues/exclusivequeues.go @@ -56,6 +56,17 @@ func (f *ExclusiveQueues) Clear(op util.Op) { f.activeKeys.Delete(op.Key()) } +func (f *ExclusiveQueues) IsEmpty() bool { + length := 0 + + f.activeKeys.Range(func(_, _ interface{}) bool { + length++ + return false + }) + + return length <= 0 +} + // Stop closes all queues func (f *ExclusiveQueues) Stop() { for _, q := range f.queues { diff --git a/tempodb/wal/append_block.go b/tempodb/wal/append_block.go index daccf978faf..94b0597af9f 100644 --- a/tempodb/wal/append_block.go +++ b/tempodb/wal/append_block.go @@ -59,6 +59,10 @@ func (h *AppendBlock) Write(id common.ID, b []byte) error { return nil } +func (h *AppendBlock) BlockID() uuid.UUID { + return h.block.meta.BlockID +} + func (h *AppendBlock) DataLength() uint64 { return h.appender.DataLength() }