Skip to content

Commit

Permalink
signalfxexporter: remove usage of sync/atomic (open-telemetry#9779)
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>
  • Loading branch information
bogdandrutu authored and djaglowski committed May 10, 2022
1 parent d1932cb commit e185491
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 65 deletions.
9 changes: 3 additions & 6 deletions exporter/signalfxexporter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
github.com/gobwas/glob v0.2.3
github.com/gogo/protobuf v1.3.2
github.com/open-telemetry/opentelemetry-collector-contrib/internal/common v0.50.0
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.50.0
github.com/open-telemetry/opentelemetry-collector-contrib/internal/splunk v0.50.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchperresourceattr v0.50.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/experimentalmetricmetadata v0.50.0
Expand All @@ -14,13 +15,10 @@ require (
github.com/signalfx/signalfx-agent/pkg/apm v0.0.0-20201202163743-65b4fa925fc8
github.com/stretchr/testify v1.7.1
go.opentelemetry.io/collector v0.50.1-0.20220429151328-041f39835df7
go.opentelemetry.io/collector/semconv v0.50.1-0.20220429151328-041f39835df7
go.uber.org/atomic v1.9.0
go.uber.org/multierr v1.8.0
go.uber.org/zap v1.21.0
)

require (
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.50.0
go.opentelemetry.io/collector/semconv v0.50.1-0.20220429151328-041f39835df7
gopkg.in/yaml.v2 v2.4.0
)

Expand Down Expand Up @@ -61,7 +59,6 @@ require (
go.opentelemetry.io/otel v1.7.0 // indirect
go.opentelemetry.io/otel/metric v0.30.0 // indirect
go.opentelemetry.io/otel/trace v1.7.0 // indirect
go.uber.org/atomic v1.9.0 // indirect
golang.org/x/net v0.0.0-20220225172249-27dd8689420f // indirect
golang.org/x/text v0.3.7 // indirect
google.golang.org/genproto v0.0.0-20211208223120-3a66f561d7aa // indirect
Expand Down
27 changes: 3 additions & 24 deletions exporter/signalfxexporter/internal/dimensions/dimclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"reflect"
"strings"
"sync"
"sync/atomic"
"time"

"go.uber.org/zap"
Expand Down Expand Up @@ -61,19 +60,9 @@ type DimensionClient struct {
// For easier unit testing
now func() time.Time

// TODO: Look into collecting these metrics and other traces via obsreport
DimensionsCurrentlyDelayed int64
TotalDimensionsDropped int64
// The number of dimension updates that happened to the same dimension
// within sendDelay.
TotalFlappyUpdates int64
TotalClientError4xxResponses int64
TotalRetriedUpdates int64
TotalInvalidDimensions int64
TotalSuccessfulUpdates int64
logUpdates bool
logger *zap.Logger
metricsConverter translation.MetricsConverter
logUpdates bool
logger *zap.Logger
metricsConverter translation.MetricsConverter
}

type queuedDimension struct {
Expand Down Expand Up @@ -139,15 +128,11 @@ func (dc *DimensionClient) acceptDimension(dimUpdate *DimensionUpdate) error {

if delayedDimUpdate := dc.delayedSet[dimUpdate.Key()]; delayedDimUpdate != nil {
if !reflect.DeepEqual(delayedDimUpdate, dimUpdate) {
dc.TotalFlappyUpdates++

// Merge the latest updates into existing one.
delayedDimUpdate.Properties = mergeProperties(delayedDimUpdate.Properties, dimUpdate.Properties)
delayedDimUpdate.Tags = mergeTags(delayedDimUpdate.Tags, dimUpdate.Tags)
}
} else {
atomic.AddInt64(&dc.DimensionsCurrentlyDelayed, int64(1))

dc.delayedSet[dimUpdate.Key()] = dimUpdate
select {
case dc.delayedQueue <- &queuedDimension{
Expand All @@ -156,8 +141,6 @@ func (dc *DimensionClient) acceptDimension(dimUpdate *DimensionUpdate) error {
}:
break
default:
dc.TotalDimensionsDropped++
atomic.AddInt64(&dc.DimensionsCurrentlyDelayed, int64(-1))
return errors.New("dropped dimension update, propertiesMaxBuffered exceeded")
}
}
Expand Down Expand Up @@ -204,8 +187,6 @@ func (dc *DimensionClient) processQueue() {
time.Sleep(delayedDimUpdate.TimeToSend.Sub(now))
}

atomic.AddInt64(&dc.DimensionsCurrentlyDelayed, int64(-1))

dc.Lock()
delete(dc.delayedSet, delayedDimUpdate.Key())
dc.Unlock()
Expand Down Expand Up @@ -237,7 +218,6 @@ func (dc *DimensionClient) handleDimensionUpdate(dimUpdate *DimensionUpdate) err
req = req.WithContext(
context.WithValue(req.Context(), RequestFailedCallbackKey, RequestFailedCallback(func(statusCode int, err error) {
if statusCode >= 400 && statusCode < 500 && statusCode != 404 {
atomic.AddInt64(&dc.TotalClientError4xxResponses, int64(1))
dc.logger.Error(
"Unable to update dimension, not retrying",
zap.Error(err),
Expand All @@ -261,7 +241,6 @@ func (dc *DimensionClient) handleDimensionUpdate(dimUpdate *DimensionUpdate) err
zap.String("dimensionUpdate", dimUpdate.String()),
zap.Int("statusCode", statusCode),
)
atomic.AddInt64(&dc.TotalRetriedUpdates, int64(1))
// The retry is meant to provide some measure of robustness against
// temporary API failures. If the API is down for significant
// periods of time, dimension updates will probably eventually back
Expand Down
25 changes: 7 additions & 18 deletions exporter/signalfxexporter/internal/dimensions/dimclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ import (
"net/url"
"regexp"
"strconv"
"sync/atomic"
"testing"
"time"

"github.com/stretchr/testify/require"
"go.uber.org/atomic"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -61,11 +61,11 @@ loop:
return dims
}

func makeHandler(dimCh chan<- dim, forcedResp *atomic.Value) http.HandlerFunc {
func makeHandler(dimCh chan<- dim, forcedResp *atomic.Int32) http.HandlerFunc {
forcedResp.Store(200)

return func(rw http.ResponseWriter, r *http.Request) {
forcedRespInt := forcedResp.Load().(int)
forcedRespInt := int(forcedResp.Load())
if forcedRespInt != 200 {
rw.WriteHeader(forcedRespInt)
return
Expand Down Expand Up @@ -98,11 +98,11 @@ func makeHandler(dimCh chan<- dim, forcedResp *atomic.Value) http.HandlerFunc {
}
}

func setup(t *testing.T) (*DimensionClient, chan dim, *atomic.Value, context.CancelFunc) {
func setup(t *testing.T) (*DimensionClient, chan dim, *atomic.Int32, context.CancelFunc) {
dimCh := make(chan dim)

var forcedResp atomic.Value
server := httptest.NewServer(makeHandler(dimCh, &forcedResp))
forcedResp := atomic.NewInt32(0)
server := httptest.NewServer(makeHandler(dimCh, forcedResp))

serverURL, err := url.Parse(server.URL)
require.NoError(t, err, "failed to get server URL", err)
Expand All @@ -122,7 +122,7 @@ func setup(t *testing.T) (*DimensionClient, chan dim, *atomic.Value, context.Can
})
client.Start()

return client, dimCh, &forcedResp, cancel
return client, dimCh, forcedResp, cancel
}

func TestDimensionClient(t *testing.T) {
Expand Down Expand Up @@ -354,16 +354,6 @@ func TestFlappyUpdates(t *testing.T) {
Properties: map[string]*string{"index": newString("4")},
},
}, dims)

// Give it enough time to run the counter updates which happen after the
// request is completed.
time.Sleep(1 * time.Second)

require.Equal(t, int64(8), atomic.LoadInt64(&client.TotalFlappyUpdates))
require.Equal(t, int64(0), atomic.LoadInt64(&client.DimensionsCurrentlyDelayed))
require.Equal(t, int64(2), atomic.LoadInt64(&client.requestSender.TotalRequestsStarted))
require.Equal(t, int64(2), atomic.LoadInt64(&client.requestSender.TotalRequestsCompleted))
require.Equal(t, int64(0), atomic.LoadInt64(&client.requestSender.TotalRequestsFailed))
}

func TestInvalidUpdatesNotSent(t *testing.T) {
Expand Down Expand Up @@ -394,7 +384,6 @@ func TestInvalidUpdatesNotSent(t *testing.T) {

dims := waitForDims(dimCh, 2, 3)
require.Len(t, dims, 0)
require.Equal(t, int64(0), atomic.LoadInt64(&client.TotalInvalidDimensions))
}

func newString(s string) *string {
Expand Down
2 changes: 0 additions & 2 deletions exporter/signalfxexporter/internal/dimensions/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package dimensions // import "github.com/open-telemetry/opentelemetry-collector-
import (
"fmt"
"strings"
"sync/atomic"

"go.uber.org/multierr"

Expand Down Expand Up @@ -111,7 +110,6 @@ func (dc *DimensionClient) PushMetadata(metadata []*metadata.MetadataUpdate) err
dimensionUpdate := getDimensionUpdateFromMetadata(*m, dc.metricsConverter)

if dimensionUpdate.Name == "" || dimensionUpdate.Value == "" {
atomic.AddInt64(&dc.TotalInvalidDimensions, int64(1))
return fmt.Errorf("dimensionUpdate %v is missing Name or value, cannot send", dimensionUpdate)
}

Expand Down
25 changes: 10 additions & 15 deletions exporter/signalfxexporter/internal/dimensions/requests.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ import (
"fmt"
"io/ioutil"
"net/http"
"sync/atomic"

"go.uber.org/atomic"
)

// ReqSender is a direct port of
Expand All @@ -44,11 +45,7 @@ type ReqSender struct {
workerCount uint
ctx context.Context
additionalDimensions map[string]string

RunningWorkers int64
TotalRequestsStarted int64
TotalRequestsCompleted int64
TotalRequestsFailed int64
runningWorkers *atomic.Int64
}

func NewReqSender(ctx context.Context, client *http.Client,
Expand All @@ -57,9 +54,10 @@ func NewReqSender(ctx context.Context, client *http.Client,
client: client,
additionalDimensions: diagnosticDimensions,
// Unbuffered so that it blocks clients
requests: make(chan *http.Request),
workerCount: workerCount,
ctx: ctx,
requests: make(chan *http.Request),
workerCount: workerCount,
ctx: ctx,
runningWorkers: atomic.NewInt64(0),
}
}

Expand All @@ -72,7 +70,7 @@ func (rs *ReqSender) Send(req *http.Request) {
case rs.requests <- req:
return
default:
if atomic.LoadInt64(&rs.RunningWorkers) < int64(rs.workerCount) {
if rs.runningWorkers.Load() < int64(rs.workerCount) {
go rs.processRequests()
}

Expand All @@ -82,20 +80,17 @@ func (rs *ReqSender) Send(req *http.Request) {
}

func (rs *ReqSender) processRequests() {
atomic.AddInt64(&rs.RunningWorkers, int64(1))
defer atomic.AddInt64(&rs.RunningWorkers, int64(-1))
rs.runningWorkers.Add(1)
defer rs.runningWorkers.Add(-1)

for {
select {
case <-rs.ctx.Done():
return
case req := <-rs.requests:
atomic.AddInt64(&rs.TotalRequestsStarted, int64(1))
if err := rs.sendRequest(req); err != nil {
atomic.AddInt64(&rs.TotalRequestsFailed, int64(1))
continue
}
atomic.AddInt64(&rs.TotalRequestsCompleted, int64(1))
}
}
}
Expand Down

0 comments on commit e185491

Please sign in to comment.