diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index d29fff6f33510..27aeca71e886b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -2300,6 +2300,13 @@ protected void internalGetMessageById(AsyncResponse asyncResponse, long ledgerId } PersistentTopic topic = (PersistentTopic) getTopicReference(topicName); ManagedLedgerImpl ledger = (ManagedLedgerImpl) topic.getManagedLedger(); + if (null == ledger.getLedgerInfo(ledgerId).get()) { + log.error("[{}] Failed to get message with ledgerId {} entryId {} from {}, " + + "the ledgerId does not belong to this topic.", + clientAppId(), ledgerId, entryId, topicName); + asyncResponse.resume(new RestException(Status.NOT_FOUND, + "Message not found, the ledgerId does not belong to this topic")); + } ledger.asyncReadEntry(new PositionImpl(ledgerId, entryId), new AsyncCallbacks.ReadEntryCallback() { @Override public void readEntryFailed(ManagedLedgerException exception, Object ctx) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java index c089dfe74c02c..d6ad1ac8ae60e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java @@ -857,6 +857,37 @@ public void testSetReplicatedSubscriptionStatus() { Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode()); } + public void testGetMessageById() throws Exception { + TenantInfoImpl tenantInfo = new TenantInfoImpl(Sets.newHashSet("role1", "role2"), Sets.newHashSet("test")); + admin.tenants().createTenant("tenant-xyz", tenantInfo); + admin.namespaces().createNamespace("tenant-xyz/ns-abc", Sets.newHashSet("test")); + final String topicName1 = "persistent://tenant-xyz/ns-abc/testGetMessageById1"; + final String topicName2 = "persistent://tenant-xyz/ns-abc/testGetMessageById2"; + admin.topics().createNonPartitionedTopic(topicName1); + admin.topics().createNonPartitionedTopic(topicName2); + ProducerBase producer1 = (ProducerBase) pulsarClient.newProducer().topic(topicName1) + .enableBatching(false).create(); + String data1 = "test1"; + MessageIdImpl id1 = (MessageIdImpl) producer1.send(data1.getBytes()); + + ProducerBase producer2 = (ProducerBase) pulsarClient.newProducer().topic(topicName2) + .enableBatching(false).create(); + String data2 = "test2"; + MessageIdImpl id2 = (MessageIdImpl) producer2.send(data2.getBytes()); + + Message message1 = admin.topics().getMessageById(topicName1, id1.getLedgerId(), id1.getEntryId()); + Assert.assertEquals(message1.getData(), data1.getBytes()); + + Message message2 = admin.topics().getMessageById(topicName2, id2.getLedgerId(), id2.getEntryId()); + Assert.assertEquals(message2.getData(), data2.getBytes()); + + Message message3 = admin.topics().getMessageById(topicName2, id1.getLedgerId(), id1.getEntryId()); + Assert.assertNull(message3); + + Message message4 = admin.topics().getMessageById(topicName1, id2.getLedgerId(), id2.getEntryId()); + Assert.assertNull(message4); + } + @Test public void testGetMessageIdByTimestamp() throws Exception { TenantInfoImpl tenantInfo = new TenantInfoImpl(Sets.newHashSet("role1", "role2"), Sets.newHashSet("test"));