Skip to content

Commit

Permalink
Seek to the first one >= timestamp (apache#6393)
Browse files Browse the repository at this point in the history
The current logic for `resetCursor` by timestamp is odd. The first message it returns is the last message earlier or equal to the designated timestamp. This "earlier" message should be avoided to emit.
  • Loading branch information
yjshen committed Feb 24, 2020
1 parent f862961 commit 81f8afd
Show file tree
Hide file tree
Showing 7 changed files with 27 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public void findMessages(final long timestamp, AsyncCallbacks.FindEntryCallback
MessageImpl msg = null;
try {
msg = MessageImpl.deserialize(entry.getDataBuffer());
return msg.getPublishTime() <= timestamp;
return msg.getPublishTime() < timestamp;
} catch (Exception e) {
log.error("[{}][{}] Error deserializing message for message position find", topicName, subName, e);
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -654,7 +654,7 @@ public void findEntryComplete(Position position, Object ctx) {
"[{}][{}] Unable to find position for timestamp {}. Resetting cursor to first position {} in ledger",
topicName, subName, timestamp, finalPosition);
} else {
finalPosition = position;
finalPosition = position.getNext();
}
resetCursor(finalPosition, future);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1595,14 +1595,14 @@ public void persistentTopicsCursorReset(String topicName) throws Exception {

int receivedAfterReset = 0;

for (int i = 4; i < 10; i++) {
for (int i = 5; i < 10; i++) {
Message<byte[]> message = consumer.receive();
consumer.acknowledge(message);
++receivedAfterReset;
String expected = "message-" + i;
assertEquals(message.getData(), expected.getBytes());
}
assertEquals(receivedAfterReset, 6);
assertEquals(receivedAfterReset, 5);

consumer.close();

Expand Down Expand Up @@ -1654,29 +1654,29 @@ public void persistentTopicsCursorResetAfterReset(String topicName) throws Excep

int receivedAfterReset = 0;

// Should received messages from 4-9
for (int i = 4; i < 10; i++) {
// Should received messages from 5-9
for (int i = 5; i < 10; i++) {
Message<byte[]> message = consumer.receive();
consumer.acknowledge(message);
++receivedAfterReset;
String expected = "message-" + i;
assertEquals(new String(message.getData()), expected);
}
assertEquals(receivedAfterReset, 6);
assertEquals(receivedAfterReset, 5);

// Reset at 2nd timestamp
receivedAfterReset = 0;
admin.topics().resetCursor(topicName, "my-sub", secondTimestamp);

// Should received messages from 7-9
for (int i = 7; i < 10; i++) {
// Should received messages from 8-9
for (int i = 8; i < 10; i++) {
Message<byte[]> message = consumer.receive();
consumer.acknowledge(message);
++receivedAfterReset;
String expected = "message-" + i;
assertEquals(new String(message.getData()), expected);
}
assertEquals(receivedAfterReset, 3);
assertEquals(receivedAfterReset, 2);

consumer.close();
admin.topics().deleteSubscription(topicName, "my-sub");
Expand Down Expand Up @@ -1724,14 +1724,14 @@ public void persistentTopicsCursorResetAndFailover() throws Exception {
.acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();

int receivedAfterReset = 0;
for (int i = 4; i < 10; i++) {
for (int i = 5; i < 10; i++) {
Message<byte[]> message = consumerA.receive(5, TimeUnit.SECONDS);
consumerA.acknowledge(message);
++receivedAfterReset;
String expected = "message-" + i;
assertEquals(message.getData(), expected.getBytes());
}
assertEquals(receivedAfterReset, 6);
assertEquals(receivedAfterReset, 5);

// Closing consumerA activates consumerB
consumerA.close();
Expand Down Expand Up @@ -1787,7 +1787,7 @@ public void partitionedTopicsCursorReset(String topicName) throws Exception {

Set<String> expectedMessages = Sets.newHashSet();
Set<String> receivedMessages = Sets.newHashSet();
for (int i = 4; i < 10; i++) {
for (int i = 5; i < 10; i++) {
Message<byte[]> message = consumer.receive();
consumer.acknowledge(message);
expectedMessages.add("message-" + i);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1490,14 +1490,14 @@ public void persistentTopicsCursorReset(String topicName) throws Exception {

int receivedAfterReset = 0;

for (int i = 4; i < 10; i++) {
for (int i = 5; i < 10; i++) {
Message<byte[]> message = consumer.receive();
consumer.acknowledge(message);
++receivedAfterReset;
String expected = "message-" + i;
assertEquals(message.getData(), expected.getBytes());
}
assertEquals(receivedAfterReset, 6);
assertEquals(receivedAfterReset, 5);

consumer.close();

Expand Down Expand Up @@ -1549,29 +1549,29 @@ public void persistentTopicsCursorResetAfterReset(String topicName) throws Excep

int receivedAfterReset = 0;

// Should received messages from 4-9
for (int i = 4; i < 10; i++) {
// Should received messages from 5-9
for (int i = 5; i < 10; i++) {
Message<byte[]> message = consumer.receive();
consumer.acknowledge(message);
++receivedAfterReset;
String expected = "message-" + i;
assertEquals(new String(message.getData()), expected);
}
assertEquals(receivedAfterReset, 6);
assertEquals(receivedAfterReset, 5);

// Reset at 2nd timestamp
receivedAfterReset = 0;
admin.topics().resetCursor(topicName, "my-sub", secondTimestamp);

// Should received messages from 7-9
for (int i = 7; i < 10; i++) {
// Should received messages from 8-9
for (int i = 8; i < 10; i++) {
Message<byte[]> message = consumer.receive();
consumer.acknowledge(message);
++receivedAfterReset;
String expected = "message-" + i;
assertEquals(new String(message.getData()), expected);
}
assertEquals(receivedAfterReset, 3);
assertEquals(receivedAfterReset, 2);

consumer.close();
admin.topics().deleteSubscription(topicName, "my-sub");
Expand Down Expand Up @@ -1614,7 +1614,7 @@ public void partitionedTopicsCursorReset(String topicName) throws Exception {

Set<String> expectedMessages = Sets.newHashSet();
Set<String> receivedMessages = Sets.newHashSet();
for (int i = 4; i < 10; i++) {
for (int i = 5; i < 10; i++) {
Message<byte[]> message = consumer.receive();
consumer.acknowledge(message);
expectedMessages.add("message-" + i);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ public void testSeekTime() throws Exception {

long currentTimestamp = System.currentTimeMillis();
consumer.seek(currentTimestamp);
assertEquals(sub.getNumberOfEntriesInBacklog(false), 1);
assertEquals(sub.getNumberOfEntriesInBacklog(false), 0);

// Wait for consumer to reconnect
Thread.sleep(1000);
Expand Down Expand Up @@ -187,7 +187,7 @@ public void testSeekTimeOnPartitionedTopic() throws Exception {
for (PersistentSubscription sub : subs) {
backlogs += sub.getNumberOfEntriesInBacklog(false);
}
assertEquals(backlogs, 2);
assertEquals(backlogs, 0);

// Wait for consumer to reconnect
Thread.sleep(1000);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -676,7 +676,7 @@ public void testReaderIsAbleToSeekWithTimeOnMiddleOfTopic() throws Exception {
reader.seek(l + plusTime);

Set<String> messageSet = Sets.newHashSet();
for (int i = halfMessages; i < numOfMessage; i++) {
for (int i = halfMessages + 1; i < numOfMessage; i++) {
Message<byte[]> message = reader.readNext();
String receivedMessage = new String(message.getData());
String expectedMessage = String.format("msg num %d", i);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,8 +256,8 @@ public void testReaderWithTimeLong() throws Exception {
receivedMessageIds.add(msg.getMessageId());
}

assertEquals(receivedMessageIds.size(), totalMsg + 1);
assertEquals(receivedMessageIds.get(0), lastMsgId);
assertEquals(receivedMessageIds.size(), totalMsg);
assertEquals(receivedMessageIds.get(0), firstMsgId);

restartBroker();

Expand Down

0 comments on commit 81f8afd

Please sign in to comment.