Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add an API to send multiple samples at once #235

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions src/main/java/com/timgroup/statsd/DirectStatsDClient.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package com.timgroup.statsd;

/**
* DirectStatsDClient is an experimental extension of {@link StatsDClient} that allows for direct access to some
* dogstatsd features.
*
* <p>It is not recommended to use this client in production. This client might allow you to take advantage of
* new features in the agent before they are released, but it might also break your application.
*/
public interface DirectStatsDClient extends StatsDClient {

/**
* Records values for the specified named distribution.
*
* <p>The method doesn't take care of breaking down the values array if it is too large. It's up to the caller to
* make sure the size is kept reasonable.</p>
*
* <p>This method is a DataDog extension, and may not work with other servers.</p>
*
* <p>This method is non-blocking and is guaranteed not to throw an exception.</p>
vickenty marked this conversation as resolved.
Show resolved Hide resolved
*
* @param aspect the name of the distribution
* @param values the values to be incorporated in the distribution
* @param sampleRate percentage of time metric to be sent
* @param tags array of tags to be added to the data
*/
void recordDistributionValues(String aspect, double[] values, double sampleRate, String... tags);


/**
* Records values for the specified named distribution.
*
* <p>The method doesn't take care of breaking down the values array if it is too large. It's up to the caller to
* make sure the size is kept reasonable.</p>
*
* <p>This method is a DataDog extension, and may not work with other servers.</p>
*
* <p>This method is non-blocking and is guaranteed not to throw an exception.</p>
*
* @param aspect the name of the distribution
* @param values the values to be incorporated in the distribution
* @param sampleRate percentage of time metric to be sent
* @param tags array of tags to be added to the data
*/
void recordDistributionValues(String aspect, long[] values, double sampleRate, String... tags);
}
9 changes: 6 additions & 3 deletions src/main/java/com/timgroup/statsd/Message.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,13 @@ protected Message(String aspect, Message.Type type, String[] tags) {
* Write this message to the provided {@link StringBuilder}. Will
* be called from the sender threads.
*
* @param builder
* StringBuilder the text representation will be written to.
* @param builder StringBuilder the text representation will be written to.
* @param capacity The capacity of the send buffer.
* @param containerID The container ID to be appended to the message.
* @return boolean indicating whether the message was partially written to the builder.
* If true, the method will be called again with the same arguments to continue writing.
*/
abstract void writeTo(StringBuilder builder, String containerID);
abstract boolean writeTo(StringBuilder builder, int capacity, String containerID);

/**
* Aggregate message.
Expand Down
11 changes: 11 additions & 0 deletions src/main/java/com/timgroup/statsd/NoOpDirectStatsDClient.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package com.timgroup.statsd;

/**
* A No-Op {@link NonBlockingDirectStatsDClient}, which can be substituted in when metrics are not
* required.
*/
public final class NoOpDirectStatsDClient extends NoOpStatsDClient implements DirectStatsDClient {
@Override public void recordDistributionValues(String aspect, double[] values, double sampleRate, String... tags) { }

@Override public void recordDistributionValues(String aspect, long[] values, double sampleRate, String... tags) { }
}
5 changes: 4 additions & 1 deletion src/main/java/com/timgroup/statsd/NoOpStatsDClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@
* @author Tom Denley
*
*/
public final class NoOpStatsDClient implements StatsDClient {
public class NoOpStatsDClient implements StatsDClient {

NoOpStatsDClient() {}

@Override public void stop() { }

@Override public void close() { }
Expand Down
159 changes: 159 additions & 0 deletions src/main/java/com/timgroup/statsd/NonBlockingDirectStatsDClient.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
package com.timgroup.statsd;

final class NonBlockingDirectStatsDClient extends NonBlockingStatsDClient implements DirectStatsDClient {

public NonBlockingDirectStatsDClient(final NonBlockingStatsDClientBuilder builder) throws StatsDClientException {
super(builder);
}

@Override
public void recordDistributionValues(String aspect, double[] values, double sampleRate, String... tags) {
if ((Double.isNaN(sampleRate) || !isInvalidSample(sampleRate)) && values != null && values.length > 0) {
if (values.length == 1) {
recordDistributionValue(aspect, values[0], sampleRate, tags);
} else {
sendMetric(new DoublesStatsDMessage(aspect, Message.Type.DISTRIBUTION, values, sampleRate, 0, tags));
}
}
}

@Override
public void recordDistributionValues(String aspect, long[] values, double sampleRate, String... tags) {
if ((Double.isNaN(sampleRate) || !isInvalidSample(sampleRate)) && values != null && values.length > 0) {
if (values.length == 1) {
recordDistributionValue(aspect, values[0], sampleRate, tags);
} else {
sendMetric(new LongsStatsDMessage(aspect, Message.Type.DISTRIBUTION, values, sampleRate, 0, tags));
}
}
}

abstract class MultiValuedStatsDMessage extends Message {
private final double sampleRate; // NaN for none
private final long timestamp; // zero for none
private int metadataSize = -1; // Cache the size of the metadata, -1 means not calculated yet
private int offset = 0; // The index of the first value that has not been written

MultiValuedStatsDMessage(String aspect, Message.Type type, String[] tags, double sampleRate, long timestamp) {
super(aspect, type, tags);
this.sampleRate = sampleRate;
this.timestamp = timestamp;
}

@Override
public final boolean canAggregate() {
return false;
}

@Override
public final void aggregate(Message message) {
}

@Override
public final boolean writeTo(StringBuilder builder, int capacity, String containerID) {
int metadataSize = metadataSize(builder, containerID);
writeHeadMetadata(builder);
boolean partialWrite = writeValuesTo(builder, capacity - metadataSize);
writeTailMetadata(builder, containerID);
return partialWrite;

}

private int metadataSize(StringBuilder builder, String containerID) {
if (metadataSize == -1) {
int previousLength = builder.length();
writeHeadMetadata(builder);
writeTailMetadata(builder, containerID);
metadataSize = builder.length() - previousLength;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This counts characters, while the capacity passed into writeTo is in bytes, which can cause payload drops if the direct client is used with non-ascii data. Would it be possible to account for encoded length here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! I have fixed that in 37f858d

Copy link
Contributor Author

@blemale blemale Feb 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have again pushed the commit to use the target encoding:

final int previousEncodedLength = builder.toString().getBytes(StandardCharsets.UTF_8).length;

https://github.com/DataDog/java-dogstatsd-client/pull/235/files#diff-363801336ae6e5f0955834149b9e550375970e5c0dcde6b72ce36c69abed7bc6R67

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code is allocating, if it is a blocker, I can intern this code from guava: https://github.com/google/guava/blob/master/guava/src/com/google/common/base/Utf8.java#L49

builder.setLength(previousLength);
}
return metadataSize;
}

private void writeHeadMetadata(StringBuilder builder) {
builder.append(prefix).append(aspect);
}

private void writeTailMetadata(StringBuilder builder, String containerID) {
builder.append('|').append(type);
if (!Double.isNaN(sampleRate)) {
builder.append('|').append('@').append(format(SAMPLE_RATE_FORMATTER, sampleRate));
}
if (timestamp != 0) {
builder.append("|T").append(timestamp);
}
tagString(tags, builder);
if (containerID != null && !containerID.isEmpty()) {
builder.append("|c:").append(containerID);
}

builder.append('\n');
}

private boolean writeValuesTo(StringBuilder builder, int remainingCapacity) {
int maxLength = builder.length() + remainingCapacity;

// Add at least one value
builder.append(':');
writeValueTo(builder, offset);
int previousLength = builder.length();

// Add remaining values up to the max length
for (int i = offset + 1; i < lengthOfValues(); i++) {
builder.append(':');
writeValueTo(builder, i);
if (builder.length() > maxLength) {
builder.setLength(previousLength);
offset += i;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could be missing something, but this looks like it will skip values for chunks after second one: let's say we start with offset=3 and write 4 values at positions 3, 4, 5 and 6 before hitting overflow at i=7. This sets offset to 12 (3+7), loosing samples 7-11.

Copy link
Contributor Author

@blemale blemale Feb 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, I have updated the test and the implementation to fix that in d4f3cf8 as i is tracking the absolute index in the array not the relative one compare to the offset.

return true;
}
previousLength = builder.length();
}
offset = lengthOfValues();
return false;
}

protected abstract int lengthOfValues();

protected abstract void writeValueTo(StringBuilder buffer, int index);
}

final class LongsStatsDMessage extends MultiValuedStatsDMessage {
private final long[] values;

LongsStatsDMessage(String aspect, Message.Type type, long[] values, double sampleRate, long timestamp, String[] tags) {
super(aspect, type, tags, sampleRate, timestamp);
this.values = values;
}

@Override
protected int lengthOfValues() {
return values.length;
}

@Override
protected void writeValueTo(StringBuilder buffer, int index) {
buffer.append(values[index]);
}
}

final class DoublesStatsDMessage extends MultiValuedStatsDMessage {
private final double[] values;

DoublesStatsDMessage(String aspect, Message.Type type, double[] values, double sampleRate, long timestamp,
String[] tags) {
super(aspect, type, tags, sampleRate, timestamp);
this.values = values;
}

@Override
protected int lengthOfValues() {
return values.length;
}

@Override
protected void writeValueTo(StringBuilder buffer, int index) {
buffer.append(values[index]);
}
}
}
28 changes: 17 additions & 11 deletions src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ protected static String format(ThreadLocal<NumberFormat> formatter, Number value
return formatter.get().format(value);
}

private final String prefix;
final String prefix;
private final ClientChannel clientChannel;
private final ClientChannel telemetryClientChannel;
private final StatsDClientErrorHandler handler;
Expand Down Expand Up @@ -247,7 +247,7 @@ protected static String format(ThreadLocal<NumberFormat> formatter, Number value
* @throws StatsDClientException
* if the client could not be started
*/
private NonBlockingStatsDClient(final String prefix, final int queueSize, final String[] constantTags,
NonBlockingStatsDClient(final String prefix, final int queueSize, final String[] constantTags,
final StatsDClientErrorHandler errorHandler, final Callable<SocketAddress> addressLookup,
final Callable<SocketAddress> telemetryAddressLookup, final int timeout, final int bufferSize,
final int maxPacketSizeBytes, String entityID, final int poolSize, final int processorWorkers,
Expand Down Expand Up @@ -526,7 +526,7 @@ protected StatsDMessage(String aspect, Message.Type type, T value, double sample
}

@Override
public final void writeTo(StringBuilder builder, String containerID) {
public final boolean writeTo(StringBuilder builder, int capacity, String containerID) {
builder.append(prefix).append(aspect).append(':');
writeValue(builder);
builder.append('|').append(type);
Expand All @@ -542,6 +542,7 @@ public final void writeTo(StringBuilder builder, String containerID) {
}

builder.append('\n');
return false;
}

@Override
Expand All @@ -554,7 +555,7 @@ public boolean canAggregate() {
}


private boolean sendMetric(final Message message) {
boolean sendMetric(final Message message) {
return send(message);
}

Expand Down Expand Up @@ -1145,7 +1146,7 @@ private StringBuilder eventMap(final Event event, StringBuilder res) {
@Override
public void recordEvent(final Event event, final String... eventTags) {
statsDProcessor.send(new AlphaNumericMessage(Message.Type.EVENT, "") {
@Override public void writeTo(StringBuilder builder, String containerID) {
@Override public boolean writeTo(StringBuilder builder, int capacity, String containerID) {
final String title = escapeEventString(prefix + event.getTitle());
final String text = escapeEventString(event.getText());
builder.append(Message.Type.EVENT.toString())
Expand All @@ -1168,6 +1169,7 @@ public void recordEvent(final Event event, final String... eventTags) {
}

builder.append('\n');
return false;
}
});
this.telemetry.incrEventsSent(1);
Expand Down Expand Up @@ -1200,13 +1202,14 @@ private int getUtf8Length(final String text) {
@Override
public void recordServiceCheckRun(final ServiceCheck sc) {
statsDProcessor.send(new AlphaNumericMessage(Message.Type.SERVICE_CHECK, "") {
@Override public void writeTo(StringBuilder sb, String containerID) {
@Override
public boolean writeTo(StringBuilder sb, int capacity, String containerID) {
// see http://docs.datadoghq.com/guides/dogstatsd/#service-checks
sb.append(Message.Type.SERVICE_CHECK.toString())
.append("|")
.append(sc.getName())
.append("|")
.append(sc.getStatus());
.append("|")
.append(sc.getName())
.append("|")
.append(sc.getStatus());
if (sc.getTimestamp() > 0) {
sb.append("|d:").append(sc.getTimestamp());
}
Expand All @@ -1222,6 +1225,7 @@ public void recordServiceCheckRun(final ServiceCheck sc) {
}

sb.append('\n');
return false;
}
});
this.telemetry.incrServiceChecksSent(1);
Expand Down Expand Up @@ -1286,7 +1290,8 @@ protected void writeValue(StringBuilder builder) {
builder.append(getValue());
}

@Override protected final void writeTo(StringBuilder builder, String containerID) {
@Override
protected final boolean writeTo(StringBuilder builder, int capacity, String containerID) {
builder.append(prefix).append(aspect).append(':');
writeValue(builder);
builder.append('|').append(type);
Expand All @@ -1296,6 +1301,7 @@ protected void writeValue(StringBuilder builder) {
}

builder.append('\n');
return false;
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,18 @@ public NonBlockingStatsDClient build() throws StatsDClientException {
return new NonBlockingStatsDClient(resolve());
}

/**
* {@link DirectStatsDClient} factory method.
*
* <p>It is an experimental extension of {@link StatsDClient} that allows for direct access to some dogstatsd features.
* It is not recommended to use this client in production.
* @return the built DirectStatsDClient.
* @see DirectStatsDClient
*/
public DirectStatsDClient buildDirectStatsDClient() throws StatsDClientException {
return new NonBlockingDirectStatsDClient(resolve());
}

/**
* Creates a copy of this builder with any implicit elements resolved.
* @return the resolved copy of the builder.
Expand Down
Loading
Loading