Skip to content

Commit

Permalink
Merge branch 'main' into filter-out-old-read-only-instances-on-read
Browse files Browse the repository at this point in the history
  • Loading branch information
jhalterman committed Aug 16, 2024
2 parents f1fd192 + cbfe0ab commit 0187781
Show file tree
Hide file tree
Showing 14 changed files with 533 additions and 27 deletions.
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
* [CHANGE] Cache: Remove superfluous `cache.RemoteCacheClient` interface and unify all caches using the `cache.Cache` interface. #520
* [CHANGE] Updated the minimum required Go version to 1.21. #540
* [CHANGE] memberlist: Metric `memberlist_client_messages_in_broadcast_queue` is now split into `queue="local"` and `queue="gossip"` values. #539
* [CHANGE] `Service.AddListener` and `Manager.AddListener` now return function for stopping the listener. #564
* [FEATURE] Cache: Add support for configuring a Redis cache backend. #268 #271 #276
* [FEATURE] Add support for waiting on the rate limiter using the new `WaitN` method. #279
* [FEATURE] Add `log.BufferedLogger` type. #338
Expand Down Expand Up @@ -219,8 +220,9 @@
* [ENHANCEMENT] memberlist: Added `-<prefix>memberlist.broadcast-timeout-for-local-updates-on-shutdown` option to set timeout for sending locally-generated updates on shutdown, instead of previously hardcoded 10s (which is still the default). #539
* [ENHANCEMENT] tracing: add ExtractTraceSpanID function.
* [EHNANCEMENT] crypto/tls: Support reloading client certificates #537 #552
* [ENHANCEMENT] Add read only support for ingesters in the ring and lifecycler. #553 #554
* [ENHANCEMENT] Add read only support for ingesters in the ring and lifecycler. #553 #554 #556
* [ENHANCEMENT] Added new ring methods to expose number of writable instances with tokens per zone, and overall. #560 #562
* [ENHANCEMENT] `services.FailureWatcher` can now be closed, which unregisters all service and manager listeners, and closes channel used to receive errors. #564
* [CHANGE] Backoff: added `Backoff.ErrCause()` which is like `Backoff.Err()` but returns the context cause if backoff is terminated because the context has been canceled. #538
* [BUGFIX] spanlogger: Support multiple tenant IDs. #59
* [BUGFIX] Memberlist: fixed corrupted packets when sending compound messages with more than 255 messages or messages bigger than 64KB. #85
Expand Down Expand Up @@ -256,3 +258,4 @@
* [BUGFIX] Memcached: Don't truncate sub-second TTLs to 0 which results in them being cached forever. #530
* [BUGFIX] Cache: initialise the `operation_failures_total{reason="connect-timeout"}` metric to 0 for each cache operation type on startup. #545
* [BUGFIX] spanlogger: include correct caller information in log messages logged through a `SpanLogger`. #547
* [BUGFIX] Ring: shuffle shard without lookback no longer returns entire ring when shard size >= number of instances. Instead proper subring is computed, with correct number of instances in each zone. Returning entire ring was a bug, and such ring can contain instances that were not supposed to be used, if zones are not balanced. #554 #556
3 changes: 2 additions & 1 deletion grpcutil/health_check_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,9 @@ func (s *mockService) State() services.State {
return s.state
}

func (s *mockService) AddListener(listener services.Listener) {
func (s *mockService) AddListener(listener services.Listener) func() {
s.listeners = append(s.listeners, listener)
return func() {}
}

func (s *mockService) StartAsync(_ context.Context) error { return nil }
Expand Down
11 changes: 8 additions & 3 deletions ring/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -708,8 +708,13 @@ func (r *Ring) updateRingMetrics(compareResult CompareResult) {
//
// Subring returned by this method does not contain instances that have read-only field set.
func (r *Ring) ShuffleShard(identifier string, size int) ReadRing {
// size == 0 or size > num(ingesters) is handled in shuffleShard method.
// It is safe to use such size in caching key, because caches are invalidated when number of instances in the ring changes.
// Use all possible instances if shuffle sharding is disabled. We don't set size to r.InstancesCount(), because
// that could lead to not all instances being returned when ring zones are unbalanced.
// Reason for not returning entire ring directly is that we need to filter out read-only instances.
if size <= 0 {
size = math.MaxInt
}

if cached := r.getCachedShuffledSubring(identifier, size); cached != nil {
return cached
}
Expand Down Expand Up @@ -795,7 +800,7 @@ func (r *Ring) shuffleShard(identifier string, size int, lookbackPeriod time.Dur
actualZones = []string{""}
}

shard := make(map[string]InstanceDesc, min(r.InstancesCount(), size))
shard := make(map[string]InstanceDesc, min(len(r.ringDesc.Ingesters), size))

// We need to iterate zones always in the same order to guarantee stability.
for _, zone := range actualZones {
Expand Down
93 changes: 91 additions & 2 deletions ring/ring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1461,6 +1461,83 @@ func TestRing_ShuffleShard(t *testing.T) {
expectedZoneCount: 2,
expectedInstancesInZoneCount: map[string]int{"zone-a": 1, "zone-b": 1, "zone-c": 0},
},
"multiple zones, shard size == num instances, balanced zones": {
ringInstances: map[string]InstanceDesc{
"instance-1": {Addr: "127.0.0.1", Zone: "zone-a", Tokens: gen.GenerateTokens(128, nil)},
"instance-2": {Addr: "127.0.0.2", Zone: "zone-a", Tokens: gen.GenerateTokens(128, nil)},
"instance-3": {Addr: "127.0.0.3", Zone: "zone-b", Tokens: gen.GenerateTokens(128, nil)},
"instance-4": {Addr: "127.0.0.4", Zone: "zone-b", Tokens: gen.GenerateTokens(128, nil)},
"instance-5": {Addr: "127.0.0.5", Zone: "zone-c", Tokens: gen.GenerateTokens(128, nil)},
"instance-6": {Addr: "127.0.0.6", Zone: "zone-c", Tokens: gen.GenerateTokens(128, nil)},
},
shardSize: 6,
zoneAwarenessEnabled: true,
expectedSize: 6,
expectedDistribution: []int{2, 2, 2},
expectedZoneCount: 3,
expectedInstancesInZoneCount: map[string]int{"zone-a": 2, "zone-b": 2, "zone-c": 2},
},
"multiple zones, shard size == num instances, unbalanced zones": {
ringInstances: map[string]InstanceDesc{
"instance-1": {Addr: "127.0.0.1", Zone: "zone-a", Tokens: gen.GenerateTokens(128, nil)},
"instance-2": {Addr: "127.0.0.2", Zone: "zone-a", Tokens: gen.GenerateTokens(128, nil)},
"instance-3": {Addr: "127.0.0.3", Zone: "zone-a", Tokens: gen.GenerateTokens(128, nil)},
"instance-4": {Addr: "127.0.0.4", Zone: "zone-b", Tokens: gen.GenerateTokens(128, nil)},
"instance-5": {Addr: "127.0.0.5", Zone: "zone-b", Tokens: gen.GenerateTokens(128, nil)},
"instance-6": {Addr: "127.0.0.6", Zone: "zone-c", Tokens: gen.GenerateTokens(128, nil)},
},
shardSize: 6,
zoneAwarenessEnabled: true,
expectedSize: 5,
expectedDistribution: []int{2, 2, 1},
expectedZoneCount: 3,
expectedInstancesInZoneCount: map[string]int{"zone-a": 2, "zone-b": 2, "zone-c": 1},
},
"multiple zones, shard size > num instances, balanced zones": {
ringInstances: map[string]InstanceDesc{
"instance-1": {Addr: "127.0.0.1", Zone: "zone-a", Tokens: gen.GenerateTokens(128, nil)},
"instance-2": {Addr: "127.0.0.2", Zone: "zone-b", Tokens: gen.GenerateTokens(128, nil)},
"instance-3": {Addr: "127.0.0.3", Zone: "zone-c", Tokens: gen.GenerateTokens(128, nil)},
},
shardSize: 3,
zoneAwarenessEnabled: true,
expectedSize: 3,
expectedDistribution: []int{1, 1, 1},
expectedZoneCount: 3,
expectedInstancesInZoneCount: map[string]int{"zone-a": 1, "zone-b": 1, "zone-c": 1},
},
"multiple zones, shard size > num instances, unbalanced zones": {
ringInstances: map[string]InstanceDesc{
"instance-1": {Addr: "127.0.0.1", Zone: "zone-a", Tokens: gen.GenerateTokens(128, nil)},
"instance-2": {Addr: "127.0.0.2", Zone: "zone-a", Tokens: gen.GenerateTokens(128, nil)},
"instance-3": {Addr: "127.0.0.3", Zone: "zone-a", Tokens: gen.GenerateTokens(128, nil)},
"instance-4": {Addr: "127.0.0.4", Zone: "zone-b", Tokens: gen.GenerateTokens(128, nil)},
"instance-5": {Addr: "127.0.0.5", Zone: "zone-b", Tokens: gen.GenerateTokens(128, nil)},
"instance-6": {Addr: "127.0.0.6", Zone: "zone-c", Tokens: gen.GenerateTokens(128, nil)},
},
shardSize: 9,
zoneAwarenessEnabled: true,
expectedSize: 6,
expectedDistribution: []int{3, 2, 1},
expectedZoneCount: 3,
expectedInstancesInZoneCount: map[string]int{"zone-a": 3, "zone-b": 2, "zone-c": 1},
},
"multiple zones, shard size = 0, unbalanced zones": {
ringInstances: map[string]InstanceDesc{
"instance-1": {Addr: "127.0.0.1", Zone: "zone-a", Tokens: gen.GenerateTokens(128, nil)},
"instance-2": {Addr: "127.0.0.2", Zone: "zone-a", Tokens: gen.GenerateTokens(128, nil)},
"instance-3": {Addr: "127.0.0.3", Zone: "zone-a", Tokens: gen.GenerateTokens(128, nil)},
"instance-4": {Addr: "127.0.0.4", Zone: "zone-b", Tokens: gen.GenerateTokens(128, nil)},
"instance-5": {Addr: "127.0.0.5", Zone: "zone-b", Tokens: gen.GenerateTokens(128, nil)},
"instance-6": {Addr: "127.0.0.6", Zone: "zone-c", Tokens: gen.GenerateTokens(128, nil)},
},
shardSize: 0,
zoneAwarenessEnabled: true,
expectedSize: 6,
expectedDistribution: []int{3, 2, 1},
expectedZoneCount: 3,
expectedInstancesInZoneCount: map[string]int{"zone-a": 3, "zone-b": 2, "zone-c": 1},
},
}

for testName, testData := range tests {
Expand Down Expand Up @@ -3162,7 +3239,7 @@ func TestRing_ShuffleShardWithLookback_CachingConcurrency(t *testing.T) {
func BenchmarkRing_ShuffleShard(b *testing.B) {
for _, numInstances := range []int{50, 100, 1000} {
for _, numZones := range []int{1, 3} {
for _, shardSize := range []int{3, 10, 30} {
for _, shardSize := range []int{0, 3, 10, 30} {
b.Run(fmt.Sprintf("num instances = %d, num zones = %d, shard size = %d", numInstances, numZones, shardSize), func(b *testing.B) {
benchmarkShuffleSharding(b, numInstances, numZones, 128, shardSize, false)
})
Expand All @@ -3174,7 +3251,7 @@ func BenchmarkRing_ShuffleShard(b *testing.B) {
func BenchmarkRing_ShuffleShardCached(b *testing.B) {
for _, numInstances := range []int{50, 100, 1000} {
for _, numZones := range []int{1, 3} {
for _, shardSize := range []int{3, 10, 30} {
for _, shardSize := range []int{0, 3, 10, 30} {
b.Run(fmt.Sprintf("num instances = %d, num zones = %d, shard size = %d", numInstances, numZones, shardSize), func(b *testing.B) {
benchmarkShuffleSharding(b, numInstances, numZones, 128, shardSize, true)
})
Expand Down Expand Up @@ -3207,6 +3284,18 @@ func BenchmarkRing_ShuffleShard_LargeShardSize(b *testing.B) {
benchmarkShuffleSharding(b, numInstances, numZones, numTokens, shardSize, cacheEnabled)
}

func BenchmarkRing_ShuffleShard_ShardSize_0(b *testing.B) {
const (
numInstances = 90
numZones = 3
numTokens = 512
shardSize = 0
cacheEnabled = false
)

benchmarkShuffleSharding(b, numInstances, numZones, numTokens, shardSize, cacheEnabled)
}

func benchmarkShuffleSharding(b *testing.B, numInstances, numZones, numTokens, shardSize int, cache bool) {
// Initialise the ring.
ringDesc := &Desc{Ingesters: generateRingInstances(initTokenGenerator(b), numInstances, numZones, numTokens)}
Expand Down
3 changes: 3 additions & 0 deletions ring/shard/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ func ShuffleShardSeed(identifier, zone string) int64 {
// zone when zone-aware replication is enabled. The algorithm expects the shard size to be divisible
// by the number of zones, in order to have nodes balanced across zones. If it's not, we do round up.
func ShuffleShardExpectedInstancesPerZone(shardSize, numZones int) int {
if shardSize == math.MaxInt {
return math.MaxInt
}
return int(math.Ceil(float64(shardSize) / float64(numZones)))
}

Expand Down
6 changes: 6 additions & 0 deletions ring/shard/shard_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package shard

import (
"math"
"testing"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -38,6 +39,11 @@ func TestShuffleShardExpectedInstancesPerZone(t *testing.T) {
numZones: 3,
expected: 2,
},
{
shardSize: math.MaxInt,
numZones: 3,
expected: math.MaxInt,
},
}

for _, test := range tests {
Expand Down
48 changes: 42 additions & 6 deletions services/basic_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ package services
import (
"context"
"fmt"
"slices"
"sync"

"go.uber.org/atomic"
)

// StartingFn is called when service enters Starting state. If StartingFn returns
Expand Down Expand Up @@ -325,26 +328,59 @@ func (b *BasicService) State() State {
}

// AddListener is part of Service interface.
func (b *BasicService) AddListener(listener Listener) {
func (b *BasicService) AddListener(listener Listener) func() {
b.stateMu.Lock()
defer b.stateMu.Unlock()

if b.state == Terminated || b.state == Failed {
// no more state transitions will be done, and channel wouldn't get closed
return
return func() {}
}

// There are max 4 state transitions. We use buffer to avoid blocking the sender,
// which holds service lock.
ch := make(chan func(l Listener), 4)
b.listeners = append(b.listeners, ch)
listenerCh := make(chan func(l Listener), 4)
b.listeners = append(b.listeners, listenerCh)

stop := make(chan struct{})
stopClosed := atomic.NewBool(false)

wg := sync.WaitGroup{}
wg.Add(1)

// each listener has its own goroutine, processing events.
go func() {
for lfn := range ch {
lfn(listener)
defer wg.Done()
for {
select {
// Process events from service.
case lfn, ok := <-listenerCh:
if !ok {
return
}
lfn(listener)

case <-stop:
return
}
}
}()

return func() {
if stopClosed.CompareAndSwap(false, true) {
// Tell listener goroutine to stop.
close(stop)
}

// Remove channel for notifications from service's list of listeners.
b.stateMu.Lock()
b.listeners = slices.DeleteFunc(b.listeners, func(c chan func(l Listener)) bool {
return listenerCh == c
})
b.stateMu.Unlock()

wg.Wait()
}
}

// lock must be held here. Read lock would be good enough, but since
Expand Down
74 changes: 74 additions & 0 deletions services/basic_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/stretchr/testify/require"
"go.uber.org/goleak"
)

var _ Service = &BasicService{} // just make sure that BasicService implements Service
Expand Down Expand Up @@ -326,3 +327,76 @@ func TestServiceName(t *testing.T) {
s.WithName("new")
require.Equal(t, "test name", DescribeService(s))
}

func TestListenerCancellationUnstartedService(t *testing.T) {
defer goleak.VerifyNone(t)

s := NewIdleService(nil, nil)

sl := newServiceListener()
for i := 0; i < 10; i++ {
stop := s.AddListener(sl)
stop()
// multiple stop() calls are ignored
stop()
}
}

func TestListenerCancellationTerminatedService(t *testing.T) {
defer goleak.VerifyNone(t)

s := NewIdleService(nil, nil)
require.NoError(t, StopAndAwaitTerminated(context.Background(), s))

sl := newServiceListener()
for i := 0; i < 10; i++ {
stop := s.AddListener(sl)
stop()
// multiple stop() calls are ignored
stop()
}
}

func TestListenerCancellationRunningService(t *testing.T) {
s := NewIdleService(nil, nil)
require.NoError(t, StartAndAwaitRunning(context.Background(), s))

// Ignore goroutine started by service -- we're testing for goroutines created by listeners.
defer goleak.VerifyNone(t, goleak.IgnoreCurrent(), goleak.Cleanup(func(_ int) {
require.NoError(t, StopAndAwaitTerminated(context.Background(), s))

// After stopping service, we can do another check.
goleak.VerifyNone(t)
}))

const count = 10
sl := newServiceListener()

// Now that service is running, we add single listener multiple times, and keep all functions to unregister it.
// This listener is unregistered before other state transitions of service "s",
// so it won't receive any notifications.

var stopFns []func()
for i := 0; i < count; i++ {
stop := s.AddListener(sl)
stopFns = append(stopFns, stop)
}

// Check for number of listeners in the service.
s.stateMu.Lock()
listenersCount := len(s.listeners)
s.stateMu.Unlock()
require.Equal(t, listenersCount, count)

// Unregister all listeners. Calling same function second time has no effect.
for _, stop := range stopFns {
stop()
stop()
}

// Check for number of listeners again.
s.stateMu.Lock()
listenersCount = len(s.listeners)
s.stateMu.Unlock()
require.Equal(t, listenersCount, 0)
}
Loading

0 comments on commit 0187781

Please sign in to comment.