Skip to content

FMWK-788 Improved bandwidth limiter #315

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,11 @@ func (c *Client) Restore(

switch config.EncoderType {
case EncoderTypeASB:
handler := newRestoreHandler[*models.Token](ctx, config, c.aerospikeClient, c.logger, streamingReader)
handler, err := newRestoreHandler[*models.Token](ctx, config, c.aerospikeClient, c.logger, streamingReader)
if err != nil {
return nil, fmt.Errorf("failed to create restore handler: %w", err)
}

handler.run()

return handler, nil
Expand All @@ -294,7 +298,11 @@ func (c *Client) Restore(
return nil, fmt.Errorf("failed to validate restore config: %w", err)
}

handler := newRestoreHandler[*models.ASBXToken](ctx, config, c.aerospikeClient, c.logger, streamingReader)
handler, err := newRestoreHandler[*models.ASBXToken](ctx, config, c.aerospikeClient, c.logger, streamingReader)
if err != nil {
return nil, fmt.Errorf("failed to create restore handler: %w", err)
}

handler.run()

return handler, nil
Expand Down
2 changes: 1 addition & 1 deletion cmd/internal/config/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func NewBackupConfigs(params *BackupParams, logger *slog.Logger,
slog.Bool("no_indexes", backupConfig.NoIndexes),
slog.Bool("no_udfs", backupConfig.NoUDFs),
slog.Int("records_per_second", backupConfig.RecordsPerSecond),
slog.Int("bandwidth", backupConfig.Bandwidth),
slog.Int64("bandwidth", backupConfig.Bandwidth),
slog.Uint64("file_limit", backupConfig.FileLimit),
slog.Bool("compact", backupConfig.Compact),
slog.Bool("not_ttl_only", backupConfig.NoTTLOnly),
Expand Down
2 changes: 1 addition & 1 deletion cmd/internal/config/configs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func TestMapBackupConfig_Success(t *testing.T) {

assert.Equal(t, 5, config.ParallelWrite, "The ParallelWrite should be set correctly")
assert.Equal(t, 5, config.ParallelRead, "The ParallelRead should be set correctly")
assert.Equal(t, 10*1024*1024, config.Bandwidth, "The Bandwidth should be set to 10 MiB in bytes")
assert.Equal(t, int64(10*1024*1024), config.Bandwidth, "The Bandwidth should be set to 10 MiB in bytes")
assert.True(t, config.ParallelNodes, "The ParallelNodes flag should be set correctly")
assert.True(t, config.Compact, "The Compact flag should be set correctly")
assert.ElementsMatch(t, []string{"node1", "node2"}, config.NodeList, "The NodeList should be set correctly")
Expand Down
2 changes: 1 addition & 1 deletion cmd/internal/flags/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func (f *Common) NewFlagSet() *pflag.FlagSet {
10000,
"Socket timeout in milliseconds. If this value is 0, it's set to --total-timeout.\n"+
"If both this and --total-timeout are 0, there is no socket idle time limit.")
flagSet.IntVarP(&f.Nice, "nice", "N",
flagSet.Int64VarP(&f.Nice, "nice", "N",
0,
"The limits for read/write storage bandwidth in MiB/s.\n"+
"The lower bound is 8MiB (maximum size of the Aerospike record). Default is 0 (no limit).")
Expand Down
2 changes: 1 addition & 1 deletion cmd/internal/models/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,5 @@ type Common struct {

// Nice is mapped to config.Bandwidth
// Is set in MiB then converted to bytes.
Nice int
Nice int64
}
1 change: 1 addition & 0 deletions cmd/internal/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ func NewService(
slog.Int("max_asynx_batches", restoreConfig.MaxAsyncBatches),
slog.Int64("extra_ttl", restoreConfig.ExtraTTL),
slog.Bool("ignore_records_error", restoreConfig.IgnoreRecordError),
slog.Int64("bandwidth", restoreConfig.Bandwidth),
)

warmUp := GetWarmUp(params.Restore.WarmUp, params.Restore.MaxAsyncBatches)
Expand Down
7 changes: 1 addition & 6 deletions config_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"time"

a "github.com/aerospike/aerospike-client-go/v8"
"github.com/aerospike/backup-go/internal/bandwidth"
"github.com/aerospike/backup-go/models"
)

Expand Down Expand Up @@ -106,7 +105,7 @@ type ConfigBackup struct {
// Bandwidth * base64ratio + metaOverhead
// Where: base64ratio = 1.34, metaOverhead = 16 * 1024
// Will not apply rps limit if Bandwidth is zero (default).
Bandwidth int
Bandwidth int64
// File size limit (in bytes) for the backup. If a backup file exceeds this
// size threshold, a new file will be created. 0 for no file size limit.
FileLimit uint64
Expand Down Expand Up @@ -205,10 +204,6 @@ func (c *ConfigBackup) validate() error {
return fmt.Errorf("bandwidth value must not be negative, got %d", c.Bandwidth)
}

if c.Bandwidth != 0 && c.Bandwidth < bandwidth.MinLimit {
return fmt.Errorf("bandwidth value must be greater than %d, got %d", bandwidth.MinLimit, c.Bandwidth)
}

if c.StateFile != "" && c.PageSize == 0 {
return fmt.Errorf("page size must be set if saving state to state file is enabled")
}
Expand Down
7 changes: 1 addition & 6 deletions config_restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"fmt"

a "github.com/aerospike/aerospike-client-go/v8"
"github.com/aerospike/backup-go/internal/bandwidth"
"github.com/aerospike/backup-go/models"
)

Expand Down Expand Up @@ -62,7 +61,7 @@ type ConfigRestore struct {
// Bandwidth * base64ratio + metaOverhead
// Where: base64ratio = 1.34, metaOverhead = 16 * 1024
// Will not apply rps limit if Bandwidth is zero (default).
Bandwidth int
Bandwidth int64
// Don't restore any records.
NoRecords bool
// Don't restore any secondary indexes.
Expand Down Expand Up @@ -120,10 +119,6 @@ func (c *ConfigRestore) validate() error {
return fmt.Errorf("bandwidth value must not be negative, got %d", c.Bandwidth)
}

if c.Bandwidth != 0 && c.Bandwidth < bandwidth.MinLimit {
return fmt.Errorf("bandwidth value must be greater than %d, got %d", bandwidth.MinLimit, c.Bandwidth)
}

if c.RecordsPerSecond < 0 {
return fmt.Errorf("rps value must not be negative, got %d", c.RecordsPerSecond)
}
Expand Down
8 changes: 7 additions & 1 deletion handler_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,12 @@ func newBackupHandler(

recCounter := newRecordCounter(ac, infoCLient, config, readerProcessor, logger)

limiter, err := bandwidth.NewLimiter(config.Bandwidth)
if err != nil {
cancel()
return nil, fmt.Errorf("failed to create bandwidth limiter: %w", err)
}

bh := &BackupHandler{
ctx: ctx,
cancel: cancel,
Expand All @@ -181,7 +187,7 @@ func newBackupHandler(
encoder: encoder,
readerProcessor: readerProcessor,
recordCounter: recCounter,
limiter: bandwidth.NewLimiter(config.Bandwidth),
limiter: limiter,
infoClient: infoCLient,
scanLimiter: scanLimiter,
state: state,
Expand Down
14 changes: 11 additions & 3 deletions handler_restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func newRestoreHandler[T models.TokenConstraint](
aerospikeClient AerospikeClient,
logger *slog.Logger,
reader StreamingReader,
) *RestoreHandler[T] {
) (*RestoreHandler[T], error) {
id := uuid.NewString()
logger = logging.WithHandler(logger, id, logging.HandlerTypeRestore, reader.GetType())
metricMessage := fmt.Sprintf("%s metrics %s", logging.HandlerTypeRestore, id)
Expand All @@ -102,13 +102,15 @@ func newRestoreHandler[T models.TokenConstraint](
errorsCh := make(chan error, 1)

stats := models.NewRestoreStats()

rpsCollector := metrics.NewCollector(
ctx,
logger,
metrics.RecordsPerSecond,
metricMessage,
config.MetricsEnabled,
)

kbpsCollector := metrics.NewCollector(
ctx,
logger,
Expand All @@ -134,6 +136,12 @@ func newRestoreHandler[T models.TokenConstraint](
logger,
)

limiter, err := bandwidth.NewLimiter(config.Bandwidth)
if err != nil {
cancel()
return nil, fmt.Errorf("failed to create bandwidth limiter: %w", err)
}

return &RestoreHandler[T]{
ctx: ctx,
cancel: cancel,
Expand All @@ -143,12 +151,12 @@ func newRestoreHandler[T models.TokenConstraint](
stats: stats,
id: id,
logger: logger,
limiter: bandwidth.NewLimiter(config.Bandwidth),
limiter: limiter,
errors: errorsCh,
done: make(chan struct{}, 1),
rpsCollector: rpsCollector,
kbpsCollector: kbpsCollector,
}
}, nil
}

func (rh *RestoreHandler[T]) run() {
Expand Down
112 changes: 112 additions & 0 deletions internal/bandwidth/bucket.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
// Copyright 2024 Aerospike, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package bandwidth

import (
"fmt"
"sync"
"time"
)

// Bucket implements a thread-safe leaky bucket rate limiter.
type Bucket struct {
// mu to make bucket thread safe.
// As bucket is used for limiting speed, one mutex won't affect speed.
mu sync.Mutex

// Maximum tokens in the bucket.
limit int64

// Rate at which tokens are added (tokens per nanosecond).
rate float64

// Current available tokens (can be fractional for precise calculations).
tokens float64

// Last time we leaked tokens.
lastLeak time.Time
}

// NewBucket creates a new rate limiter with the specified limit and interval.
func NewBucket(limit int64, interval time.Duration) (*Bucket, error) {
if limit <= 0 {
return nil, fmt.Errorf("limit must be greater than 0")
}

if interval <= 0 {
return nil, fmt.Errorf("interval must be greater than 0")
}
// Calculate rate as tokens per nanosecond for precise calculations.
rate := float64(limit) / float64(interval.Nanoseconds())

return &Bucket{
limit: limit,
rate: rate,
tokens: float64(limit),
lastLeak: time.Now(),
}, nil
}

// Wait blocks until n tokens are available.
// It allows waiting for amounts larger than the limit.
func (rl *Bucket) Wait(n int64) {
if n < 1 {
return
}

rl.mu.Lock()
defer rl.mu.Unlock()

// Leak tokens.
rl.leak()

// If we have enough tokens, use them and return.
if rl.tokens >= float64(n) {
rl.tokens -= float64(n)
return
}

// Calculate exact time needed to accumulate required tokens.
tokensNeeded := float64(n) - rl.tokens
waitDuration := time.Duration(tokensNeeded / rl.rate)

// Wait for exact duration.
time.Sleep(waitDuration)

// After waiting, leak tokens again and consume.
rl.leak()
rl.tokens -= float64(n)
}

// leak updates the token count based on elapsed time.
func (rl *Bucket) leak() {
now := time.Now()
elapsed := now.Sub(rl.lastLeak)

// Calculate exact tokens to add based on elapsed time.
tokensToAdd := rl.rate * float64(elapsed.Nanoseconds())

if tokensToAdd > 0 {
rl.tokens += tokensToAdd

// Don't overflow the bucket.
if rl.tokens > float64(rl.limit) {
rl.tokens = float64(rl.limit)
}

// Update last leak time to current time.
rl.lastLeak = now
}
}
Loading
Loading