Skip to content

Java Programming Guide

Martin Thompson edited this page Jan 10, 2020 · 52 revisions

Java Programming Guide

The Aeron API is designed to be as simple as possible and no simpler. In this guide, we will walk through a set of applications demonstrating specific points as we do. The entire applications can be found in the locations below.

Note: The javadoc is the definitive source of documentation. Please consider this guide as only a starting point.

Embedded Media Driver

The Aeron Media Driver can be run standalone and handle many applications. However, in some situations, it is desirable to run the media driver within the application.

In this case, a MediaDriver can be instantiated in the process. Only a single one is needed, but it does require some resources as discussed here.

When running an embedded Media Driver, it is recommended to set the following via system properties or directly via MediaDriver.Context passed into MediaDriver.launch:

  • Log Buffer Locations, specified by MediaDriver.Context.aeronDirectoryName(), should point to a specific location as to not interfere with other Media Driver instances and
  • Threading Modes should be considered carefully as they will be spawned within the parent process.

An example of starting up an embedded Media Driver.

final MediaDriver driver = MediaDriver.launch();

To guarantee that an embedded Media Driver does not interfere with other Media Drivers, one can use the following launch method:

final MediaDriver driver = MediaDriver.launchEmbedded();

The difference is that the latter launches a Media Driver with a randomly generated aeronDirectoryName if it detects that the default value has not been changed. This is enough to isolate it from other instances of a Media Driver.

Aeron

Aeron client applications need to coordinate operation with a running Media Driver. Either an embedded one or one that is standalone. This interaction handles creating Publications and Subscriptions and housekeeping. The interaction point for the application is the Aeron class.

final Aeron aeron = Aeron.connect(new Aeron.Context());

Settings for the instance may be changed via an Aeron.Context instance that is passed into the Aeron.connect method, as mentioned here.

To be able to establish connection with a Media Driver, Aeron must know the Aeron directory name used by the Media Driver. This can be left unspecified (default value is then used), passed as a system property, or set manually. When the Media Driver is launched in an embedded mode and the directory is randomly generated, one can use a convenient method MediaDriver.aeronDirectoryName() that provides the directory name of the Media Driver. It can be then used to set Aeron.Context.aeronDirectoryName() with this value and pass this context to the Aeron.connect method, as shown below.

final MediaDriver driver = MediaDriver.launchEmbedded();
Aeron aeron = Aeron.connect(new Aeron.Context().aeronDirectoryName(driver.aeronDirectoryName()))

Event Handling

Aeron instances have a set of handlers that might be called for some events. The application can specify these handlers via the Aeron.Context instance used to create the instance.

  • Aeron.Context.errorHandler lets the application specify a lambda to call when errors/exceptions occur.
  • Aeron.Context.availableImageHandler specifies a lambda to call when images are available. An image is the replication of the publication stream on the subscription side.
  • Aeron.Context.unavailableImageHandler specifies a lambda to call when an image becomes unavailable.

These handlers are called from the ClientConductor thread.

From BasicSubscriber:

final Aeron.Context ctx = new Aeron.Context()
    .availableImageHandler(SamplesUtil::printAvailableImage)
    .unavailableImageHandler(SamplesUtil::printUnavailableImage);

DirectBuffer

Accessing and modifying buffers that Aeron uses for sending and receiving of messages is done via a set of interfaces.

The methods should look familiar to anyone you uses ByteBuffer regularly. However, it extends and provides a more appropriate implementation for efficient handling of data layout.

In many cases, the use of UnsafeBuffer will allow for the most efficient operation. To be useful, a ByteBuffer, byte[], etc. must be wrapped. Once wrapped, then mutation or access of the underlying data can be done.

From BasicPublisher, putting some bytes into a buffer:

private static final UnsafeBuffer BUFFER = new UnsafeBuffer(BufferUtil.allocateDirectAligned(256, 64));

...

    final String message = "Hello World!";
    BUFFER.putBytes(0, message.getBytes());

For a subscriber, grabbing some bytes from a buffer:

(buffer, offset, length, header) ->
{
    final byte[] data = new byte[length];
    buffer.getBytes(offset, data);
	...
}

Subscription

An application that desires to listen to data needs to use a channel and stream to listen on. A Subscription aggregates zero or more Images for the same channel and stream id. Images are identified by session id from unique sources that is encoded in the opaque Image.sourceIdentity().

From BasicSubscriber, listen on a channel and a stream:

final Aeron aeron = Aeron.connect(new Aeron.Context());
final Subscription subscription = aeron.addSubscription(CHANNEL, STREAM_ID);

Note: The Aeron.addSubscription method will block until the Media Driver acknowledges the request or a timeout occurs.

Polling

Subscribing applications totally control when data is delivered to the FragmentHandler methods via the Subscription.poll or Image.poll methods, Subscriptions delegate polling to the matching Images. When called, this method determines if there is any messages to deliver and delivers them via the FragmentHandler interface up to the limit of the number of fragments to deliver before returning.

Example of polling for new messages with a per poll limit of 10 fragments and an Idle Strategy.

final FragmentHandler fragmentHandler = ... // defined below

final IdleStrategy idleStrategy = new BackoffIdleStrategy(
    100, 10, TimeUnit.MICROSECONDS.toNanos(1), TimeUnit.MICROSECONDS.toNanos(100));

while (...)
{
    final int fragmentsRead = subscription.poll(fragmentHandler, 10);
    idleStrategy.idle(fragmentsRead);
}

FragmentHandler

Messages are read from Image instances via FragmentHandler callbacks. This interface is a functional interface. The arguments are:

  • buffer holding the data
  • offset indicating the offset in the buffer that starts the message
  • length of the message
  • header holding the metadata of the message

Example of printing the contents of a message as a string along with some metadata:

final FragmentHandler fragmentHandler = (buffer, offset, length, header) ->
{
    final byte[] data = new byte[length];
    buffer.getBytes(offset, data);

    System.out.println(String.format(
        "Message to stream %d from session %d (%d@%d) <<%s>>",
         streamId, header.sessionId(), length, offset, new String(data)));
};

Message Reassembly

Publication instances automatically fragment large messages into data frames that Aeron sends. Subscription instances that desire these fragments to be reassembled prior to delivery to the FragmentHandler can chain an instance of FragmentAssembler to do this by composition.

FragmentHandler fragmentAssembler = new FragmentAssembler(fragmentHandler);
final int fragmentsRead = subscription.poll(fragmentAssembler, 10);

Note: Reassembly has been shown to be minimal impact to latency. But not totally negligible. If the lowest latency is desired, then limiting message sizes to MTU size is a good practice.

Note: There is a maximum length allowed for messages which is the min of 1/8th a term length or 16MB. Messages larger than this should chunked using an application level chunking protocol. Chunking has better recovery properties from failure and streams with mechanical sympathy.

Advanced Polling

At times you may wish to take more control in how a Subscription/Image is polled. For example, if you wish to archive a stream of messages in parallel then the Image.blockPoll or Image.rawPoll can be used to efficiently copy available ranges of messages in a stream to another location.

It is also possible to control the polling action with the Image or Subscription.controlledPoll method. This method takes a ControlledFragmentHandler that returns the action which should be taken after the message fragment is handled.

When handling a fragment with the ControlledFragmentHandler the following return codes can be used to control the polling action:

  • ABORT the current polling operation and do not advance the position for this fragment.
  • BREAK from the current polling operation and commit the position as of the end of the current fragment being handled.
  • COMMIT Continue processing but commit the position as of the end of the current fragment so that flow control is applied to this point.
  • CONTINUE Continue processing taking the same approach as the in the standard FragmentHandler

Publication

An application that desires to send data needs to specify a channel and stream to send to.

From Basicpublisher, send to a channel and a stream:

final Aeron aeron = Aeron.connect(new Aeron.Context());
final Publication publication = aeron.addPublication(CHANNEL, STREAM_ID);

Note: The Aeron.addPublication method will block until the Media Driver acknowledges the request or a timeout occurs.

Afterwards, the application is free to send data via the Publication.offer method.

private static final UnsafeBuffer BUFFER = new UnsafeBuffer(BufferUtil.allocateDirectAligned(256, 64));
...
final String message = "Hello World!";
BUFFER.putBytes(0, message.getBytes());

final long resultingPosition = publication.offer(BUFFER, 0, message.getBytes().length);

Handling Back Pressure

Aeron has built in back pressure for a publisher. It will not allow a publisher to send data that exceeds proscribed flow control limits.

When calling Publication.offer a return value greater than 0 indicates the message was sent. Negative values indicate that the message has not been enqueued for sending. Constants for negative values are as follows:

  • NOT_CONNECTED means no subscriber is connected to the publication, this can be a transient state as subscribers come and go.
  • BACK_PRESSURED indicates the message was not sent due to back pressure from Subscribers, but can be retried if desired.
  • ADMIN_ACTION indicates the message was not sent due to an administration action, such as log rotation, but can be retried if desired.
  • CLOSED indicates the Publication has been closed either by another client thread, or if the channel is invalid, or if the client has timed out.
  • MAX_POSITION_EXCEEDED indicates that the Publication has reached the maximum possible position given the term-length. This is possible with a small term-length. Max position is 2^31 * term-length for a Publication.

The ways that an application may handle back pressure are, by necessity, dependent on the application semantics. Here are a few options. This is not an exhaustive list.

  • Retry until success. Keep calling Publication.offer until it succeeds. This may spin or have some sort of idle strategy. Many examples do this.
  • Ignore failure and continue. Ignore that the data didn't send and move on. This is usually appropriate for situations where the data being sent has some lifetime and it would be better to not send stale data.
  • Retry until success or timeout. As normal retry with or without some sort of idle strategy but with a timeout attached.
  • Retry asynchronously. Retry periodically, but instead of idling, do some other work.

The needs of an application, or system, are quite complex. The common use case is one of non-blocking offer, though. Out of this more complex scenarios may be developed.

Monitoring

The Aeron Media Driver and the status of various buffers may be monitored outside of the driver via the counter files in use by the driver. Below is an example application that reads this data and prints it periodically. Full source can be found here.

/**
 * Tool for printing out Aeron counters. A command-and-control (CnC) file is maintained by media driver
 * in shared memory. This application reads the the cnc file and prints the counters. Layout of the cnc file is
 * described in {@link CncFileDescriptor}.
 * <p>
 * This tool accepts filters on the command line, e.g. for connections only see example below:
 * <p>
 * <code>
 * java -cp aeron-samples/build/libs/samples.jar io.aeron.samples.AeronStat type=[1-9] identity=12345
 * </code>
 */
public class AeronStat
{
    private static final String ANSI_CLS = "\u001b[2J";
    private static final String ANSI_HOME = "\u001b[H";

    /**
     * The delay in seconds between each update.
     */
    private static final String DELAY = "delay";

    /**
     * Whether to watch for updates or run once.
     */
    private static final String WATCH = "watch";

    /**
     * Types of the counters.
     * <ul>
     * <li>0: System Counters</li>
     * <li>1 - 5, 9, 10, 11: Stream Positions and Indicators</li>
     * <li>6 - 7: Channel Endpoint Status</li>
     * </ul>
     */
    private static final String COUNTER_TYPE_ID = "type";

    /**
     * The identity of each counter that can either be the system counter id or registration id for positions.
     */
    private static final String COUNTER_IDENTITY = "identity";

    /**
     * Session id filter to be used for position counters.
     */
    private static final String COUNTER_SESSION_ID = "session";

    /**
     * Stream id filter to be used for position counters.
     */
    private static final String COUNTER_STREAM_ID = "stream";

    /**
     * Channel filter to be used for position counters.
     */
    private static final String COUNTER_CHANNEL = "channel";

    public static void main(final String[] args) throws Exception
    {
        long delayMs = 1000L;
        boolean watch = true;
        Pattern typeFilter = null;
        Pattern identityFilter = null;
        Pattern sessionFilter = null;
        Pattern streamFilter = null;
        Pattern channelFilter = null;

        if (0 != args.length)
        {
            checkForHelp(args);

            for (final String arg : args)
            {
                final int equalsIndex = arg.indexOf('=');
                if (-1 == equalsIndex)
                {
                    System.out.println("Arguments must be in name=pattern format: Invalid '" + arg + "'");
                    return;
                }

                final String argName = arg.substring(0, equalsIndex);
                final String argValue = arg.substring(equalsIndex + 1);

                switch (argName)
                {
                    case WATCH:
                        watch = Boolean.parseBoolean(argValue);
                        break;

                    case DELAY:
                        delayMs = Long.parseLong(argValue) * 1000L;
                        break;

                    case COUNTER_TYPE_ID:
                        typeFilter = Pattern.compile(argValue);
                        break;

                    case COUNTER_IDENTITY:
                        identityFilter = Pattern.compile(argValue);
                        break;

                    case COUNTER_SESSION_ID:
                        sessionFilter = Pattern.compile(argValue);
                        break;

                    case COUNTER_STREAM_ID:
                        streamFilter = Pattern.compile(argValue);
                        break;

                    case COUNTER_CHANNEL:
                        channelFilter = Pattern.compile(argValue);
                        break;

                    default:
                        System.out.println("Unrecognised argument: '" + arg + "'");
                        return;
                }
            }
        }

        final CncFileReader cncFileReader = CncFileReader.map();

        final CounterFilter counterFilter = new CounterFilter(
            typeFilter, identityFilter, sessionFilter, streamFilter, channelFilter);

        if (watch)
        {
            workLoop(delayMs, () -> printOutput(cncFileReader, counterFilter));
        }
        else
        {
            printOutput(cncFileReader, counterFilter);
        }
    }

    private static void workLoop(final long delayMs, final Runnable outputPrinter) throws Exception
    {
        final AtomicBoolean running = new AtomicBoolean(true);
        SigInt.register(() -> running.set(false));

        do
        {
            clearScreen();
            outputPrinter.run();
            Thread.sleep(delayMs);
        }
        while (running.get());
    }

    private static void printOutput(final CncFileReader cncFileReader, final CounterFilter counterFilter)
    {
        final SimpleDateFormat dateFormat = new SimpleDateFormat("HH:mm:ss");

        System.out.print(dateFormat.format(new Date()));
        System.out.println(
            " - Aeron Stat (CnC v" + cncFileReader.semanticVersion() + ")" +
            ", pid " + SystemUtil.getPid() +
            ", heartbeat age " + cncFileReader.driverHeartbeatAgeMs() + "ms");
        System.out.println("======================================================================");

        final CountersReader counters = cncFileReader.countersReader();

        counters.forEach(
            (counterId, typeId, keyBuffer, label) ->
            {
                if (counterFilter.filter(typeId, keyBuffer))
                {
                    final long value = counters.getCounterValue(counterId);
                    System.out.format("%3d: %,20d - %s%n", counterId, value, label);
                }
            }
        );

        System.out.println("--");
    }

    private static void checkForHelp(final String[] args)
    {
        for (final String arg : args)
        {
            if ("-?".equals(arg) || "-h".equals(arg) || "-help".equals(arg))
            {
                System.out.format(
                    "Usage: [-Daeron.dir=<directory containing CnC file>] AeronStat%n" +
                    "\t[delay=<seconds between updates>]%n" +
                    "\t[watch=<true|false>]%n" +
                    "filter by optional regex patterns:%n" +
                    "\t[type=<pattern>]%n" +
                    "\t[identity=<pattern>]%n" +
                    "\t[sessionId=<pattern>]%n" +
                    "\t[streamId=<pattern>]%n" +
                    "\t[channel=<pattern>]%n");

                System.exit(0);
            }
        }
    }

    private static void clearScreen() throws Exception
    {
        if (SystemUtil.isWindows())
        {
            new ProcessBuilder("cmd", "/c", "cls").inheritIO().start().waitFor();
        }
        else
        {
            System.out.print(ANSI_CLS + ANSI_HOME);
        }
    }

    static class CounterFilter
    {
        private final Pattern typeFilter;
        private final Pattern identityFilter;
        private final Pattern sessionFilter;
        private final Pattern streamFilter;
        private final Pattern channelFilter;

        CounterFilter(
            final Pattern typeFilter,
            final Pattern identityFilter,
            final Pattern sessionFilter,
            final Pattern streamFilter,
            final Pattern channelFilter)
        {
            this.typeFilter = typeFilter;
            this.identityFilter = identityFilter;
            this.sessionFilter = sessionFilter;
            this.streamFilter = streamFilter;
            this.channelFilter = channelFilter;
        }

        private static boolean match(final Pattern pattern, final Supplier<String> supplier)
        {
            return null == pattern || pattern.matcher(supplier.get()).find();
        }

        boolean filter(final int typeId, final DirectBuffer keyBuffer)
        {
            if (!match(typeFilter, () -> Integer.toString(typeId)))
            {
                return false;
            }

            if (SYSTEM_COUNTER_TYPE_ID == typeId && !match(identityFilter, () -> Integer.toString(keyBuffer.getInt(0))))
            {
                return false;
            }
            else if ((typeId >= PUBLISHER_LIMIT_TYPE_ID && typeId <= RECEIVER_POS_TYPE_ID) ||
                typeId == SENDER_LIMIT_TYPE_ID || typeId == PER_IMAGE_TYPE_ID || typeId == PUBLISHER_POS_TYPE_ID)
            {
                return
                    match(identityFilter, () -> Long.toString(keyBuffer.getLong(REGISTRATION_ID_OFFSET))) &&
                    match(sessionFilter, () -> Integer.toString(keyBuffer.getInt(SESSION_ID_OFFSET))) &&
                    match(streamFilter, () -> Integer.toString(keyBuffer.getInt(STREAM_ID_OFFSET))) &&
                    match(channelFilter, () -> keyBuffer.getStringAscii(CHANNEL_OFFSET));
            }
            else if (typeId >= SEND_CHANNEL_STATUS_TYPE_ID && typeId <= RECEIVE_CHANNEL_STATUS_TYPE_ID)
            {
                return match(channelFilter, () -> keyBuffer.getStringAscii(ChannelEndpointStatus.CHANNEL_OFFSET));
            }

            return true;
        }
    }
}

The AeronStat application above does the following:

  1. Find labels and values files in the file system
  2. Map the files into MappedByteBuffer instances
  3. Use an UnsafeBuffer to read the values
  4. Use a CountersReader to grab context for the values and labels.
  5. Set up a SigInt to handle control-C out of the application
  6. While running, in a loop do the following:
    1. Grab the time
    2. For each counter, grab its value and print out a line with the timestamp, label, and value.