Skip to content

Commit

Permalink
Introduced interface on KinesisProducer for testability/mockability (#…
Browse files Browse the repository at this point in the history
…136)

Introduced UserRecord object and KinesisProducer.addUserRecord(UserRecord) method.
  • Loading branch information
JeffAtDeere authored and pfifer committed Nov 13, 2017
1 parent 14f4cce commit a73efe7
Show file tree
Hide file tree
Showing 5 changed files with 201 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public class Attempt {
private String errorCode;
private boolean success;

private Attempt(int delay, int duration, String errorMessage, String errorCode, boolean success) {
public Attempt(int delay, int duration, String errorMessage, String errorCode, boolean success) {
this.delay = delay;
this.duration = duration;
this.errorMessage = errorMessage;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package com.amazonaws.services.kinesis.producer;

import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.ExecutionException;
import com.google.common.util.concurrent.ListenableFuture;


public interface IKinesisProducer {
ListenableFuture<UserRecordResult> addUserRecord(String stream, String partitionKey, ByteBuffer data);

ListenableFuture<UserRecordResult> addUserRecord(UserRecord userRecord);

ListenableFuture<UserRecordResult> addUserRecord(String stream, String partitionKey, String explicitHashKey, ByteBuffer data);

int getOutstandingRecordsCount();

List<Metric> getMetrics(String metricName, int windowSeconds) throws InterruptedException, ExecutionException;

List<Metric> getMetrics(String metricName) throws InterruptedException, ExecutionException;

List<Metric> getMetrics() throws InterruptedException, ExecutionException;

List<Metric> getMetrics(int windowSeconds) throws InterruptedException, ExecutionException;

void destroy();

void flush(String stream);

void flush();

void flushSync();
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@
* @author chaodeng
*
*/
public class KinesisProducer {
public class KinesisProducer implements IKinesisProducer {
private static final Logger log = LoggerFactory.getLogger(KinesisProducer.class);

private static final BigInteger UINT_128_MAX = new BigInteger(StringUtils.repeat("FF", 16), 16);
Expand Down Expand Up @@ -344,10 +344,68 @@ protected KinesisProducer(File inPipe, File outPipe) {
* @see KinesisProducerConfiguration#setRecordTtl(long)
* @see UserRecordFailedException
*/
@Override
public ListenableFuture<UserRecordResult> addUserRecord(String stream, String partitionKey, ByteBuffer data) {
return addUserRecord(stream, partitionKey, null, data);
}


/**
* Put a record asynchronously. A {@link ListenableFuture} is returned that
* can be used to retrieve the result, either by polling or by registering a
* callback.
*
* <p>
* The return value can be disregarded if you do not wish to process the
* result. Under the covers, the KPL will automatically re-attempt puts in
* case of transient errors (including throttling). A failed result is
* generally returned only if an irrecoverable error is detected (e.g.
* trying to put to a stream that doesn't exist), or if the record expires.
*
* <p>
* <b>Thread safe.</b>
*
* <p>
* To add a listener to the future:
* <p>
* <code>
* ListenableFuture&lt;PutRecordResult&gt; f = myKinesisProducer.addUserRecord(...);
* com.google.common.util.concurrent.Futures.addCallback(f, callback, executor);
* </code>
* <p>
* where <code>callback</code> is an instance of
* {@link com.google.common.util.concurrent.FutureCallback} and
* <code>executor</code> is an instance of
* {@link java.util.concurrent.Executor}.
* <p>
* <b>Important:</b>
* <p>
* If long-running tasks are performed in the callbacks, it is recommended
* that a custom executor be provided when registering callbacks to ensure
* that there are enough threads to achieve the desired level of
* parallelism. By default, the KPL will use an internal thread pool to
* execute callbacks, but this pool may not have a sufficient number of
* threads if a large number is desired.
* <p>
* Another option would be to hand the result off to a different component
* for processing and keep the callback routine fast.
*
* @param userRecord
* All data necessary to write to the stream.
* @return A future for the result of the put.
* @throws IllegalArgumentException
* if input does not meet stated constraints
* @throws DaemonException
* if the child process is dead
* @see ListenableFuture
* @see UserRecordResult
* @see KinesisProducerConfiguration#setRecordTtl(long)
* @see UserRecordFailedException
*/
@Override
public ListenableFuture<UserRecordResult> addUserRecord(UserRecord userRecord) {
return addUserRecord(userRecord.getStreamName(), userRecord.getPartitionKey(), userRecord.getExplicitHashKey(), userRecord.getData());
}

/**
* Put a record asynchronously. A {@link ListenableFuture} is returned that
* can be used to retrieve the result, either by polling or by registering a
Expand Down Expand Up @@ -410,6 +468,7 @@ public ListenableFuture<UserRecordResult> addUserRecord(String stream, String pa
* @see KinesisProducerConfiguration#setRecordTtl(long)
* @see UserRecordFailedException
*/
@Override
public ListenableFuture<UserRecordResult> addUserRecord(String stream, String partitionKey, String explicitHashKey, ByteBuffer data) {
if (stream == null) {
throw new IllegalArgumentException("Stream name cannot be null");
Expand Down Expand Up @@ -490,6 +549,7 @@ public ListenableFuture<UserRecordResult> addUserRecord(String stream, String pa
*
* @return The number of unfinished records currently being processed.
*/
@Override
public int getOutstandingRecordsCount() {
return futures.size();
}
Expand Down Expand Up @@ -530,6 +590,7 @@ public int getOutstandingRecordsCount() {
* from the child process.
* @see Metric
*/
@Override
public List<Metric> getMetrics(String metricName, int windowSeconds) throws InterruptedException, ExecutionException {
MetricsRequest.Builder mrb = MetricsRequest.newBuilder();
if (metricName != null) {
Expand Down Expand Up @@ -587,6 +648,7 @@ public List<Metric> getMetrics(String metricName, int windowSeconds) throws Inte
* from the child process.
* @see Metric
*/
@Override
public List<Metric> getMetrics(String metricName) throws InterruptedException, ExecutionException {
return getMetrics(metricName, -1);
}
Expand Down Expand Up @@ -624,6 +686,7 @@ public List<Metric> getMetrics(String metricName) throws InterruptedException, E
* from the child process.
* @see Metric
*/
@Override
public List<Metric> getMetrics() throws InterruptedException, ExecutionException {
return getMetrics(null);
}
Expand Down Expand Up @@ -661,6 +724,7 @@ public List<Metric> getMetrics() throws InterruptedException, ExecutionException
* from the child process.
* @see Metric
*/
@Override
public List<Metric> getMetrics(int windowSeconds) throws InterruptedException, ExecutionException {
return getMetrics(null, windowSeconds);
}
Expand All @@ -684,6 +748,7 @@ public List<Metric> getMetrics(int windowSeconds) throws InterruptedException, E
* terminate the child process. If you are terminating the JVM then calling
* destroy is unnecessary since it will be done automatically.
*/
@Override
public void destroy() {
destroyed = true;
child.destroy();
Expand All @@ -706,6 +771,7 @@ public void destroy() {
* @throws DaemonException
* if the child process is dead
*/
@Override
public void flush(String stream) {
Flush.Builder f = Flush.newBuilder();
if (stream != null) {
Expand Down Expand Up @@ -733,6 +799,7 @@ public void flush(String stream) {
* @throws DaemonException
* if the child process is dead
*/
@Override
public void flush() {
flush(null);
}
Expand All @@ -757,6 +824,7 @@ public void flush() {
* @see KinesisProducerConfiguration#setRecordTtl(long)
* @see KinesisProducerConfiguration#setRequestTimeout(long)
*/
@Override
public void flushSync() {
while (getOutstandingRecordsCount() > 0) {
flush();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package com.amazonaws.services.kinesis.producer;

import java.nio.ByteBuffer;

public class UserRecord {
/**
* Stream to put to.
*/
private String streamName;

/**
* Partition key. Length must be at least one, and at most 256 (inclusive).
*/
private String partitionKey;

/**
* The hash value used to explicitly determine the shard the data
* record is assigned to by overriding the partition key hash.
* Must be a valid string representation of a positive integer
* with value between 0 and <tt>2^128 - 1</tt> (inclusive).
*/
private String explicitHashKey;

/**
* Binary data of the record. Maximum size 1MiB.
*/
private ByteBuffer data;

public UserRecord() {
}

public UserRecord(String streamName, String partitionKey, ByteBuffer data) {
this.streamName = streamName;
this.partitionKey = partitionKey;
this.data = data;
}

public UserRecord(String streamName, String partitionKey, String explicitHashKey, ByteBuffer data) {
this.streamName = streamName;
this.partitionKey = partitionKey;
this.explicitHashKey = explicitHashKey;
this.data = data;
}

public String getStreamName() {
return streamName;
}

public void setStreamName(String streamName) {
this.streamName = streamName;
}

public UserRecord withStreamName(String streamName) {
this.streamName = streamName;
return this;
}

public String getPartitionKey() {
return partitionKey;
}

public void setPartitionKey(String partitionKey) {
this.partitionKey = partitionKey;
}

public UserRecord withPartitionKey(String partitionKey) {
this.partitionKey = partitionKey;
return this;
}

public ByteBuffer getData() {
return data;
}

public void setData(ByteBuffer data) {
this.data = data;
}

public UserRecord withData(ByteBuffer byteBuffer) {
this.data = byteBuffer;
return this;
}

public String getExplicitHashKey() {
return explicitHashKey;
}

public void setExplicitHashKey(String explicitHashKey) {
this.explicitHashKey = explicitHashKey;
}

public UserRecord withExplicitHashKey(String explicitHashKey) {
this.explicitHashKey = explicitHashKey;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public class UserRecordResult {
private String shardId;
private boolean successful;

private UserRecordResult(List<Attempt> attempts, String sequenceNumber, String shardId, boolean successful) {
public UserRecordResult(List<Attempt> attempts, String sequenceNumber, String shardId, boolean successful) {
this.attempts = attempts;
this.sequenceNumber = sequenceNumber;
this.shardId = shardId;
Expand Down

0 comments on commit a73efe7

Please sign in to comment.