Skip to content

Commit

Permalink
Fix early cancel and make shutdown cleaner
Browse files Browse the repository at this point in the history
  • Loading branch information
hslatman committed Nov 11, 2023
1 parent 593ab44 commit 76de797
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 42 deletions.
2 changes: 1 addition & 1 deletion crowdsec/crowdsec.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func (c *CrowdSec) Start() error {

// Stop stops the CrowdSec Caddy app
func (c *CrowdSec) Stop() error {
return c.bouncer.ShutDown()
return c.bouncer.Shutdown()
}

// IsAllowed is used by the CrowdSec HTTP handler to check if
Expand Down
86 changes: 45 additions & 41 deletions internal/bouncer/bouncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ type Bouncer struct {
logger *zap.Logger
useStreamingBouncer bool
shouldFailHard bool

ctx context.Context
cancel context.CancelFunc
}

// New creates a new (streaming) Bouncer with a storage based on immutable radix tree
Expand Down Expand Up @@ -96,12 +99,9 @@ func (b *Bouncer) Run() {

// TODO: pass context from top, so that it can influence the running
// bouncer, and possibly reload/restart it?
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

b.ctx, b.cancel = context.WithCancel(context.Background())
go func() {
b.streamingBouncer.Run(ctx)
cancel()
b.streamingBouncer.Run(b.ctx)
}()

// TODO: close the stream nicely when the bouncer needs to quit. This is not done
Expand All @@ -110,56 +110,60 @@ func (b *Bouncer) Run() {
// the CrowdSec API? The bouncer/client doesn't seem to give us that information
// directly, but we could use the heartbeat service before starting to run?
// That can also be useful for testing the LiveBouncer at startup.

go func() {
b.logger.Info("start processing new and deleted decisions")
for decisions := range b.streamingBouncer.Stream {
// TODO: deletions seem to include all old decisions that had already expired; CrowdSec bug or intended behavior?
// TODO: process in separate goroutines/waitgroup?
if numberOfDeletedDecisions := len(decisions.Deleted); numberOfDeletedDecisions > 0 {
b.logger.Debug(fmt.Sprintf("processing %d deleted decisions", numberOfDeletedDecisions))
for _, decision := range decisions.Deleted {
if err := b.delete(decision); err != nil {
b.logger.Error(fmt.Sprintf("unable to delete decision for %q: %s", *decision.Value, err))
} else {
if numberOfDeletedDecisions <= maxNumberOfDecisionsToLog {
b.logger.Debug(fmt.Sprintf("deleted %q (scope: %s)", *decision.Value, *decision.Scope))
for {
select {
case <-b.ctx.Done():
b.logger.Info("processing new and deleted decisions stopped")
return
case decisions := <-b.streamingBouncer.Stream:
// TODO: deletions seem to include all old decisions that had already expired; CrowdSec bug or intended behavior?
// TODO: process in separate goroutines/waitgroup?
if numberOfDeletedDecisions := len(decisions.Deleted); numberOfDeletedDecisions > 0 {
b.logger.Debug(fmt.Sprintf("processing %d deleted decisions", numberOfDeletedDecisions))
for _, decision := range decisions.Deleted {
if err := b.delete(decision); err != nil {
b.logger.Error(fmt.Sprintf("unable to delete decision for %q: %s", *decision.Value, err))
} else {
if numberOfDeletedDecisions <= maxNumberOfDecisionsToLog {
b.logger.Debug(fmt.Sprintf("deleted %q (scope: %s)", *decision.Value, *decision.Scope))
}
}
}
if numberOfDeletedDecisions > maxNumberOfDecisionsToLog {
b.logger.Debug(fmt.Sprintf("skipped logging for %d deleted decisions", numberOfDeletedDecisions))
}
b.logger.Debug(fmt.Sprintf("finished processing %d deleted decisions", numberOfDeletedDecisions))
}
if numberOfDeletedDecisions > maxNumberOfDecisionsToLog {
b.logger.Debug(fmt.Sprintf("skipped logging for %d deleted decisions", numberOfDeletedDecisions))
}
b.logger.Debug(fmt.Sprintf("finished processing %d deleted decisions", numberOfDeletedDecisions))
}

// TODO: process in separate goroutines/waitgroup?
if numberOfNewDecisions := len(decisions.New); numberOfNewDecisions > 0 {
b.logger.Debug(fmt.Sprintf("processing %d new decisions", numberOfNewDecisions))
for _, decision := range decisions.New {
if err := b.add(decision); err != nil {
b.logger.Error(fmt.Sprintf("unable to insert decision for %q: %s", *decision.Value, err))
} else {
if numberOfNewDecisions <= maxNumberOfDecisionsToLog {
b.logger.Debug(fmt.Sprintf("adding %q (scope: %s) for %q", *decision.Value, *decision.Scope, *decision.Duration))
// TODO: process in separate goroutines/waitgroup?
if numberOfNewDecisions := len(decisions.New); numberOfNewDecisions > 0 {
b.logger.Debug(fmt.Sprintf("processing %d new decisions", numberOfNewDecisions))
for _, decision := range decisions.New {
if err := b.add(decision); err != nil {
b.logger.Error(fmt.Sprintf("unable to insert decision for %q: %s", *decision.Value, err))
} else {
if numberOfNewDecisions <= maxNumberOfDecisionsToLog {
b.logger.Debug(fmt.Sprintf("adding %q (scope: %s) for %q", *decision.Value, *decision.Scope, *decision.Duration))
}
}
}
if numberOfNewDecisions > maxNumberOfDecisionsToLog {
b.logger.Debug(fmt.Sprintf("skipped logging for %d new decisions", numberOfNewDecisions))
}
b.logger.Debug(fmt.Sprintf("finished processing %d new decisions", numberOfNewDecisions))
}
if numberOfNewDecisions > maxNumberOfDecisionsToLog {
b.logger.Debug(fmt.Sprintf("skipped logging for %d new decisions", numberOfNewDecisions))
}
b.logger.Debug(fmt.Sprintf("finished processing %d new decisions", numberOfNewDecisions))
}
}

b.logger.Info("processing new and deleted decisions stopped")
}()
}

// ShutDown stops the Bouncer
func (b *Bouncer) ShutDown() error {
// TODO: persist the current state of the radix tree in some way, so that it can be used in startup again?
// Shutdown stops the Bouncer
func (b *Bouncer) Shutdown() error {
b.cancel()
// TODO: clean shutdown of the streaming bouncer channel reading
b.store = nil
//b.store = nil // TODO(hs): setting this to nil without reinstantiating it, leads to errors; do this properly.
return nil
}

Expand Down

0 comments on commit 76de797

Please sign in to comment.