Skip to content

Commit

Permalink
Make notify interval configurable
Browse files Browse the repository at this point in the history
Signed-off-by: Brad Davidson <brad.davidson@rancher.com>
  • Loading branch information
brandond committed Jan 19, 2024
1 parent f0b2a70 commit bffa8c9
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 13 deletions.
6 changes: 6 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,12 @@ func main() {
Usage: "Enable net/http/pprof handlers on the metrics bind address. Default is false.",
Destination: &metricsConfig.EnableProfiling,
},
cli.DurationFlag{
Name: "watch-progress-notify-interval",
Usage: "Interval between periodic watch progress notifications. Default is 10m.",
Destination: &config.NotifyInterval,
Value: time.Minute * 10,
},
cli.BoolFlag{Name: "debug"},
}
app.Action = run
Expand Down
4 changes: 3 additions & 1 deletion pkg/endpoint/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"net"
"os"
"strings"
"time"

"github.com/k3s-io/kine/pkg/drivers/dqlite"
"github.com/k3s-io/kine/pkg/drivers/generic"
Expand Down Expand Up @@ -44,6 +45,7 @@ type Config struct {
ServerTLSConfig tls.Config
BackendTLSConfig tls.Config
MetricsRegisterer prometheus.Registerer
NotifyInterval time.Duration
}

type ETCDConfig struct {
Expand Down Expand Up @@ -80,7 +82,7 @@ func Listen(ctx context.Context, config Config) (ETCDConfig, error) {
}

// set up GRPC server and register services
b := server.New(backend, endpointScheme(config))
b := server.New(backend, endpointScheme(config), config.NotifyInterval)
grpcServer, err := grpcServer(config)
if err != nil {
return ETCDConfig{}, errors.Wrap(err, "creating GRPC server")
Expand Down
6 changes: 4 additions & 2 deletions pkg/server/limited.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@ package server

import (
"context"
"time"

"go.etcd.io/etcd/api/v3/etcdserverpb"
)

type LimitedServer struct {
backend Backend
scheme string
notifyInterval time.Duration
backend Backend
scheme string
}

func (l *LimitedServer) Range(ctx context.Context, r *etcdserverpb.RangeRequest) (*RangeResponse, error) {
Expand Down
9 changes: 6 additions & 3 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package server

import (
"time"

"go.etcd.io/etcd/api/v3/etcdserverpb"
"google.golang.org/grpc"
"google.golang.org/grpc/health"
Expand All @@ -12,11 +14,12 @@ type KVServerBridge struct {
limited *LimitedServer
}

func New(backend Backend, scheme string) *KVServerBridge {
func New(backend Backend, scheme string, notifyInterval time.Duration) *KVServerBridge {
return &KVServerBridge{
limited: &LimitedServer{
backend: backend,
scheme: scheme,
notifyInterval: notifyInterval,
backend: backend,
scheme: scheme,
},
}
}
Expand Down
13 changes: 6 additions & 7 deletions pkg/server/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,17 @@ import (
)

var watchID int64
var progressReportInterval = 10 * time.Minute

// explicit interface check
var _ etcdserverpb.WatchServer = (*KVServerBridge)(nil)

// GetProgressReportInterval returns the current progress report interval, with some jitter
func GetProgressReportInterval() time.Duration {
// add rand(1/10*progressReportInterval) as jitter so that kine will not
// getProgressReportInterval returns the configured progress report interval, with some jitter
func (s *KVServerBridge) getProgressReportInterval() time.Duration {
// add rand(1/10*notifyInterval) as jitter so that kine will not
// send progress notifications to watchers at the same time even when watchers
// are created at the same time.
jitter := time.Duration(rand.Int63n(int64(progressReportInterval) / 10))
return progressReportInterval + jitter
jitter := time.Duration(rand.Int63n(int64(s.limited.notifyInterval) / 10))
return s.limited.notifyInterval + jitter
}

func (s *KVServerBridge) Watch(ws etcdserverpb.Watch_WatchServer) error {
Expand All @@ -40,7 +39,7 @@ func (s *KVServerBridge) Watch(ws etcdserverpb.Watch_WatchServer) error {

logrus.Tracef("WATCH SERVER CREATE")

go wait.PollInfiniteWithContext(ws.Context(), GetProgressReportInterval(), w.ProgressIfSynced)
go wait.PollInfiniteWithContext(ws.Context(), s.getProgressReportInterval(), w.ProgressIfSynced)

for {
msg, err := ws.Recv()
Expand Down

0 comments on commit bffa8c9

Please sign in to comment.