Skip to content

Commit

Permalink
Simplify ttlEvents ListWatch
Browse files Browse the repository at this point in the history
There is no reason to do the list in one goroutine and the watch in another, as the watcher just blocks on startup waiting for the list to finish and send the revision on the channel. This can be simplified to one goroutine, which lets us drop the channel and waitgroup entirely.

Signed-off-by: Brad Davidson <brad.davidson@rancher.com>
  • Loading branch information
brandond committed Jul 23, 2024
1 parent b527bdd commit 863b9f7
Showing 1 changed file with 8 additions and 22 deletions.
30 changes: 8 additions & 22 deletions pkg/logstructured/logstructured.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,26 +350,21 @@ func (l *LogStructured) deleteTTLEvent(ctx context.Context, rwMutex *sync.RWMute
delete(store, curEventKV.key)
}

// ttlEvents starts a goroutine to do a ListWatch on the root prefix. First it lists
// all non-deleted keys with a page size of 1000, then it starts watching at the
// revision returned by the initial list. Any keys that have a Lease associated with
// them are sent into the result channel for deferred handling of TTL expiration.
func (l *LogStructured) ttlEvents(ctx context.Context) chan *server.Event {
result := make(chan *server.Event)
lastListRevision := make(chan int64)
wg := sync.WaitGroup{}
wg.Add(2)

go func() {
wg.Wait()
close(result)
close(lastListRevision)
}()

go func() {
defer wg.Done()
defer close(result)

rev, events, err := l.log.List(ctx, "/", "", 1000, 0, false)
for len(events) > 0 {
if err != nil {
logrus.Errorf("TTL event list failed: %v", err)
break
return
}

for _, event := range events {
Expand All @@ -378,19 +373,10 @@ func (l *LogStructured) ttlEvents(ctx context.Context) chan *server.Event {
}
}

rev, events, err = l.log.List(ctx, "/", events[len(events)-1].KV.Key, 1000, rev, false)
_, events, err = l.log.List(ctx, "/", events[len(events)-1].KV.Key, 1000, rev, false)
}
lastListRevision <- rev
}()

go func() {
defer wg.Done()
revision := <-lastListRevision
if revision == 0 {
logrus.Error("TTL event watch failed to get start revision")
return
}
wr := l.Watch(ctx, "/", revision)
wr := l.Watch(ctx, "/", rev)
if wr.CompactRevision != 0 {
logrus.Errorf("TTL event watch failed: %v", server.ErrCompacted)
return
Expand Down

0 comments on commit 863b9f7

Please sign in to comment.