Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add retries to copying files in GCS #2111

Merged
merged 6 commits into from
Feb 28, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
* [BUGFIX] Unescape query parameters in AWS Lambda to allow TraceQL queries to work. [#2114](https://github.com/grafana/tempo/issues/2114) (@joe-elliott)
* [CHANGE] Update Go to 1.20 [#2079](https://github.com/grafana/tempo/pull/2079) (@scalalang2)
* [CHANGE] Removing leading zeroes in span id [#2062](https://github.com/grafana/tempo/pull/2062) (@ie-pham)
* [BUGFIX] Retry idempotent operations in GCS backend [#2111](https://github.com/grafana/tempo/pull/2111) (@mapno)

## v2.0.0 / 2023-01-31

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ require (
)

require (
github.com/googleapis/gax-go/v2 v2.6.0
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
golang.org/x/exp v0.0.0-20221002003631-540bb7301a08
)
Expand Down Expand Up @@ -152,7 +153,6 @@ require (
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/google/btree v1.0.1 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.2.0 // indirect
github.com/googleapis/gax-go/v2 v2.6.0 // indirect
github.com/gorilla/handlers v1.5.1 // indirect
github.com/grafana/regexp v0.0.0-20221005093135-b4c2bcb0a4b6 // indirect
github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect
Expand Down
6 changes: 5 additions & 1 deletion tempodb/backend/gcs/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"cloud.google.com/go/storage"
"github.com/google/uuid"
"github.com/googleapis/gax-go/v2"
"github.com/grafana/tempo/tempodb/backend"
"google.golang.org/api/iterator"
)
Expand All @@ -17,7 +18,10 @@ func (rw *readerWriter) MarkBlockCompacted(blockID uuid.UUID, tenantID string) e
compactedMetaFilename := backend.CompactedMetaFileName(blockID, tenantID)

src := rw.bucket.Object(metaFilename)
dst := rw.bucket.Object(compactedMetaFilename)
dst := rw.bucket.Object(compactedMetaFilename).Retryer(
storage.WithBackoff(gax.Backoff{}),
storage.WithPolicy(storage.RetryAlways),
)

ctx := context.TODO()
_, err := dst.CopierFrom(src).Run(ctx)
Expand Down
4 changes: 2 additions & 2 deletions tempodb/backend/gcs/gcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ func createBucket(ctx context.Context, cfg *Config, hedge bool) (*storage.Bucket
instrumentation.PublishHedgedMetrics(stats)
}

// build client
// Build client
storageClientOptions := []option.ClientOption{
option.WithHTTPClient(&http.Client{
Transport: transport,
Expand All @@ -285,7 +285,7 @@ func createBucket(ctx context.Context, cfg *Config, hedge bool) (*storage.Bucket
return nil, errors.Wrap(err, "creating storage client")
}

// build bucket
// Build bucket
return client.Bucket(cfg.BucketName), nil
}

Expand Down
34 changes: 34 additions & 0 deletions tempodb/backend/gcs/gcs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"time"

"cloud.google.com/go/storage"
"github.com/google/uuid"
"github.com/grafana/tempo/tempodb/backend"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -142,6 +143,39 @@ func TestObjectConfigAttributes(t *testing.T) {
}
}

func TestRetry_MarkBlockCompacted(t *testing.T) {
var count int32
server := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/b/blerg":
_, _ = w.Write([]byte(`{}`))
default:
// First two requests fail, third succeeds.
if atomic.LoadInt32(&count) < 2 {
atomic.AddInt32(&count, 1)
w.WriteHeader(503)
return
}
_, _ = w.Write([]byte(`{"done": true}`))
}
}))
server.StartTLS()
t.Cleanup(server.Close)

_, _, c, err := New(&Config{
BucketName: "blerg",
Insecure: true,
Endpoint: server.URL,
})
require.NoError(t, err)

id, err := uuid.NewUUID()
require.NoError(t, err)

require.NoError(t, c.MarkBlockCompacted(id, "tenant"))
require.Equal(t, int32(2), atomic.LoadInt32(&count))
}

func fakeServer(t *testing.T, returnIn time.Duration, counter *int32) *httptest.Server {
server := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
time.Sleep(returnIn)
Expand Down
39 changes: 32 additions & 7 deletions tempodb/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,14 @@ import (
"strconv"
"time"

"github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

"github.com/go-kit/log/level"
"github.com/grafana/tempo/pkg/util"
"github.com/grafana/tempo/tempodb/backend"
"github.com/grafana/tempo/tempodb/encoding"
"github.com/grafana/tempo/tempodb/encoding/common"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)

const (
Expand Down Expand Up @@ -75,7 +74,10 @@ func (rw *readerWriter) compactionLoop() {
}
}

// doCompaction runs a compaction cycle every 30s
func (rw *readerWriter) doCompaction() {
// List of all tenants in the block list
// The block list is updated by constant polling the storage for tenant indexes and/or tenant blocks (and building the index)
tenants := rw.blocklist.Tenants()
if len(tenants) == 0 {
return
Expand All @@ -86,9 +88,18 @@ func (rw *readerWriter) doCompaction() {
sort.Slice(tenants, func(i, j int) bool { return tenants[i] < tenants[j] })
rw.compactorTenantOffset = (rw.compactorTenantOffset + 1) % uint(len(tenants))

// Select the next tenant to run compaction for
tenantID := tenants[rw.compactorTenantOffset]
// Get the meta file of all non-compacted blocks for the given tenant
blocklist := rw.blocklist.Metas(tenantID)

// Select which blocks to compact.
//
// Blocks are firstly divided by the active compaction window (default: most recent 24h)
// 1. If blocks are inside the active window, they're grouped by compaction level (how many times they've been compacted).
// Favoring lower compaction levels, and compacting blocks only from the same tenant.
// 2. If blocks are outside the active window, they're grouped only by windows, ignoring compaction level.
// It picks more recent windows first, and compacting blocks only from the same tenant.
blockSelector := newTimeWindowBlockSelector(blocklist,
rw.compactorCfg.MaxCompactionRange,
rw.compactorCfg.MaxCompactionObjects,
Expand All @@ -100,6 +111,7 @@ func (rw *readerWriter) doCompaction() {

level.Info(rw.logger).Log("msg", "starting compaction cycle", "tenantID", tenantID, "offset", rw.compactorTenantOffset)
for {
// Pick up to defaultMaxInputBlocks (4) blocks to compact into a single one
toBeCompacted, hashString := blockSelector.BlocksToCompact()
if len(toBeCompacted) == 0 {
measureOutstandingBlocks(tenantID, blockSelector, rw.compactorSharder.Owns)
Expand All @@ -112,6 +124,7 @@ func (rw *readerWriter) doCompaction() {
continue
}
level.Info(rw.logger).Log("msg", "Compacting hash", "hashString", hashString)
// Compact selected blocks into a larger one
err := rw.compact(toBeCompacted, tenantID)

if err == backend.ErrDoesNotExist {
Expand Down Expand Up @@ -201,13 +214,16 @@ func (rw *readerWriter) compact(blockMetas []*backend.BlockMeta, tenantID string

compactor := enc.NewCompactor(opts)

// Compact selected blocks into a larger one
newCompactedBlocks, err := compactor.Compact(ctx, rw.logger, rw.r, rw.getWriterForBlock, blockMetas)
if err != nil {
return err
}

// mark old blocks compacted so they don't show up in polling
markCompacted(rw, tenantID, blockMetas, newCompactedBlocks)
// mark old blocks compacted, so they don't show up in polling
if err := markCompacted(rw, tenantID, blockMetas, newCompactedBlocks); err != nil {
return err
}

metricCompactionBlocks.WithLabelValues(compactionLevelLabel).Add(float64(len(blockMetas)))

Expand All @@ -225,10 +241,13 @@ func (rw *readerWriter) compact(blockMetas []*backend.BlockMeta, tenantID string
return nil
}

func markCompacted(rw *readerWriter, tenantID string, oldBlocks []*backend.BlockMeta, newBlocks []*backend.BlockMeta) {
func markCompacted(rw *readerWriter, tenantID string, oldBlocks []*backend.BlockMeta, newBlocks []*backend.BlockMeta) error {
// Check if we have any errors, but continue marking the blocks as compacted
var errCount int
for _, meta := range oldBlocks {
// Mark in the backend
if err := rw.c.MarkBlockCompacted(meta.BlockID, tenantID); err != nil {
errCount++
level.Error(rw.logger).Log("msg", "unable to mark block compacted", "blockID", meta.BlockID, "tenantID", tenantID, "err", err)
metricCompactionErrors.Inc()
}
Expand All @@ -245,6 +264,12 @@ func markCompacted(rw *readerWriter, tenantID string, oldBlocks []*backend.Block

// Update blocklist in memory
rw.blocklist.Update(tenantID, newBlocks, oldBlocks, newCompactions, nil)

if errCount > 0 {
return fmt.Errorf("unable to mark %d blocks compacted", errCount)
}

return nil
}

func measureOutstandingBlocks(tenantID string, blockSelector CompactionBlockSelector, owned func(hash string) bool) {
Expand Down