From 1eb7b54d29f36cc7781ede0cc66024f9c2810016 Mon Sep 17 00:00:00 2001 From: scottf Date: Mon, 25 Sep 2023 10:27:51 -0400 Subject: [PATCH 1/3] in progress --- .../io/synadia/{ => flink}/Constants.java | 2 +- .../java/io/synadia/{ => flink}/Utils.java | 2 +- .../{ => flink}/common/ConnectionFactory.java | 6 +- .../common/NatsConnectionBuilder.java | 4 +- .../NatsSubjectsAndConnectionBuilder.java | 2 +- .../common/NatsSubjectsConnection.java | 2 +- .../synadia/{ => flink}/payload/Payload.java | 2 +- .../payload/PayloadDeserializer.java | 2 +- .../payload/PayloadSerializer.java | 2 +- .../payload/StringPayloadDeserializer.java | 2 +- .../payload/StringPayloadSerializer.java | 2 +- .../io/synadia/{ => flink}/sink/NatsSink.java | 8 +- .../{ => flink}/sink/NatsSinkBuilder.java | 6 +- .../{ => flink}/sink/NatsSinkWriter.java | 8 +- .../io/synadia/flink/source/NatsSource.java | 86 +++++++++++++++ .../source/NatsSourceBuilder.java} | 14 +-- .../{ => flink}/source/NoOpCheckpoint.java | 2 +- .../enumerator/NatsSourceEnumerator.java | 104 ++++++++++++++++++ .../NatsSourceEnumeratorStateSerializer.java | 92 ++++++++++++++++ .../NatsSubjectSourceEnumeratorState.java | 23 ++++ .../source/reader}/NatsRecordEmitter.java | 12 +- .../split/NatSubjectStateSplitState.java | 16 +++ .../source/split/NatsSubjectSplit.java | 16 +-- .../split/NatsSubjectSplitSerializer.java | 67 +++++++++++ .../java/io/synadia/source/NatsSource.java | 67 ----------- .../enumerator/NatsSourceEnumState.java | 76 ------------- .../NatsSourceEnumStateSerializer.java | 71 ------------ .../enumerator/NatsSourceEnumerator.java | 90 --------------- .../source/split/NoStateSplitState.java | 7 -- .../{ => io/synadia/flink}/TestBase.java | 10 +- .../synadia/flink}/WordSubscriber.java | 4 +- .../SerializersDeserializersTests.java | 11 +- .../synadia/flink/sink}/SinkTests.java | 8 +- .../synadia/flink/source}/SourceTests.java | 11 +- .../flink/source/StateSerializerTest.java | 42 +++++++ 35 files changed, 495 insertions(+), 384 deletions(-) rename src/main/java/io/synadia/{ => flink}/Constants.java (96%) rename src/main/java/io/synadia/{ => flink}/Utils.java (98%) rename src/main/java/io/synadia/{ => flink}/common/ConnectionFactory.java (94%) rename src/main/java/io/synadia/{ => flink}/common/NatsConnectionBuilder.java (98%) rename src/main/java/io/synadia/{ => flink}/common/NatsSubjectsAndConnectionBuilder.java (97%) rename src/main/java/io/synadia/{ => flink}/common/NatsSubjectsConnection.java (95%) rename src/main/java/io/synadia/{ => flink}/payload/Payload.java (91%) rename src/main/java/io/synadia/{ => flink}/payload/PayloadDeserializer.java (92%) rename src/main/java/io/synadia/{ => flink}/payload/PayloadSerializer.java (92%) rename src/main/java/io/synadia/{ => flink}/payload/StringPayloadDeserializer.java (98%) rename src/main/java/io/synadia/{ => flink}/payload/StringPayloadSerializer.java (98%) rename src/main/java/io/synadia/{ => flink}/sink/NatsSink.java (87%) rename src/main/java/io/synadia/{ => flink}/sink/NatsSinkBuilder.java (94%) rename src/main/java/io/synadia/{ => flink}/sink/NatsSinkWriter.java (92%) create mode 100644 src/main/java/io/synadia/flink/source/NatsSource.java rename src/main/java/io/synadia/{source/NatsPayloadSourceBuilder.java => flink/source/NatsSourceBuilder.java} (81%) rename src/main/java/io/synadia/{ => flink}/source/NoOpCheckpoint.java (86%) create mode 100644 src/main/java/io/synadia/flink/source/enumerator/NatsSourceEnumerator.java create mode 100644 src/main/java/io/synadia/flink/source/enumerator/NatsSourceEnumeratorStateSerializer.java create mode 100644 src/main/java/io/synadia/flink/source/enumerator/NatsSubjectSourceEnumeratorState.java rename src/main/java/io/synadia/{source => flink/source/reader}/NatsRecordEmitter.java (63%) create mode 100644 src/main/java/io/synadia/flink/source/split/NatSubjectStateSplitState.java rename src/main/java/io/synadia/{ => flink}/source/split/NatsSubjectSplit.java (62%) create mode 100644 src/main/java/io/synadia/flink/source/split/NatsSubjectSplitSerializer.java delete mode 100644 src/main/java/io/synadia/source/NatsSource.java delete mode 100644 src/main/java/io/synadia/source/enumerator/NatsSourceEnumState.java delete mode 100644 src/main/java/io/synadia/source/enumerator/NatsSourceEnumStateSerializer.java delete mode 100644 src/main/java/io/synadia/source/enumerator/NatsSourceEnumerator.java delete mode 100644 src/main/java/io/synadia/source/split/NoStateSplitState.java rename src/test/java/io/synadia/{ => io/synadia/flink}/TestBase.java (96%) rename src/test/java/io/synadia/{ => io/synadia/flink}/WordSubscriber.java (92%) rename src/test/java/io/synadia/{ => io/synadia/flink/payload}/SerializersDeserializersTests.java (94%) rename src/test/java/io/synadia/{ => io/synadia/flink/sink}/SinkTests.java (93%) rename src/test/java/io/synadia/{ => io/synadia/flink/source}/SourceTests.java (86%) create mode 100644 src/test/java/io/synadia/io/synadia/flink/source/StateSerializerTest.java diff --git a/src/main/java/io/synadia/Constants.java b/src/main/java/io/synadia/flink/Constants.java similarity index 96% rename from src/main/java/io/synadia/Constants.java rename to src/main/java/io/synadia/flink/Constants.java index 535a7d2..9562448 100644 --- a/src/main/java/io/synadia/Constants.java +++ b/src/main/java/io/synadia/flink/Constants.java @@ -1,7 +1,7 @@ // Copyright (c) 2023 Synadia Communications Inc. All Rights Reserved. // See LICENSE and NOTICE file for details. -package io.synadia; +package io.synadia.flink; public interface Constants { String SOURCE_SUBJECTS = "source.subjects"; diff --git a/src/main/java/io/synadia/Utils.java b/src/main/java/io/synadia/flink/Utils.java similarity index 98% rename from src/main/java/io/synadia/Utils.java rename to src/main/java/io/synadia/flink/Utils.java index 299e5da..2aa58fc 100644 --- a/src/main/java/io/synadia/Utils.java +++ b/src/main/java/io/synadia/flink/Utils.java @@ -1,7 +1,7 @@ // Copyright (c) 2023 Synadia Communications Inc. All Rights Reserved. // See LICENSE and NOTICE file for details. -package io.synadia; +package io.synadia.flink; import io.nats.client.NUID; import org.apache.flink.util.FlinkRuntimeException; diff --git a/src/main/java/io/synadia/common/ConnectionFactory.java b/src/main/java/io/synadia/flink/common/ConnectionFactory.java similarity index 94% rename from src/main/java/io/synadia/common/ConnectionFactory.java rename to src/main/java/io/synadia/flink/common/ConnectionFactory.java index 111cd1b..2822a92 100644 --- a/src/main/java/io/synadia/common/ConnectionFactory.java +++ b/src/main/java/io/synadia/flink/common/ConnectionFactory.java @@ -1,4 +1,4 @@ -package io.synadia.common; +package io.synadia.flink.common; import io.nats.client.Connection; import io.nats.client.Nats; @@ -8,8 +8,8 @@ import java.io.Serializable; import java.util.Properties; -import static io.synadia.Utils.jitter; -import static io.synadia.Utils.loadPropertiesFromFile; +import static io.synadia.flink.Utils.jitter; +import static io.synadia.flink.Utils.loadPropertiesFromFile; public class ConnectionFactory implements Serializable { private final Properties connectionProperties; diff --git a/src/main/java/io/synadia/common/NatsConnectionBuilder.java b/src/main/java/io/synadia/flink/common/NatsConnectionBuilder.java similarity index 98% rename from src/main/java/io/synadia/common/NatsConnectionBuilder.java rename to src/main/java/io/synadia/flink/common/NatsConnectionBuilder.java index b5e3dfd..d8b2444 100644 --- a/src/main/java/io/synadia/common/NatsConnectionBuilder.java +++ b/src/main/java/io/synadia/flink/common/NatsConnectionBuilder.java @@ -1,9 +1,9 @@ // Copyright (c) 2023 Synadia Communications Inc. All Rights Reserved. // See LICENSE and NOTICE file for details. -package io.synadia.common; +package io.synadia.flink.common; -import io.synadia.Utils; +import io.synadia.flink.Utils; import java.io.IOException; import java.util.Properties; diff --git a/src/main/java/io/synadia/common/NatsSubjectsAndConnectionBuilder.java b/src/main/java/io/synadia/flink/common/NatsSubjectsAndConnectionBuilder.java similarity index 97% rename from src/main/java/io/synadia/common/NatsSubjectsAndConnectionBuilder.java rename to src/main/java/io/synadia/flink/common/NatsSubjectsAndConnectionBuilder.java index 0697244..638628b 100644 --- a/src/main/java/io/synadia/common/NatsSubjectsAndConnectionBuilder.java +++ b/src/main/java/io/synadia/flink/common/NatsSubjectsAndConnectionBuilder.java @@ -1,7 +1,7 @@ // Copyright (c) 2023 Synadia Communications Inc. All Rights Reserved. // See LICENSE and NOTICE file for details. -package io.synadia.common; +package io.synadia.flink.common; import java.util.ArrayList; import java.util.Arrays; diff --git a/src/main/java/io/synadia/common/NatsSubjectsConnection.java b/src/main/java/io/synadia/flink/common/NatsSubjectsConnection.java similarity index 95% rename from src/main/java/io/synadia/common/NatsSubjectsConnection.java rename to src/main/java/io/synadia/flink/common/NatsSubjectsConnection.java index 40e0c4f..a91c511 100644 --- a/src/main/java/io/synadia/common/NatsSubjectsConnection.java +++ b/src/main/java/io/synadia/flink/common/NatsSubjectsConnection.java @@ -1,7 +1,7 @@ // Copyright (c) 2023 Synadia Communications Inc. All Rights Reserved. // See LICENSE and NOTICE file for details. -package io.synadia.common; +package io.synadia.flink.common; import java.io.Serializable; import java.util.List; diff --git a/src/main/java/io/synadia/payload/Payload.java b/src/main/java/io/synadia/flink/payload/Payload.java similarity index 91% rename from src/main/java/io/synadia/payload/Payload.java rename to src/main/java/io/synadia/flink/payload/Payload.java index 0747e69..d5cda8a 100644 --- a/src/main/java/io/synadia/payload/Payload.java +++ b/src/main/java/io/synadia/flink/payload/Payload.java @@ -1,4 +1,4 @@ -package io.synadia.payload; +package io.synadia.flink.payload; import io.nats.client.impl.Headers; diff --git a/src/main/java/io/synadia/payload/PayloadDeserializer.java b/src/main/java/io/synadia/flink/payload/PayloadDeserializer.java similarity index 92% rename from src/main/java/io/synadia/payload/PayloadDeserializer.java rename to src/main/java/io/synadia/flink/payload/PayloadDeserializer.java index 14ec8eb..f4b02a2 100644 --- a/src/main/java/io/synadia/payload/PayloadDeserializer.java +++ b/src/main/java/io/synadia/flink/payload/PayloadDeserializer.java @@ -1,7 +1,7 @@ // Copyright (c) 2023 Synadia Communications Inc. All Rights Reserved. // See LICENSE and NOTICE file for details. -package io.synadia.payload; +package io.synadia.flink.payload; import java.io.Serializable; diff --git a/src/main/java/io/synadia/payload/PayloadSerializer.java b/src/main/java/io/synadia/flink/payload/PayloadSerializer.java similarity index 92% rename from src/main/java/io/synadia/payload/PayloadSerializer.java rename to src/main/java/io/synadia/flink/payload/PayloadSerializer.java index 730d08b..f83ce91 100644 --- a/src/main/java/io/synadia/payload/PayloadSerializer.java +++ b/src/main/java/io/synadia/flink/payload/PayloadSerializer.java @@ -1,7 +1,7 @@ // Copyright (c) 2023 Synadia Communications Inc. All Rights Reserved. // See LICENSE and NOTICE file for details. -package io.synadia.payload; +package io.synadia.flink.payload; import java.io.Serializable; diff --git a/src/main/java/io/synadia/payload/StringPayloadDeserializer.java b/src/main/java/io/synadia/flink/payload/StringPayloadDeserializer.java similarity index 98% rename from src/main/java/io/synadia/payload/StringPayloadDeserializer.java rename to src/main/java/io/synadia/flink/payload/StringPayloadDeserializer.java index da75623..6eb7da3 100644 --- a/src/main/java/io/synadia/payload/StringPayloadDeserializer.java +++ b/src/main/java/io/synadia/flink/payload/StringPayloadDeserializer.java @@ -1,7 +1,7 @@ // Copyright (c) 2023 Synadia Communications Inc. All Rights Reserved. // See LICENSE and NOTICE file for details. -package io.synadia.payload; +package io.synadia.flink.payload; import java.io.IOException; import java.io.ObjectInputStream; diff --git a/src/main/java/io/synadia/payload/StringPayloadSerializer.java b/src/main/java/io/synadia/flink/payload/StringPayloadSerializer.java similarity index 98% rename from src/main/java/io/synadia/payload/StringPayloadSerializer.java rename to src/main/java/io/synadia/flink/payload/StringPayloadSerializer.java index 6cbb3d6..a2584be 100644 --- a/src/main/java/io/synadia/payload/StringPayloadSerializer.java +++ b/src/main/java/io/synadia/flink/payload/StringPayloadSerializer.java @@ -1,7 +1,7 @@ // Copyright (c) 2023 Synadia Communications Inc. All Rights Reserved. // See LICENSE and NOTICE file for details. -package io.synadia.payload; +package io.synadia.flink.payload; import java.io.IOException; import java.io.ObjectInputStream; diff --git a/src/main/java/io/synadia/sink/NatsSink.java b/src/main/java/io/synadia/flink/sink/NatsSink.java similarity index 87% rename from src/main/java/io/synadia/sink/NatsSink.java rename to src/main/java/io/synadia/flink/sink/NatsSink.java index 4789a9d..43e6bfe 100644 --- a/src/main/java/io/synadia/sink/NatsSink.java +++ b/src/main/java/io/synadia/flink/sink/NatsSink.java @@ -1,11 +1,11 @@ // Copyright (c) 2023 Synadia Communications Inc. All Rights Reserved. // See LICENSE and NOTICE file for details. -package io.synadia.sink; +package io.synadia.flink.sink; -import io.synadia.common.ConnectionFactory; -import io.synadia.common.NatsSubjectsConnection; -import io.synadia.payload.PayloadSerializer; +import io.synadia.flink.common.ConnectionFactory; +import io.synadia.flink.common.NatsSubjectsConnection; +import io.synadia.flink.payload.PayloadSerializer; import org.apache.flink.api.connector.sink2.Sink; import org.apache.flink.api.connector.sink2.SinkWriter; diff --git a/src/main/java/io/synadia/sink/NatsSinkBuilder.java b/src/main/java/io/synadia/flink/sink/NatsSinkBuilder.java similarity index 94% rename from src/main/java/io/synadia/sink/NatsSinkBuilder.java rename to src/main/java/io/synadia/flink/sink/NatsSinkBuilder.java index 9a87037..de8e035 100644 --- a/src/main/java/io/synadia/sink/NatsSinkBuilder.java +++ b/src/main/java/io/synadia/flink/sink/NatsSinkBuilder.java @@ -1,10 +1,10 @@ // Copyright (c) 2023 Synadia Communications Inc. All Rights Reserved. // See LICENSE and NOTICE file for details. -package io.synadia.sink; +package io.synadia.flink.sink; -import io.synadia.common.NatsSubjectsAndConnectionBuilder; -import io.synadia.payload.PayloadSerializer; +import io.synadia.flink.common.NatsSubjectsAndConnectionBuilder; +import io.synadia.flink.payload.PayloadSerializer; /** * Builder to construct {@link NatsSink}. diff --git a/src/main/java/io/synadia/sink/NatsSinkWriter.java b/src/main/java/io/synadia/flink/sink/NatsSinkWriter.java similarity index 92% rename from src/main/java/io/synadia/sink/NatsSinkWriter.java rename to src/main/java/io/synadia/flink/sink/NatsSinkWriter.java index c6cda10..003078a 100644 --- a/src/main/java/io/synadia/sink/NatsSinkWriter.java +++ b/src/main/java/io/synadia/flink/sink/NatsSinkWriter.java @@ -1,11 +1,11 @@ // Copyright (c) 2023 Synadia Communications Inc. All Rights Reserved. // See LICENSE and NOTICE file for details. -package io.synadia.sink; +package io.synadia.flink.sink; import io.nats.client.Connection; -import io.synadia.common.ConnectionFactory; -import io.synadia.payload.PayloadSerializer; +import io.synadia.flink.common.ConnectionFactory; +import io.synadia.flink.payload.PayloadSerializer; import org.apache.flink.api.connector.sink2.Sink; import org.apache.flink.api.connector.sink2.SinkWriter; @@ -14,7 +14,7 @@ import java.io.Serializable; import java.util.List; -import static io.synadia.Utils.generateId; +import static io.synadia.flink.Utils.generateId; /** * This class is responsible to publish to one or more NATS subjects diff --git a/src/main/java/io/synadia/flink/source/NatsSource.java b/src/main/java/io/synadia/flink/source/NatsSource.java new file mode 100644 index 0000000..48adede --- /dev/null +++ b/src/main/java/io/synadia/flink/source/NatsSource.java @@ -0,0 +1,86 @@ +// Copyright (c) 2023 Synadia Communications Inc. All Rights Reserved. +// See LICENSE and NOTICE file for details. + +package io.synadia.flink.source; + +import io.synadia.flink.common.ConnectionFactory; +import io.synadia.flink.common.NatsSubjectsConnection; +import io.synadia.flink.payload.PayloadDeserializer; +import io.synadia.flink.source.enumerator.NatsSubjectSourceEnumeratorState; +import io.synadia.flink.source.split.NatsSubjectSplit; +import org.apache.flink.api.connector.source.*; +import org.apache.flink.core.io.SimpleVersionedSerializer; + +import java.util.List; + +/** + * Flink Source to consumer data from one or more NATS subjects + * @param the type of object to convert message payload data to + */ +public class NatsSource extends NatsSubjectsConnection implements Source { + private final PayloadDeserializer payloadDeserializer; + + /** + * Create a {@link NatsSourceBuilder} to allow the fluent construction of a new {@link NatsSource}. + * @param type of records being read + * @return {@link NatsSourceBuilder} + */ + public static NatsSourceBuilder builder() { + return new NatsSourceBuilder<>(); + } + + NatsSource(List subjects, + PayloadDeserializer payloadDeserializer, + ConnectionFactory connectionFactory) { + super(subjects, connectionFactory); + this.payloadDeserializer = payloadDeserializer; + } + + @Override + public Boundedness getBoundedness() { + return Boundedness.CONTINUOUS_UNBOUNDED; + } + + @Override + public SplitEnumerator createEnumerator(SplitEnumeratorContext enumContext) throws Exception { + return restoreEnumerator(enumContext, null); + } + + @Override + public SplitEnumerator + restoreEnumerator(SplitEnumeratorContext enumContext, + NatsSubjectSourceEnumeratorState checkpoint) throws Exception + { +// return new NatsSourceEnumerator( +// enumContext, +// streamArn, +// sourceConfig, +// createKinesisStreamProxy(sourceConfig), +// kinesisShardAssigner, +// checkpoint); + return null; + } + + @Override + public SimpleVersionedSerializer getSplitSerializer() { + return null; + } + + @Override + public SimpleVersionedSerializer getEnumeratorCheckpointSerializer() { + return null; + } + + @Override + public SourceReader createReader(SourceReaderContext readerContext) throws Exception { + return null; + } + + /** + * Get the payload deserializer registered for this source + * @return the deserializer + */ + public PayloadDeserializer getPayloadDeserializer() { + return payloadDeserializer; + } +} diff --git a/src/main/java/io/synadia/source/NatsPayloadSourceBuilder.java b/src/main/java/io/synadia/flink/source/NatsSourceBuilder.java similarity index 81% rename from src/main/java/io/synadia/source/NatsPayloadSourceBuilder.java rename to src/main/java/io/synadia/flink/source/NatsSourceBuilder.java index f240e55..7664e4f 100644 --- a/src/main/java/io/synadia/source/NatsPayloadSourceBuilder.java +++ b/src/main/java/io/synadia/flink/source/NatsSourceBuilder.java @@ -1,10 +1,10 @@ // Copyright (c) 2023 Synadia Communications Inc. All Rights Reserved. // See LICENSE and NOTICE file for details. -package io.synadia.source; +package io.synadia.flink.source; -import io.synadia.common.NatsSubjectsAndConnectionBuilder; -import io.synadia.payload.PayloadDeserializer; +import io.synadia.flink.common.NatsSubjectsAndConnectionBuilder; +import io.synadia.flink.payload.PayloadDeserializer; /** * Builder to construct {@link NatsSource}. @@ -23,12 +23,12 @@ * @see NatsSource * @param type of the records written to Kafka */ -public class NatsPayloadSourceBuilder extends NatsSubjectsAndConnectionBuilder> { +public class NatsSourceBuilder extends NatsSubjectsAndConnectionBuilder> { private PayloadDeserializer payloadDeserializer; private String payloadDeserializerClass; @Override - protected NatsPayloadSourceBuilder getThis() { + protected NatsSourceBuilder getThis() { return null; } @@ -37,7 +37,7 @@ protected NatsPayloadSourceBuilder getThis() { * @param payloadDeserializer the deserializer. * @return the builder */ - public NatsPayloadSourceBuilder payloadDeserializer(PayloadDeserializer payloadDeserializer) { + public NatsSourceBuilder payloadDeserializer(PayloadDeserializer payloadDeserializer) { this.payloadDeserializer = payloadDeserializer; this.payloadDeserializerClass = null; return this; @@ -48,7 +48,7 @@ public NatsPayloadSourceBuilder payloadDeserializer(PayloadDeserializer * @param payloadDeserializerClass the serializer class name. * @return the builder */ - public NatsPayloadSourceBuilder payloadDeserializerClass(String payloadDeserializerClass) { + public NatsSourceBuilder payloadDeserializerClass(String payloadDeserializerClass) { this.payloadDeserializer = null; this.payloadDeserializerClass = payloadDeserializerClass; return this; diff --git a/src/main/java/io/synadia/source/NoOpCheckpoint.java b/src/main/java/io/synadia/flink/source/NoOpCheckpoint.java similarity index 86% rename from src/main/java/io/synadia/source/NoOpCheckpoint.java rename to src/main/java/io/synadia/flink/source/NoOpCheckpoint.java index 724dcf1..ef211ea 100644 --- a/src/main/java/io/synadia/source/NoOpCheckpoint.java +++ b/src/main/java/io/synadia/flink/source/NoOpCheckpoint.java @@ -1,7 +1,7 @@ // Copyright (c) 2023 Synadia Communications Inc. All Rights Reserved. // See LICENSE and NOTICE file for details. -package io.synadia.source; +package io.synadia.flink.source; import java.io.Serializable; diff --git a/src/main/java/io/synadia/flink/source/enumerator/NatsSourceEnumerator.java b/src/main/java/io/synadia/flink/source/enumerator/NatsSourceEnumerator.java new file mode 100644 index 0000000..c2703f9 --- /dev/null +++ b/src/main/java/io/synadia/flink/source/enumerator/NatsSourceEnumerator.java @@ -0,0 +1,104 @@ +// Copyright (c) 2023 Synadia Communications Inc. All Rights Reserved. +// See LICENSE and NOTICE file for details. + +package io.synadia.flink.source.enumerator; + +import io.nats.client.Connection; +import io.nats.client.Dispatcher; +import io.nats.client.support.Debug; +import io.synadia.flink.source.split.NatsSubjectSplit; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.*; + +public class NatsSourceEnumerator implements SplitEnumerator { + private static final Logger LOG = LoggerFactory.getLogger(NatsSourceEnumerator.class); + + private final SplitEnumeratorContext context; + + private final NatsSubjectSourceEnumeratorState state; + private final Map> splitAssignment = new HashMap<>(); + private final Set assignedSplitIds = new HashSet<>(); + private final Set unassignedSplits; + + private Connection connection; + private Dispatcher dispatcher; + + public NatsSourceEnumerator(SplitEnumeratorContext context, + Set subjects) + { + this.context = context; + this.state = null; // new NatsSubjectSourceEnumeratorState(subjects); + unassignedSplits = new HashSet<>(); + } + +// public NatsSourceEnumerator(ConnectionFactory connectionFactory, +// SplitEnumeratorContext context, +// NatsSubjectSourceEnumeratorState natsSubjectSourceEnumeratorState) +// { +// this.connectionFactory = connectionFactory; +// this.context = context; +// this.state = natsSubjectSourceEnumeratorState; +// } + + @Override + public void start() { + Debug.dbg("NatsSourceEnumerator start"); + } + +// private List periodicallyDiscoverSplits() { +// return mapToSplits(shards, InitialPosition.TRIM_HORIZON); +// } + + @Override + public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) { + Debug.dbg("NatsSourceEnumerator.start | subtaskId:" + subtaskId + " | requesterHostname:" + requesterHostname); + NatsSubjectSplit split = null; + if (split == null) { + context.signalNoMoreSplits(subtaskId); + } + else { + context.assignSplit(split, subtaskId); + } + } + + @Override + public void addSplitsBack(List splits, int subtaskId) { + Debug.dbg("NatsSourceEnumerator.addSplitsBack | splits:" + (splits == null ? -1 : splits.size()) + " | subtaskId:" + subtaskId); + if (!splitAssignment.containsKey(subtaskId)) { + LOG.warn( + "Unable to add splits back for subtask {} since it is not assigned any splits. Splits: {}", + subtaskId, + splits); + return; + } +// state.addSplitsBack(splits); + } + + @Override + public void addReader(int subtaskId) { + Debug.dbg("NatsSourceEnumerator.start | subtaskId:" + subtaskId); + splitAssignment.putIfAbsent(subtaskId, new HashSet<>()); + } + + @Override + public NatsSubjectSourceEnumeratorState snapshotState(long checkpointId) throws Exception { + Debug.dbg("NatsSourceEnumerator.snapshotState | checkpointId:" + checkpointId); + return new NatsSubjectSourceEnumeratorState(unassignedSplits); + } + + @Override + public void close() throws IOException { + try { + connection.close(); + } + catch (InterruptedException e) { + throw new IOException(e); + } + } +} diff --git a/src/main/java/io/synadia/flink/source/enumerator/NatsSourceEnumeratorStateSerializer.java b/src/main/java/io/synadia/flink/source/enumerator/NatsSourceEnumeratorStateSerializer.java new file mode 100644 index 0000000..bf8e935 --- /dev/null +++ b/src/main/java/io/synadia/flink/source/enumerator/NatsSourceEnumeratorStateSerializer.java @@ -0,0 +1,92 @@ +// Copyright (c) 2023 Synadia Communications Inc. All Rights Reserved. +// See LICENSE and NOTICE file for details. + +package io.synadia.flink.source.enumerator; + +import io.synadia.flink.source.split.NatsSubjectSplit; +import io.synadia.flink.source.split.NatsSubjectSplitSerializer; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.core.io.VersionMismatchException; + +import java.io.*; +import java.util.HashSet; +import java.util.Set; + +public class NatsSourceEnumeratorStateSerializer + implements SimpleVersionedSerializer { + + private static final int CURRENT_VERSION = 0; + + private final NatsSubjectSplitSerializer splitSerializer; + + public NatsSourceEnumeratorStateSerializer(NatsSubjectSplitSerializer splitSerializer) { + this.splitSerializer = splitSerializer; + } + + @Override + public int getVersion() { + return CURRENT_VERSION; + } + + @Override + public byte[] serialize(NatsSubjectSourceEnumeratorState enumState) throws IOException { + try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream out = new DataOutputStream(baos)) { + + out.writeInt(enumState.getUnassignedSplits().size()); + out.writeInt(splitSerializer.getVersion()); + for (NatsSubjectSplit split : enumState.getUnassignedSplits()) { + byte[] serializedSplit = splitSerializer.serialize(split); + out.writeInt(serializedSplit.length); + out.write(serializedSplit); + } + + out.flush(); + + return baos.toByteArray(); + } + } + + @Override + public NatsSubjectSourceEnumeratorState deserialize(int version, byte[] serializedEnumeratorState) throws IOException { + try (ByteArrayInputStream bais = new ByteArrayInputStream(serializedEnumeratorState); + DataInputStream in = new DataInputStream(bais)) { + + if (version != getVersion()) { + throw new VersionMismatchException( + "Trying to deserialize NatsSubjectSourceEnumeratorState serialized with unsupported version " + + version + + ". Serializer version is " + + getVersion()); + } + + final int numUnassignedSplits = in.readInt(); + final int splitSerializerVersion = in.readInt(); + if (splitSerializerVersion != splitSerializer.getVersion()) { + throw new VersionMismatchException( + "Trying to deserialize NatsSubjectSplit serialized with unsupported version " + + splitSerializerVersion + + ". Serializer version is " + + splitSerializer.getVersion()); + } + Set unassignedSplits = new HashSet<>(numUnassignedSplits); + for (int i = 0; i < numUnassignedSplits; i++) { + int serializedLength = in.readInt(); + byte[] serializedSplit = new byte[serializedLength]; + if (in.read(serializedSplit) != -1) { + unassignedSplits.add( + splitSerializer.deserialize(splitSerializerVersion, serializedSplit)); + } else { + throw new IOException( + "Unexpectedly reading more bytes than is present in stream."); + } + } + + if (in.available() > 0) { + throw new IOException("Unexpected trailing bytes when deserializing."); + } + + return new NatsSubjectSourceEnumeratorState(unassignedSplits); + } + } +} diff --git a/src/main/java/io/synadia/flink/source/enumerator/NatsSubjectSourceEnumeratorState.java b/src/main/java/io/synadia/flink/source/enumerator/NatsSubjectSourceEnumeratorState.java new file mode 100644 index 0000000..4f76a25 --- /dev/null +++ b/src/main/java/io/synadia/flink/source/enumerator/NatsSubjectSourceEnumeratorState.java @@ -0,0 +1,23 @@ +// Copyright (c) 2023 Synadia Communications Inc. All Rights Reserved. +// See LICENSE and NOTICE file for details. + +package io.synadia.flink.source.enumerator; + +import io.synadia.flink.source.split.NatsSubjectSplit; +import org.apache.flink.annotation.Internal; + +import java.util.Set; + +/** The state of Nats source enumerator. */ +@Internal +public class NatsSubjectSourceEnumeratorState { + private final Set unassignedSplits; + + public NatsSubjectSourceEnumeratorState(Set unassignedSplits) { + this.unassignedSplits = unassignedSplits; + } + + public Set getUnassignedSplits() { + return unassignedSplits; + } +} diff --git a/src/main/java/io/synadia/source/NatsRecordEmitter.java b/src/main/java/io/synadia/flink/source/reader/NatsRecordEmitter.java similarity index 63% rename from src/main/java/io/synadia/source/NatsRecordEmitter.java rename to src/main/java/io/synadia/flink/source/reader/NatsRecordEmitter.java index 90f91b1..b408ce1 100644 --- a/src/main/java/io/synadia/source/NatsRecordEmitter.java +++ b/src/main/java/io/synadia/flink/source/reader/NatsRecordEmitter.java @@ -1,22 +1,22 @@ // Copyright (c) 2023 Synadia Communications Inc. All Rights Reserved. // See LICENSE and NOTICE file for details. -package io.synadia.source; +package io.synadia.flink.source.reader; -import io.synadia.payload.PayloadDeserializer; -import io.synadia.source.split.NoStateSplitState; +import io.synadia.flink.payload.PayloadDeserializer; +import io.synadia.flink.source.split.NatSubjectStateSplitState; import org.apache.flink.api.connector.source.SourceOutput; import org.apache.flink.connector.base.source.reader.RecordEmitter; -public class NatsRecordEmitter implements RecordEmitter { - private PayloadDeserializer payloadDeserializer; +public class NatsRecordEmitter implements RecordEmitter { + private final PayloadDeserializer payloadDeserializer; public NatsRecordEmitter(PayloadDeserializer payloadDeserializer) { this.payloadDeserializer = payloadDeserializer; } @Override - public void emitRecord(byte[] bytes, SourceOutput sourceOutput, NoStateSplitState noStateSplitState) throws Exception { + public void emitRecord(byte[] bytes, SourceOutput sourceOutput, NatSubjectStateSplitState natSubjectStateSplitState) throws Exception { sourceOutput.collect(payloadDeserializer.getObject(bytes)); } } diff --git a/src/main/java/io/synadia/flink/source/split/NatSubjectStateSplitState.java b/src/main/java/io/synadia/flink/source/split/NatSubjectStateSplitState.java new file mode 100644 index 0000000..2f97375 --- /dev/null +++ b/src/main/java/io/synadia/flink/source/split/NatSubjectStateSplitState.java @@ -0,0 +1,16 @@ +// Copyright (c) 2023 Synadia Communications Inc. All Rights Reserved. +// See LICENSE and NOTICE file for details. + +package io.synadia.flink.source.split; + +public class NatSubjectStateSplitState { + private final NatsSubjectSplit natsSubjectSplit; + + public NatSubjectStateSplitState(NatsSubjectSplit natsSubjectSplit) { + this.natsSubjectSplit = natsSubjectSplit; + } + + public String getSplitId() { + return natsSubjectSplit.splitId(); + } +} diff --git a/src/main/java/io/synadia/source/split/NatsSubjectSplit.java b/src/main/java/io/synadia/flink/source/split/NatsSubjectSplit.java similarity index 62% rename from src/main/java/io/synadia/source/split/NatsSubjectSplit.java rename to src/main/java/io/synadia/flink/source/split/NatsSubjectSplit.java index 83b1ae9..e223ce0 100644 --- a/src/main/java/io/synadia/source/split/NatsSubjectSplit.java +++ b/src/main/java/io/synadia/flink/source/split/NatsSubjectSplit.java @@ -1,26 +1,14 @@ // Copyright (c) 2023 Synadia Communications Inc. All Rights Reserved. // See LICENSE and NOTICE file for details. -package io.synadia.source.split; +package io.synadia.flink.source.split; -import io.synadia.source.enumerator.NatsSourceEnumStateSerializer; import org.apache.flink.api.connector.source.SourceSplit; -import javax.annotation.Nullable; -import java.io.Serializable; - -public class NatsSubjectSplit implements SourceSplit, Serializable { - private static final long serialVersionUID = 1L; +public class NatsSubjectSplit implements SourceSplit { private final String subject; - /** - * The splits are frequently serialized into checkpoints. Caching the byte representation makes - * repeated serialization cheap. This field is used by {@link NatsSourceEnumStateSerializer}. - */ - @Nullable - transient byte[] serializedFormCache; - public NatsSubjectSplit(String subject) { this.subject = subject; } diff --git a/src/main/java/io/synadia/flink/source/split/NatsSubjectSplitSerializer.java b/src/main/java/io/synadia/flink/source/split/NatsSubjectSplitSerializer.java new file mode 100644 index 0000000..58f2b9b --- /dev/null +++ b/src/main/java/io/synadia/flink/source/split/NatsSubjectSplitSerializer.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.synadia.flink.source.split; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.core.io.VersionMismatchException; + +import java.io.*; + +/** + * Serializes and deserializes the {@link NatsSubjectSplit}. This class needs to handle + * deserializing splits from older versions. + */ +@Internal +public class NatsSubjectSplitSerializer implements SimpleVersionedSerializer { + + private static final int CURRENT_VERSION = 0; + + @Override + public int getVersion() { + return CURRENT_VERSION; + } + + @Override + public byte[] serialize(NatsSubjectSplit split) throws IOException { + try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream out = new DataOutputStream(baos)) + { + out.writeUTF(split.getSubject()); + out.flush(); + return baos.toByteArray(); + } + } + + @Override + public NatsSubjectSplit deserialize(int version, byte[] serialized) throws IOException { + try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized); + DataInputStream in = new DataInputStream(bais)) { + if (version != getVersion()) { + throw new VersionMismatchException( + "Trying to deserialize NatsSubjectSplit serialized with unsupported version " + + version + + ". Version of serializer is " + + getVersion()); + } + + return new NatsSubjectSplit(in.readUTF()); + } + } +} diff --git a/src/main/java/io/synadia/source/NatsSource.java b/src/main/java/io/synadia/source/NatsSource.java deleted file mode 100644 index eac0e2f..0000000 --- a/src/main/java/io/synadia/source/NatsSource.java +++ /dev/null @@ -1,67 +0,0 @@ -// Copyright (c) 2023 Synadia Communications Inc. All Rights Reserved. -// See LICENSE and NOTICE file for details. - -package io.synadia.source; - -import io.synadia.common.ConnectionFactory; -import io.synadia.common.NatsSubjectsConnection; -import io.synadia.payload.PayloadDeserializer; -import io.synadia.source.split.NatsSubjectSplit; -import org.apache.flink.api.connector.source.*; -import org.apache.flink.core.io.SimpleVersionedSerializer; - -import java.util.Collection; -import java.util.List; - -/** - * Flink Source to consumer data from one or more NATS subjects - * @param the type of object to convert message payload data to - */ -public class NatsSource extends NatsSubjectsConnection implements Source> { - private final PayloadDeserializer payloadDeserializer; - - NatsSource(List subjects, - PayloadDeserializer payloadDeserializer, - ConnectionFactory connectionFactory) { - super(subjects, connectionFactory); - this.payloadDeserializer = payloadDeserializer; - } - - @Override - public Boundedness getBoundedness() { - return Boundedness.CONTINUOUS_UNBOUNDED; - } - - @Override - public SplitEnumerator> createEnumerator(SplitEnumeratorContext enumContext) throws Exception { - return null; - } - - @Override - public SplitEnumerator> restoreEnumerator(SplitEnumeratorContext enumContext, Collection checkpoint) throws Exception { - return null; - } - - @Override - public SimpleVersionedSerializer getSplitSerializer() { - return null; - } - - @Override - public SimpleVersionedSerializer> getEnumeratorCheckpointSerializer() { - return null; - } - - @Override - public SourceReader createReader(SourceReaderContext readerContext) throws Exception { - return null; - } - - /** - * Get the payload deserializer registered for this source - * @return the deserializer - */ - public PayloadDeserializer getPayloadDeserializer() { - return payloadDeserializer; - } -} diff --git a/src/main/java/io/synadia/source/enumerator/NatsSourceEnumState.java b/src/main/java/io/synadia/source/enumerator/NatsSourceEnumState.java deleted file mode 100644 index fdc9075..0000000 --- a/src/main/java/io/synadia/source/enumerator/NatsSourceEnumState.java +++ /dev/null @@ -1,76 +0,0 @@ -// Copyright (c) 2023 Synadia Communications Inc. All Rights Reserved. -// See LICENSE and NOTICE file for details. - -package io.synadia.source.enumerator; - -import io.synadia.source.split.NatsSubjectSplit; -import org.apache.flink.annotation.Internal; - -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Set; - -/** The state of Nats source enumerator. */ -@Internal -public class NatsSourceEnumState { - private final Set assignedSplits; - private final Set unassignedSplits; - - public NatsSourceEnumState(Set unassigned) { - this(Collections.emptySet(), unassigned); - } - - NatsSourceEnumState(Set assigned, Set unassigned) { - assignedSplits = new HashSet<>(); - for (String s : assigned) { - assignedSplits.add(new NatsSubjectSplit(s)); - } - unassignedSplits = new HashSet<>(); - for (String s : unassigned) { - unassignedSplits.add(new NatsSubjectSplit(s)); - } - } - - // used for copy method - private NatsSourceEnumState() { - this(Collections.emptySet(), Collections.emptySet()); - } - - public NatsSourceEnumState copy() { - NatsSourceEnumState state = new NatsSourceEnumState(); - state.assignedSplits.addAll(assignedSplits); - state.unassignedSplits.addAll(unassignedSplits); - return state; - } - - public boolean hasUnassigned() { - return !unassignedSplits.isEmpty(); - } - - public NatsSubjectSplit assign() { - if (unassignedSplits.isEmpty()) { - return null; - } - NatsSubjectSplit subject = unassignedSplits.iterator().next(); - unassignedSplits.remove(subject); - assignedSplits.add(subject); - return subject; - } - - public void addSplitsBack(List splits) { - for (NatsSubjectSplit split : splits) { - if (assignedSplits.remove(split)) { - unassignedSplits.add(split); - } - } - } - - public Set getAssignedSplits() { - return assignedSplits; - } - - public Set getUnassignedSplits() { - return unassignedSplits; - } -} diff --git a/src/main/java/io/synadia/source/enumerator/NatsSourceEnumStateSerializer.java b/src/main/java/io/synadia/source/enumerator/NatsSourceEnumStateSerializer.java deleted file mode 100644 index 862487e..0000000 --- a/src/main/java/io/synadia/source/enumerator/NatsSourceEnumStateSerializer.java +++ /dev/null @@ -1,71 +0,0 @@ -// Copyright (c) 2023 Synadia Communications Inc. All Rights Reserved. -// See LICENSE and NOTICE file for details. - -package io.synadia.source.enumerator; - -import io.synadia.source.split.NatsSubjectSplit; -import org.apache.flink.core.io.SimpleVersionedSerializer; - -import java.io.*; -import java.util.HashSet; -import java.util.Set; - -public class NatsSourceEnumStateSerializer - implements SimpleVersionedSerializer { - - private static final int VERSION = 1; - - @Override - public int getVersion() { - return VERSION; - } - - @Override - public byte[] serialize(NatsSourceEnumState enumState) throws IOException { - try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); - DataOutputStream out = new DataOutputStream(baos)) - { - Set asplits = enumState.getAssignedSplits(); - Set usplits = enumState.getUnassignedSplits(); - out.writeInt(asplits.size()); - out.writeInt(usplits.size()); - for (NatsSubjectSplit split : asplits) { - out.writeUTF(split.getSubject()); - } - for (NatsSubjectSplit split : usplits) { - out.writeUTF(split.getSubject()); - } - out.flush(); - return baos.toByteArray(); - } - } - - @Override - public NatsSourceEnumState deserialize(int version, byte[] serialized) throws IOException { - if (version == 1) { - return deserializeV1(serialized); - } - throw new IOException("Unknown version: " + version); - } - - private static NatsSourceEnumState deserializeV1(byte[] serialized) throws IOException { - try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized); - DataInputStream in = new DataInputStream(bais)) { - - int anum = in.readInt(); - int unum = in.readInt(); - Set assigned = new HashSet<>(anum); - Set unassigned = new HashSet<>(unum); - for (int i = 0; i < anum; i++) { - assigned.add(in.readUTF()); - } - for (int i = 0; i < unum; i++) { - unassigned.add(in.readUTF()); - } - if (in.available() > 0) { - throw new IOException("Unexpected trailing bytes in serialized topic partitions"); - } - return new NatsSourceEnumState(assigned, unassigned); - } - } -} diff --git a/src/main/java/io/synadia/source/enumerator/NatsSourceEnumerator.java b/src/main/java/io/synadia/source/enumerator/NatsSourceEnumerator.java deleted file mode 100644 index 36fd6df..0000000 --- a/src/main/java/io/synadia/source/enumerator/NatsSourceEnumerator.java +++ /dev/null @@ -1,90 +0,0 @@ -// Copyright (c) 2023 Synadia Communications Inc. All Rights Reserved. -// See LICENSE and NOTICE file for details. - -package io.synadia.source.enumerator; - -import io.nats.client.Connection; -import io.nats.client.Dispatcher; -import io.nats.client.support.Debug; -import io.synadia.common.ConnectionFactory; -import io.synadia.source.split.NatsSubjectSplit; -import org.apache.flink.api.connector.source.SplitEnumerator; -import org.apache.flink.api.connector.source.SplitEnumeratorContext; - -import javax.annotation.Nullable; -import java.io.IOException; -import java.util.List; -import java.util.Set; - -public class NatsSourceEnumerator implements SplitEnumerator { - private final ConnectionFactory connectionFactory; - private final SplitEnumeratorContext context; - - private final NatsSourceEnumState state; - - // Lazily instantiated or mutable fields. - private Connection connection; - private Dispatcher dispatcher; - - public NatsSourceEnumerator(Set unassigned, - ConnectionFactory connectionFactory, - SplitEnumeratorContext context) - { - this.connectionFactory = connectionFactory; - this.context = context; - this.state = new NatsSourceEnumState(unassigned); - } - - public NatsSourceEnumerator(ConnectionFactory connectionFactory, - SplitEnumeratorContext context, - NatsSourceEnumState natsSourceEnumState) - { - this.connectionFactory = connectionFactory; - this.context = context; - this.state = natsSourceEnumState; - } - - @Override - public void start() { - Debug.dbg("NatsSourceEnumerator start"); - } - - @Override - public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) { - Debug.dbg("NatsSourceEnumerator.start | subtaskId:" + subtaskId + " | requesterHostname:" + requesterHostname); - NatsSubjectSplit split = state.assign(); - if (split == null) { - context.signalNoMoreSplits(subtaskId); - } - else { - context.assignSplit(state.assign(), subtaskId); - } - } - - @Override - public void addSplitsBack(List splits, int subtaskId) { - Debug.dbg("NatsSourceEnumerator.addSplitsBack | splits:" + (splits == null ? -1 : splits.size()) + " | subtaskId:" + subtaskId); - state.addSplitsBack(splits); - } - - @Override - public void addReader(int subtaskId) { - Debug.dbg("NatsSourceEnumerator.start | subtaskId:" + subtaskId); - } - - @Override - public NatsSourceEnumState snapshotState(long checkpointId) throws Exception { - Debug.dbg("NatsSourceEnumerator.snapshotState | checkpointId:" + checkpointId); - return state.copy(); - } - - @Override - public void close() throws IOException { - try { - connection.close(); - } - catch (InterruptedException e) { - throw new IOException(e); - } - } -} diff --git a/src/main/java/io/synadia/source/split/NoStateSplitState.java b/src/main/java/io/synadia/source/split/NoStateSplitState.java deleted file mode 100644 index b585f67..0000000 --- a/src/main/java/io/synadia/source/split/NoStateSplitState.java +++ /dev/null @@ -1,7 +0,0 @@ -// Copyright (c) 2023 Synadia Communications Inc. All Rights Reserved. -// See LICENSE and NOTICE file for details. - -package io.synadia.source.split; - -public class NoStateSplitState { -} diff --git a/src/test/java/io/synadia/TestBase.java b/src/test/java/io/synadia/io/synadia/flink/TestBase.java similarity index 96% rename from src/test/java/io/synadia/TestBase.java rename to src/test/java/io/synadia/io/synadia/flink/TestBase.java index 61d31f0..fff4049 100644 --- a/src/test/java/io/synadia/TestBase.java +++ b/src/test/java/io/synadia/io/synadia/flink/TestBase.java @@ -1,4 +1,4 @@ -package io.synadia; +package io.synadia.io.synadia.flink; import io.nats.client.*; import io.nats.client.api.StorageType; @@ -20,10 +20,10 @@ import java.util.logging.Level; public class TestBase { - static final String PLAIN_ASCII = "hello world ascii"; - static final List UTF8_TEST_STRINGS = new ArrayList<>(); - static final List WORD_COUNT_JSONS = new ArrayList<>(); - static final Map WORD_COUNT_MAP = new HashMap<>(); + public static final String PLAIN_ASCII = "hello world ascii"; + public static final List UTF8_TEST_STRINGS = new ArrayList<>(); + public static final List WORD_COUNT_JSONS = new ArrayList<>(); + public static final Map WORD_COUNT_MAP = new HashMap<>(); static { NatsServerRunner.setDefaultOutputSupplier(ConsoleOutput::new); diff --git a/src/test/java/io/synadia/WordSubscriber.java b/src/test/java/io/synadia/io/synadia/flink/WordSubscriber.java similarity index 92% rename from src/test/java/io/synadia/WordSubscriber.java rename to src/test/java/io/synadia/io/synadia/flink/WordSubscriber.java index d50984b..ba38a73 100644 --- a/src/test/java/io/synadia/WordSubscriber.java +++ b/src/test/java/io/synadia/io/synadia/flink/WordSubscriber.java @@ -1,7 +1,7 @@ // Copyright (c) 2023 Synadia Communications Inc. All Rights Reserved. // See LICENSE and NOTICE file for details. -package io.synadia; +package io.synadia.io.synadia.flink; import io.nats.client.Connection; import io.nats.client.Dispatcher; @@ -14,7 +14,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; -class WordSubscriber implements MessageHandler { +public class WordSubscriber implements MessageHandler { public final Dispatcher d; final Map resultMap = new HashMap<>(); diff --git a/src/test/java/io/synadia/SerializersDeserializersTests.java b/src/test/java/io/synadia/io/synadia/flink/payload/SerializersDeserializersTests.java similarity index 94% rename from src/test/java/io/synadia/SerializersDeserializersTests.java rename to src/test/java/io/synadia/io/synadia/flink/payload/SerializersDeserializersTests.java index 18a1c6d..1f16b11 100644 --- a/src/test/java/io/synadia/SerializersDeserializersTests.java +++ b/src/test/java/io/synadia/io/synadia/flink/payload/SerializersDeserializersTests.java @@ -1,13 +1,14 @@ // Copyright (c) 2023 Synadia Communications Inc. All Rights Reserved. // See LICENSE and NOTICE file for details. -package io.synadia; +package io.synadia.io.synadia.flink.payload; import io.nats.client.support.*; -import io.synadia.payload.PayloadDeserializer; -import io.synadia.payload.PayloadSerializer; -import io.synadia.payload.StringPayloadDeserializer; -import io.synadia.payload.StringPayloadSerializer; +import io.synadia.flink.payload.PayloadDeserializer; +import io.synadia.flink.payload.PayloadSerializer; +import io.synadia.flink.payload.StringPayloadDeserializer; +import io.synadia.flink.payload.StringPayloadSerializer; +import io.synadia.io.synadia.flink.TestBase; import org.apache.flink.util.FlinkRuntimeException; import org.junit.jupiter.api.Test; diff --git a/src/test/java/io/synadia/SinkTests.java b/src/test/java/io/synadia/io/synadia/flink/sink/SinkTests.java similarity index 93% rename from src/test/java/io/synadia/SinkTests.java rename to src/test/java/io/synadia/io/synadia/flink/sink/SinkTests.java index 768b2b8..7a51036 100644 --- a/src/test/java/io/synadia/SinkTests.java +++ b/src/test/java/io/synadia/io/synadia/flink/sink/SinkTests.java @@ -1,13 +1,15 @@ // Copyright (c) 2023 Synadia Communications Inc. All Rights Reserved. // See LICENSE and NOTICE file for details. -package io.synadia; +package io.synadia.io.synadia.flink.sink; import io.nats.client.Connection; import io.nats.client.Nats; import io.nats.client.Options; -import io.synadia.payload.StringPayloadSerializer; -import io.synadia.sink.NatsSinkBuilder; +import io.synadia.flink.payload.StringPayloadSerializer; +import io.synadia.flink.sink.NatsSinkBuilder; +import io.synadia.io.synadia.flink.TestBase; +import io.synadia.io.synadia.flink.WordSubscriber; import nats.io.NatsServerRunner; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.time.Time; diff --git a/src/test/java/io/synadia/SourceTests.java b/src/test/java/io/synadia/io/synadia/flink/source/SourceTests.java similarity index 86% rename from src/test/java/io/synadia/SourceTests.java rename to src/test/java/io/synadia/io/synadia/flink/source/SourceTests.java index b15a992..e7c5d5c 100644 --- a/src/test/java/io/synadia/SourceTests.java +++ b/src/test/java/io/synadia/io/synadia/flink/source/SourceTests.java @@ -1,13 +1,14 @@ // Copyright (c) 2023 Synadia Communications Inc. All Rights Reserved. // See LICENSE and NOTICE file for details. -package io.synadia; +package io.synadia.io.synadia.flink.source; import io.nats.client.Connection; import io.nats.client.Options; -import io.synadia.payload.StringPayloadDeserializer; -import io.synadia.source.NatsPayloadSourceBuilder; -import io.synadia.source.NatsSource; +import io.synadia.flink.payload.StringPayloadDeserializer; +import io.synadia.flink.source.NatsSource; +import io.synadia.flink.source.NatsSourceBuilder; +import io.synadia.io.synadia.flink.TestBase; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.time.Time; @@ -34,7 +35,7 @@ private static void _testSource(Connection nc, String subject, String connectionPropertiesFile) throws Exception { final StringPayloadDeserializer deserializer = new StringPayloadDeserializer(); - NatsPayloadSourceBuilder builder = new NatsPayloadSourceBuilder() + NatsSourceBuilder builder = new NatsSourceBuilder() .subjects(subject) .payloadDeserializer(deserializer); diff --git a/src/test/java/io/synadia/io/synadia/flink/source/StateSerializerTest.java b/src/test/java/io/synadia/io/synadia/flink/source/StateSerializerTest.java new file mode 100644 index 0000000..8834eda --- /dev/null +++ b/src/test/java/io/synadia/io/synadia/flink/source/StateSerializerTest.java @@ -0,0 +1,42 @@ +package io.synadia.io.synadia.flink.source; + +import io.synadia.flink.source.enumerator.NatsSourceEnumeratorStateSerializer; +import io.synadia.flink.source.enumerator.NatsSubjectSourceEnumeratorState; +import io.synadia.flink.source.split.NatsSubjectSplit; +import io.synadia.flink.source.split.NatsSubjectSplitSerializer; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class StateSerializerTest { + + @Test + void testSerializeAndDeserialize() throws IOException { + NatsSubjectSplit nss1 = new NatsSubjectSplit("sub1"); + NatsSubjectSplit nss2 = new NatsSubjectSplit("sub2"); + Set unassignedSplits = new HashSet<>(); + unassignedSplits.add(nss1); + unassignedSplits.add(nss2); + + NatsSubjectSourceEnumeratorState initialState = + new NatsSubjectSourceEnumeratorState(unassignedSplits); + + NatsSubjectSplitSerializer splitSerializer = new NatsSubjectSplitSerializer(); + NatsSourceEnumeratorStateSerializer serializer = + new NatsSourceEnumeratorStateSerializer(splitSerializer); + + byte[] serialized = serializer.serialize(initialState); + NatsSubjectSourceEnumeratorState deserializedState = + serializer.deserialize(serializer.getVersion(), serialized); + + unassignedSplits = deserializedState.getUnassignedSplits(); + assertEquals(2, unassignedSplits.size()); + assertTrue(unassignedSplits.contains(nss1)); + assertTrue(unassignedSplits.contains(nss2)); + } +} From 3eb10d50f9df77b949c1f6e84c86d79236325d36 Mon Sep 17 00:00:00 2001 From: scottf Date: Thu, 28 Sep 2023 09:27:34 -0400 Subject: [PATCH 2/3] in progress --- .../source/enumerator/NatsSourceEnumerator.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/main/java/io/synadia/flink/source/enumerator/NatsSourceEnumerator.java b/src/main/java/io/synadia/flink/source/enumerator/NatsSourceEnumerator.java index c2703f9..6ff8638 100644 --- a/src/main/java/io/synadia/flink/source/enumerator/NatsSourceEnumerator.java +++ b/src/main/java/io/synadia/flink/source/enumerator/NatsSourceEnumerator.java @@ -3,9 +3,9 @@ package io.synadia.flink.source.enumerator; +import com.esotericsoftware.minlog.Log; import io.nats.client.Connection; import io.nats.client.Dispatcher; -import io.nats.client.support.Debug; import io.synadia.flink.source.split.NatsSubjectSplit; import org.apache.flink.api.connector.source.SplitEnumerator; import org.apache.flink.api.connector.source.SplitEnumeratorContext; @@ -48,7 +48,7 @@ public NatsSourceEnumerator(SplitEnumeratorContext context, @Override public void start() { - Debug.dbg("NatsSourceEnumerator start"); + Log.debug("NatsSourceEnumerator start"); } // private List periodicallyDiscoverSplits() { @@ -57,7 +57,7 @@ public void start() { @Override public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) { - Debug.dbg("NatsSourceEnumerator.start | subtaskId:" + subtaskId + " | requesterHostname:" + requesterHostname); + Log.debug("NatsSourceEnumerator.start | subtaskId:" + subtaskId + " | requesterHostname:" + requesterHostname); NatsSubjectSplit split = null; if (split == null) { context.signalNoMoreSplits(subtaskId); @@ -69,7 +69,7 @@ public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname @Override public void addSplitsBack(List splits, int subtaskId) { - Debug.dbg("NatsSourceEnumerator.addSplitsBack | splits:" + (splits == null ? -1 : splits.size()) + " | subtaskId:" + subtaskId); + Log.debug("NatsSourceEnumerator.addSplitsBack | splits:" + (splits == null ? -1 : splits.size()) + " | subtaskId:" + subtaskId); if (!splitAssignment.containsKey(subtaskId)) { LOG.warn( "Unable to add splits back for subtask {} since it is not assigned any splits. Splits: {}", @@ -82,13 +82,13 @@ public void addSplitsBack(List splits, int subtaskId) { @Override public void addReader(int subtaskId) { - Debug.dbg("NatsSourceEnumerator.start | subtaskId:" + subtaskId); + Log.debug("NatsSourceEnumerator.start | subtaskId:" + subtaskId); splitAssignment.putIfAbsent(subtaskId, new HashSet<>()); } @Override public NatsSubjectSourceEnumeratorState snapshotState(long checkpointId) throws Exception { - Debug.dbg("NatsSourceEnumerator.snapshotState | checkpointId:" + checkpointId); + Log.debug("NatsSourceEnumerator.snapshotState | checkpointId:" + checkpointId); return new NatsSubjectSourceEnumeratorState(unassignedSplits); } From 97a577e1e8ac46464ad549e4f79f36079cfd9b9b Mon Sep 17 00:00:00 2001 From: scottf Date: Thu, 28 Sep 2023 09:33:29 -0400 Subject: [PATCH 3/3] in progress --- .../flink/source/NatsSourceBuilder.java | 2 +- .../io/synadia/flink/source/SourceTests.java | 37 ++++++++----------- 2 files changed, 16 insertions(+), 23 deletions(-) diff --git a/src/main/java/io/synadia/flink/source/NatsSourceBuilder.java b/src/main/java/io/synadia/flink/source/NatsSourceBuilder.java index 7664e4f..465144e 100644 --- a/src/main/java/io/synadia/flink/source/NatsSourceBuilder.java +++ b/src/main/java/io/synadia/flink/source/NatsSourceBuilder.java @@ -29,7 +29,7 @@ public class NatsSourceBuilder extends NatsSubjectsAndConnectionBuilder @Override protected NatsSourceBuilder getThis() { - return null; + return this; } /** diff --git a/src/test/java/io/synadia/io/synadia/flink/source/SourceTests.java b/src/test/java/io/synadia/io/synadia/flink/source/SourceTests.java index e7c5d5c..be7952f 100644 --- a/src/test/java/io/synadia/io/synadia/flink/source/SourceTests.java +++ b/src/test/java/io/synadia/io/synadia/flink/source/SourceTests.java @@ -6,15 +6,8 @@ import io.nats.client.Connection; import io.nats.client.Options; import io.synadia.flink.payload.StringPayloadDeserializer; -import io.synadia.flink.source.NatsSource; import io.synadia.flink.source.NatsSourceBuilder; import io.synadia.io.synadia.flink.TestBase; -import org.apache.flink.api.common.eventtime.WatermarkStrategy; -import org.apache.flink.api.common.restartstrategy.RestartStrategies; -import org.apache.flink.api.common.time.Time; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.sink.PrintSink; import org.junit.jupiter.api.Test; import java.util.Properties; @@ -39,20 +32,20 @@ private static void _testSource(Connection nc, String subject, .subjects(subject) .payloadDeserializer(deserializer); - if (connectionProperties == null) { - builder.connectionPropertiesFile(connectionPropertiesFile); - } - else { - builder.connectionProperties(connectionProperties); - } - - NatsSource natsSource = builder.build(); - - StreamExecutionEnvironment env = getStreamExecutionEnvironment(); - DataStream ds = env.fromSource(builder.build(), WatermarkStrategy.noWatermarks(), "nats-source-input"); - ds.sinkTo(new PrintSink()); - - env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5, Time.seconds(5))); - env.execute("TestSource"); +// if (connectionProperties == null) { +// builder.connectionPropertiesFile(connectionPropertiesFile); +// } +// else { +// builder.connectionProperties(connectionProperties); +// } +// +// NatsSource natsSource = builder.build(); +// +// StreamExecutionEnvironment env = getStreamExecutionEnvironment(); +// DataStream ds = env.fromSource(builder.build(), WatermarkStrategy.noWatermarks(), "nats-source-input"); +// ds.sinkTo(new PrintSink()); +// +// env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5, Time.seconds(5))); +// env.execute("TestSource"); } }