Skip to content

Commit

Permalink
[ClientAPI]Fix hasMessageAvailable() (apache#6362)
Browse files Browse the repository at this point in the history
Fixes apache#6333 

Previously, `hasMoreMessages` is test against:
```
return lastMessageIdInBroker.compareTo(lastDequeuedMessage) == 0
                && incomingMessages.size() > 0;
```
However, the `incomingMessages` could be 0 when the consumer/reader has just started and hasn't received any messages yet. 

In this PR, the last entry is retrieved and decoded to get message metadata. for the batchIndex field population.
  • Loading branch information
yjshen committed Mar 3, 2020
1 parent 333888a commit baf155f
Show file tree
Hide file tree
Showing 7 changed files with 233 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,11 @@
import javax.naming.AuthenticationException;
import javax.net.ssl.SSLSession;

import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.mledger.util.SafeRun;
import org.apache.commons.lang3.StringUtils;
Expand All @@ -59,6 +63,7 @@
import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
import org.apache.pulsar.broker.service.BrokerServiceException.TopicNotFoundException;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
import org.apache.pulsar.broker.web.RestException;
Expand Down Expand Up @@ -1396,22 +1401,83 @@ protected void handleGetLastMessageId(CommandGetLastMessageId getLastMessageId)
Topic topic = consumer.getSubscription().getTopic();
Position position = topic.getLastMessageId();
int partitionIndex = TopicName.getPartitionIndex(topic.getName());
if (log.isDebugEnabled()) {
log.debug("[{}] [{}][{}] Get LastMessageId {} partitionIndex {}", remoteAddress,
topic.getName(), consumer.getSubscription().getName(), position, partitionIndex);
}
MessageIdData messageId = MessageIdData.newBuilder()
.setLedgerId(((PositionImpl)position).getLedgerId())
.setEntryId(((PositionImpl)position).getEntryId())
.setPartition(partitionIndex)
.build();

ctx.writeAndFlush(Commands.newGetLastMessageIdResponse(requestId, messageId));
getLargestBatchIndexWhenPossible(
topic,
(PositionImpl) position,
partitionIndex,
requestId,
consumer.getSubscription().getName());

} else {
ctx.writeAndFlush(Commands.newError(getLastMessageId.getRequestId(), ServerError.MetadataError, "Consumer not found"));
}
}

private void getLargestBatchIndexWhenPossible(
Topic topic,
PositionImpl position,
int partitionIndex,
long requestId,
String subscriptionName) {

PersistentTopic persistentTopic = (PersistentTopic) topic;
ManagedLedgerImpl ml = (ManagedLedgerImpl) persistentTopic.getManagedLedger();

// If it's not pointing to a valid entry, respond messageId of the current position.
if (position.getEntryId() == -1) {
MessageIdData messageId = MessageIdData.newBuilder()
.setLedgerId(position.getLedgerId())
.setEntryId(position.getEntryId())
.setPartition(partitionIndex).build();

ctx.writeAndFlush(Commands.newGetLastMessageIdResponse(requestId, messageId));
}

// For a valid position, we read the entry out and parse the batch size from its metadata.
CompletableFuture<Entry> entryFuture = new CompletableFuture<>();
ml.asyncReadEntry(position, new AsyncCallbacks.ReadEntryCallback() {
@Override
public void readEntryComplete(Entry entry, Object ctx) {
entryFuture.complete(entry);
}

@Override
public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
entryFuture.completeExceptionally(exception);
}
}, null);

CompletableFuture<Integer> batchSizeFuture = entryFuture.thenApply(entry -> {
MessageMetadata metadata = Commands.parseMessageMetadata(entry.getDataBuffer());
int batchSize = metadata.getNumMessagesInBatch();
entry.release();
return batchSize;
});

batchSizeFuture.whenComplete((batchSize, e) -> {
if (e != null) {
ctx.writeAndFlush(Commands.newError(
requestId, ServerError.MetadataError, "Failed to get batch size for entry " + e.getMessage()));
} else {
int largestBatchIndex = batchSize > 1 ? batchSize - 1 : -1;

if (log.isDebugEnabled()) {
log.debug("[{}] [{}][{}] Get LastMessageId {} partitionIndex {}", remoteAddress,
topic.getName(), subscriptionName, position, partitionIndex);
}

MessageIdData messageId = MessageIdData.newBuilder()
.setLedgerId(position.getLedgerId())
.setEntryId(position.getEntryId())
.setPartition(partitionIndex)
.setBatchIndex(largestBatchIndex).build();

ctx.writeAndFlush(Commands.newGetLastMessageIdResponse(requestId, messageId));
}
});
}

@Override
protected void handleGetTopicsOfNamespace(CommandGetTopicsOfNamespace commandGetTopicsOfNamespace) {
final long requestId = commandGetTopicsOfNamespace.getRequestId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.RawMessage;
import org.apache.pulsar.client.api.RawReader;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.RawBatchConverter;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
Expand Down Expand Up @@ -95,7 +96,10 @@ private CompletableFuture<PhaseOneResult> phaseOne(RawReader reader) {
} else {
log.info("Commencing phase one of compaction for {}, reading to {}",
reader.getTopic(), lastMessageId);
phaseOneLoop(reader, Optional.empty(), Optional.empty(), lastMessageId, latestForKey,
// Each entry is processed as a whole, discard the batchIndex part deliberately.
MessageIdImpl lastImpl = (MessageIdImpl) lastMessageId;
MessageIdImpl lastEntryMessageId = new MessageIdImpl(lastImpl.getLedgerId(), lastImpl.getEntryId(), lastImpl.getPartitionIndex());
phaseOneLoop(reader, Optional.empty(), Optional.empty(), lastEntryMessageId, latestForKey,
loopPromise);
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,17 @@ public static Object[][] variationsForResetOnLatestMsg() {
};
}

@DataProvider
public static Object[][] variationsForHasMessageAvailable() {
return new Object[][] {
// batching / start-inclusive
{true, true},
{true, false},
{false, true},
{false, false},
};
}

@Test
public void testSimpleReader() throws Exception {
Reader<byte[]> reader = pulsarClient.newReader().topic("persistent://my-property/my-ns/testSimpleReader")
Expand Down Expand Up @@ -531,6 +542,68 @@ public void testMessageAvailableAfterRestart() throws Exception {

}

@Test(dataProvider = "variationsForHasMessageAvailable")
public void testHasMessageAvailable(boolean enableBatch, boolean startInclusive) throws Exception {
final String topicName = "persistent://my-property/my-ns/HasMessageAvailable";
final int numOfMessage = 100;

ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer()
.topic(topicName);

if (enableBatch) {
producerBuilder
.enableBatching(true)
.batchingMaxMessages(10);
} else {
producerBuilder
.enableBatching(false);
}

Producer<byte[]> producer = producerBuilder.create();

CountDownLatch latch = new CountDownLatch(numOfMessage);

List<MessageId> allIds = Collections.synchronizedList(new ArrayList<>());

for (int i = 0; i < numOfMessage; i++) {
producer.sendAsync(String.format("msg num %d", i).getBytes()).whenComplete((mid, e) -> {
if (e != null) {
Assert.fail();
} else {
allIds.add(mid);
}
latch.countDown();
});
}

latch.await();

allIds.sort(null); // make sure the largest mid appears at last.

for (MessageId id : allIds) {
Reader<byte[]> reader;

if (startInclusive) {
reader = pulsarClient.newReader().topic(topicName)
.startMessageId(id).startMessageIdInclusive().create();
} else {
reader = pulsarClient.newReader().topic(topicName)
.startMessageId(id).create();
}

if (startInclusive) {
assertTrue(reader.hasMessageAvailable());
} else if (id != allIds.get(allIds.size() - 1)) {
assertTrue(reader.hasMessageAvailable());
} else {
assertFalse(reader.hasMessageAvailable());
}
reader.close();
}

producer.close();
}

@Test
public void testReaderNonDurableIsAbleToSeekRelativeTime() throws Exception {
final int numOfMessage = 10;
Expand Down Expand Up @@ -794,7 +867,7 @@ public void testReaderStartInMiddleOfBatch() throws Exception {
.batchingMaxMessages(10)
.create();

CountDownLatch latch = new CountDownLatch(100);
CountDownLatch latch = new CountDownLatch(numOfMessage);

List<MessageId> allIds = Collections.synchronizedList(new ArrayList<>());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,9 @@ public void testReadMessageWithBatchingWithMessageInclusive() throws Exception {
while (reader.hasMessageAvailable()) {
Assert.assertTrue(keys.remove(reader.readNext().getKey()));
}
Assert.assertTrue(keys.isEmpty());
// start from latest with start message inclusive should only read the last message in batch
Assert.assertTrue(keys.size() == 9);
Assert.assertFalse(keys.contains("key9"));
Assert.assertFalse(reader.hasMessageAvailable());
}

Expand Down
Loading

0 comments on commit baf155f

Please sign in to comment.