From 3eba11100e0223c58ffd1d25541c0109702360e3 Mon Sep 17 00:00:00 2001 From: "Andrew S. Brown" Date: Tue, 16 Jul 2019 20:18:28 -0700 Subject: [PATCH 1/3] Allow setting initial timestamp --- consumer.go | 5 +++++ options.go | 13 ++++++++++++- 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/consumer.go b/consumer.go index 1ea5c5cd..50086646 100644 --- a/consumer.go +++ b/consumer.go @@ -6,6 +6,7 @@ import ( "fmt" "io/ioutil" "log" + "time" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/session" @@ -60,6 +61,7 @@ func New(streamName string, opts ...Option) (*Consumer, error) { type Consumer struct { streamName string initialShardIteratorType string + initialTimestamp *time.Time client kinesisiface.KinesisAPI logger Logger group Group @@ -199,6 +201,9 @@ func (c *Consumer) getShardIterator(streamName, shardID, seqNum string) (*string if seqNum != "" { params.ShardIteratorType = aws.String(kinesis.ShardIteratorTypeAfterSequenceNumber) params.StartingSequenceNumber = aws.String(seqNum) + } else if c.initialTimestamp != nil { + params.ShardIteratorType = aws.String(kinesis.ShardIteratorTypeAtTimestamp) + params.Timestamp = c.initialTimestamp } else { params.ShardIteratorType = aws.String(c.initialShardIteratorType) } diff --git a/options.go b/options.go index 08769314..79afc3b9 100644 --- a/options.go +++ b/options.go @@ -1,6 +1,10 @@ package consumer -import "github.com/aws/aws-sdk-go/service/kinesis/kinesisiface" +import ( + "time" + + "github.com/aws/aws-sdk-go/service/kinesis/kinesisiface" +) // Option is used to override defaults when creating a new Consumer type Option func(*Consumer) @@ -39,3 +43,10 @@ func WithShardIteratorType(t string) Option { c.initialShardIteratorType = t } } + +// Timestamp overrides the starting point for the consumer +func WithTimestamp(t time.Time) Option { + return func(c *Consumer) { + c.initialTimestamp = &t + } +} From 4f407a30e82bac9d3d3da51aacce1a42530bbbd8 Mon Sep 17 00:00:00 2001 From: "Andrew S. Brown" Date: Tue, 16 Jul 2019 23:13:01 -0700 Subject: [PATCH 2/3] Fix writing to closed channel --- consumer.go | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/consumer.go b/consumer.go index 50086646..c2340b84 100644 --- a/consumer.go +++ b/consumer.go @@ -6,6 +6,7 @@ import ( "fmt" "io/ioutil" "log" + "sync" "time" "github.com/aws/aws-sdk-go/aws" @@ -99,22 +100,29 @@ func (c *Consumer) Scan(ctx context.Context, fn ScanFunc) error { close(shardc) }() + wg := new(sync.WaitGroup) // process each of the shards for shard := range shardc { + wg.Add(1) go func(shardID string) { + defer wg.Done() if err := c.ScanShard(ctx, shardID, fn); err != nil { select { case errc <- fmt.Errorf("shard %s error: %v", shardID, err): // first error to occur cancel() default: - // error has already occured + // error has already occurred } } }(aws.StringValue(shard.ShardId)) } - close(errc) + go func() { + wg.Wait() + close(errc) + }() + return <-errc } From 1f35253fe5d0b23663388df2ce0da33009f3b70b Mon Sep 17 00:00:00 2001 From: "Andrew S. Brown" Date: Fri, 19 Jul 2019 20:48:15 -0700 Subject: [PATCH 3/3] Allow cancelling of request --- consumer.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/consumer.go b/consumer.go index c2340b84..c646b185 100644 --- a/consumer.go +++ b/consumer.go @@ -136,7 +136,7 @@ func (c *Consumer) ScanShard(ctx context.Context, shardID string, fn ScanFunc) e } // get shard iterator - shardIterator, err := c.getShardIterator(c.streamName, shardID, lastSeqNum) + shardIterator, err := c.getShardIterator(ctx, c.streamName, shardID, lastSeqNum) if err != nil { return fmt.Errorf("get shard iterator error: %v", err) } @@ -157,7 +157,7 @@ func (c *Consumer) ScanShard(ctx context.Context, shardID string, fn ScanFunc) e // attempt to recover from GetRecords error by getting new shard iterator if err != nil { - shardIterator, err = c.getShardIterator(c.streamName, shardID, lastSeqNum) + shardIterator, err = c.getShardIterator(ctx, c.streamName, shardID, lastSeqNum) if err != nil { return fmt.Errorf("get shard iterator error: %v", err) } @@ -200,7 +200,7 @@ func isShardClosed(nextShardIterator, currentShardIterator *string) bool { return nextShardIterator == nil || currentShardIterator == nextShardIterator } -func (c *Consumer) getShardIterator(streamName, shardID, seqNum string) (*string, error) { +func (c *Consumer) getShardIterator(ctx context.Context, streamName, shardID, seqNum string) (*string, error) { params := &kinesis.GetShardIteratorInput{ ShardId: aws.String(shardID), StreamName: aws.String(streamName), @@ -216,6 +216,6 @@ func (c *Consumer) getShardIterator(streamName, shardID, seqNum string) (*string params.ShardIteratorType = aws.String(c.initialShardIteratorType) } - res, err := c.client.GetShardIterator(params) + res, err := c.client.GetShardIteratorWithContext(aws.Context(ctx), params) return res.ShardIterator, err }