Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch compactor to new BasicLifecycler with AutoForgetDelegate #1178

Merged
merged 6 commits into from
Dec 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
* [ENHANCEMENT] jsonnet: set rollingUpdate.maxSurge to 3 for distributor, frontend and queriers [#1164](https://github.com/grafana/tempo/pull/1164) (@kvrhdn)
* [ENHANCEMENT] Reduce search data file sizes by optimizing contents [#1165](https://github.com/grafana/tempo/pull/1165) (@mdisibio)
* [ENHANCEMENT] Add `tempo_ingester_live_traces` metric [#1170](https://github.com/grafana/tempo/pull/1170) (@mdisibio)
* [ENHANCEMENT] Update compactor ring to automatically forget unhealthy entries [#1178](https://github.com/grafana/tempo/pull/1178) (@mdisibio)
* [BUGFIX] Add process name to vulture traces to work around display issues [#1127](https://github.com/grafana/tempo/pull/1127) (@mdisibio)
* [BUGFIX] Fixed issue where compaction sometimes dropped spans. [#1130](https://github.com/grafana/tempo/pull/1130) (@joe-elliott)
* [BUGFIX] Ensure that the admin client jsonnet has correct S3 bucket property. (@hedss)
Expand Down
169 changes: 118 additions & 51 deletions modules/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/cortexproject/cortex/pkg/util/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/kv"
"github.com/grafana/dskit/ring"
"github.com/grafana/dskit/services"
"github.com/pkg/errors"
Expand All @@ -19,7 +20,17 @@ import (
)

const (
waitOnStartup = 90 * time.Second
// ringAutoForgetUnhealthyPeriods is how many consecutive timeout periods an unhealthy instance
// in the ring will be automatically removed.
ringAutoForgetUnhealthyPeriods = 2

// We use a safe default instead of exposing to config option to the user
// in order to simplify the config.
ringNumTokens = 512
mdisibio marked this conversation as resolved.
Show resolved Hide resolved
)

var (
ringOp = ring.NewOp([]ring.InstanceState{ring.ACTIVE}, nil)
mdisibio marked this conversation as resolved.
Show resolved Hide resolved
)

type Compactor struct {
Expand All @@ -30,7 +41,7 @@ type Compactor struct {
overrides *overrides.Overrides

// Ring used for sharding compactions.
ringLifecycler *ring.Lifecycler
ringLifecycler *ring.BasicLifecycler
Ring *ring.Ring

subservices *services.Manager
Expand All @@ -45,50 +56,94 @@ func New(cfg Config, store storage.Store, overrides *overrides.Overrides, reg pr
overrides: overrides,
}

subservices := []services.Service(nil)
if c.isSharded() {
lifecyclerCfg := c.cfg.ShardingRing.ToLifecyclerConfig()
lifecycler, err := ring.NewLifecycler(lifecyclerCfg, ring.NewNoopFlushTransferer(), "compactor", cfg.OverrideRingKey, false, log.Logger, prometheus.WrapRegistererWithPrefix("cortex_", reg))
reg = prometheus.WrapRegistererWithPrefix("cortex_", reg)

lifecyclerStore, err := kv.NewClient(
cfg.ShardingRing.KVStore,
ring.GetCodec(),
joe-elliott marked this conversation as resolved.
Show resolved Hide resolved
kv.RegistererWithKVName(reg, ring.CompactorRingKey+"-lifecycler"),
log.Logger,
)
if err != nil {
return nil, errors.Wrap(err, "unable to initialize compactor ring lifecycler")
return nil, err
}
c.ringLifecycler = lifecycler
subservices = append(subservices, c.ringLifecycler)

ring, err := ring.New(lifecyclerCfg.RingConfig, "compactor", cfg.OverrideRingKey, log.Logger, prometheus.WrapRegistererWithPrefix("cortex_", reg))
delegate := ring.BasicLifecyclerDelegate(c)
delegate = ring.NewLeaveOnStoppingDelegate(delegate, log.Logger)
delegate = ring.NewAutoForgetDelegate(ringAutoForgetUnhealthyPeriods*cfg.ShardingRing.HeartbeatTimeout, delegate, log.Logger)

bcfg, err := toBasicLifecyclerConfig(cfg.ShardingRing, log.Logger)
if err != nil {
return nil, errors.Wrap(err, "unable to initialize compactor ring")
return nil, err
}
c.Ring = ring
subservices = append(subservices, c.Ring)

c.subservices, err = services.NewManager(subservices...)
c.ringLifecycler, err = ring.NewBasicLifecycler(bcfg, ring.CompactorRingKey, cfg.OverrideRingKey, lifecyclerStore, delegate, log.Logger, reg)
if err != nil {
return nil, fmt.Errorf("failed to create subservices %w", err)
return nil, errors.Wrap(err, "unable to initialize compactor ring lifecycler")
}

c.Ring, err = ring.New(c.cfg.ShardingRing.ToLifecyclerConfig().RingConfig, ring.CompactorRingKey, cfg.OverrideRingKey, log.Logger, reg)
if err != nil {
return nil, errors.Wrap(err, "unable to initialize compactor ring")
}
c.subservicesWatcher = services.NewFailureWatcher()
c.subservicesWatcher.WatchManager(c.subservices)
}

c.Service = services.NewBasicService(c.starting, c.running, c.stopping)

return c, nil
}

func (c *Compactor) starting(ctx context.Context) error {
if c.subservices != nil {
func (c *Compactor) starting(ctx context.Context) (err error) {
// In case this function will return error we want to unregister the instance
// from the ring. We do it ensuring dependencies are gracefully stopped if they
// were already started.
defer func() {
if err == nil || c.subservices == nil {
mdisibio marked this conversation as resolved.
Show resolved Hide resolved
return
}

if stopErr := services.StopManagerAndAwaitStopped(context.Background(), c.subservices); stopErr != nil {
level.Error(log.Logger).Log("msg", "failed to gracefully stop compactor dependencies", "err", stopErr)
}
}()

if c.isSharded() {
c.subservices, err = services.NewManager(c.ringLifecycler, c.Ring)
if err != nil {
return fmt.Errorf("failed to create subservices %w", err)
}
c.subservicesWatcher = services.NewFailureWatcher()
c.subservicesWatcher.WatchManager(c.subservices)

err := services.StartManagerAndAwaitHealthy(ctx, c.subservices)
if err != nil {
return fmt.Errorf("failed to start subservices %w", err)
}

ctx := context.Background()

level.Info(log.Logger).Log("msg", "waiting to be active in the ring")
err = c.waitRingActive(ctx)
if err != nil {
// Wait until the ring client detected this instance in the ACTIVE state.
level.Info(log.Logger).Log("msg", "waiting until compactor is ACTIVE in the ring")
ctxWithTimeout, cancel := context.WithTimeout(ctx, c.cfg.ShardingRing.WaitActiveInstanceTimeout)
defer cancel()
if err := ring.WaitInstanceState(ctxWithTimeout, c.Ring, c.ringLifecycler.GetInstanceID(), ring.ACTIVE); err != nil {
return err
}
level.Info(log.Logger).Log("msg", "compactor is ACTIVE in the ring")

// In the event of a cluster cold start we may end up in a situation where each new compactor
// instance starts at a slightly different time and thus each one starts with a different state
// of the ring. It's better to just wait the ring stability for a short time.
if c.cfg.ShardingRing.WaitStabilityMinDuration > 0 {
minWaiting := c.cfg.ShardingRing.WaitStabilityMinDuration
maxWaiting := c.cfg.ShardingRing.WaitStabilityMaxDuration

level.Info(log.Logger).Log("msg", "waiting until compactor ring topology is stable", "min_waiting", minWaiting.String(), "max_waiting", maxWaiting.String())
if err := ring.WaitRingStability(ctx, c.Ring, ringOp, minWaiting, maxWaiting); err != nil {
level.Warn(log.Logger).Log("msg", "compactor ring topology is not stable after the max waiting time, proceeding anyway")
} else {
level.Info(log.Logger).Log("msg", "compactor ring topology is stable")
}
}
}

// this will block until one poll cycle is complete
Expand All @@ -98,12 +153,8 @@ func (c *Compactor) starting(ctx context.Context) error {
}

func (c *Compactor) running(ctx context.Context) error {
go func() {
level.Info(log.Logger).Log("msg", "waiting for compaction ring to settle", "waitDuration", waitOnStartup)
time.Sleep(waitOnStartup)
level.Info(log.Logger).Log("msg", "enabling compaction")
c.store.EnableCompaction(&c.cfg.Compactor, c, c)
}()
level.Info(log.Logger).Log("msg", "enabling compaction")
c.store.EnableCompaction(&c.cfg.Compactor, c, c)

if c.subservices != nil {
select {
Expand Down Expand Up @@ -140,7 +191,7 @@ func (c *Compactor) Owns(hash string) bool {
_, _ = hasher.Write([]byte(hash))
hash32 := hasher.Sum32()

rs, err := c.Ring.Get(hash32, ring.Read, []ring.InstanceDesc{}, nil, nil)
rs, err := c.Ring.Get(hash32, ringOp, []ring.InstanceDesc{}, nil, nil)
if err != nil {
level.Error(log.Logger).Log("msg", "failed to get ring", "err", err)
return false
Expand All @@ -151,9 +202,11 @@ func (c *Compactor) Owns(hash string) bool {
return false
}

level.Debug(log.Logger).Log("msg", "checking addresses", "owning_addr", rs.Instances[0].Addr, "this_addr", c.ringLifecycler.Addr)
ringAddr := c.ringLifecycler.GetInstanceAddr()

level.Debug(log.Logger).Log("msg", "checking addresses", "owning_addr", rs.Instances[0].Addr, "this_addr", ringAddr)

return rs.Instances[0].Addr == c.ringLifecycler.Addr
return rs.Instances[0].Addr == ringAddr
}

// Combine implements common.ObjectCombiner
Expand All @@ -166,27 +219,41 @@ func (c *Compactor) BlockRetentionForTenant(tenantID string) time.Duration {
return c.overrides.BlockRetention(tenantID)
}

func (c *Compactor) waitRingActive(ctx context.Context) error {
for {
// Check if the ingester is ACTIVE in the ring and our ring client
// has detected it.
if rs, err := c.Ring.GetAllHealthy(ring.Reporting); err == nil {
for _, i := range rs.Instances {
if i.GetAddr() == c.ringLifecycler.Addr && i.GetState() == ring.ACTIVE {
return nil
}
}
}
func (c *Compactor) isSharded() bool {
return c.cfg.ShardingRing.KVStore.Store != ""
}

select {
case <-time.After(time.Second):
// Nothing to do
case <-ctx.Done():
return ctx.Err()
}
// OnRingInstanceRegister is called while the lifecycler is registering the
// instance within the ring and should return the state and set of tokens to
// use for the instance itself.
func (c *Compactor) OnRingInstanceRegister(lifecycler *ring.BasicLifecycler, ringDesc ring.Desc, instanceExists bool, instanceID string, instanceDesc ring.InstanceDesc) (ring.InstanceState, ring.Tokens) {
// When we initialize the compactor instance in the ring we want to start from
// a clean situation, so whatever is the state we set it ACTIVE, while we keep existing
// tokens (if any) or the ones loaded from file.
var tokens []uint32
if instanceExists {
tokens = instanceDesc.GetTokens()
}

takenTokens := ringDesc.GetTokens()
newTokens := ring.GenerateTokens(ringNumTokens-len(tokens), takenTokens)

// Tokens sorting will be enforced by the parent caller.
tokens = append(tokens, newTokens...)

return ring.ACTIVE, tokens
}

func (c *Compactor) isSharded() bool {
return c.cfg.ShardingRing.KVStore.Store != ""
// OnRingInstanceTokens is called once the instance tokens are set and are
// stable within the ring (honoring the observe period, if set).
func (c *Compactor) OnRingInstanceTokens(lifecycler *ring.BasicLifecycler, tokens ring.Tokens) {}

// OnRingInstanceStopping is called while the lifecycler is stopping. The lifecycler
// will continue to hearbeat the ring the this function is executing and will proceed
// to unregister the instance from the ring only after this function has returned.
func (c *Compactor) OnRingInstanceStopping(lifecycler *ring.BasicLifecycler) {}

// OnRingInstanceHeartbeat is called while the instance is updating its heartbeat
// in the ring.
func (c *Compactor) OnRingInstanceHeartbeat(lifecycler *ring.BasicLifecycler, ringDesc *ring.Desc, instanceDesc *ring.InstanceDesc) {
}
18 changes: 18 additions & 0 deletions modules/compactor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package compactor

import (
"flag"
"fmt"
"time"

cortex_compactor "github.com/cortexproject/cortex/pkg/compactor"
"github.com/go-kit/log"
"github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/ring"
"github.com/grafana/tempo/pkg/util"
Expand Down Expand Up @@ -37,3 +39,19 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet)
f.DurationVar(&cfg.Compactor.MaxCompactionRange, util.PrefixConfig(prefix, "compaction.compaction-window"), time.Hour, "Maximum time window across which to compact blocks.")
cfg.OverrideRingKey = ring.CompactorRingKey
}

func toBasicLifecyclerConfig(cfg cortex_compactor.RingConfig, logger log.Logger) (ring.BasicLifecyclerConfig, error) {
instanceAddr, err := ring.GetInstanceAddr(cfg.InstanceAddr, cfg.InstanceInterfaceNames, logger)
if err != nil {
return ring.BasicLifecyclerConfig{}, err
}

instancePort := ring.GetInstancePort(cfg.InstancePort, cfg.ListenPort)

return ring.BasicLifecyclerConfig{
ID: cfg.InstanceID,
Addr: fmt.Sprintf("%s:%d", instanceAddr, instancePort),
HeartbeatPeriod: cfg.HeartbeatPeriod,
NumTokens: ringNumTokens,
}, nil
}