Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improvements on Nats Serdes #5222

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,13 @@
import io.nats.client.Options;
import io.nats.client.PullSubscribeOptions;
import io.nats.client.api.ConsumerConfiguration;
import io.nats.client.api.RetentionPolicy;
import io.nats.client.api.StreamConfiguration;
import io.quarkus.test.junit.QuarkusIntegrationTest;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.GenericContainer;
Expand All @@ -43,12 +42,12 @@ public class AvroNatsSerdeIT extends ApicurioRegistryBaseIT {

public static final Integer NATS_PORT = 4222;

public static final Integer NATS_MGMT_PORT = 8222;
public static final Integer NATS_MNTR_PORT = 8222;

@BeforeAll
void setupEnvironment() {
if (nats == null || !nats.isRunning()) {
nats = new GenericContainer<>("nats:2.10.20").withExposedPorts(NATS_PORT, NATS_MGMT_PORT)
nats = new GenericContainer<>("nats:2.10.20").withExposedPorts(NATS_PORT, NATS_MNTR_PORT)
.withCommand("--jetstream");
nats.start();
}
Expand Down Expand Up @@ -76,10 +75,10 @@ public void testNatsJsonSchema() throws IOException, InterruptedException, JetSt
jsm = connection.jetStreamManagement();

StreamConfiguration stream = new StreamConfiguration.Builder().subjects(subjectId).name(subjectId)
.build();
.retentionPolicy(RetentionPolicy.WorkQueue).build();

ConsumerConfiguration consumerConfiguration = ConsumerConfiguration.builder().durable(subjectId)
.durable(subjectId).filterSubject(subjectId).build();
.durable(subjectId).filterSubject(subjectId).ackWait(2000).build();

jsm.addStream(stream); // Create Stream in advance
jsm.addOrUpdateConsumer(stream.getName(), consumerConfiguration); // Create Consumer in advance
Expand All @@ -95,16 +94,38 @@ public void testNatsJsonSchema() throws IOException, InterruptedException, JetSt
NatsConsumer<GenericRecord> consumer = new NatsConsumerImpl<>(connection, subjectId, options,
configs);

producer.send(record);
producer.publish(record);

NatsConsumerRecord<GenericRecord> message = consumer.receive();
NatsConsumerRecord<GenericRecord> message = consumer.fetch();

if (message.getPayload() != null) {
GenericRecord event1 = message.getPayload();
Assertions.assertEquals(record, event1);
}

message.ack();

producer.publish(record);
consumer.fetch().nak(); // Nak will redeliver the message until ack'd so message should be left in
// stream
Assertions.assertTrue(jsm.getStreamInfo(stream.getName()).getStreamState().getMsgCount() == 1);

jsm.purgeStream(stream.getName());
producer.publish(record);
consumer.fetch().term(); // this will terminate the message, since there was only one message in
// stream and after calling terminate we should not have any message left
// in stream
Assertions.assertTrue(jsm.getStreamInfo(stream.getName()).getStreamState().getMsgCount() == 0);
producer.publish(record);

NatsConsumerRecord<GenericRecord> newMessage = consumer.fetch();
Thread.sleep(1000); // Ack wait is set to 2second for consumer,after 2 second if ack is not
// received consumer will redeliver the message
newMessage.inProgress(); // with this we are resetting the ackwait to 2 second again so consumer
// do not redeliver the message. we should have a message in ack pending
// state
Assertions.assertTrue(jsm.getConsumerInfo(stream.getName(), consumerConfiguration.getDurable())
.getNumAckPending() == 1);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ public interface NatsConsumer<T> extends AutoCloseable {

String getSubject();

NatsConsumerRecord<T> receive() throws JetStreamApiException, IOException;
NatsConsumerRecord<T> fetch() throws JetStreamApiException, IOException;

NatsConsumerRecord<T> receive(Duration timeout) throws JetStreamApiException, IOException;
NatsConsumerRecord<T> fetch(Duration timeout) throws JetStreamApiException, IOException;

Collection<NatsConsumerRecord<T>> receive(int batchSize, Duration timeout)
Collection<NatsConsumerRecord<T>> fetch(int batchSize, Duration timeout)
throws JetStreamApiException, IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,19 +58,19 @@ public String getSubject() {
}

@Override
public NatsConsumerRecord<DATA> receive() throws JetStreamApiException, IOException {
return receive(Duration.ofSeconds(3));
public NatsConsumerRecord<DATA> fetch() throws JetStreamApiException, IOException {
return fetch(Duration.ofSeconds(3));
}

@Override
public NatsConsumerRecord<DATA> receive(Duration timeout) throws JetStreamApiException, IOException {
Collection<NatsConsumerRecord<DATA>> messages = receive(1, timeout);
public NatsConsumerRecord<DATA> fetch(Duration timeout) throws JetStreamApiException, IOException {
Collection<NatsConsumerRecord<DATA>> messages = fetch(1, timeout);
Optional<NatsConsumerRecord<DATA>> record = messages.stream().findFirst();
return record.orElse(null);
}

@Override
public List<NatsConsumerRecord<DATA>> receive(int batchSize, Duration timeout)
public List<NatsConsumerRecord<DATA>> fetch(int batchSize, Duration timeout)
throws JetStreamApiException, IOException {
List<Message> messages = getLazySubscription().fetch(batchSize, timeout);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,10 @@ public interface NatsConsumerRecord<T> {
T getPayload();

void ack();

void nak();

void term();

void inProgress();
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,19 @@ public T getPayload() {
public void ack() {
natsMessage.ack();
}

@Override
public void nak() {
natsMessage.nak();
}

@Override
public void term() {
natsMessage.term();
}

@Override
public void inProgress() {
natsMessage.inProgress();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@

public interface NatsProducer<T> extends AutoCloseable {

void send(T message) throws ApicurioNatsException;
void publish(T message) throws ApicurioNatsException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public NatsProducerImpl(Connection connection, String subject, Map<String, Objec
}

@Override
public void send(DATA message) throws ApicurioNatsException {
public void publish(DATA message) throws ApicurioNatsException {
byte[] data = serializer.serializeData(subject, message);

try {
Expand Down