Skip to content

Commit

Permalink
cli list blocks usability improvements (#403)
Browse files Browse the repository at this point in the history
* Update list blocks command to now show compacted blocks unless flag is specified. When flag specified then inlude compacted y/n column. Sort by block end time and add age column.

* Add new --include-compacted flag and columns to cli documentation

* List blocks: load meta in parallel to speed up command

* Apply copy/edit suggestions

Co-authored-by: achatterjee-grafana <70489351+achatterjee-grafana@users.noreply.github.com>

* Update changelog

Co-authored-by: achatterjee-grafana <70489351+achatterjee-grafana@users.noreply.github.com>
  • Loading branch information
mdisibio and achatterjee-grafana committed Dec 10, 2020
1 parent 40abd5d commit 37e596e
Show file tree
Hide file tree
Showing 5 changed files with 171 additions and 78 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
* [CHANGE] Redo tempo-cli with basic command structure and improvements [#385](https://github.com/grafana/tempo/pull/385)
* [CHANGE] Add content negotiation support and sharding parameters to Querier [#375](https://github.com/grafana/tempo/pull/375)
* [ENHANCEMENT] Add docker-compose example for GCS along with new backend options [#397](https://github.com/grafana/tempo/pull/397)
* [ENHANCEMENT] tempo-cli list blocks usability improvements [#403](https://github.com/grafana/tempo/pull/403)
* [BUGFIX] Compactor without GCS permissions fail silently [#379](https://github.com/grafana/tempo/issues/379)

## v0.4.0
Expand Down
16 changes: 8 additions & 8 deletions cmd/tempo-cli/cmd-list-block.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,14 @@ func dumpBlock(r tempodb_backend.Reader, c tempodb_backend.Compactor, tenantID s
return nil
}

objects, lvl, window, start, end := blockStats(meta, compactedMeta, windowRange)

fmt.Println("ID : ", id)
fmt.Println("Total Objects : ", objects)
fmt.Println("Level : ", lvl)
fmt.Println("Window : ", window)
fmt.Println("Start : ", start)
fmt.Println("End : ", end)
unifiedMeta := getMeta(meta, compactedMeta, windowRange)

fmt.Println("ID : ", unifiedMeta.id)
fmt.Println("Total Objects : ", unifiedMeta.objects)
fmt.Println("Level : ", unifiedMeta.compactionLevel)
fmt.Println("Window : ", unifiedMeta.window)
fmt.Println("Start : ", unifiedMeta.start)
fmt.Println("End : ", unifiedMeta.end)

if checkDupes {
fmt.Println("Searching for dupes ...")
Expand Down
147 changes: 86 additions & 61 deletions cmd/tempo-cli/cmd-list-blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ import (
)

type listBlocksCmd struct {
TenantID string `arg:"" help:"tenant-id within the bucket"`
LoadIndex bool `help:"load block indexes and display additional information"`
TenantID string `arg:"" help:"tenant-id within the bucket"`
LoadIndex bool `help:"load block indexes and display additional information"`
IncludeCompacted bool `help:"include compacted blocks"`

backendOptions
}
Expand All @@ -30,104 +31,119 @@ func (l *listBlocksCmd) Run(ctx *globalOptions) error {

windowDuration := time.Hour

results, err := loadBucket(r, c, l.TenantID, windowDuration, l.LoadIndex)
results, err := loadBucket(r, c, l.TenantID, windowDuration, l.LoadIndex, l.IncludeCompacted)
if err != nil {
return err
}

displayResults(results, windowDuration, l.LoadIndex)
displayResults(results, windowDuration, l.LoadIndex, l.IncludeCompacted)
return nil
}

type bucketStats struct {
id uuid.UUID
compactionLevel uint8
objects int
window int64
start time.Time
end time.Time
type blockStats struct {
unifiedBlockMeta

totalIDs int
duplicateIDs int
}

func loadBucket(r tempodb_backend.Reader, c tempodb_backend.Compactor, tenantID string, windowRange time.Duration, loadIndex bool) ([]bucketStats, error) {
func loadBucket(r tempodb_backend.Reader, c tempodb_backend.Compactor, tenantID string, windowRange time.Duration, loadIndex bool, includeCompacted bool) ([]blockStats, error) {
blockIDs, err := r.Blocks(context.Background(), tenantID)
if err != nil {
return nil, err
}

fmt.Println("total blocks: ", len(blockIDs))
results := make([]bucketStats, 0)

for _, id := range blockIDs {
fmt.Print(".")

meta, err := r.BlockMeta(context.Background(), id, tenantID)
if err != nil && err != tempodb_backend.ErrMetaDoesNotExist {
return nil, err
}
// Load in parallel
wg := newBoundedWaitGroup(10)
resultsCh := make(chan blockStats, len(blockIDs))

compactedMeta, err := c.CompactedBlockMeta(id, tenantID)
if err != nil && err != tempodb_backend.ErrMetaDoesNotExist {
return nil, err
}
for _, id := range blockIDs {
wg.Add(1)

totalIDs := -1
duplicateIDs := -1
go func(id2 uuid.UUID) {
defer wg.Done()

if loadIndex {
indexBytes, err := r.Index(context.Background(), id, tenantID)
if err == nil {
records, err := encoding.UnmarshalRecords(indexBytes)
if err != nil {
return nil, err
}
duplicateIDs = 0
totalIDs = len(records)
for i := 1; i < len(records); i++ {
if bytes.Equal(records[i-1].ID, records[i].ID) {
duplicateIDs++
}
}
b, err := loadBlock(r, c, tenantID, id2, windowRange, loadIndex, includeCompacted)
if err != nil {
fmt.Println("Error loading block:", id2, err)
return
}
}

objects, lvl, window, start, end := blockStats(meta, compactedMeta, windowRange)
if b != nil {
resultsCh <- *b
}
}(id)
}

results = append(results, bucketStats{
id: id,
compactionLevel: lvl,
objects: objects,
window: window,
start: start,
end: end,
wg.Wait()
close(resultsCh)

totalIDs: totalIDs,
duplicateIDs: duplicateIDs,
})
results := make([]blockStats, 0)
for b := range resultsCh {
results = append(results, b)
}

sort.Slice(results, func(i, j int) bool {
bI := results[i]
bJ := results[j]
return results[i].end.Before(results[j].end)
})

return results, nil
}

func loadBlock(r tempodb_backend.Reader, c tempodb_backend.Compactor, tenantID string, id uuid.UUID, windowRange time.Duration, loadIndex bool, includeCompacted bool) (*blockStats, error) {
fmt.Print(".")

if bI.window == bJ.window {
return bI.compactionLevel < bJ.compactionLevel
meta, err := r.BlockMeta(context.Background(), id, tenantID)
if err == tempodb_backend.ErrMetaDoesNotExist && !includeCompacted {
return nil, nil
} else if err != nil && err != tempodb_backend.ErrMetaDoesNotExist {
return nil, err
}

compactedMeta, err := c.CompactedBlockMeta(id, tenantID)
if err != nil && err != tempodb_backend.ErrMetaDoesNotExist {
return nil, err
}

totalIDs := -1
duplicateIDs := -1

if loadIndex {
indexBytes, err := r.Index(context.Background(), id, tenantID)
if err == nil {
records, err := encoding.UnmarshalRecords(indexBytes)
if err != nil {
return nil, err
}
duplicateIDs = 0
totalIDs = len(records)
for i := 1; i < len(records); i++ {
if bytes.Equal(records[i-1].ID, records[i].ID) {
duplicateIDs++
}
}
}
}

return bI.window < bJ.window
})
return &blockStats{
unifiedBlockMeta: getMeta(meta, compactedMeta, windowRange),

return results, nil
totalIDs: totalIDs,
duplicateIDs: duplicateIDs,
}, nil
}

func displayResults(results []bucketStats, windowDuration time.Duration, includeIndexInfo bool) {
func displayResults(results []blockStats, windowDuration time.Duration, includeIndexInfo bool, includeCompacted bool) {

columns := []string{"id", "lvl", "count", "window", "start", "end", "duration"}
columns := []string{"id", "lvl", "count", "window", "start", "end", "duration", "age"}
if includeIndexInfo {
columns = append(columns, "idx", "dupe")
}
if includeCompacted {
columns = append(columns, "cmp")
}

totalObjects := 0
out := make([][]string, 0)
Expand Down Expand Up @@ -155,12 +171,21 @@ func displayResults(results []bucketStats, windowDuration time.Duration, include
case "duration":
// Time range included in bucket
s = fmt.Sprint(r.end.Sub(r.start).Round(time.Second))
case "age":
s = fmt.Sprint(time.Since(r.end).Round(time.Second))
case "idx":
// Number of entries in the index (may not be the same as the block when index downsampling enabled)
s = strconv.Itoa(r.totalIDs)
case "dupe":
// Number of duplicate IDs found in the index
s = strconv.Itoa(r.duplicateIDs)
case "cmp":
// Compacted?
if r.compacted {
s = "Y"
} else {
s = " "
}
}

line = append(line, s)
Expand Down
74 changes: 69 additions & 5 deletions cmd/tempo-cli/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ import (
"flag"
"fmt"
"io/ioutil"
"sync"
"time"

"github.com/google/uuid"
"github.com/grafana/tempo/cmd/tempo/app"
tempodb_backend "github.com/grafana/tempo/tempodb/backend"
"github.com/grafana/tempo/tempodb/backend/local"
Expand Down Expand Up @@ -107,12 +109,74 @@ func loadBackend(b *backendOptions, g *globalOptions) (tempodb_backend.Reader, t
return r, c, nil
}

func blockStats(meta *encoding.BlockMeta, compactedMeta *encoding.CompactedBlockMeta, windowRange time.Duration) (int, uint8, int64, time.Time, time.Time) {
type unifiedBlockMeta struct {
id uuid.UUID
compactionLevel uint8
objects int
window int64
start time.Time
end time.Time
compacted bool
}

func getMeta(meta *encoding.BlockMeta, compactedMeta *encoding.CompactedBlockMeta, windowRange time.Duration) unifiedBlockMeta {
if meta != nil {
return meta.TotalObjects, meta.CompactionLevel, meta.EndTime.Unix() / int64(windowRange/time.Second), meta.StartTime, meta.EndTime
} else if compactedMeta != nil {
return compactedMeta.TotalObjects, compactedMeta.CompactionLevel, compactedMeta.EndTime.Unix() / int64(windowRange/time.Second), compactedMeta.StartTime, compactedMeta.EndTime
return unifiedBlockMeta{
id: meta.BlockID,
compactionLevel: meta.CompactionLevel,
objects: meta.TotalObjects,
window: meta.EndTime.Unix() / int64(windowRange/time.Second),
start: meta.StartTime,
end: meta.EndTime,
compacted: false,
}
}
if compactedMeta != nil {
return unifiedBlockMeta{
id: compactedMeta.BlockID,
compactionLevel: compactedMeta.CompactionLevel,
objects: compactedMeta.TotalObjects,
window: compactedMeta.EndTime.Unix() / int64(windowRange/time.Second),
start: compactedMeta.StartTime,
end: compactedMeta.EndTime,
compacted: true,
}
}
return unifiedBlockMeta{
id: uuid.UUID{},
compactionLevel: 0,
objects: -1,
window: -1,
start: time.Unix(0, 0),
end: time.Unix(0, 0),
compacted: false,
}
}

// boundedWaitGroup like a normal wait group except limits number of active goroutines to given capacity.
type boundedWaitGroup struct {
wg sync.WaitGroup
ch chan struct{} // Chan buffer size is used to limit concurrency.
}

func newBoundedWaitGroup(cap int) boundedWaitGroup {
return boundedWaitGroup{ch: make(chan struct{}, cap)}
}

func (bwg *boundedWaitGroup) Add(delta int) {
for i := 0; i > delta; i-- {
<-bwg.ch
}
for i := 0; i < delta; i++ {
bwg.ch <- struct{}{}
}
bwg.wg.Add(delta)
}

func (bwg *boundedWaitGroup) Done() {
bwg.Add(-1)
}

return -1, 0, -1, time.Unix(0, 0), time.Unix(0, 0)
func (bwg *boundedWaitGroup) Wait() {
bwg.wg.Wait()
}
11 changes: 7 additions & 4 deletions docs/tempo/website/cli/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,19 +74,22 @@ Arguments:
- `tenant-id` The tenant ID. Use `single-tenant` for single tenant setups.

Options:
- `--load-index` Also load the block indexes and perform integrity checks for duplicates. **Note:** can be intense.
- `--include-compacted` Include blocks that have been compacted. Default behavior is to display only active blocks.
- `--load-index` Also load the block indexes and perform integrity checks for duplicates. **Note:** This can be a resource intensive process.

**Output:**
Explanation of output:
- `ID` Block ID.
- `Lvl` Compaction level of the block.
- `Count` Number of objects stored in the block.
- `Window` The time window considered for compaction purposes.
- `Window` The window of time that was considered for compaction purposes.
- `Start` The earliest timestamp stored in the block.
- `End` The latest timestamp stored in the block.
- `Duration` Time duration between start and end.
- `Age` The age of the block.
- `Duration`Duration between the start and end time.
- `Idx` Number of records stored in the index (present when --load-index is specified).
- `Dupe` Number of duplicate entries in the index. Should be zero. (present when --load-index is specified).
- `Dupe` Number of duplicate entries in the index (present when --load-index is specified). Should be zero.
- `Cmp` Whether the block has been compacted (present when --include-compacted is specified).

**Example:**
```bash
Expand Down

0 comments on commit 37e596e

Please sign in to comment.