Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding a Shutdown Handler #526

Merged
merged 15 commits into from
Feb 23, 2021
Merged
1 change: 1 addition & 0 deletions cmd/tempo/app/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ func (t *App) initIngester() (services.Service, error) {
tempopb.RegisterPusherServer(t.server.GRPC, t.ingester)
tempopb.RegisterQuerierServer(t.server.GRPC, t.ingester)
t.server.HTTP.Path("/flush").Handler(http.HandlerFunc(t.ingester.FlushHandler))
t.server.HTTP.Path("/shutdown").Handler(http.HandlerFunc(t.ingester.ShutdownHandler))
return t.ingester, nil
}

Expand Down
1 change: 0 additions & 1 deletion integration/microservices/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ services:

synthetic-load-generator:
image: omnition/synthetic-load-generator:1.0.25
scale: 4 # every container = 1000 spans/s
volumes:
- ./load-generator.json:/etc/load-generator.json
environment:
Expand Down
116 changes: 87 additions & 29 deletions modules/ingester/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,15 @@ package ingester
import (
"context"
"fmt"
"math"
"net/http"
"strconv"
"time"

"github.com/google/uuid"

"github.com/cortexproject/cortex/pkg/util/log"
"github.com/cortexproject/cortex/pkg/util/services"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
Expand Down Expand Up @@ -38,6 +43,11 @@ const (
flushBackoff = 1 * time.Second
)

const (
opKindComplete = iota
opKindFlush
)

// Flush triggers a flush of all in memory traces to disk. This is called
// by the lifecycler on shutdown and will put our traces in the WAL to be
// replayed.
Expand All @@ -52,28 +62,55 @@ func (i *Ingester) Flush() {
}
}

// FlushHandler calls sweepUsers(true) which will force push all traces into the WAL and force
// ShutdownHandler handles a graceful shutdown for an ingester. It does the following things in order
// * Stop incoming writes by exiting from the ring
// * Flush all blocks to backend
func (i *Ingester) ShutdownHandler(w http.ResponseWriter, _ *http.Request) {
go func() {
// lifecycler should exit the ring on shutdown
annanay25 marked this conversation as resolved.
Show resolved Hide resolved
i.lifecycler.SetUnregisterOnShutdown(true)

// stop accepting new writes
i.markUnavailable()

// move all data into flushQueue
i.sweepAllInstances(true)

for !i.flushQueues.IsEmpty() {
time.Sleep(100 * time.Millisecond)
}

// stop ingester service
_ = services.StopAndAwaitTerminated(context.Background(), i)
}()

_, _ = w.Write([]byte("shutdown job acknowledged"))
}

// FlushHandler calls sweepAllInstances(true) which will force push all traces into the WAL and force
// mark all head blocks as ready to flush.
func (i *Ingester) FlushHandler(w http.ResponseWriter, _ *http.Request) {
i.sweepUsers(true)
i.sweepAllInstances(true)
w.WriteHeader(http.StatusNoContent)
}

type flushOp struct {
from int64
userID string
kind int
from int64
userID string
blockID uuid.UUID
}

func (o *flushOp) Key() string {
return o.userID
return o.userID + "/" + strconv.Itoa(o.kind) + "/" + o.blockID.String()
}

func (o *flushOp) Priority() int64 {
return -o.from
}

// sweepUsers periodically schedules series for flushing and garbage collects users with no series
func (i *Ingester) sweepUsers(immediate bool) {
// sweepAllInstances periodically schedules series for flushing and garbage collects instances with no series
func (i *Ingester) sweepAllInstances(immediate bool) {
instances := i.getInstances()

for _, instance := range instances {
Expand All @@ -89,26 +126,27 @@ func (i *Ingester) sweepInstance(instance *instance, immediate bool) {
return
}

// see if it's ready to cut a block?
err = instance.CutBlockIfReady(i.cfg.MaxBlockDuration, i.cfg.MaxBlockBytes, immediate)
// see if it's ready to cut a block
blockID, err := instance.CutBlockIfReady(i.cfg.MaxBlockDuration, i.cfg.MaxBlockBytes, immediate)
if err != nil {
level.Error(log.WithUserID(instance.instanceID, log.Logger)).Log("msg", "failed to cut block", "err", err)
return
}

if blockID != uuid.Nil {
i.flushQueues.Enqueue(&flushOp{
kind: opKindComplete,
from: math.MaxInt64,
userID: instance.instanceID,
blockID: blockID,
})
}

// dump any blocks that have been flushed for awhile
err = instance.ClearFlushedBlocks(i.cfg.CompleteBlockTimeout)
if err != nil {
level.Error(log.WithUserID(instance.instanceID, log.Logger)).Log("msg", "failed to complete block", "err", err)
}

// see if any complete blocks are ready to be flushed
if instance.GetBlockToBeFlushed() != nil {
i.flushQueues.Enqueue(&flushOp{
time.Now().Unix(),
instance.instanceID,
})
}
}

func (i *Ingester) flushLoop(j int) {
Expand All @@ -124,13 +162,38 @@ func (i *Ingester) flushLoop(j int) {
}
op := o.(*flushOp)

level.Debug(log.Logger).Log("msg", "flushing block", "userid", op.userID, "fp")
var completeBlockID uuid.UUID
var err error
if op.kind == opKindComplete {
level.Debug(log.Logger).Log("msg", "completing block", "userid", op.userID)
instance, exists := i.getInstanceByID(op.userID)
if !exists {
// instance no longer exists? that's bad, log and continue
level.Error(log.Logger).Log("msg", "instance not found", "tenantID", op.userID)
continue
}

completeBlockID, err = instance.CompleteBlock(op.blockID)
if completeBlockID != uuid.Nil {
annanay25 marked this conversation as resolved.
Show resolved Hide resolved
// add a flushOp for the block we just completed
i.flushQueues.Enqueue(&flushOp{
kind: opKindFlush,
from: time.Now().Unix(),
userID: instance.instanceID,
blockID: completeBlockID,
})
}

} else {
level.Debug(log.Logger).Log("msg", "flushing block", "userid", op.userID, "fp")

err = i.flushBlock(op.userID, op.blockID)
}

err := i.flushUserTraces(op.userID)
if err != nil {
level.Error(log.WithUserID(op.userID, log.Logger)).Log("msg", "failed to flush user", "err", err)

// re-queue failed flush
level.Error(log.WithUserID(op.userID, log.Logger)).Log("msg", "error performing op in flushQueue",
"op", op.kind, "block", op.blockID.String(), "err", err)
// re-queue op with backoff
op.from += int64(flushBackoff)
i.flushQueues.Requeue(op)
continue
Expand All @@ -140,7 +203,7 @@ func (i *Ingester) flushLoop(j int) {
}
}

func (i *Ingester) flushUserTraces(userID string) error {
func (i *Ingester) flushBlock(userID string, blockID uuid.UUID) error {
instance, err := i.getOrCreateInstance(userID)
if err != nil {
return err
Expand All @@ -150,12 +213,7 @@ func (i *Ingester) flushUserTraces(userID string) error {
return fmt.Errorf("instance id %s not found", userID)
}

for {
block := instance.GetBlockToBeFlushed()
if block == nil {
break
}

if block := instance.GetBlockToBeFlushed(blockID); block != nil {
annanay25 marked this conversation as resolved.
Show resolved Hide resolved
ctx := user.InjectOrgID(context.Background(), userID)
ctx, cancel := context.WithTimeout(ctx, i.cfg.FlushOpTimeout)
defer cancel()
Expand Down
24 changes: 15 additions & 9 deletions modules/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func (i *Ingester) loop(ctx context.Context) error {
for {
select {
case <-flushTicker.C:
i.sweepUsers(false)
i.sweepAllInstances(false)

case <-ctx.Done():
return nil
Expand All @@ -125,14 +125,7 @@ func (i *Ingester) loop(ctx context.Context) error {

// stopping is run when ingester is asked to stop
func (i *Ingester) stopping(_ error) error {
// This will prevent us accepting any more samples
i.stopIncomingRequests()

// Lifecycler can be nil if the ingester is for a flusher.
if i.lifecycler != nil {
// Next initiate our graceful exit from the ring.
return services.StopAndAwaitTerminated(context.Background(), i.lifecycler)
}
i.markUnavailable()

if i.flushQueues != nil {
i.flushQueues.Stop()
Expand All @@ -142,6 +135,19 @@ func (i *Ingester) stopping(_ error) error {
return nil
}

func (i *Ingester) markUnavailable() {
// Lifecycler can be nil if the ingester is for a flusher.
if i.lifecycler != nil {
// Next initiate our graceful exit from the ring.
if err := services.StopAndAwaitTerminated(context.Background(), i.lifecycler); err != nil {
level.Warn(log.Logger).Log("msg", "failed to stop ingester lifecycler", "err", err)
}
}

// This will prevent us accepting any more samples
i.stopIncomingRequests()
}

// Push implements tempopb.Pusher.Push
func (i *Ingester) Push(ctx context.Context, req *tempopb.PushRequest) (*tempopb.PushResponse, error) {
instanceID, err := user.ExtractOrgID(ctx)
Expand Down
89 changes: 56 additions & 33 deletions modules/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,10 @@ type instance struct {
tracesMtx sync.Mutex
traces map[uint32]*trace

blocksMtx sync.RWMutex
headBlock *wal.AppendBlock
completingBlock *wal.AppendBlock
completeBlocks []*encoding.CompleteBlock
blocksMtx sync.RWMutex
headBlock *wal.AppendBlock
completingBlocks []*wal.AppendBlock
completeBlocks []*encoding.CompleteBlock

lastBlockCut time.Time

Expand Down Expand Up @@ -133,54 +133,77 @@ func (i *instance) CutCompleteTraces(cutoff time.Duration, immediate bool) error
return nil
}

func (i *instance) CutBlockIfReady(maxBlockLifetime time.Duration, maxBlockBytes uint64, immediate bool) error {
// CutBlockIfReady cuts a completingBlock from the HeadBlock if ready
// Returns a bool indicating if a block was cut along with the error (if any).
func (i *instance) CutBlockIfReady(maxBlockLifetime time.Duration, maxBlockBytes uint64, immediate bool) (uuid.UUID, error) {
i.blocksMtx.Lock()
defer i.blocksMtx.Unlock()

if i.headBlock == nil || i.headBlock.DataLength() == 0 {
return nil
return uuid.Nil, nil
}

now := time.Now()
if i.lastBlockCut.Add(maxBlockLifetime).Before(now) || i.headBlock.DataLength() >= maxBlockBytes || immediate {
if i.completingBlock != nil {
return fmt.Errorf("unable to complete head block for %s b/c there is already a completing block. Will try again next cycle", i.instanceID)
}
completingBlock := i.headBlock

i.completingBlocks = append(i.completingBlocks, completingBlock)

i.completingBlock = i.headBlock
err := i.resetHeadBlock()
if err != nil {
return fmt.Errorf("failed to resetHeadBlock: %w", err)
return uuid.Nil, fmt.Errorf("failed to resetHeadBlock: %w", err)
}

// todo : this should be a queue of blocks to complete with workers
go func() {
completeBlock, err := i.writer.CompleteBlock(i.completingBlock, i)
i.blocksMtx.Lock()
defer i.blocksMtx.Unlock()

if err != nil {
// this is a really bad error that results in data loss. most likely due to disk full
_ = i.completingBlock.Clear()
metricFailedFlushes.Inc()
i.completingBlock = nil
level.Error(log.Logger).Log("msg", "unable to complete block. THIS BLOCK WAS LOST", "tenantID", i.instanceID, "err", err)
return
}
i.completingBlock = nil
i.completeBlocks = append(i.completeBlocks, completeBlock)
}()
return completingBlock.BlockID(), nil
}

return nil
return uuid.Nil, nil
}

// CompleteBlock() moves a completingBlock to a completeBlock
func (i *instance) CompleteBlock(blockID uuid.UUID) (uuid.UUID, error) {
i.blocksMtx.Lock()

var completingBlock *wal.AppendBlock
for j, iterBlock := range i.completingBlocks {
if iterBlock.BlockID() == blockID {
completingBlock = iterBlock
i.completingBlocks = append(i.completingBlocks[:j], i.completingBlocks[j+1:]...)
break
}
}
i.blocksMtx.Unlock()

if completingBlock == nil {
return uuid.Nil, nil
}

// potentially long running operation placed outside blocksMtx
completeBlock, err := i.writer.CompleteBlock(completingBlock, i)

i.blocksMtx.Lock()
if err != nil {
// re-add completingBlock into list
i.completingBlocks = append(i.completingBlocks, completingBlock)
metricFailedFlushes.Inc()
level.Error(log.Logger).Log("msg", "unable to complete block.", "tenantID", i.instanceID, "err", err)
i.blocksMtx.Unlock()
return uuid.Nil, err
}
completeBlockID := completeBlock.BlockMeta().BlockID
i.completeBlocks = append(i.completeBlocks, completeBlock)
i.blocksMtx.Unlock()

return completeBlockID, nil
}

func (i *instance) GetBlockToBeFlushed() *encoding.CompleteBlock {
// GetBlocksToBeFlushed gets a list of blocks that can be flushed to the backend
annanay25 marked this conversation as resolved.
Show resolved Hide resolved
func (i *instance) GetBlockToBeFlushed(blockID uuid.UUID) *encoding.CompleteBlock {
i.blocksMtx.Lock()
defer i.blocksMtx.Unlock()

for _, c := range i.completeBlocks {
if c.FlushedTime().IsZero() {
if c.BlockMeta().BlockID == blockID && c.FlushedTime().IsZero() {
return c
}
}
Expand Down Expand Up @@ -240,8 +263,8 @@ func (i *instance) FindTraceByID(id []byte) (*tempopb.Trace, error) {
allBytes = i.Combine(foundBytes, allBytes)

// completingBlock
if i.completingBlock != nil {
foundBytes, err = i.completingBlock.Find(id, i)
for _, c := range i.completingBlocks {
foundBytes, err = c.Find(id, i)
if err != nil {
return nil, fmt.Errorf("completingBlock.Find failed: %w", err)
}
Expand Down
Loading