Skip to content

Commit

Permalink
Directly use the unix timestamp instead of time
Browse files Browse the repository at this point in the history
  • Loading branch information
codebien committed May 30, 2023
1 parent 986db71 commit 7c2ddb0
Show file tree
Hide file tree
Showing 8 changed files with 65 additions and 25 deletions.
15 changes: 9 additions & 6 deletions output/cloud/expv2/collect.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,7 @@ import (
)

type timeBucket struct {
// TODO: for performance reasons, use directly a unix time here
// so we can avoid time->unix->time
Time time.Time
Time int64
Sinks map[metrics.TimeSeries]metricValue
}

Expand Down Expand Up @@ -68,12 +66,18 @@ func newCollector(aggrPeriod, waitPeriod time.Duration) (*collector, error) {
if aggrPeriod == 0 {
return nil, errors.New("aggregation period is not allowed to be zero")
}
if aggrPeriod != aggrPeriod.Truncate(time.Second) {
return nil, errors.New("aggregation period is not allowed to have sub-second precision")
}
if waitPeriod == 0 {
// TODO: we could simplify the expiring logic
// just having an internal static logic.
// Like skip only not closed buckets bucketEnd > now.
return nil, errors.New("aggregation wait period is not allowed to be zero")
}
if waitPeriod != waitPeriod.Truncate(time.Second) {
return nil, errors.New("aggregation wait period is not allowed to have sub-second precision")
}
return &collector{
bq: bucketQ{},
nowFunc: time.Now,
Expand Down Expand Up @@ -152,9 +156,8 @@ func (c *collector) bucketID(t time.Time) int64 {
return t.UnixNano() / int64(c.aggregationPeriod)
}

func (c *collector) timeFromBucketID(id int64) time.Time {
return time.Unix(0, id*int64(c.aggregationPeriod)).
Truncate(time.Microsecond).UTC()
func (c *collector) timeFromBucketID(id int64) int64 {
return id * int64(c.aggregationPeriod)
}

func (c *collector) bucketCutoffID() int64 {
Expand Down
21 changes: 15 additions & 6 deletions output/cloud/expv2/collect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,15 @@ import (
"go.k6.io/k6/metrics"
)

func TestNewCollectorError(t *testing.T) {
// TODO: more cases
_, err := newCollector(4*time.Second+300*time.Millisecond, 1*time.Second)
require.ErrorContains(t, err, "sub-second precision")

_, err = newCollector(4*time.Second, 1*time.Second+300*time.Millisecond)
require.ErrorContains(t, err, "sub-second precision")
}

func TestCollectorCollectSample(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -174,7 +183,7 @@ func TestCollectorExpiredBucketsCutoff(t *testing.T) {
assert.NotContains(t, c.timeBuckets, 3)

require.Len(t, expired, 1)
expDateTime := time.Unix(9, 0).UTC()
expDateTime := time.Unix(9, 0).UTC().UnixNano()
assert.Equal(t, expDateTime, expired[0].Time)
}

Expand Down Expand Up @@ -205,7 +214,7 @@ func TestCollectorTimeFromBucketID(t *testing.T) {
c := collector{aggregationPeriod: 3 * time.Second}

// exp = TimeFromUnix(bucketID * aggregationPeriod) = Time(49 * 3s)
exp := time.Date(1970, time.January, 1, 0, 2, 27, 0, time.UTC)
exp := time.Date(1970, time.January, 1, 0, 2, 27, 0, time.UTC).UnixNano()
assert.Equal(t, exp, c.timeFromBucketID(49))
}

Expand All @@ -228,16 +237,16 @@ func TestBucketQPush(t *testing.T) {
t.Parallel()

bq := bucketQ{}
bq.Push([]timeBucket{{Time: time.Unix(1, 0)}})
bq.Push([]timeBucket{{Time: int64(1 * time.Second)}})
require.Len(t, bq.buckets, 1)
}

func TestBucketQPopAll(t *testing.T) {
t.Parallel()
bq := bucketQ{
buckets: []timeBucket{
{Time: time.Unix(1, 0)},
{Time: time.Unix(2, 0)},
{Time: int64(1 * time.Second)},
{Time: int64(2 * time.Second)},
},
}
buckets := bq.PopAll()
Expand Down Expand Up @@ -273,7 +282,7 @@ func TestBucketQPushPopConcurrency(t *testing.T) {
}
}()

now := time.Now()
now := time.Now().Truncate(time.Second).UnixNano()
for {
select {
case <-stop:
Expand Down
3 changes: 1 addition & 2 deletions output/cloud/expv2/flush_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package expv2

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand All @@ -24,7 +23,7 @@ func TestMetricSetBuilderAddTimeBucket(t *testing.T) {
}

tb := timeBucket{
Time: time.Unix(1, 0),
Time: 1,
Sinks: map[metrics.TimeSeries]metricValue{
timeSeries: &counter{},
},
Expand Down
6 changes: 2 additions & 4 deletions output/cloud/expv2/hdr.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,8 @@ package expv2
import (
"math"
"math/bits"
"time"

"go.k6.io/k6/output/cloud/expv2/pbcloud"
"google.golang.org/protobuf/types/known/timestamppb"
)

const (
Expand Down Expand Up @@ -183,9 +181,9 @@ func (h *histogram) appendBuckets(index uint32) {
}

// histogramAsProto converts the histogram into the equivalent Protobuf version.
func histogramAsProto(h *histogram, time time.Time) *pbcloud.TrendHdrValue {
func histogramAsProto(h *histogram, time int64) *pbcloud.TrendHdrValue {
hval := &pbcloud.TrendHdrValue{
Time: timestamppb.New(time),
Time: timestampAsProto(time),
MinResolution: 1.0,
SignificantDigits: 2,
LowerCounterIndex: h.FirstNotZeroBucket,
Expand Down
2 changes: 1 addition & 1 deletion output/cloud/expv2/hdr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,6 @@ func TestHistogramAsProto(t *testing.T) {
tc.exp.MinResolution = 1.0
tc.exp.SignificantDigits = 2
tc.exp.Time = &timestamppb.Timestamp{Seconds: 1}
assert.Equal(t, tc.exp, histogramAsProto(&h, time.Unix(1, 0)), tc.name)
assert.Equal(t, tc.exp, histogramAsProto(&h, time.Unix(1, 0).UnixNano()), tc.name)
}
}
22 changes: 17 additions & 5 deletions output/cloud/expv2/mapping.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package expv2

import (
"fmt"
"time"

"github.com/mstoykov/atlas"
"go.k6.io/k6/metrics"
Expand Down Expand Up @@ -51,7 +50,7 @@ func mapMetricTypeProto(mt metrics.MetricType) pbcloud.MetricType {
func addBucketToTimeSeriesProto(
timeSeries *pbcloud.TimeSeries,
mt metrics.MetricType,
time time.Time,
time int64,
value metricValue,
) {
if timeSeries.Samples == nil {
Expand All @@ -62,13 +61,13 @@ func addBucketToTimeSeriesProto(
case *counter:
samples := timeSeries.GetCounterSamples()
samples.Values = append(samples.Values, &pbcloud.CounterValue{
Time: timestamppb.New(time),
Time: timestampAsProto(time),
Value: typedMetricValue.Sum,
})
case *gauge:
samples := timeSeries.GetGaugeSamples()
samples.Values = append(samples.Values, &pbcloud.GaugeValue{
Time: timestamppb.New(time),
Time: timestampAsProto(time),
Last: typedMetricValue.Last,
Min: typedMetricValue.Max,
Max: typedMetricValue.Min,
Expand All @@ -78,7 +77,7 @@ func addBucketToTimeSeriesProto(
case *rate:
samples := timeSeries.GetRateSamples()
samples.Values = append(samples.Values, &pbcloud.RateValue{
Time: timestamppb.New(time),
Time: timestampAsProto(time),
NonzeroCount: typedMetricValue.NonZeroCount,
TotalCount: typedMetricValue.Total,
})
Expand Down Expand Up @@ -111,3 +110,16 @@ func initTimeSeriesSamples(timeSeries *pbcloud.TimeSeries, mt metrics.MetricType
}
}
}

func timestampAsProto(unixnano int64) *timestamppb.Timestamp {
sec := unixnano / 1e9
return &timestamppb.Timestamp{
Seconds: sec,
// sub-second precision for aggregation is not expected
// so we don't waste cpu-time computing nanos.
//
// In the case this assumption is changed then enable them
// by int32(unixnano - (sec * 1e9))
Nanos: 0,
}
}
19 changes: 19 additions & 0 deletions output/cloud/expv2/mapping_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package expv2

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestTimestampAsProto(t *testing.T) {
date := time.Unix(10, 0).UTC()
timestamp := timestampAsProto(date.UnixNano())
assert.Equal(t, date, timestamp.AsTime())

// sub-second precision is not supported
date = time.Unix(10, 282).UTC()
timestamp = timestampAsProto(date.UnixNano())
assert.Equal(t, time.Unix(10, 0).UTC(), timestamp.AsTime())
}
2 changes: 1 addition & 1 deletion output/cloud/expv2/output_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func TestOutputCollectSamples(t *testing.T) {
require.True(t, ok)
assert.Equal(t, 5.0, counter.Sum)

expTime := time.Date(2023, time.May, 1, 1, 1, 15, 0, time.UTC)
expTime := time.Date(2023, time.May, 1, 1, 1, 15, 0, time.UTC).UnixNano()
assert.Equal(t, expTime, buckets[0].Time)
}

Expand Down

0 comments on commit 7c2ddb0

Please sign in to comment.