Skip to content

Commit

Permalink
in progress
Browse files Browse the repository at this point in the history
  • Loading branch information
scottf committed Sep 28, 2023
1 parent 1eb7b54 commit 3eb10d5
Showing 1 changed file with 6 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@

package io.synadia.flink.source.enumerator;

import com.esotericsoftware.minlog.Log;
import io.nats.client.Connection;
import io.nats.client.Dispatcher;
import io.nats.client.support.Debug;
import io.synadia.flink.source.split.NatsSubjectSplit;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
Expand Down Expand Up @@ -48,7 +48,7 @@ public NatsSourceEnumerator(SplitEnumeratorContext<NatsSubjectSplit> context,

@Override
public void start() {
Debug.dbg("NatsSourceEnumerator start");
Log.debug("NatsSourceEnumerator start");
}

// private List<NatsSubjectSplit> periodicallyDiscoverSplits() {
Expand All @@ -57,7 +57,7 @@ public void start() {

@Override
public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) {
Debug.dbg("NatsSourceEnumerator.start | subtaskId:" + subtaskId + " | requesterHostname:" + requesterHostname);
Log.debug("NatsSourceEnumerator.start | subtaskId:" + subtaskId + " | requesterHostname:" + requesterHostname);
NatsSubjectSplit split = null;
if (split == null) {
context.signalNoMoreSplits(subtaskId);
Expand All @@ -69,7 +69,7 @@ public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname

@Override
public void addSplitsBack(List<NatsSubjectSplit> splits, int subtaskId) {
Debug.dbg("NatsSourceEnumerator.addSplitsBack | splits:" + (splits == null ? -1 : splits.size()) + " | subtaskId:" + subtaskId);
Log.debug("NatsSourceEnumerator.addSplitsBack | splits:" + (splits == null ? -1 : splits.size()) + " | subtaskId:" + subtaskId);
if (!splitAssignment.containsKey(subtaskId)) {
LOG.warn(
"Unable to add splits back for subtask {} since it is not assigned any splits. Splits: {}",
Expand All @@ -82,13 +82,13 @@ public void addSplitsBack(List<NatsSubjectSplit> splits, int subtaskId) {

@Override
public void addReader(int subtaskId) {
Debug.dbg("NatsSourceEnumerator.start | subtaskId:" + subtaskId);
Log.debug("NatsSourceEnumerator.start | subtaskId:" + subtaskId);
splitAssignment.putIfAbsent(subtaskId, new HashSet<>());
}

@Override
public NatsSubjectSourceEnumeratorState snapshotState(long checkpointId) throws Exception {
Debug.dbg("NatsSourceEnumerator.snapshotState | checkpointId:" + checkpointId);
Log.debug("NatsSourceEnumerator.snapshotState | checkpointId:" + checkpointId);
return new NatsSubjectSourceEnumeratorState(unassignedSplits);
}

Expand Down

0 comments on commit 3eb10d5

Please sign in to comment.