Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

in progress #2

Merged
merged 3 commits into from
Sep 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) 2023 Synadia Communications Inc. All Rights Reserved.
// See LICENSE and NOTICE file for details.

package io.synadia;
package io.synadia.flink;

public interface Constants {
String SOURCE_SUBJECTS = "source.subjects";
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) 2023 Synadia Communications Inc. All Rights Reserved.
// See LICENSE and NOTICE file for details.

package io.synadia;
package io.synadia.flink;

import io.nats.client.NUID;
import org.apache.flink.util.FlinkRuntimeException;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.synadia.common;
package io.synadia.flink.common;

import io.nats.client.Connection;
import io.nats.client.Nats;
Expand All @@ -8,8 +8,8 @@
import java.io.Serializable;
import java.util.Properties;

import static io.synadia.Utils.jitter;
import static io.synadia.Utils.loadPropertiesFromFile;
import static io.synadia.flink.Utils.jitter;
import static io.synadia.flink.Utils.loadPropertiesFromFile;

public class ConnectionFactory implements Serializable {
private final Properties connectionProperties;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
// Copyright (c) 2023 Synadia Communications Inc. All Rights Reserved.
// See LICENSE and NOTICE file for details.

package io.synadia.common;
package io.synadia.flink.common;

import io.synadia.Utils;
import io.synadia.flink.Utils;

import java.io.IOException;
import java.util.Properties;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) 2023 Synadia Communications Inc. All Rights Reserved.
// See LICENSE and NOTICE file for details.

package io.synadia.common;
package io.synadia.flink.common;

import java.util.ArrayList;
import java.util.Arrays;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) 2023 Synadia Communications Inc. All Rights Reserved.
// See LICENSE and NOTICE file for details.

package io.synadia.common;
package io.synadia.flink.common;

import java.io.Serializable;
import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.synadia.payload;
package io.synadia.flink.payload;

import io.nats.client.impl.Headers;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) 2023 Synadia Communications Inc. All Rights Reserved.
// See LICENSE and NOTICE file for details.

package io.synadia.payload;
package io.synadia.flink.payload;

import java.io.Serializable;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) 2023 Synadia Communications Inc. All Rights Reserved.
// See LICENSE and NOTICE file for details.

package io.synadia.payload;
package io.synadia.flink.payload;

import java.io.Serializable;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) 2023 Synadia Communications Inc. All Rights Reserved.
// See LICENSE and NOTICE file for details.

package io.synadia.payload;
package io.synadia.flink.payload;

import java.io.IOException;
import java.io.ObjectInputStream;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) 2023 Synadia Communications Inc. All Rights Reserved.
// See LICENSE and NOTICE file for details.

package io.synadia.payload;
package io.synadia.flink.payload;

import java.io.IOException;
import java.io.ObjectInputStream;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
// Copyright (c) 2023 Synadia Communications Inc. All Rights Reserved.
// See LICENSE and NOTICE file for details.

package io.synadia.sink;
package io.synadia.flink.sink;

import io.synadia.common.ConnectionFactory;
import io.synadia.common.NatsSubjectsConnection;
import io.synadia.payload.PayloadSerializer;
import io.synadia.flink.common.ConnectionFactory;
import io.synadia.flink.common.NatsSubjectsConnection;
import io.synadia.flink.payload.PayloadSerializer;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
// Copyright (c) 2023 Synadia Communications Inc. All Rights Reserved.
// See LICENSE and NOTICE file for details.

package io.synadia.sink;
package io.synadia.flink.sink;

import io.synadia.common.NatsSubjectsAndConnectionBuilder;
import io.synadia.payload.PayloadSerializer;
import io.synadia.flink.common.NatsSubjectsAndConnectionBuilder;
import io.synadia.flink.payload.PayloadSerializer;

/**
* Builder to construct {@link NatsSink}.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
// Copyright (c) 2023 Synadia Communications Inc. All Rights Reserved.
// See LICENSE and NOTICE file for details.

package io.synadia.sink;
package io.synadia.flink.sink;

import io.nats.client.Connection;
import io.synadia.common.ConnectionFactory;
import io.synadia.payload.PayloadSerializer;
import io.synadia.flink.common.ConnectionFactory;
import io.synadia.flink.payload.PayloadSerializer;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;

Expand All @@ -14,7 +14,7 @@
import java.io.Serializable;
import java.util.List;

import static io.synadia.Utils.generateId;
import static io.synadia.flink.Utils.generateId;

/**
* This class is responsible to publish to one or more NATS subjects
Expand Down
86 changes: 86 additions & 0 deletions src/main/java/io/synadia/flink/source/NatsSource.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Copyright (c) 2023 Synadia Communications Inc. All Rights Reserved.
// See LICENSE and NOTICE file for details.

package io.synadia.flink.source;

import io.synadia.flink.common.ConnectionFactory;
import io.synadia.flink.common.NatsSubjectsConnection;
import io.synadia.flink.payload.PayloadDeserializer;
import io.synadia.flink.source.enumerator.NatsSubjectSourceEnumeratorState;
import io.synadia.flink.source.split.NatsSubjectSplit;
import org.apache.flink.api.connector.source.*;
import org.apache.flink.core.io.SimpleVersionedSerializer;

import java.util.List;

/**
* Flink Source to consumer data from one or more NATS subjects
* @param <OutputT> the type of object to convert message payload data to
*/
public class NatsSource<OutputT> extends NatsSubjectsConnection implements Source<OutputT, NatsSubjectSplit, NatsSubjectSourceEnumeratorState> {
private final PayloadDeserializer<OutputT> payloadDeserializer;

/**
* Create a {@link NatsSourceBuilder} to allow the fluent construction of a new {@link NatsSource}.
* @param <T> type of records being read
* @return {@link NatsSourceBuilder}
*/
public static <T> NatsSourceBuilder<T> builder() {
return new NatsSourceBuilder<>();
}

NatsSource(List<String> subjects,
PayloadDeserializer<OutputT> payloadDeserializer,
ConnectionFactory connectionFactory) {
super(subjects, connectionFactory);
this.payloadDeserializer = payloadDeserializer;
}

@Override
public Boundedness getBoundedness() {
return Boundedness.CONTINUOUS_UNBOUNDED;
}

@Override
public SplitEnumerator<NatsSubjectSplit, NatsSubjectSourceEnumeratorState> createEnumerator(SplitEnumeratorContext<NatsSubjectSplit> enumContext) throws Exception {
return restoreEnumerator(enumContext, null);
}

@Override
public SplitEnumerator<NatsSubjectSplit, NatsSubjectSourceEnumeratorState>
restoreEnumerator(SplitEnumeratorContext<NatsSubjectSplit> enumContext,
NatsSubjectSourceEnumeratorState checkpoint) throws Exception
{
// return new NatsSourceEnumerator(
// enumContext,
// streamArn,
// sourceConfig,
// createKinesisStreamProxy(sourceConfig),
// kinesisShardAssigner,
// checkpoint);
return null;
}

@Override
public SimpleVersionedSerializer<NatsSubjectSplit> getSplitSerializer() {
return null;
}

@Override
public SimpleVersionedSerializer<NatsSubjectSourceEnumeratorState> getEnumeratorCheckpointSerializer() {
return null;
}

@Override
public SourceReader<OutputT, NatsSubjectSplit> createReader(SourceReaderContext readerContext) throws Exception {
return null;
}

/**
* Get the payload deserializer registered for this source
* @return the deserializer
*/
public PayloadDeserializer<OutputT> getPayloadDeserializer() {
return payloadDeserializer;
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
// Copyright (c) 2023 Synadia Communications Inc. All Rights Reserved.
// See LICENSE and NOTICE file for details.

package io.synadia.source;
package io.synadia.flink.source;

import io.synadia.common.NatsSubjectsAndConnectionBuilder;
import io.synadia.payload.PayloadDeserializer;
import io.synadia.flink.common.NatsSubjectsAndConnectionBuilder;
import io.synadia.flink.payload.PayloadDeserializer;

/**
* Builder to construct {@link NatsSource}.
Expand All @@ -23,21 +23,21 @@
* @see NatsSource
* @param <OutputT> type of the records written to Kafka
*/
public class NatsPayloadSourceBuilder<OutputT> extends NatsSubjectsAndConnectionBuilder<NatsPayloadSourceBuilder<OutputT>> {
public class NatsSourceBuilder<OutputT> extends NatsSubjectsAndConnectionBuilder<NatsSourceBuilder<OutputT>> {
private PayloadDeserializer<OutputT> payloadDeserializer;
private String payloadDeserializerClass;

@Override
protected NatsPayloadSourceBuilder<OutputT> getThis() {
return null;
protected NatsSourceBuilder<OutputT> getThis() {
return this;
}

/**
* Set the payload deserializer for the source.
* @param payloadDeserializer the deserializer.
* @return the builder
*/
public NatsPayloadSourceBuilder<OutputT> payloadDeserializer(PayloadDeserializer<OutputT> payloadDeserializer) {
public NatsSourceBuilder<OutputT> payloadDeserializer(PayloadDeserializer<OutputT> payloadDeserializer) {
this.payloadDeserializer = payloadDeserializer;
this.payloadDeserializerClass = null;
return this;
Expand All @@ -48,7 +48,7 @@ public NatsPayloadSourceBuilder<OutputT> payloadDeserializer(PayloadDeserializer
* @param payloadDeserializerClass the serializer class name.
* @return the builder
*/
public NatsPayloadSourceBuilder<OutputT> payloadDeserializerClass(String payloadDeserializerClass) {
public NatsSourceBuilder<OutputT> payloadDeserializerClass(String payloadDeserializerClass) {
this.payloadDeserializer = null;
this.payloadDeserializerClass = payloadDeserializerClass;
return this;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) 2023 Synadia Communications Inc. All Rights Reserved.
// See LICENSE and NOTICE file for details.

package io.synadia.source;
package io.synadia.flink.source;

import java.io.Serializable;

Expand Down
Loading