Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ClientAPI]Fix hasMessageAvailable() #6362

Merged
merged 12 commits into from
Mar 3, 2020
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,26 @@
import io.netty.channel.ChannelOption;
import io.netty.handler.ssl.SslHandler;

import java.io.IOException;
import java.net.SocketAddress;

import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import javax.naming.AuthenticationException;
import javax.net.ssl.SSLSession;

import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.mledger.util.SafeRun;
import org.apache.commons.lang3.StringUtils;
Expand All @@ -59,6 +66,7 @@
import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
import org.apache.pulsar.broker.service.BrokerServiceException.TopicNotFoundException;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
import org.apache.pulsar.broker.web.RestException;
Expand All @@ -68,6 +76,8 @@
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.api.AuthData;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandNewTxn;
import org.apache.pulsar.common.api.raw.MessageParser;
import org.apache.pulsar.common.api.raw.RawMessageImpl;
import org.apache.pulsar.common.protocol.CommandUtils;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.PulsarHandler;
Expand Down Expand Up @@ -1396,6 +1406,7 @@ protected void handleGetLastMessageId(CommandGetLastMessageId getLastMessageId)
Topic topic = consumer.getSubscription().getTopic();
Position position = topic.getLastMessageId();
int partitionIndex = TopicName.getPartitionIndex(topic.getName());
int largestBatchIndex = getLargestBatchIndex(topic, (PositionImpl) position, requestId);
if (log.isDebugEnabled()) {
log.debug("[{}] [{}][{}] Get LastMessageId {} partitionIndex {}", remoteAddress,
topic.getName(), consumer.getSubscription().getName(), position, partitionIndex);
Expand All @@ -1404,6 +1415,7 @@ protected void handleGetLastMessageId(CommandGetLastMessageId getLastMessageId)
.setLedgerId(((PositionImpl)position).getLedgerId())
.setEntryId(((PositionImpl)position).getEntryId())
.setPartition(partitionIndex)
.setBatchIndex(largestBatchIndex)
.build();

ctx.writeAndFlush(Commands.newGetLastMessageIdResponse(requestId, messageId));
Expand All @@ -1412,6 +1424,51 @@ protected void handleGetLastMessageId(CommandGetLastMessageId getLastMessageId)
}
}

private int getLargestBatchIndex(Topic topic, PositionImpl position, long requestId) {
PersistentTopic persistentTopic = (PersistentTopic) topic;
ManagedLedgerImpl ml = (ManagedLedgerImpl) persistentTopic.getManagedLedger();

CompletableFuture<Entry> entryFuture = new CompletableFuture<>();
ml.asyncReadEntry(position, new AsyncCallbacks.ReadEntryCallback() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally we should be avoiding this read by just recording the number of messages in the last batch, along with the current position.

@Override
public void readEntryComplete(Entry entry, Object ctx) {
entryFuture.complete(entry);
}

@Override
public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
entryFuture.completeExceptionally(exception);
}
}, null);

CompletableFuture<Integer> batchSizeFuture = entryFuture.thenApply(entry -> {
int[] sizeHolder = new int[1];
try {
MessageParser.parseMessage(TopicName.get(topic.getName()), entry.getLedgerId(), entry.getEntryId(),
entry.getDataBuffer(), (message) -> {
sizeHolder[0] = ((RawMessageImpl) message).getBatchSize();
message.release();
}, Commands.DEFAULT_MAX_MESSAGE_SIZE);
yjshen marked this conversation as resolved.
Show resolved Hide resolved
entry.release();
} catch (IOException e) {
throw new CompletionException(e);
}
return sizeHolder[0];
});

try {
int batchSize = batchSizeFuture.get();
yjshen marked this conversation as resolved.
Show resolved Hide resolved
if (batchSize > 1) {
return batchSize - 1;
} else {
return -1;
}
} catch (InterruptedException | ExecutionException e) {
ctx.writeAndFlush(Commands.newError(requestId, ServerError.MetadataError, "Failed to open entry for batch size check"));;
}
return -1;
}

@Override
protected void handleGetTopicsOfNamespace(CommandGetTopicsOfNamespace commandGetTopicsOfNamespace) {
final long requestId = commandGetTopicsOfNamespace.getRequestId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.nio.file.Paths;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.MessageImpl;
Expand Down Expand Up @@ -86,6 +87,17 @@ public static Object[][] variationsForResetOnLatestMsg() {
};
}

@DataProvider
public static Object[][] variationsForHasMessageAvailable() {
return new Object[][] {
// batching / start-inclusive
{true, true},
{true, false},
{false, true},
{false, false},
};
}

@Test
public void testSimpleReader() throws Exception {
Reader<byte[]> reader = pulsarClient.newReader().topic("persistent://my-property/my-ns/testSimpleReader")
Expand Down Expand Up @@ -526,6 +538,68 @@ public void testMessageAvailableAfterRestart() throws Exception {

}

@Test(dataProvider = "variationsForHasMessageAvailable")
public void testHasMessageAvailable(boolean enableBatch, boolean startInclusive) throws Exception {
final String topicName = "persistent://my-property/my-ns/HasMessageAvailable";
final int numOfMessage = 100;

ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer()
.topic(topicName);

if (enableBatch) {
producerBuilder
.enableBatching(true)
.batchingMaxMessages(10);
} else {
producerBuilder
.enableBatching(false);
}

Producer<byte[]> producer = producerBuilder.create();

CountDownLatch latch = new CountDownLatch(100);
yjshen marked this conversation as resolved.
Show resolved Hide resolved

List<MessageId> allIds = Collections.synchronizedList(new ArrayList<>());

for (int i = 0; i < numOfMessage; i++) {
producer.sendAsync(String.format("msg num %d", i).getBytes()).whenComplete((mid, e) -> {
if (e != null) {
Assert.fail();
} else {
allIds.add(mid);
}
latch.countDown();
});
}

latch.await();

allIds.sort(null); // make sure the largest mid appears at last.

for (MessageId id : allIds) {
Reader<byte[]> reader;

if (startInclusive) {
reader = pulsarClient.newReader().topic(topicName)
.startMessageId(id).startMessageIdInclusive().create();
} else {
reader = pulsarClient.newReader().topic(topicName)
.startMessageId(id).create();
}

if (startInclusive) {
assertTrue(reader.hasMessageAvailable());
} else if (id != allIds.get(allIds.size() - 1)) {
assertTrue(reader.hasMessageAvailable());
} else {
assertFalse(reader.hasMessageAvailable());
}
reader.close();
}

producer.close();
}

@Test
public void testReaderNonDurableIsAbleToSeekRelativeTime() throws Exception {
final int numOfMessage = 10;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1581,8 +1581,9 @@ private boolean hasMoreMessages(MessageId lastMessageIdInBroker, MessageId lastD
return true;
} else {
// Make sure batching message can be read completely.
return lastMessageIdInBroker.compareTo(lastDequeuedMessage) == 0
&& incomingMessages.size() > 0;
return resetIncludeHead ?
lastMessageIdInBroker.compareTo(lastDequeuedMessage) >= 0:
lastMessageIdInBroker.compareTo(lastDequeuedMessage) > 0;
}
}

Expand Down Expand Up @@ -1628,8 +1629,13 @@ private void internalGetLastMessageIdAsync(final Backoff backoff,
cnx.sendGetLastMessageId(getLastIdCmd, requestId).thenAccept((result) -> {
log.info("[{}][{}] Successfully getLastMessageId {}:{}",
topic, subscription, result.getLedgerId(), result.getEntryId());
future.complete(new MessageIdImpl(result.getLedgerId(),
result.getEntryId(), result.getPartition()));
if (result.getBatchIndex() < 0) {
future.complete(new MessageIdImpl(result.getLedgerId(),
result.getEntryId(), result.getPartition()));
} else {
future.complete(new BatchMessageIdImpl(result.getLedgerId(),
result.getEntryId(), result.getPartition(), result.getBatchIndex()));
}
}).exceptionally(e -> {
log.error("[{}][{}] Failed getLastMessageId command", topic, subscription);
future.completeExceptionally(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,4 +162,7 @@ public boolean hasBase64EncodedKey() {
return msgMetadata.get().getPartitionKeyB64Encoded();
}

public int getBatchSize() {
return msgMetadata.get().getNumMessagesInBatch();
}
}