From 76de797984e065f04981c23443a5aac91b70e8b5 Mon Sep 17 00:00:00 2001 From: Herman Slatman Date: Sat, 11 Nov 2023 20:50:26 +0100 Subject: [PATCH] Fix early cancel and make shutdown cleaner --- crowdsec/crowdsec.go | 2 +- internal/bouncer/bouncer.go | 86 +++++++++++++++++++------------------ 2 files changed, 46 insertions(+), 42 deletions(-) diff --git a/crowdsec/crowdsec.go b/crowdsec/crowdsec.go index 7313193c..fc3f1e8a 100644 --- a/crowdsec/crowdsec.go +++ b/crowdsec/crowdsec.go @@ -182,7 +182,7 @@ func (c *CrowdSec) Start() error { // Stop stops the CrowdSec Caddy app func (c *CrowdSec) Stop() error { - return c.bouncer.ShutDown() + return c.bouncer.Shutdown() } // IsAllowed is used by the CrowdSec HTTP handler to check if diff --git a/internal/bouncer/bouncer.go b/internal/bouncer/bouncer.go index 03aad092..0db88396 100644 --- a/internal/bouncer/bouncer.go +++ b/internal/bouncer/bouncer.go @@ -36,6 +36,9 @@ type Bouncer struct { logger *zap.Logger useStreamingBouncer bool shouldFailHard bool + + ctx context.Context + cancel context.CancelFunc } // New creates a new (streaming) Bouncer with a storage based on immutable radix tree @@ -96,12 +99,9 @@ func (b *Bouncer) Run() { // TODO: pass context from top, so that it can influence the running // bouncer, and possibly reload/restart it? - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - + b.ctx, b.cancel = context.WithCancel(context.Background()) go func() { - b.streamingBouncer.Run(ctx) - cancel() + b.streamingBouncer.Run(b.ctx) }() // TODO: close the stream nicely when the bouncer needs to quit. This is not done @@ -110,56 +110,60 @@ func (b *Bouncer) Run() { // the CrowdSec API? The bouncer/client doesn't seem to give us that information // directly, but we could use the heartbeat service before starting to run? // That can also be useful for testing the LiveBouncer at startup. + go func() { - b.logger.Info("start processing new and deleted decisions") - for decisions := range b.streamingBouncer.Stream { - // TODO: deletions seem to include all old decisions that had already expired; CrowdSec bug or intended behavior? - // TODO: process in separate goroutines/waitgroup? - if numberOfDeletedDecisions := len(decisions.Deleted); numberOfDeletedDecisions > 0 { - b.logger.Debug(fmt.Sprintf("processing %d deleted decisions", numberOfDeletedDecisions)) - for _, decision := range decisions.Deleted { - if err := b.delete(decision); err != nil { - b.logger.Error(fmt.Sprintf("unable to delete decision for %q: %s", *decision.Value, err)) - } else { - if numberOfDeletedDecisions <= maxNumberOfDecisionsToLog { - b.logger.Debug(fmt.Sprintf("deleted %q (scope: %s)", *decision.Value, *decision.Scope)) + for { + select { + case <-b.ctx.Done(): + b.logger.Info("processing new and deleted decisions stopped") + return + case decisions := <-b.streamingBouncer.Stream: + // TODO: deletions seem to include all old decisions that had already expired; CrowdSec bug or intended behavior? + // TODO: process in separate goroutines/waitgroup? + if numberOfDeletedDecisions := len(decisions.Deleted); numberOfDeletedDecisions > 0 { + b.logger.Debug(fmt.Sprintf("processing %d deleted decisions", numberOfDeletedDecisions)) + for _, decision := range decisions.Deleted { + if err := b.delete(decision); err != nil { + b.logger.Error(fmt.Sprintf("unable to delete decision for %q: %s", *decision.Value, err)) + } else { + if numberOfDeletedDecisions <= maxNumberOfDecisionsToLog { + b.logger.Debug(fmt.Sprintf("deleted %q (scope: %s)", *decision.Value, *decision.Scope)) + } } } + if numberOfDeletedDecisions > maxNumberOfDecisionsToLog { + b.logger.Debug(fmt.Sprintf("skipped logging for %d deleted decisions", numberOfDeletedDecisions)) + } + b.logger.Debug(fmt.Sprintf("finished processing %d deleted decisions", numberOfDeletedDecisions)) } - if numberOfDeletedDecisions > maxNumberOfDecisionsToLog { - b.logger.Debug(fmt.Sprintf("skipped logging for %d deleted decisions", numberOfDeletedDecisions)) - } - b.logger.Debug(fmt.Sprintf("finished processing %d deleted decisions", numberOfDeletedDecisions)) - } - // TODO: process in separate goroutines/waitgroup? - if numberOfNewDecisions := len(decisions.New); numberOfNewDecisions > 0 { - b.logger.Debug(fmt.Sprintf("processing %d new decisions", numberOfNewDecisions)) - for _, decision := range decisions.New { - if err := b.add(decision); err != nil { - b.logger.Error(fmt.Sprintf("unable to insert decision for %q: %s", *decision.Value, err)) - } else { - if numberOfNewDecisions <= maxNumberOfDecisionsToLog { - b.logger.Debug(fmt.Sprintf("adding %q (scope: %s) for %q", *decision.Value, *decision.Scope, *decision.Duration)) + // TODO: process in separate goroutines/waitgroup? + if numberOfNewDecisions := len(decisions.New); numberOfNewDecisions > 0 { + b.logger.Debug(fmt.Sprintf("processing %d new decisions", numberOfNewDecisions)) + for _, decision := range decisions.New { + if err := b.add(decision); err != nil { + b.logger.Error(fmt.Sprintf("unable to insert decision for %q: %s", *decision.Value, err)) + } else { + if numberOfNewDecisions <= maxNumberOfDecisionsToLog { + b.logger.Debug(fmt.Sprintf("adding %q (scope: %s) for %q", *decision.Value, *decision.Scope, *decision.Duration)) + } } } + if numberOfNewDecisions > maxNumberOfDecisionsToLog { + b.logger.Debug(fmt.Sprintf("skipped logging for %d new decisions", numberOfNewDecisions)) + } + b.logger.Debug(fmt.Sprintf("finished processing %d new decisions", numberOfNewDecisions)) } - if numberOfNewDecisions > maxNumberOfDecisionsToLog { - b.logger.Debug(fmt.Sprintf("skipped logging for %d new decisions", numberOfNewDecisions)) - } - b.logger.Debug(fmt.Sprintf("finished processing %d new decisions", numberOfNewDecisions)) } } - - b.logger.Info("processing new and deleted decisions stopped") }() } -// ShutDown stops the Bouncer -func (b *Bouncer) ShutDown() error { - // TODO: persist the current state of the radix tree in some way, so that it can be used in startup again? +// Shutdown stops the Bouncer +func (b *Bouncer) Shutdown() error { + b.cancel() // TODO: clean shutdown of the streaming bouncer channel reading - b.store = nil + //b.store = nil // TODO(hs): setting this to nil without reinstantiating it, leads to errors; do this properly. return nil }