Skip to content

Commit

Permalink
added DNS lookup interface (#63)
Browse files Browse the repository at this point in the history
* initial prototype for DnsResolver (server-side and client unit-tests)
* added low-level connection heart-beating
  follows the ZMTP/3.1 definition to detect faster TCP-timeouts
* changed DataSource Factory API to forward the ExecutorService from the superordinate DataSourcePublisher
* changed to re-use existing ZeroMQ enum to track connection state
* added some more URI type-safety
* change private to protected access for getClient(..) and other secondary necessary methods/fields
  • Loading branch information
RalphSteinhagen committed Mar 15, 2021
1 parent f1b56a4 commit 33c6411
Show file tree
Hide file tree
Showing 34 changed files with 1,653 additions and 685 deletions.
127 changes: 77 additions & 50 deletions client/src/main/java/io/opencmw/client/DataSource.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,13 @@

import java.net.URI;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutorService;

import org.jetbrains.annotations.NotNull;
import org.zeromq.ZContext;
import org.zeromq.ZMQ.Socket;
import org.zeromq.ZMsg;
Expand All @@ -16,8 +21,8 @@
* Should provide a static boolean matches(String address) function to determine whether
* it is eligible for a given address.
*/
public abstract class DataSource {
private static final List<Factory> IMPLEMENTATIONS = new NoDuplicatesList<>();
public abstract class DataSource implements AutoCloseable {
private static final List<Factory> IMPLEMENTATIONS = Collections.synchronizedList(new NoDuplicatesList<>());

private DataSource() {
// prevent implementers from implementing default constructor
Expand All @@ -27,31 +32,29 @@ private DataSource() {
* Constructor
* @param endpoint Endpoint to subscribe to
*/
public DataSource(final URI endpoint) {
if (endpoint == null || !getFactory().matches(endpoint)) {
protected DataSource(final @NotNull URI endpoint) {
if (!getFactory().matches(endpoint)) {
throw new UnsupportedOperationException(this.getClass().getName() + " DataSource Implementation does not support endpoint: " + endpoint);
}
}

/**
* Factory method to get a DataSource for a given endpoint
* @param endpoint endpoint address
* @return if there is a DataSource implementation for the protocol of the endpoint return a Factory to create a new
* Instance of this DataSource
* @throws UnsupportedOperationException in case there is no valid implementation
* Perform a get request on this endpoint.
* @param requestId request id which later allows to match the returned value to this query.
* This is the only mandatory parameter, all the following may be null.
* @param endpoint extend the filters originally supplied to the endpoint e.g. "ctx=selector&amp;channel=channelA"
* @param data The serialised data which can be used by the get call
* @param rbacToken byte array containing signed body hash-key and corresponding RBAC role
*/
public static Factory getFactory(final URI endpoint) {
for (Factory factory : IMPLEMENTATIONS) {
if (factory.matches(endpoint)) {
return factory;
}
}
throw new UnsupportedOperationException("No DataSource implementation available for endpoint: " + endpoint);
}
public abstract void get(final String requestId, final URI endpoint, final byte[] data, final byte[] rbacToken);

public static void register(final Factory factory) {
IMPLEMENTATIONS.add(0, factory); // custom added implementations are added in front to be discovered first
}
/**
* Gets called whenever data is available on the DataSource's socket.
* Should then try to receive data and return any results back to the calling event loop.
* @return null if there is no more data available, a Zero length ZMsg if there was data which was only used internally
* or a ZMsg with [reqId, endpoint, byte[] data, [byte[] optional RBAC token]]
*/
public abstract ZMsg getMessage();

/**
* Get Socket to wait for in the event loop.
Expand All @@ -61,25 +64,26 @@ public static void register(final Factory factory) {
*/
public abstract Socket getSocket();

protected abstract Factory getFactory();

/**
* Gets called whenever data is available on the DataSoure's socket.
* Should then try to receive data and return any results back to the calling event loop.
* @return null if there is no more data available, a Zero length Zmsg if there was data which was only used internally
* or a ZMsg with [reqId, endpoint, byte[] data, [byte[] optional RBAC token]]
* Perform housekeeping tasks like connection management, heartbeats, subscriptions, etc
* @return UTC time-stamp in [ms] when the next housekeeping duties should be performed
*/
public abstract ZMsg getMessage();
public abstract long housekeeping();

/**
* Perform housekeeping tasks like connection management, heartbeats, subscriptions, etc
* @return next time housekeeping duties should be performed
* Perform a set request on this endpoint using additional filters
* @param requestId request id which later allows to match the returned value to this query.
* This is the only mandatory parameter, all the following may be null.
* @param endpoint extend the filters originally supplied to the endpoint e.g. "ctx=selector&amp;channel=channelA"
* @param data The serialised data which can be used by the get call
* @param rbacToken byte array containing signed body hash-key and corresponding RBAC role
*/
public abstract long housekeeping();
public abstract void set(final String requestId, final URI endpoint, final byte[] data, final byte[] rbacToken);

/**
* Subscribe to this endpoint
* @param reqId the id to join the result of this subscribe with
* @param endpoint endpoint URI to subscribe to
* @param rbacToken byte array containing signed body hash-key and corresponding RBAC role
*/
public abstract void subscribe(final String reqId, final URI endpoint, final byte[] rbacToken);
Expand All @@ -90,28 +94,51 @@ public static void register(final Factory factory) {
public abstract void unsubscribe(final String reqId);

/**
* Perform a get request on this endpoint.
* @param requestId request id which later allows to match the returned value to this query.
* This is the only mandatory parameter, all the following may be null.
* @param endpoint extend the filters originally supplied to the endpoint e.g. "ctx=selector&amp;channel=chanA"
* @param data The serialised data which can be used by the get call
* @param rbacToken byte array containing signed body hash-key and corresponding RBAC role
* Factory method to get a DataSource for a given endpoint
* @param endpoint endpoint address
* @return if there is a DataSource implementation for the protocol of the endpoint return a Factory to create a new
* Instance of this DataSource
* @throws UnsupportedOperationException in case there is no valid implementation
*/
public abstract void get(final String requestId, final URI endpoint, final byte[] data, final byte[] rbacToken);
public static Factory getFactory(final @NotNull URI endpoint) {
for (Factory factory : IMPLEMENTATIONS) {
if (factory.matches(endpoint)) {
return factory;
}
}
throw new UnsupportedOperationException("No DataSource implementation available for endpoint: " + endpoint);
}

/**
* Perform a set request on this endpoint using additional filters
* @param requestId request id which later allows to match the returned value to this query.
* This is the only mandatory parameter, all the following may be null.
* @param endpoint extend the filters originally supplied to the endpoint e.g. "ctx=selector&amp;channel=chanA"
* @param data The serialised data which can be used by the get call
* @param rbacToken byte array containing signed body hash-key and corresponding RBAC role
*/
public abstract void set(final String requestId, final URI endpoint, final byte[] data, final byte[] rbacToken);
public static void register(final Factory factory) {
IMPLEMENTATIONS.add(0, factory); // custom added implementations are added in front to be discovered first
}

protected interface Factory {
boolean matches(final URI endpoint);
Class<? extends IoSerialiser> getMatchingSerialiserType(final URI endpoint);
DataSource newInstance(final ZContext context, final URI endpoint, final Duration timeout, final String clientId);
protected abstract Factory getFactory();

public interface Factory {
/**
* @return returns the list of applicable schemes (and protocols this resolver can handle) this resolver can handle
*/
List<String> getApplicableSchemes();

Class<? extends IoSerialiser> getMatchingSerialiserType(final @NotNull URI endpoint);

List<DnsResolver> getRegisteredDnsResolver();

default boolean matches(final @NotNull URI endpoint) {
final String scheme = Objects.requireNonNull(endpoint.getScheme(), "required URI has no scheme defined: " + endpoint);
return getApplicableSchemes().stream().anyMatch(s -> s.equalsIgnoreCase(scheme));
}

DataSource newInstance(final ZContext context, final @NotNull URI endpoint, final @NotNull Duration timeout, final @NotNull ExecutorService executorService, final @NotNull String clientId);

default void registerDnsResolver(final @NotNull DnsResolver resolver) {
final ArrayList<String> list = new ArrayList<>(getApplicableSchemes());
list.retainAll(resolver.getApplicableSchemes());
if (list.isEmpty()) {
throw new IllegalArgumentException("resolver schemes not compatible with this DataSource: " + resolver);
}
getRegisteredDnsResolver().add(resolver);
}
}
}
77 changes: 44 additions & 33 deletions client/src/main/java/io/opencmw/client/DataSourcePublisher.java
Original file line number Diff line number Diff line change
Expand Up @@ -95,48 +95,47 @@
@SuppressWarnings({ "PMD.GodClass", "PMD.ExcessiveImports", "PMD.TooManyFields" })
public class DataSourcePublisher implements Runnable, Closeable {
public static final int MIN_FRAMES_INTERNAL_MSG = 3;
protected static final ZFrame EMPTY_ZFRAME = new ZFrame(EMPTY_FRAME);
private static final Logger LOGGER = LoggerFactory.getLogger(DataSourcePublisher.class);
private static final ZFrame EMPTY_ZFRAME = new ZFrame(EMPTY_FRAME);
private static final AtomicInteger INSTANCE_COUNT = new AtomicInteger();

static { // register default data sources
DataSource.register(CmwLightDataSource.FACTORY);
DataSource.register(RestDataSource.FACTORY);
DataSource.register(OpenCmwDataSource.FACTORY);
}

protected final long heartbeatInterval = SystemProperties.getValueIgnoreCase(HEARTBEAT, HEARTBEAT_DEFAULT); // [ms] time between to heartbeats in ms
private final String inprocCtrl = "inproc://dsPublisher#" + INSTANCE_COUNT.incrementAndGet();
private final Map<String, ThePromisedFuture<?, ?>> requests = new ConcurrentHashMap<>(); // <requestId, future for the get request>
private final Map<String, DataSource> clientMap = new ConcurrentHashMap<>(); // scheme://authority -> DataSource
protected final String inprocCtrl = "inproc://dsPublisher#" + INSTANCE_COUNT.incrementAndGet();
protected final Map<String, ThePromisedFuture<?, ?>> requests = new ConcurrentHashMap<>(); // <requestId, future for the get request>
protected final Map<String, DataSource> clientMap = new ConcurrentHashMap<>(); // scheme://authority -> DataSource
protected final AtomicInteger internalReqIdGenerator = new AtomicInteger(0);
protected final ExecutorService executor; // NOPMD - threads are ok, not a webapp
protected final ZContext context;
protected final ZMQ.Poller poller;
protected final ZMQ.Socket sourceSocket;
protected final String clientId;
private final IoBuffer byteBuffer = new FastByteBuffer(0, true, null); // never actually used
private final IoClassSerialiser ioClassSerialiser = new IoClassSerialiser(byteBuffer);
private final AtomicBoolean shallRun = new AtomicBoolean(false);
private final AtomicBoolean running = new AtomicBoolean(false);
private final AtomicInteger internalReqIdGenerator = new AtomicInteger(0);
private final EventStore rawDataEventStore;
private final boolean owningContext;
private final ZContext context;
private final ZMQ.Poller poller;
private final ZMQ.Socket sourceSocket;
private final IoBuffer byteBuffer = new FastByteBuffer(0, true, null); // never actually used
private final IoClassSerialiser ioClassSerialiser = new IoClassSerialiser(byteBuffer);
private final String clientId;
private final RbacProvider rbacProvider;
private final ExecutorService executor; // NOPMD - threads are ok, not a webapp
private final EventStore publicationTarget;
private final AtomicReference<Thread> threadReference = new AtomicReference<>();

static { // register default data sources
DataSource.register(CmwLightDataSource.FACTORY);
DataSource.register(RestDataSource.FACTORY);
DataSource.register(OpenCmwDataSource.FACTORY);
}

public DataSourcePublisher(final RbacProvider rbacProvider, final ExecutorService executorService, final String... clientId) {
this(null, null, rbacProvider, executorService, clientId);
start(); // NOPMD
}

public DataSourcePublisher(final ZContext ctx, final EventStore publicationTarget, final RbacProvider rbacProvider, final ExecutorService executorService, final String... clientId) {
owningContext = ctx == null;
this.context = Objects.requireNonNullElse(ctx, new ZContext(SystemProperties.getValueIgnoreCase(N_IO_THREADS, N_IO_THREADS_DEFAULT)));
this.executor = Objects.requireNonNullElse(executorService, Executors.newCachedThreadPool());
poller = context.createPoller(1);
// control socket for adding subscriptions / triggering requests from other threads
sourceSocket = context.createSocket(SocketType.DEALER);
sourceSocket.setHWM(SystemProperties.getValueIgnoreCase(HIGH_WATER_MARK, HIGH_WATER_MARK_DEFAULT));
setDefaultSocketParameters(sourceSocket);
sourceSocket.bind(inprocCtrl);
poller.register(sourceSocket, ZMQ.Poller.POLLIN);

Expand Down Expand Up @@ -180,9 +179,8 @@ public void close() {
if (running.get() && thread != null) {
thread.interrupt();
}
if (owningContext) {
context.destroy();
}
poller.close();
sourceSocket.close();
}

public void start() {
Expand Down Expand Up @@ -213,7 +211,10 @@ public void run() {
// event loop polling all data sources and performing regular housekeeping jobs
long nextHousekeeping = System.currentTimeMillis(); // immediately perform first housekeeping
long timeOut = 0L;
while (!Thread.interrupted() && shallRun.get() && (timeOut <= 0 || -1 != poller.poll(timeOut))) {
while (!Thread.interrupted() && shallRun.get() && !context.isClosed() && (timeOut <= 0 || -1 != poller.poll(timeOut))) {
if (context.isClosed()) {
break;
}
boolean dataAvailable = true;
while (dataAvailable && System.currentTimeMillis() < nextHousekeeping && shallRun.get()) {
dataAvailable = handleDataSourceSockets(); // get data from clients
Expand All @@ -229,6 +230,13 @@ public void run() {
LOGGER.atDebug().log("Shutting down DataSourcePublisher");
}
rawDataEventStore.stop();
for (DataSource dataSource : clientMap.values()) {
try {
dataSource.close();
} catch (Exception e) { // NOPMD
// shut-down close
}
}
running.set(false);
threadReference.set(null);
}
Expand Down Expand Up @@ -319,7 +327,7 @@ protected <R, C> ThePromisedFuture<R, C> newSubscriptionFuture(final URI endpoin
}

@SuppressWarnings({ "PMD.UnusedFormalParameter" }) // method signature is mandated by functional interface
private void internalEventHandler(final RingBufferEvent event, final long sequence, final boolean endOfBatch) {
protected void internalEventHandler(final RingBufferEvent event, final long sequence, final boolean endOfBatch) {
final EvtTypeFilter evtTypeFilter = event.getFilter(EvtTypeFilter.class);
final boolean notifyFuture;
switch (evtTypeFilter.updateType) {
Expand Down Expand Up @@ -369,11 +377,11 @@ private void internalEventHandler(final RingBufferEvent event, final long sequen
final StringWriter sw = new StringWriter();
final PrintWriter pw = new PrintWriter(sw);
e.printStackTrace(pw);
final ProtocolException protException = new ProtocolException(ANSI_RED + "error deserialising object:\n" + sw.toString() + ANSI_RESET);
final ProtocolException protocolException = new ProtocolException(ANSI_RED + "error deserialising object:\n" + sw.toString() + ANSI_RESET);
if (notifyFuture) {
domainObject.future.setException(protException);
domainObject.future.setException(protocolException);
} else {
executor.submit(() -> domainObject.future.listener.updateException(protException)); // NOPMD - threads are ok, not a webapp
executor.submit(() -> domainObject.future.listener.updateException(protocolException)); // NOPMD - threads are ok, not a webapp
}
}
} else if (notifyFuture) {
Expand All @@ -388,7 +396,7 @@ private void internalEventHandler(final RingBufferEvent event, final long sequen
}

@SuppressWarnings({ "PMD.UnusedFormalParameter" }) // method signature is mandated by functional interface
private void publishToExternalStore(final RingBufferEvent publishEvent, final long seq, final RingBufferEvent sourceEvent, final Object replyDomainObject, final String exception) {
protected void publishToExternalStore(final RingBufferEvent publishEvent, final long seq, final RingBufferEvent sourceEvent, final Object replyDomainObject, final String exception) {
sourceEvent.copyTo(publishEvent);
publishEvent.payload = new SharedPointer<>();
if (replyDomainObject != null) {
Expand All @@ -411,9 +419,12 @@ private void publishToExternalStore(final RingBufferEvent publishEvent, final lo
}
}

private DataSource getClient(final URI endpoint) {
return clientMap.computeIfAbsent(endpoint.getScheme() + "://" + getDeviceName(endpoint), requestedEndPoint -> {
final DataSource dataSource = DataSource.getFactory(URI.create(requestedEndPoint)).newInstance(context, endpoint, Duration.ofMillis(100), Long.toString(internalReqIdGenerator.incrementAndGet()));
protected DataSource getClient(final URI endpoint) {
// N.B. protected method so that knowledgeable/courageous developer can define their own multiplexing 'key' map-criteria
// e.g. a key including the volatile authority and/or a more specific 'device/property' path information, e.g.
// key := "<scheme>://authority/path" (N.B. usually the authority is resolved by the DnsResolver/any Broker)
return clientMap.computeIfAbsent(endpoint.getScheme() + ":/" + getDeviceName(endpoint), requestedEndPoint -> {
final DataSource dataSource = DataSource.getFactory(URI.create(requestedEndPoint)).newInstance(context, endpoint, Duration.ofMillis(100), executor, Long.toString(internalReqIdGenerator.incrementAndGet()));
poller.register(dataSource.getSocket(), ZMQ.Poller.POLLIN);
return dataSource;
});
Expand Down
Loading

0 comments on commit 33c6411

Please sign in to comment.