diff --git a/.circleci/config.yml b/.circleci/config.yml index e370cd42..341ad39e 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -1,5 +1,8 @@ version: 2.1 +orbs: + win: circleci/windows@2.4.1 + commands: create_custom_cache_lock: description: "Create custom cache lock for java version." @@ -45,6 +48,14 @@ jobs: docker: - image: circleci/openjdk:14.0.2-buster <<: *default_steps + windows-openjdk12: + executor: win/default + steps: + - checkout + - run: | + choco install maven + - run: | + mvn clean install workflows: version: 2 @@ -55,3 +66,4 @@ workflows: - openjdk9 - openjdk11 - openjdk13 + - windows-openjdk12 diff --git a/pom.xml b/pom.xml index 8eb7930f..b74bdd6e 100644 --- a/pom.xml +++ b/pom.xml @@ -149,6 +149,12 @@ test 1.18.0 + + net.java.dev.jna + jna-platform + test + 5.10.0 + com.github.jnr jnr-unixsocket diff --git a/src/main/java/com/timgroup/statsd/ClientChannel.java b/src/main/java/com/timgroup/statsd/ClientChannel.java new file mode 100644 index 00000000..e0d4ee16 --- /dev/null +++ b/src/main/java/com/timgroup/statsd/ClientChannel.java @@ -0,0 +1,7 @@ +package com.timgroup.statsd; + +import java.nio.channels.WritableByteChannel; + +public interface ClientChannel extends WritableByteChannel { + String getTransportType(); +} diff --git a/src/main/java/com/timgroup/statsd/DatagramClientChannel.java b/src/main/java/com/timgroup/statsd/DatagramClientChannel.java new file mode 100644 index 00000000..13c81826 --- /dev/null +++ b/src/main/java/com/timgroup/statsd/DatagramClientChannel.java @@ -0,0 +1,55 @@ +package com.timgroup.statsd; + +import java.io.IOException; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.DatagramChannel; + +public class DatagramClientChannel implements ClientChannel { + protected final DatagramChannel delegate; + private final SocketAddress address; + + /** + * Creates a new DatagramClientChannel using the default DatagramChannel. + * @param address Address to connect the channel to + * @throws IOException if an I/O error occurs + */ + public DatagramClientChannel(SocketAddress address) throws IOException { + this(DatagramChannel.open(), address); + } + + /** + * Creates a new DatagramClientChannel that wraps the delegate. + * @param delegate Implementation this instance wraps + * @param address Address to connect the channel to + */ + public DatagramClientChannel(DatagramChannel delegate, SocketAddress address) { + this.delegate = delegate; + this.address = address; + } + + @Override + public boolean isOpen() { + return delegate.isOpen(); + } + + @Override + public int write(ByteBuffer src) throws IOException { + return delegate.send(src, address); + } + + @Override + public void close() throws IOException { + delegate.close(); + } + + @Override + public String getTransportType() { + return "udp"; + } + + @Override + public String toString() { + return "[" + getTransportType() + "] " + address; + } +} diff --git a/src/main/java/com/timgroup/statsd/NamedPipeClientChannel.java b/src/main/java/com/timgroup/statsd/NamedPipeClientChannel.java new file mode 100644 index 00000000..b38e343c --- /dev/null +++ b/src/main/java/com/timgroup/statsd/NamedPipeClientChannel.java @@ -0,0 +1,51 @@ +package com.timgroup.statsd; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; + +public class NamedPipeClientChannel implements ClientChannel { + private final RandomAccessFile randomAccessFile; + private final FileChannel fileChannel; + private final String pipe; + + /** + * Creates a new NamedPipeClientChannel with the given address. + * + * @param address Location of named pipe + * @throws FileNotFoundException if pipe does not exist + */ + public NamedPipeClientChannel(NamedPipeSocketAddress address) throws FileNotFoundException { + pipe = address.getPipe(); + randomAccessFile = new RandomAccessFile(pipe, "rw"); + fileChannel = randomAccessFile.getChannel(); + } + + @Override + public boolean isOpen() { + return fileChannel.isOpen(); + } + + @Override + public int write(ByteBuffer src) throws IOException { + return fileChannel.write(src); + } + + @Override + public void close() throws IOException { + // closing the file also closes the channel + randomAccessFile.close(); + } + + @Override + public String getTransportType() { + return "namedpipe"; + } + + @Override + public String toString() { + return pipe; + } +} diff --git a/src/main/java/com/timgroup/statsd/NamedPipeSocketAddress.java b/src/main/java/com/timgroup/statsd/NamedPipeSocketAddress.java new file mode 100644 index 00000000..d4b4c4fa --- /dev/null +++ b/src/main/java/com/timgroup/statsd/NamedPipeSocketAddress.java @@ -0,0 +1,27 @@ +package com.timgroup.statsd; + +import java.net.SocketAddress; + +public class NamedPipeSocketAddress extends SocketAddress { + private static final String NAMED_PIPE_PREFIX = "\\\\.\\pipe\\"; + private final String pipe; + + public NamedPipeSocketAddress(String pipeName) { + this.pipe = normalizePipeName(pipeName); + } + + public String getPipe() { + return pipe; + } + + /** + * A normalized version of the pipe name that includes the `\\.\pipe\` prefix + */ + static String normalizePipeName(String pipeName) { + if (pipeName.startsWith(NAMED_PIPE_PREFIX)) { + return pipeName; + } else { + return NAMED_PIPE_PREFIX + pipeName; + } + } +} diff --git a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java index 972a85f6..b88ee0d3 100644 --- a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java +++ b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java @@ -1,15 +1,13 @@ package com.timgroup.statsd; -import jnr.unixsocket.UnixDatagramChannel; import jnr.unixsocket.UnixSocketAddress; -import jnr.unixsocket.UnixSocketOptions; import java.io.IOException; -import java.lang.Double; import java.net.SocketAddress; import java.nio.ByteBuffer; -import java.nio.channels.DatagramChannel; +import java.nio.channels.WritableByteChannel; import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; import java.text.DecimalFormat; import java.text.DecimalFormatSymbols; import java.text.NumberFormat; @@ -23,7 +21,6 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; - /** * A simple StatsD client implementation facilitating metrics recording. * @@ -56,6 +53,7 @@ public class NonBlockingStatsDClient implements StatsDClient { public static final String DD_DOGSTATSD_PORT_ENV_VAR = "DD_DOGSTATSD_PORT"; public static final String DD_AGENT_HOST_ENV_VAR = "DD_AGENT_HOST"; + public static final String DD_NAMED_PIPE_ENV_VAR = "DD_DOGSTATSD_PIPE_NAME"; public static final String DD_ENTITY_ID_ENV_VAR = "DD_ENTITY_ID"; private static final String ENTITY_ID_TAG_NAME = "dd.internal.entity_id" ; @@ -100,7 +98,7 @@ String tag() { /** * UTF-8 is the expected encoding for data sent to the agent. */ - public static final Charset UTF_8 = Charset.forName("UTF-8"); + public static final Charset UTF_8 = StandardCharsets.UTF_8; private static final StatsDClientErrorHandler NO_OP_HANDLER = new StatsDClientErrorHandler() { @Override public void handle(final Exception ex) { /* No-op */ } @@ -154,7 +152,8 @@ protected static String format(ThreadLocal formatter, Number value } private final String prefix; - private final DatagramChannel clientChannel; + private final ClientChannel clientChannel; + private final ClientChannel telemetryClientChannel; private final StatsDClientErrorHandler handler; private final String constantTagsRendered; @@ -261,58 +260,27 @@ private NonBlockingStatsDClient(final String prefix, final int queueSize, final costantPreTags = null; } - String transportType = ""; try { - final SocketAddress address = addressLookup.call(); - if (address instanceof UnixSocketAddress) { - clientChannel = UnixDatagramChannel.open(); - // Set send timeout, to handle the case where the transmission buffer is full - // If no timeout is set, the send becomes blocking - if (timeout > 0) { - clientChannel.setOption(UnixSocketOptions.SO_SNDTIMEO, timeout); - } - if (bufferSize > 0) { - clientChannel.setOption(UnixSocketOptions.SO_SNDBUF, bufferSize); - } - transportType = "uds"; - } else { - clientChannel = DatagramChannel.open(); - transportType = "udp"; - } + clientChannel = createByteChannel(addressLookup, timeout, bufferSize); ThreadFactory threadFactory = customThreadFactory != null ? customThreadFactory : new StatsDThreadFactory(); statsDProcessor = createProcessor(queueSize, handler, maxPacketSizeBytes, poolSize, processorWorkers, blocking, aggregationFlushInterval, aggregationShards, threadFactory); - telemetryStatsDProcessor = statsDProcessor; Properties properties = new Properties(); properties.load(getClass().getClassLoader().getResourceAsStream( "dogstatsd/version.properties")); - String telemetryTags = tagString(new String[]{CLIENT_TRANSPORT_TAG + transportType, + String telemetryTags = tagString(new String[]{CLIENT_TRANSPORT_TAG + clientChannel.getTransportType(), CLIENT_VERSION_TAG + properties.getProperty("dogstatsd_client_version"), CLIENT_TAG}, new StringBuilder()).toString(); - DatagramChannel telemetryClientChannel = clientChannel; - if (addressLookup != telemetryAddressLookup) { - - final SocketAddress telemetryAddress = telemetryAddressLookup.call(); - if (telemetryAddress instanceof UnixSocketAddress) { - telemetryClientChannel = UnixDatagramChannel.open(); - // Set send timeout, to handle the case where the transmission buffer is full - // If no timeout is set, the send becomes blocking - if (timeout > 0) { - telemetryClientChannel.setOption(UnixSocketOptions.SO_SNDTIMEO, timeout); - } - if (bufferSize > 0) { - telemetryClientChannel.setOption(UnixSocketOptions.SO_SNDBUF, bufferSize); - } - } else if (transportType == "uds") { - // UDP clientChannel can submit to multiple addresses, we only need - // a new channel if transport type is UDS for main traffic. - telemetryClientChannel = DatagramChannel.open(); - } + if (addressLookup == telemetryAddressLookup) { + telemetryClientChannel = clientChannel; + telemetryStatsDProcessor = statsDProcessor; + } else { + telemetryClientChannel = createByteChannel(telemetryAddressLookup, timeout, bufferSize); // similar settings, but a single worker and non-blocking. telemetryStatsDProcessor = createProcessor(queueSize, handler, maxPacketSizeBytes, @@ -324,16 +292,15 @@ private NonBlockingStatsDClient(final String prefix, final int queueSize, final .processor(telemetryStatsDProcessor) .build(); - statsDSender = createSender(addressLookup, handler, clientChannel, statsDProcessor.getBufferPool(), + statsDSender = createSender(handler, clientChannel, statsDProcessor.getBufferPool(), statsDProcessor.getOutboundQueue(), senderWorkers, threadFactory); telemetryStatsDSender = statsDSender; if (telemetryStatsDProcessor != statsDProcessor) { // TODO: figure out why the hell telemetryClientChannel does not work here! - telemetryStatsDSender = createSender(telemetryAddressLookup, handler, telemetryClientChannel, + telemetryStatsDSender = createSender(handler, telemetryClientChannel, telemetryStatsDProcessor.getBufferPool(), telemetryStatsDProcessor.getOutboundQueue(), 1, threadFactory); - } // set telemetry @@ -389,10 +356,10 @@ protected StatsDProcessor createProcessor(final int queueSize, final StatsDClien } } - protected StatsDSender createSender(final Callable addressLookup, final StatsDClientErrorHandler handler, - final DatagramChannel clientChannel, BufferPool pool, BlockingQueue buffers, final int senderWorkers, + protected StatsDSender createSender(final StatsDClientErrorHandler handler, + final WritableByteChannel clientChannel, BufferPool pool, BlockingQueue buffers, final int senderWorkers, final ThreadFactory threadFactory) throws Exception { - return new StatsDSender(addressLookup, clientChannel, handler, pool, buffers, senderWorkers, threadFactory); + return new StatsDSender(clientChannel, handler, pool, buffers, senderWorkers, threadFactory); } /** @@ -427,6 +394,14 @@ public void stop() { handler.handle(e); } } + + if (telemetryClientChannel != null && telemetryClientChannel != clientChannel) { + try { + telemetryClientChannel.close(); + } catch (final IOException e) { + handler.handle(e); + } + } } } @@ -469,6 +444,17 @@ StringBuilder tagString(final String[] tags, StringBuilder builder) { return tagString(tags, constantTagsRendered, builder); } + ClientChannel createByteChannel(Callable addressLookup, int timeout, int bufferSize) throws Exception { + final SocketAddress address = addressLookup.call(); + if (address instanceof NamedPipeSocketAddress) { + return new NamedPipeClientChannel((NamedPipeSocketAddress) address); + } else if (address instanceof UnixSocketAddress) { + return new UnixDatagramClientChannel(address, timeout, bufferSize); + } else { + return new DatagramClientChannel(address); + } + } + abstract class StatsDMessage extends NumericMessage { final double sampleRate; // NaN for none diff --git a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClientBuilder.java b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClientBuilder.java index f0d75e86..4597d0e8 100644 --- a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClientBuilder.java +++ b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClientBuilder.java @@ -38,6 +38,7 @@ public class NonBlockingStatsDClientBuilder implements Cloneable { public String hostname; public String telemetryHostname; + public String namedPipe; public String prefix; public String entityID; public String[] constantTags; @@ -117,6 +118,11 @@ public NonBlockingStatsDClientBuilder telemetryHostname(String val) { return this; } + public NonBlockingStatsDClientBuilder namedPipe(String val) { + namedPipe = val; + return this; + } + public NonBlockingStatsDClientBuilder prefix(String val) { prefix = val; return this; @@ -190,10 +196,16 @@ NonBlockingStatsDClientBuilder resolve() { int packetSize = maxPacketSizeBytes; Callable lookup = addressLookup; - Callable telemetryLookup = telemetryAddressLookup; if (lookup == null) { - lookup = staticStatsDAddressResolution(hostname, port); + String namedPipeFromEnv = System.getenv(NonBlockingStatsDClient.DD_NAMED_PIPE_ENV_VAR); + String resolvedNamedPipe = namedPipe == null ? namedPipeFromEnv : namedPipe; + + if (resolvedNamedPipe == null) { + lookup = staticStatsDAddressResolution(hostname, port); + } else { + lookup = staticNamedPipeResolution(resolvedNamedPipe); + } } if (packetSize == 0) { @@ -201,7 +213,7 @@ NonBlockingStatsDClientBuilder resolve() { NonBlockingStatsDClient.DEFAULT_UDP_MAX_PACKET_SIZE_BYTES; } - + Callable telemetryLookup = telemetryAddressLookup; if (telemetryLookup == null) { if (telemetryHostname == null) { telemetryLookup = lookup; @@ -274,6 +286,15 @@ protected static Callable staticStatsDAddressResolution(String ho } } + protected static Callable staticNamedPipeResolution(String namedPipe) { + final NamedPipeSocketAddress socketAddress = new NamedPipeSocketAddress(namedPipe); + return new Callable() { + @Override public SocketAddress call() { + return socketAddress; + } + }; + } + /** * Retrieves host name from the environment variable "DD_AGENT_HOST". * diff --git a/src/main/java/com/timgroup/statsd/StatsDSender.java b/src/main/java/com/timgroup/statsd/StatsDSender.java index 4deadf55..55ddbbc0 100644 --- a/src/main/java/com/timgroup/statsd/StatsDSender.java +++ b/src/main/java/com/timgroup/statsd/StatsDSender.java @@ -4,6 +4,7 @@ import java.net.SocketAddress; import java.nio.ByteBuffer; import java.nio.channels.DatagramChannel; +import java.nio.channels.WritableByteChannel; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; @@ -12,9 +13,7 @@ import java.util.concurrent.atomic.AtomicInteger; public class StatsDSender { - private final Callable addressLookup; - private final SocketAddress address; - private final DatagramChannel clientChannel; + private final WritableByteChannel clientChannel; private final StatsDClientErrorHandler handler; private final BufferPool pool; @@ -30,9 +29,9 @@ public class StatsDSender { private volatile Telemetry telemetry; - StatsDSender(final Callable addressLookup, final DatagramChannel clientChannel, + StatsDSender(final WritableByteChannel clientChannel, final StatsDClientErrorHandler handler, BufferPool pool, BlockingQueue buffers, - final int workers, final ThreadFactory threadFactory) throws Exception { + final int workers, final ThreadFactory threadFactory) { this.pool = pool; this.buffers = buffers; @@ -40,8 +39,6 @@ public class StatsDSender { this.threadFactory = threadFactory; this.workers = new Thread[workers]; - this.addressLookup = addressLookup; - this.address = addressLookup.call(); this.clientChannel = clientChannel; this.endSignal = new CountDownLatch(workers); @@ -92,14 +89,14 @@ void sendLoop() { sizeOfBuffer = buffer.position(); buffer.flip(); - final int sentBytes = clientChannel.send(buffer, address); + final int sentBytes = clientChannel.write(buffer); buffer.clear(); if (sizeOfBuffer != sentBytes) { throw new IOException( String.format("Could not send stat %s entirely to %s. Only sent %d out of %d bytes", - buffer.toString(), - address.toString(), + buffer, + clientChannel, sentBytes, sizeOfBuffer)); } diff --git a/src/main/java/com/timgroup/statsd/UnixDatagramClientChannel.java b/src/main/java/com/timgroup/statsd/UnixDatagramClientChannel.java new file mode 100644 index 00000000..95877f99 --- /dev/null +++ b/src/main/java/com/timgroup/statsd/UnixDatagramClientChannel.java @@ -0,0 +1,34 @@ +package com.timgroup.statsd; + +import jnr.unixsocket.UnixDatagramChannel; +import jnr.unixsocket.UnixSocketOptions; + +import java.io.IOException; +import java.net.SocketAddress; + +public class UnixDatagramClientChannel extends DatagramClientChannel { + /** + * Creates a new UnixDatagramClientChannel. + * + * @param address Address to connect the channel to + * @param timeout Send timeout + * @param bufferSize Buffer size + * @throws IOException if socket options cannot be set + */ + public UnixDatagramClientChannel(SocketAddress address, int timeout, int bufferSize) throws IOException { + super(UnixDatagramChannel.open(), address); + // Set send timeout, to handle the case where the transmission buffer is full + // If no timeout is set, the send becomes blocking + if (timeout > 0) { + delegate.setOption(UnixSocketOptions.SO_SNDTIMEO, timeout); + } + if (bufferSize > 0) { + delegate.setOption(UnixSocketOptions.SO_SNDBUF, bufferSize); + } + } + + @Override + public String getTransportType() { + return "uds"; + } +} diff --git a/src/test/java/com/timgroup/statsd/DummyLowMemStatsDServer.java b/src/test/java/com/timgroup/statsd/DummyLowMemStatsDServer.java index fc749324..e5b77597 100644 --- a/src/test/java/com/timgroup/statsd/DummyLowMemStatsDServer.java +++ b/src/test/java/com/timgroup/statsd/DummyLowMemStatsDServer.java @@ -2,77 +2,28 @@ package com.timgroup.statsd; import java.io.IOException; -import java.net.InetSocketAddress; -import java.nio.Buffer; -import java.nio.ByteBuffer; -import java.nio.channels.DatagramChannel; import java.util.concurrent.atomic.AtomicInteger; -import java.util.ArrayList; -import java.util.List; -import jnr.unixsocket.UnixDatagramChannel; -import jnr.unixsocket.UnixSocketAddress; -import java.nio.charset.StandardCharsets; -class DummyLowMemStatsDServer extends DummyStatsDServer { - private final AtomicInteger packetCount = new AtomicInteger(0); +class DummyLowMemStatsDServer extends UDPDummyStatsDServer { private final AtomicInteger messageCount = new AtomicInteger(0); public DummyLowMemStatsDServer(int port) throws IOException { super(port); } - public DummyLowMemStatsDServer(String socketPath) throws IOException { - super(socketPath); - } - - @Override - protected void listen() { - Thread thread = new Thread(new Runnable() { - @Override - public void run() { - final ByteBuffer packet = ByteBuffer.allocateDirect(1500); - - while(server.isOpen()) { - if (freeze) { - try { - Thread.sleep(10); - } catch (InterruptedException e) { - } - } else { - try { - ((Buffer)packet).clear(); // Cast necessary to handle Java9 covariant return types - // see: https://jira.mongodb.org/browse/JAVA-2559 for ref. - server.receive(packet); - packet.flip(); - packetCount.incrementAndGet(); - - for (String msg : StandardCharsets.UTF_8.decode(packet).toString().split("\n")) { - messageCount.incrementAndGet(); - } - } catch (IOException e) { - } - } - } - } - }); - thread.setDaemon(true); - thread.start(); - } - @Override public void clear() { - packetCount.set(0); messageCount.set(0); super.clear(); } - public int getPacketCount() { - return packetCount.get(); - } - public int getMessageCount() { return messageCount.get(); } + @Override + protected void addMessage(String msg) { + messageCount.incrementAndGet(); + } } diff --git a/src/test/java/com/timgroup/statsd/DummyStatsDServer.java b/src/test/java/com/timgroup/statsd/DummyStatsDServer.java index 53ee8000..0609a7b9 100644 --- a/src/test/java/com/timgroup/statsd/DummyStatsDServer.java +++ b/src/test/java/com/timgroup/statsd/DummyStatsDServer.java @@ -1,48 +1,28 @@ package com.timgroup.statsd; +import java.io.Closeable; import java.io.IOException; -import java.net.InetSocketAddress; import java.nio.Buffer; import java.nio.ByteBuffer; -import java.nio.channels.DatagramChannel; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; -import java.util.Queue; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.LinkedBlockingQueue; - -import jnr.unixsocket.UnixDatagramChannel; -import jnr.unixsocket.UnixSocketAddress; -import java.nio.charset.StandardCharsets; - -class DummyStatsDServer { +abstract class DummyStatsDServer implements Closeable { private final List messagesReceived = new ArrayList(); private AtomicInteger packetsReceived = new AtomicInteger(0); - protected final DatagramChannel server; protected volatile Boolean freeze = false; - public DummyStatsDServer(int port) throws IOException { - server = DatagramChannel.open(); - server.bind(new InetSocketAddress(port)); - this.listen(); - } - - public DummyStatsDServer(String socketPath) throws IOException { - server = UnixDatagramChannel.open(); - server.bind(new UnixSocketAddress(socketPath)); - this.listen(); - } - protected void listen() { Thread thread = new Thread(new Runnable() { @Override public void run() { final ByteBuffer packet = ByteBuffer.allocate(1500); - while(server.isOpen()) { + while(isOpen()) { if (freeze) { try { Thread.sleep(10); @@ -52,14 +32,12 @@ public void run() { try { ((Buffer)packet).clear(); // Cast necessary to handle Java9 covariant return types // see: https://jira.mongodb.org/browse/JAVA-2559 for ref. - server.receive(packet); + receive(packet); packetsReceived.addAndGet(1); packet.flip(); for (String msg : StandardCharsets.UTF_8.decode(packet).toString().split("\n")) { - synchronized(messagesReceived) { - messagesReceived.add(msg.trim()); - } + addMessage(msg); } } catch (IOException e) { } @@ -84,7 +62,7 @@ public void waitForMessage(String prefix) { done = !messagesReceived.isEmpty(); } - if (done && prefix != null && prefix != "") { + if (done && prefix != null && !prefix.isEmpty()) { done = false; List messages = this.messagesReceived(); for (String message : messages) { @@ -105,7 +83,7 @@ public List messagesReceived() { } } - public int packetsReceived() { + public int getPacketsReceived() { return packetsReceived.get(); } @@ -117,17 +95,22 @@ public void unfreeze() { freeze = false; } - public void close() throws IOException { - try { - server.close(); - } catch (Exception e) { - //ignore - } - } - public void clear() { packetsReceived.set(0); messagesReceived.clear(); } + protected abstract boolean isOpen(); + + protected abstract void receive(ByteBuffer packet) throws IOException; + + protected void addMessage(String msg) { + synchronized(messagesReceived) { + String trimmed = msg.trim(); + if (!trimmed.isEmpty()) { + messagesReceived.add(msg.trim()); + } + } + } + } diff --git a/src/test/java/com/timgroup/statsd/NamedPipeDummyStatsDServer.java b/src/test/java/com/timgroup/statsd/NamedPipeDummyStatsDServer.java new file mode 100644 index 00000000..bc59f8cb --- /dev/null +++ b/src/test/java/com/timgroup/statsd/NamedPipeDummyStatsDServer.java @@ -0,0 +1,80 @@ +package com.timgroup.statsd; + +import com.sun.jna.platform.win32.Kernel32; +import com.sun.jna.platform.win32.WinBase; +import com.sun.jna.platform.win32.WinError; +import com.sun.jna.platform.win32.WinNT.HANDLE; +import com.sun.jna.ptr.IntByReference; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.logging.Logger; + +// Template from https://github.com/java-native-access/jna/blob/master/contrib/platform/test/com/sun/jna/platform/win32/Kernel32NamedPipeTest.java +// And https://docs.microsoft.com/en-us/windows/win32/ipc/multithreaded-pipe-server +public class NamedPipeDummyStatsDServer extends DummyStatsDServer { + private static final Logger log = Logger.getLogger("NamedPipeDummyStatsDServer"); + private final HANDLE hNamedPipe; + private volatile boolean clientConnected = false; + private volatile boolean isOpen = true; + + public NamedPipeDummyStatsDServer(String pipeName) { + String normalizedPipeName = NamedPipeSocketAddress.normalizePipeName(pipeName); + + hNamedPipe= Kernel32.INSTANCE.CreateNamedPipe(normalizedPipeName, + WinBase.PIPE_ACCESS_DUPLEX, // dwOpenMode + WinBase.PIPE_TYPE_BYTE | WinBase.PIPE_READMODE_BYTE | WinBase.PIPE_WAIT, // dwPipeMode + 1, // nMaxInstances, + Byte.MAX_VALUE, // nOutBufferSize, + Byte.MAX_VALUE, // nInBufferSize, + 1000, // nDefaultTimeOut, + null); // lpSecurityAttributes + + if (WinBase.INVALID_HANDLE_VALUE.equals(hNamedPipe)) { + throw new RuntimeException("Unable to create named pipe"); + } + + listen(); + } + @Override + protected boolean isOpen() { + return isOpen; + } + + @Override + protected void receive(ByteBuffer packet) throws IOException { + if (!isOpen) { + throw new IOException("Server closed"); + } + if (!clientConnected) { + boolean connected = Kernel32.INSTANCE.ConnectNamedPipe(hNamedPipe, null); + // ERROR_PIPE_CONNECTED means the client connected before the server + // The connection is established + int lastError = Kernel32.INSTANCE.GetLastError(); + connected = connected || lastError == WinError.ERROR_PIPE_CONNECTED; + if (connected) { + clientConnected = true; + } else { + log.info("Failed to connect. Last error: " + lastError); + close(); + return; + } + } + + IntByReference bytesRead = new IntByReference(); + boolean success = Kernel32.INSTANCE.ReadFile( + hNamedPipe, // handle to pipe + packet.array(), // buffer to receive data + packet.remaining(), // size of buffer + bytesRead, // number of bytes read + null); // not overlapped I/O + + log.info("Read bytes. Result: " + success + ". Bytes read: " + bytesRead.getValue()); + packet.position(bytesRead.getValue()); + } + + @Override + public void close() throws IOException { + isOpen = false; + Kernel32.INSTANCE.CloseHandle(hNamedPipe); + } +} diff --git a/src/test/java/com/timgroup/statsd/NamedPipeTest.java b/src/test/java/com/timgroup/statsd/NamedPipeTest.java new file mode 100644 index 00000000..97b723e1 --- /dev/null +++ b/src/test/java/com/timgroup/statsd/NamedPipeTest.java @@ -0,0 +1,68 @@ +package com.timgroup.statsd; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.nullValue; + +import java.io.IOException; +import java.util.Random; +import java.util.logging.Logger; +import org.junit.After; +import org.junit.Assume; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +public class NamedPipeTest implements StatsDClientErrorHandler { + private static final Logger log = Logger.getLogger("NamedPipeTest"); + + private static final Random random = new Random(); + private NonBlockingStatsDClient client; + private DummyStatsDServer server; + private volatile Exception lastException = new Exception(); + + public synchronized void handle(Exception exception) { + log.info("Got exception: " + exception.getMessage()); + lastException = exception; + } + + @BeforeClass + public static void supportedOnly() { + Assume.assumeTrue(System.getProperty("os.name").toLowerCase().contains("windows")); + } + + @Before + public void start() { + String pipeName = "testPipe-" + random.nextInt(10000); + + server = new NamedPipeDummyStatsDServer(pipeName); + client = new NonBlockingStatsDClientBuilder().prefix("my.prefix") + .namedPipe(pipeName) + .queueSize(1) + .enableAggregation(false) + .errorHandler(this) + .build(); + } + + @After + public void stop() throws IOException { + if (client != null) { + client.stop(); + } + if (server != null) { + server.close(); + } + } + + @Test(timeout = 5000L) + public void sends_to_statsd() { + for(long i = 0; i < 5 ; i++) { + client.gauge("mycount", i); + server.waitForMessage(); + String expected = String.format("my.prefix.mycount:%d|g", i); + assertThat(server.messagesReceived(), contains(expected)); + server.clear(); + } + assertThat(lastException.getMessage(), nullValue()); + } +} diff --git a/src/test/java/com/timgroup/statsd/NonBlockingStatsDClientMaxPerfTest.java b/src/test/java/com/timgroup/statsd/NonBlockingStatsDClientMaxPerfTest.java index cb6d6144..38c4cc4c 100644 --- a/src/test/java/com/timgroup/statsd/NonBlockingStatsDClientMaxPerfTest.java +++ b/src/test/java/com/timgroup/statsd/NonBlockingStatsDClientMaxPerfTest.java @@ -156,6 +156,6 @@ public void run() { executor.awaitTermination(1, TimeUnit.SECONDS); assertNotEquals(0, server.getMessageCount()); - log.info("Messages at server: " + server.getMessageCount() + " packets: " + server.getPacketCount()); + log.info("Messages at server: " + server.getMessageCount() + " packets: " + server.getPacketsReceived()); } } diff --git a/src/test/java/com/timgroup/statsd/NonBlockingStatsDClientPerfTest.java b/src/test/java/com/timgroup/statsd/NonBlockingStatsDClientPerfTest.java index 02e33c52..6a01d4e5 100644 --- a/src/test/java/com/timgroup/statsd/NonBlockingStatsDClientPerfTest.java +++ b/src/test/java/com/timgroup/statsd/NonBlockingStatsDClientPerfTest.java @@ -43,7 +43,7 @@ public final class NonBlockingStatsDClientPerfTest { @BeforeClass public static void start() throws IOException { - server = new DummyStatsDServer(STATSD_SERVER_PORT); + server = new UDPDummyStatsDServer(STATSD_SERVER_PORT); } @AfterClass @@ -92,25 +92,16 @@ public void run() { public void perfAggregatedTest() throws Exception { int expectedSize = 1; - long elapsed = 0, start = System.currentTimeMillis(); - boolean done = false; + long start = System.currentTimeMillis(); - while(!done) { + while(System.currentTimeMillis() - start < clientAggr.statsDProcessor.getAggregator().getFlushInterval() - 1) { clientAggr.count("myaggrcount", 1); - - elapsed = System.currentTimeMillis() - start; - if (elapsed > clientAggr.statsDProcessor.getAggregator().getFlushInterval() - 1) { - done = true; - } Thread.sleep(50); } - - int messages; - while((messages = server.messagesReceived().size()) < expectedSize) { - + while(server.messagesReceived().size() < expectedSize) { try { - Thread.sleep(500); + Thread.sleep(50); } catch (InterruptedException ex) {} } diff --git a/src/test/java/com/timgroup/statsd/NonBlockingStatsDClientTest.java b/src/test/java/com/timgroup/statsd/NonBlockingStatsDClientTest.java index 09993333..3bcba22d 100644 --- a/src/test/java/com/timgroup/statsd/NonBlockingStatsDClientTest.java +++ b/src/test/java/com/timgroup/statsd/NonBlockingStatsDClientTest.java @@ -41,7 +41,7 @@ public class NonBlockingStatsDClientTest { @BeforeClass public static void start() throws IOException { - server = new DummyStatsDServer(STATSD_SERVER_PORT); + server = new UDPDummyStatsDServer(STATSD_SERVER_PORT); client = new NonBlockingStatsDClientBuilder() .prefix("my.prefix") .hostname("localhost") @@ -1108,7 +1108,7 @@ public void sends_too_large_message() throws Exception { @Test(timeout=5000L) public void sends_telemetry_elsewhere() throws Exception { final RecordingErrorHandler errorHandler = new RecordingErrorHandler(); - final DummyStatsDServer telemetryServer = new DummyStatsDServer(STATSD_SERVER_PORT+10); + final DummyStatsDServer telemetryServer = new UDPDummyStatsDServer(STATSD_SERVER_PORT+10); final NonBlockingStatsDClient testClient = new NonBlockingStatsDClientBuilder() .prefix("my.prefix") .hostname("localhost") @@ -1379,7 +1379,7 @@ public void testMessageHashcode() throws Exception { public void shutdown_test() throws Exception { final int port = 17256; final int qSize = 256; - final DummyStatsDServer server = new DummyStatsDServer(port); + final DummyStatsDServer server = new UDPDummyStatsDServer(port); final NonBlockingStatsDClientBuilder builder = new SlowStatsDNonBlockingStatsDClientBuilder().prefix("") .hostname("localhost") @@ -1480,7 +1480,7 @@ public NonsamplingClient build() throws StatsDClientException { @Test(timeout = 5000L) public void nonsampling_client_test() throws Exception { final int port = 17256; - final DummyStatsDServer server = new DummyStatsDServer(port); + final DummyStatsDServer server = new UDPDummyStatsDServer(port); final NonBlockingStatsDClientBuilder builder = new NonsamplingClientBuilder() .prefix("") diff --git a/src/test/java/com/timgroup/statsd/TelemetryTest.java b/src/test/java/com/timgroup/statsd/TelemetryTest.java index 687f2f89..e82a0315 100644 --- a/src/test/java/com/timgroup/statsd/TelemetryTest.java +++ b/src/test/java/com/timgroup/statsd/TelemetryTest.java @@ -2,12 +2,11 @@ import org.junit.After; import org.junit.AfterClass; +import org.junit.Assume; import org.junit.BeforeClass; import org.junit.Test; import java.io.IOException; -import java.net.SocketAddress; -import java.util.concurrent.Callable; import java.util.List; import java.util.ArrayList; import java.util.Properties; @@ -15,8 +14,6 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.equalTo; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; public class TelemetryTest { @@ -73,88 +70,24 @@ public void clear() { } } - // StatsDNonBlockingTelemetry exposes the telemetry tor the outside - private static class StatsDNonBlockingTelemetry extends NonBlockingStatsDClient { - public StatsDNonBlockingTelemetry(final String prefix, final int queueSize, String[] constantTags, - final StatsDClientErrorHandler errorHandler, Callable addressLookup, - final int timeout, final int bufferSize, final int maxPacketSizeBytes, - String entityID, final int poolSize, final int processorWorkers, - final int senderWorkers, boolean blocking, final boolean enableTelemetry, - final int telemetryFlushInterval) - throws StatsDClientException { - - super(new NonBlockingStatsDClientBuilder() - .prefix(prefix) - .queueSize(queueSize) - .constantTags(constantTags) - .errorHandler(errorHandler) - .addressLookup(addressLookup) - .timeout(timeout) - .socketBufferSize(bufferSize) - .maxPacketSizeBytes(maxPacketSizeBytes) - .entityID(entityID) - .bufferPoolSize(poolSize) - .processorWorkers(processorWorkers) - .senderWorkers(senderWorkers) - .blocking(blocking) - .enableAggregation(false) - .enableTelemetry(enableTelemetry) - .telemetryFlushInterval(telemetryFlushInterval) - .resolve()); - } - }; - - private static class StatsDNonBlockingTelemetryBuilder extends NonBlockingStatsDClientBuilder { - - @Override - public StatsDNonBlockingTelemetry build() throws StatsDClientException { - - int packetSize = maxPacketSizeBytes; - if (packetSize == 0) { - packetSize = (port == 0) ? NonBlockingStatsDClient.DEFAULT_UDS_MAX_PACKET_SIZE_BYTES : - NonBlockingStatsDClient.DEFAULT_UDP_MAX_PACKET_SIZE_BYTES; - } - - if (addressLookup != null) { - return new StatsDNonBlockingTelemetry(prefix, queueSize, constantTags, errorHandler, - addressLookup, timeout, socketBufferSize, packetSize, entityID, - bufferPoolSize, processorWorkers, senderWorkers, blocking, enableTelemetry, - telemetryFlushInterval); - } else { - return new StatsDNonBlockingTelemetry(prefix, queueSize, constantTags, errorHandler, - staticStatsDAddressResolution(hostname, port), timeout, socketBufferSize, packetSize, - entityID, bufferPoolSize, processorWorkers, senderWorkers, blocking, enableTelemetry, - telemetryFlushInterval); - } - } - } - private static final int STATSD_SERVER_PORT = 17254; - private static final NonBlockingStatsDClientBuilder builder = new StatsDNonBlockingTelemetryBuilder() + private static final NonBlockingStatsDClientBuilder builder = new NonBlockingStatsDClientBuilder() .prefix("my.prefix") .hostname("localhost") .constantTags("test") .port(STATSD_SERVER_PORT) .enableTelemetry(false); // disable telemetry so we can control calls to "flush" - private static StatsDNonBlockingTelemetry client = ((StatsDNonBlockingTelemetryBuilder)builder).build(); + private static NonBlockingStatsDClient client = builder.build(); // telemetry client - private static final NonBlockingStatsDClientBuilder telemetryBuilder = new StatsDNonBlockingTelemetryBuilder() + private static final NonBlockingStatsDClientBuilder telemetryBuilder = new NonBlockingStatsDClientBuilder() .prefix("my.prefix") .hostname("localhost") .constantTags("test") .port(STATSD_SERVER_PORT) + .enableAggregation(false) .enableTelemetry(false); // disable telemetry so we can control calls to "flush" - private static StatsDNonBlockingTelemetry telemetryClient = ((StatsDNonBlockingTelemetryBuilder)telemetryBuilder).build(); - - // builderError fails to send any data on the network, producing packets dropped - private static final NonBlockingStatsDClientBuilder builderError = new StatsDNonBlockingTelemetryBuilder() - .prefix("my.prefix") - .hostname("localhost") - .constantTags("test") - .port(0) - .enableTelemetry(false); // disable telemetry so we can control calls to "flush" - private static StatsDNonBlockingTelemetry clientError = ((StatsDNonBlockingTelemetryBuilder)builderError).build(); + private static NonBlockingStatsDClient telemetryClient = telemetryBuilder.build(); private static DummyStatsDServer server; private static FakeProcessor fakeProcessor; @@ -170,7 +103,7 @@ private static String computeTelemetryTags() throws IOException, Exception { @BeforeClass public static void start() throws IOException, Exception { - server = new DummyStatsDServer(STATSD_SERVER_PORT); + server = new UDPDummyStatsDServer(STATSD_SERVER_PORT); fakeProcessor = new FakeProcessor(NO_OP_HANDLER); client.telemetry.processor = fakeProcessor; telemetryClient.telemetry.processor = fakeProcessor; @@ -182,7 +115,6 @@ public static void start() throws IOException, Exception { public static void stop() throws Exception { try { client.stop(); - clientError.stop(); server.close(); } catch (java.io.IOException e) { return; @@ -193,7 +125,6 @@ public static void stop() throws Exception { public void clear() { server.clear(); client.telemetry.reset(); - clientError.telemetry.reset(); telemetryClient.telemetry.reset(); fakeProcessor.clear(); } @@ -438,7 +369,18 @@ public void telemetry_flushInterval() throws Exception { @Test(timeout = 5000L) public void telemetry_droppedData() throws Exception { - clientError.telemetry.reset(); + boolean isLinux = System.getProperty("os.name").toLowerCase().contains("linux"); + boolean isMac = System.getProperty("os.name").toLowerCase().contains("mac"); + Assume.assumeTrue(isLinux || isMac); + + // fails to send any data on the network, producing packets dropped + NonBlockingStatsDClient clientError = new NonBlockingStatsDClientBuilder() + .prefix("my.prefix") + .hostname("localhost") + .constantTags("test") + .port(0) + .enableTelemetry(false) // disable telemetry so we can control calls to "flush" + .build(); assertThat(clientError.statsDProcessor.bufferPool.getBufferSize(), equalTo(8192)); @@ -453,6 +395,8 @@ public void telemetry_droppedData() throws Exception { } catch (InterruptedException e) {} } + clientError.stop(); + assertThat(clientError.telemetry.metricsSent.get(), equalTo(1)); assertThat(clientError.telemetry.packetsDropped.get(), equalTo(1)); assertThat(clientError.telemetry.bytesDropped.get(), equalTo(27)); diff --git a/src/test/java/com/timgroup/statsd/UDPDummyStatsDServer.java b/src/test/java/com/timgroup/statsd/UDPDummyStatsDServer.java new file mode 100644 index 00000000..181e6481 --- /dev/null +++ b/src/test/java/com/timgroup/statsd/UDPDummyStatsDServer.java @@ -0,0 +1,34 @@ +package com.timgroup.statsd; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.DatagramChannel; + +public class UDPDummyStatsDServer extends DummyStatsDServer { + private final DatagramChannel server; + + public UDPDummyStatsDServer(int port) throws IOException { + server = DatagramChannel.open(); + server.bind(new InetSocketAddress(port)); + this.listen(); + } + + @Override + protected boolean isOpen() { + return server.isOpen(); + } + + @Override + protected void receive(ByteBuffer packet) throws IOException { + server.receive(packet); + } + + public void close() throws IOException { + try { + server.close(); + } catch (Exception e) { + //ignore + } + } +} diff --git a/src/test/java/com/timgroup/statsd/UnixSocketDummyStatsDServer.java b/src/test/java/com/timgroup/statsd/UnixSocketDummyStatsDServer.java new file mode 100644 index 00000000..fde642d6 --- /dev/null +++ b/src/test/java/com/timgroup/statsd/UnixSocketDummyStatsDServer.java @@ -0,0 +1,34 @@ +package com.timgroup.statsd; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.DatagramChannel; +import jnr.unixsocket.UnixDatagramChannel; +import jnr.unixsocket.UnixSocketAddress; + +public class UnixSocketDummyStatsDServer extends DummyStatsDServer { + private final DatagramChannel server; + + public UnixSocketDummyStatsDServer(String socketPath) throws IOException { + server = UnixDatagramChannel.open(); + server.bind(new UnixSocketAddress(socketPath)); + this.listen(); + } + @Override + protected boolean isOpen() { + return server.isOpen(); + } + + @Override + protected void receive(ByteBuffer packet) throws IOException { + server.receive(packet); + } + + public void close() throws IOException { + try { + server.close(); + } catch (Exception e) { + //ignore + } + } +} diff --git a/src/test/java/com/timgroup/statsd/UnixSocketTest.java b/src/test/java/com/timgroup/statsd/UnixSocketTest.java index efe1f17d..c2d76111 100644 --- a/src/test/java/com/timgroup/statsd/UnixSocketTest.java +++ b/src/test/java/com/timgroup/statsd/UnixSocketTest.java @@ -47,7 +47,7 @@ public void start() throws IOException { socketFile = new File(tmpFolder, "socket.sock"); socketFile.deleteOnExit(); - server = new DummyStatsDServer(socketFile.toString()); + server = new UnixSocketDummyStatsDServer(socketFile.toString()); client = new NonBlockingStatsDClientBuilder().prefix("my.prefix") .hostname(socketFile.toString()) .port(0) @@ -120,7 +120,7 @@ public void resist_dsd_restart() throws Exception { // Re-open the server, next send should work OK lastException = new Exception(); - DummyStatsDServer server2 = new DummyStatsDServer(socketFile.toString()); + DummyStatsDServer server2 = new UnixSocketDummyStatsDServer(socketFile.toString()); client.gauge("mycount", 30); server2.waitForMessage();