diff --git a/conf/broker.conf b/conf/broker.conf index 55838f43f849d..cf01b2d2fddc4 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -790,6 +790,11 @@ exposePublisherStats=true statsUpdateFrequencyInSecs=60 statsUpdateInitialDelayInSecs=60 +# Enable expose the precise backlog stats. +# Set false to use published counter and consumed counter to calculate, this would be more efficient but may be inaccurate. +# Default is false. +exposePreciseBacklogInPrometheus=false + ### --- Schema storage --- ### # The schema storage implementation used by this broker schemaRegistryStorageClassName=org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorageFactory diff --git a/conf/standalone.conf b/conf/standalone.conf index 3f31b422c5ebd..7dff0c3c913ab 100644 --- a/conf/standalone.conf +++ b/conf/standalone.conf @@ -531,6 +531,11 @@ exposeTopicLevelMetricsInPrometheus=true # Enable topic level metrics exposePublisherStats=true +# Enable expose the precise backlog stats. +# Set false to use published counter and consumed counter to calculate, this would be more efficient but may be inaccurate. +# Default is false. +exposePreciseBacklogInPrometheus=false + ### --- Deprecated config variables --- ### # Deprecated. Use configurationStoreServers diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java index ae0426935637c..6f12fb2cb20dc 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java @@ -193,9 +193,10 @@ void asyncGetNthEntry(int n, IndividualDeletedEntries deletedEntries, ReadEntryC * *

This method has linear time complexity on the number of ledgers included in the managed ledger. * + * @param isPrecise set to true to get precise backlog count * @return the number of entries */ - long getNumberOfEntriesInBacklog(); + long getNumberOfEntriesInBacklog(boolean isPrecise); /** * This signals that the reader is done with all the entries up to "position" (included). This can potentially diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index c75c45a64a579..9498145de2823 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -734,12 +734,16 @@ public long getEstimatedSizeSinceMarkDeletePosition() { } @Override - public long getNumberOfEntriesInBacklog() { + public long getNumberOfEntriesInBacklog(boolean isPrecise) { if (log.isDebugEnabled()) { log.debug("[{}] Consumer {} cursor ml-entries: {} -- deleted-counter: {} other counters: mdPos {} rdPos {}", ledger.getName(), name, ManagedLedgerImpl.ENTRIES_ADDED_COUNTER_UPDATER.get(ledger), messagesConsumedCounter, markDeletePosition, readPosition); } + if (isPrecise) { + return getNumberOfEntries(Range.closed(markDeletePosition, ledger.getLastPosition())) - 1; + } + long backlog = ManagedLedgerImpl.ENTRIES_ADDED_COUNTER_UPDATER.get(ledger) - messagesConsumedCounter; if (backlog < 0) { // In some case the counters get incorrect values, fall back to the precise backlog count diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 982e588286b53..ecf99a426dbd4 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -24,6 +24,7 @@ import static org.apache.bookkeeper.mledger.util.Errors.isNoSuchLedgerExistsException; import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.BoundType; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; @@ -142,6 +143,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { private final ManagedCursorContainer activeCursors = new ManagedCursorContainer(); // Ever increasing counter of entries added + @VisibleForTesting static final AtomicLongFieldUpdater ENTRIES_ADDED_COUNTER_UPDATER = AtomicLongFieldUpdater .newUpdater(ManagedLedgerImpl.class, "entriesAddedCounter"); @SuppressWarnings("unused") @@ -3176,6 +3178,11 @@ public long getOffloadedSize() { return offloadedSize; } + @VisibleForTesting + public void setEntriesAddedCounter(long count) { + ENTRIES_ADDED_COUNTER_UPDATER.set(this, count); + } + private static final Logger log = LoggerFactory.getLogger(ManagedLedgerImpl.class); } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanImpl.java index 38c67f49f8085..ec1dc74a290bd 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanImpl.java @@ -263,7 +263,7 @@ public long getNumberOfMessagesInBacklog() { long count = 0; for (ManagedCursor cursor : managedLedger.getCursors()) { - count += cursor.getNumberOfEntriesInBacklog(); + count += cursor.getNumberOfEntriesInBacklog(false); } return count; diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java index 21f5747919b2f..99e7aec740bed 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java @@ -92,7 +92,7 @@ public long getNumberOfEntries() { } @Override - public long getNumberOfEntriesInBacklog() { + public long getNumberOfEntriesInBacklog(boolean isPrecise) { return 0; } diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorListAckTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorListAckTest.java index aaf7cd1c6f282..dc876e19eb1ab 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorListAckTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorListAckTest.java @@ -51,24 +51,24 @@ void testMultiPositionDelete() throws Exception { Position p7 = ledger.addEntry("dummy-entry-7".getBytes(Encoding)); assertEquals(c1.getNumberOfEntries(), 7); - assertEquals(c1.getNumberOfEntriesInBacklog(), 7); + assertEquals(c1.getNumberOfEntriesInBacklog(false), 7); c1.delete(Lists.newArrayList(p2, p3, p5, p7)); assertEquals(c1.getNumberOfEntries(), 3); - assertEquals(c1.getNumberOfEntriesInBacklog(), 3); + assertEquals(c1.getNumberOfEntriesInBacklog(false), 3); assertEquals(c1.getMarkDeletedPosition(), p0); c1.delete(Lists.newArrayList(p1)); assertEquals(c1.getNumberOfEntries(), 2); - assertEquals(c1.getNumberOfEntriesInBacklog(), 2); + assertEquals(c1.getNumberOfEntriesInBacklog(false), 2); assertEquals(c1.getMarkDeletedPosition(), p3); c1.delete(Lists.newArrayList(p4, p6, p7)); assertEquals(c1.getNumberOfEntries(), 0); - assertEquals(c1.getNumberOfEntriesInBacklog(), 0); + assertEquals(c1.getNumberOfEntriesInBacklog(false), 0); assertEquals(c1.getMarkDeletedPosition(), p7); } diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java index 94faa9931d391..75d0280ca52b7 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java @@ -265,31 +265,31 @@ void testNumberOfEntriesInBacklog() throws Exception { Position p4 = ledger.addEntry("dummy-entry-4".getBytes(Encoding)); ManagedCursor c5 = ledger.openCursor("c5"); - assertEquals(c1.getNumberOfEntriesInBacklog(), 4); - assertEquals(c2.getNumberOfEntriesInBacklog(), 3); - assertEquals(c3.getNumberOfEntriesInBacklog(), 2); - assertEquals(c4.getNumberOfEntriesInBacklog(), 1); - assertEquals(c5.getNumberOfEntriesInBacklog(), 0); + assertEquals(c1.getNumberOfEntriesInBacklog(false), 4); + assertEquals(c2.getNumberOfEntriesInBacklog(false), 3); + assertEquals(c3.getNumberOfEntriesInBacklog(false), 2); + assertEquals(c4.getNumberOfEntriesInBacklog(false), 1); + assertEquals(c5.getNumberOfEntriesInBacklog(false), 0); List entries = c1.readEntries(2); assertEquals(entries.size(), 2); entries.forEach(e -> e.release()); assertEquals(c1.getNumberOfEntries(), 2); - assertEquals(c1.getNumberOfEntriesInBacklog(), 4); + assertEquals(c1.getNumberOfEntriesInBacklog(false), 4); c1.markDelete(p1); assertEquals(c1.getNumberOfEntries(), 2); - assertEquals(c1.getNumberOfEntriesInBacklog(), 3); + assertEquals(c1.getNumberOfEntriesInBacklog(false), 3); c1.delete(p3); assertEquals(c1.getNumberOfEntries(), 1); - assertEquals(c1.getNumberOfEntriesInBacklog(), 2); + assertEquals(c1.getNumberOfEntriesInBacklog(false), 2); c1.markDelete(p4); assertEquals(c1.getNumberOfEntries(), 0); - assertEquals(c1.getNumberOfEntriesInBacklog(), 0); + assertEquals(c1.getNumberOfEntriesInBacklog(false), 0); } @Test(timeOut = 20000) @@ -315,11 +315,11 @@ void testNumberOfEntriesInBacklogWithFallback() throws Exception { field.setLong(c4, counter); field.setLong(c5, counter); - assertEquals(c1.getNumberOfEntriesInBacklog(), 4); - assertEquals(c2.getNumberOfEntriesInBacklog(), 3); - assertEquals(c3.getNumberOfEntriesInBacklog(), 2); - assertEquals(c4.getNumberOfEntriesInBacklog(), 1); - assertEquals(c5.getNumberOfEntriesInBacklog(), 0); + assertEquals(c1.getNumberOfEntriesInBacklog(false), 4); + assertEquals(c2.getNumberOfEntriesInBacklog(false), 3); + assertEquals(c3.getNumberOfEntriesInBacklog(false), 2); + assertEquals(c4.getNumberOfEntriesInBacklog(false), 1); + assertEquals(c5.getNumberOfEntriesInBacklog(false), 0); } @Test(timeOut = 20000) @@ -811,34 +811,34 @@ void rewind() throws Exception { log.debug("p4: {}", p4); assertEquals(c1.getNumberOfEntries(), 4); - assertEquals(c1.getNumberOfEntriesInBacklog(), 4); + assertEquals(c1.getNumberOfEntriesInBacklog(false), 4); c1.markDelete(p1); assertEquals(c1.getNumberOfEntries(), 3); - assertEquals(c1.getNumberOfEntriesInBacklog(), 3); + assertEquals(c1.getNumberOfEntriesInBacklog(false), 3); List entries = c1.readEntries(10); assertEquals(entries.size(), 3); entries.forEach(e -> e.release()); assertEquals(c1.getNumberOfEntries(), 0); - assertEquals(c1.getNumberOfEntriesInBacklog(), 3); + assertEquals(c1.getNumberOfEntriesInBacklog(false), 3); c1.rewind(); assertEquals(c1.getNumberOfEntries(), 3); - assertEquals(c1.getNumberOfEntriesInBacklog(), 3); + assertEquals(c1.getNumberOfEntriesInBacklog(false), 3); c1.markDelete(p2); assertEquals(c1.getNumberOfEntries(), 2); - assertEquals(c1.getNumberOfEntriesInBacklog(), 2); + assertEquals(c1.getNumberOfEntriesInBacklog(false), 2); entries = c1.readEntries(10); assertEquals(entries.size(), 2); entries.forEach(e -> e.release()); assertEquals(c1.getNumberOfEntries(), 0); - assertEquals(c1.getNumberOfEntriesInBacklog(), 2); + assertEquals(c1.getNumberOfEntriesInBacklog(false), 2); c1.rewind(); assertEquals(c1.getNumberOfEntries(), 2); c1.markDelete(p4); assertEquals(c1.getNumberOfEntries(), 0); - assertEquals(c1.getNumberOfEntriesInBacklog(), 0); + assertEquals(c1.getNumberOfEntriesInBacklog(false), 0); c1.rewind(); assertEquals(c1.getNumberOfEntries(), 0); ledger.addEntry("dummy-entry-5".getBytes(Encoding)); @@ -1324,26 +1324,26 @@ void testFilteringReadEntries() throws Exception { /* Position p6 = */ledger.addEntry("entry6".getBytes()); assertEquals(cursor.getNumberOfEntries(), 6); - assertEquals(cursor.getNumberOfEntriesInBacklog(), 6); + assertEquals(cursor.getNumberOfEntriesInBacklog(false), 6); List entries = cursor.readEntries(3); assertEquals(entries.size(), 3); entries.forEach(e -> e.release()); assertEquals(cursor.getNumberOfEntries(), 3); - assertEquals(cursor.getNumberOfEntriesInBacklog(), 6); + assertEquals(cursor.getNumberOfEntriesInBacklog(false), 6); log.info("Deleting {}", p5); cursor.delete(p5); assertEquals(cursor.getNumberOfEntries(), 2); - assertEquals(cursor.getNumberOfEntriesInBacklog(), 5); + assertEquals(cursor.getNumberOfEntriesInBacklog(false), 5); entries = cursor.readEntries(3); assertEquals(entries.size(), 2); entries.forEach(e -> e.release()); assertEquals(cursor.getNumberOfEntries(), 0); - assertEquals(cursor.getNumberOfEntriesInBacklog(), 5); + assertEquals(cursor.getNumberOfEntriesInBacklog(false), 5); } @Test(timeOut = 20000) @@ -1384,21 +1384,21 @@ void testCountingWithDeletedEntries() throws Exception { Position p8 = ledger.addEntry("entry8".getBytes()); assertEquals(cursor.getNumberOfEntries(), 8); - assertEquals(cursor.getNumberOfEntriesInBacklog(), 8); + assertEquals(cursor.getNumberOfEntriesInBacklog(false), 8); cursor.delete(p8); assertEquals(cursor.getNumberOfEntries(), 7); - assertEquals(cursor.getNumberOfEntriesInBacklog(), 7); + assertEquals(cursor.getNumberOfEntriesInBacklog(false), 7); cursor.delete(p1); assertEquals(cursor.getNumberOfEntries(), 6); - assertEquals(cursor.getNumberOfEntriesInBacklog(), 6); + assertEquals(cursor.getNumberOfEntriesInBacklog(false), 6); cursor.delete(p7); cursor.delete(p6); cursor.delete(p5); assertEquals(cursor.getNumberOfEntries(), 3); - assertEquals(cursor.getNumberOfEntriesInBacklog(), 3); + assertEquals(cursor.getNumberOfEntriesInBacklog(false), 3); } @Test(timeOut = 20000, dataProvider = "useOpenRangeSet") @@ -1504,22 +1504,22 @@ void testClearBacklog(boolean useOpenRangeSet) throws Exception { ManagedCursor c3 = ledger.openCursor("c3"); ledger.addEntry("dummy-entry-3".getBytes(Encoding)); - assertEquals(c1.getNumberOfEntriesInBacklog(), 3); + assertEquals(c1.getNumberOfEntriesInBacklog(false), 3); assertEquals(c1.getNumberOfEntries(), 3); assertTrue(c1.hasMoreEntries()); c1.clearBacklog(); c3.clearBacklog(); - assertEquals(c1.getNumberOfEntriesInBacklog(), 0); + assertEquals(c1.getNumberOfEntriesInBacklog(false), 0); assertEquals(c1.getNumberOfEntries(), 0); assertFalse(c1.hasMoreEntries()); - assertEquals(c2.getNumberOfEntriesInBacklog(), 2); + assertEquals(c2.getNumberOfEntriesInBacklog(false), 2); assertEquals(c2.getNumberOfEntries(), 2); assertTrue(c2.hasMoreEntries()); - assertEquals(c3.getNumberOfEntriesInBacklog(), 0); + assertEquals(c3.getNumberOfEntriesInBacklog(false), 0); assertEquals(c3.getNumberOfEntries(), 0); assertFalse(c3.hasMoreEntries()); @@ -1530,15 +1530,15 @@ void testClearBacklog(boolean useOpenRangeSet) throws Exception { c2 = ledger.openCursor("c2"); c3 = ledger.openCursor("c3"); - assertEquals(c1.getNumberOfEntriesInBacklog(), 0); + assertEquals(c1.getNumberOfEntriesInBacklog(false), 0); assertEquals(c1.getNumberOfEntries(), 0); assertFalse(c1.hasMoreEntries()); - assertEquals(c2.getNumberOfEntriesInBacklog(), 2); + assertEquals(c2.getNumberOfEntriesInBacklog(false), 2); assertEquals(c2.getNumberOfEntries(), 2); assertTrue(c2.hasMoreEntries()); - assertEquals(c3.getNumberOfEntriesInBacklog(), 0); + assertEquals(c3.getNumberOfEntriesInBacklog(false), 0); assertEquals(c3.getNumberOfEntries(), 0); assertFalse(c3.hasMoreEntries()); factory2.shutdown(); @@ -1555,12 +1555,12 @@ void testRateLimitMarkDelete(boolean useOpenRangeSet) throws Exception { Position p2 = ledger.addEntry("dummy-entry-2".getBytes(Encoding)); Position p3 = ledger.addEntry("dummy-entry-3".getBytes(Encoding)); - assertEquals(c1.getNumberOfEntriesInBacklog(), 3); + assertEquals(c1.getNumberOfEntriesInBacklog(false), 3); c1.markDelete(p1); c1.markDelete(p2); c1.markDelete(p3); - assertEquals(c1.getNumberOfEntriesInBacklog(), 0); + assertEquals(c1.getNumberOfEntriesInBacklog(false), 0); // Re-open to recover from storage ManagedLedgerFactory factory2 = new ManagedLedgerFactoryImpl(bkc, bkc.getZkHandle()); @@ -1569,7 +1569,7 @@ void testRateLimitMarkDelete(boolean useOpenRangeSet) throws Exception { c1 = ledger.openCursor("c1"); // Only the 1st mark-delete was persisted - assertEquals(c1.getNumberOfEntriesInBacklog(), 2); + assertEquals(c1.getNumberOfEntriesInBacklog(false), 2); factory2.shutdown(); } @@ -1585,57 +1585,57 @@ void deleteSingleMessageTwice(boolean useOpenRangeSet) throws Exception { Position p3 = ledger.addEntry("entry-3".getBytes(Encoding)); Position p4 = ledger.addEntry("entry-4".getBytes(Encoding)); - assertEquals(c1.getNumberOfEntriesInBacklog(), 4); + assertEquals(c1.getNumberOfEntriesInBacklog(false), 4); assertEquals(c1.getNumberOfEntries(), 4); c1.delete(p1); - assertEquals(c1.getNumberOfEntriesInBacklog(), 3); + assertEquals(c1.getNumberOfEntriesInBacklog(false), 3); assertEquals(c1.getNumberOfEntries(), 3); assertEquals(c1.getMarkDeletedPosition(), p1); assertEquals(c1.getReadPosition(), p2); // Should have not effect since p1 is already deleted c1.delete(p1); - assertEquals(c1.getNumberOfEntriesInBacklog(), 3); + assertEquals(c1.getNumberOfEntriesInBacklog(false), 3); assertEquals(c1.getNumberOfEntries(), 3); assertEquals(c1.getMarkDeletedPosition(), p1); assertEquals(c1.getReadPosition(), p2); c1.delete(p2); - assertEquals(c1.getNumberOfEntriesInBacklog(), 2); + assertEquals(c1.getNumberOfEntriesInBacklog(false), 2); assertEquals(c1.getNumberOfEntries(), 2); assertEquals(c1.getMarkDeletedPosition(), p2); assertEquals(c1.getReadPosition(), p3); // Should have not effect since p2 is already deleted c1.delete(p2); - assertEquals(c1.getNumberOfEntriesInBacklog(), 2); + assertEquals(c1.getNumberOfEntriesInBacklog(false), 2); assertEquals(c1.getNumberOfEntries(), 2); assertEquals(c1.getMarkDeletedPosition(), p2); assertEquals(c1.getReadPosition(), p3); c1.delete(p3); - assertEquals(c1.getNumberOfEntriesInBacklog(), 1); + assertEquals(c1.getNumberOfEntriesInBacklog(false), 1); assertEquals(c1.getNumberOfEntries(), 1); assertEquals(c1.getMarkDeletedPosition(), p3); assertEquals(c1.getReadPosition(), p4); // Should have not effect since p3 is already deleted c1.delete(p3); - assertEquals(c1.getNumberOfEntriesInBacklog(), 1); + assertEquals(c1.getNumberOfEntriesInBacklog(false), 1); assertEquals(c1.getNumberOfEntries(), 1); assertEquals(c1.getMarkDeletedPosition(), p3); assertEquals(c1.getReadPosition(), p4); c1.delete(p4); - assertEquals(c1.getNumberOfEntriesInBacklog(), 0); + assertEquals(c1.getNumberOfEntriesInBacklog(false), 0); assertEquals(c1.getNumberOfEntries(), 0); assertEquals(c1.getMarkDeletedPosition(), p4); assertEquals(c1.getReadPosition(), p4.getNext()); // Should have not effect since p4 is already deleted c1.delete(p4); - assertEquals(c1.getNumberOfEntriesInBacklog(), 0); + assertEquals(c1.getNumberOfEntriesInBacklog(false), 0); assertEquals(c1.getNumberOfEntries(), 0); assertEquals(c1.getMarkDeletedPosition(), p4); assertEquals(c1.getReadPosition(), p4.getNext()); @@ -2367,19 +2367,19 @@ void outOfOrderAcks() throws Exception { positions.add(ledger.addEntry("entry".getBytes())); } - assertEquals(c1.getNumberOfEntriesInBacklog(), N); + assertEquals(c1.getNumberOfEntriesInBacklog(false), N); c1.delete(positions.get(3)); - assertEquals(c1.getNumberOfEntriesInBacklog(), N - 1); + assertEquals(c1.getNumberOfEntriesInBacklog(false), N - 1); c1.delete(positions.get(2)); - assertEquals(c1.getNumberOfEntriesInBacklog(), N - 2); + assertEquals(c1.getNumberOfEntriesInBacklog(false), N - 2); c1.delete(positions.get(1)); - assertEquals(c1.getNumberOfEntriesInBacklog(), N - 3); + assertEquals(c1.getNumberOfEntriesInBacklog(false), N - 3); c1.delete(positions.get(0)); - assertEquals(c1.getNumberOfEntriesInBacklog(), N - 4); + assertEquals(c1.getNumberOfEntriesInBacklog(false), N - 4); } @Test(timeOut = 20000) @@ -2394,17 +2394,17 @@ void randomOrderAcks() throws Exception { positions.add(ledger.addEntry("entry".getBytes())); } - assertEquals(c1.getNumberOfEntriesInBacklog(), N); + assertEquals(c1.getNumberOfEntriesInBacklog(false), N); // Randomize the ack sequence Collections.shuffle(positions); int toDelete = N; for (Position p : positions) { - assertEquals(c1.getNumberOfEntriesInBacklog(), toDelete); + assertEquals(c1.getNumberOfEntriesInBacklog(false), toDelete); c1.delete(p); --toDelete; - assertEquals(c1.getNumberOfEntriesInBacklog(), toDelete); + assertEquals(c1.getNumberOfEntriesInBacklog(false), toDelete); } } @@ -2572,7 +2572,7 @@ public void testOutOfOrderDeletePersistenceWithClose() throws Exception { c1.delete(addedPositions.get(8)); c1.delete(addedPositions.get(9)); - assertEquals(c1.getNumberOfEntriesInBacklog(), 20 - 5); + assertEquals(c1.getNumberOfEntriesInBacklog(false), 20 - 5); ledger.close(); factory.shutdown(); @@ -2581,7 +2581,7 @@ public void testOutOfOrderDeletePersistenceWithClose() throws Exception { factory = new ManagedLedgerFactoryImpl(bkc, bkc.getZkHandle()); ledger = factory.open("my_test_ledger", new ManagedLedgerConfig()); c1 = ledger.openCursor("c1"); - assertEquals(c1.getNumberOfEntriesInBacklog(), 20 - 5); + assertEquals(c1.getNumberOfEntriesInBacklog(false), 20 - 5); List entries = c1.readEntries(20); assertEquals(entries.size(), 20 - 5); @@ -2617,13 +2617,13 @@ public void testOutOfOrderDeletePersistenceAfterCrash() throws Exception { c1.delete(addedPositions.get(8)); c1.delete(addedPositions.get(9)); - assertEquals(c1.getNumberOfEntriesInBacklog(), 20 - 5); + assertEquals(c1.getNumberOfEntriesInBacklog(false), 20 - 5); // Re-Open ManagedLedgerFactory factory2 = new ManagedLedgerFactoryImpl(bkc, bkc.getZkHandle()); ledger = factory2.open("my_test_ledger", new ManagedLedgerConfig()); c1 = ledger.openCursor("c1"); - assertEquals(c1.getNumberOfEntriesInBacklog(), 20 - 5); + assertEquals(c1.getNumberOfEntriesInBacklog(false), 20 - 5); List entries = c1.readEntries(20); assertEquals(entries.size(), 20 - 5); @@ -2736,7 +2736,7 @@ public void testOutOfOrderDeletePersistenceIntoLedgerWithClose() throws Exceptio } } - assertEquals(c1.getNumberOfEntriesInBacklog(), totalAddEntries / 2); + assertEquals(c1.getNumberOfEntriesInBacklog(false), totalAddEntries / 2); // Close ledger to persist individual-deleted positions into cursor-ledger ledger.close(); @@ -2788,7 +2788,7 @@ public void operationFailed(MetaStoreException e) { ledger = (ManagedLedgerImpl) factory.open(ledgerName, managedLedgerConfig); c1 = (ManagedCursorImpl) ledger.openCursor("c1"); // verify cursor has been recovered - assertEquals(c1.getNumberOfEntriesInBacklog(), totalAddEntries / 2); + assertEquals(c1.getNumberOfEntriesInBacklog(false), totalAddEntries / 2); // try to read entries which should only read non-deleted positions List entries = c1.readEntries(totalAddEntries); @@ -2820,7 +2820,7 @@ public void testOutOfOrderDeletePersistenceIntoZkWithClose() throws Exception { } } - assertEquals(c1.getNumberOfEntriesInBacklog(), totalAddEntries / 2); + assertEquals(c1.getNumberOfEntriesInBacklog(false), totalAddEntries / 2); // Close ledger to persist individual-deleted positions into cursor-ledger ledger.close(); @@ -2848,7 +2848,7 @@ public void operationFailed(MetaStoreException e) { ledger = (ManagedLedgerImpl) factory.open(ledgerName, managedLedgerConfig); c1 = (ManagedCursorImpl) ledger.openCursor(cursorName); // verify cursor has been recovered - assertEquals(c1.getNumberOfEntriesInBacklog(), totalAddEntries / 2); + assertEquals(c1.getNumberOfEntriesInBacklog(false), totalAddEntries / 2); // try to read entries which should only read non-deleted positions List entries = c1.readEntries(totalAddEntries); @@ -3019,7 +3019,7 @@ public void deleteMessagesCheckhMarkDelete() throws Exception { totalDeletedMessages += 1; } } - assertEquals(c1.getNumberOfEntriesInBacklog(), totalEntries - totalDeletedMessages); + assertEquals(c1.getNumberOfEntriesInBacklog(false), totalEntries - totalDeletedMessages); assertEquals(c1.getNumberOfEntries(), totalEntries - totalDeletedMessages); assertEquals(c1.getMarkDeletedPosition(), positions[0]); assertEquals(c1.getReadPosition(), positions[1]); @@ -3033,7 +3033,7 @@ public void deleteMessagesCheckhMarkDelete() throws Exception { } } int markDelete = totalEntries / 2 - 1; - assertEquals(c1.getNumberOfEntriesInBacklog(), totalEntries - totalDeletedMessages); + assertEquals(c1.getNumberOfEntriesInBacklog(false), totalEntries - totalDeletedMessages); assertEquals(c1.getNumberOfEntries(), totalEntries - totalDeletedMessages); assertEquals(c1.getMarkDeletedPosition(), positions[markDelete]); assertEquals(c1.getReadPosition(), positions[markDelete + 1]); diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java index ea69c63f2c21c..5a9e8d4bafc7b 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java @@ -124,7 +124,7 @@ public void testBookieFailure() throws Exception { // Next add should succeed ledger.addEntry("entry-2".getBytes()); - assertEquals(3, cursor.getNumberOfEntriesInBacklog()); + assertEquals(3, cursor.getNumberOfEntriesInBacklog(false)); List entries = cursor.readEntries(1); assertEquals(1, entries.size()); @@ -357,13 +357,13 @@ public void ledgerFencedByAutoReplication() throws Exception { ledger.addEntry("entry-2".getBytes()); assertEquals(2, c1.getNumberOfEntries()); - assertEquals(2, c1.getNumberOfEntriesInBacklog()); + assertEquals(2, c1.getNumberOfEntriesInBacklog(false)); PositionImpl p3 = (PositionImpl) ledger.addEntry("entry-3".getBytes()); // Now entry-2 should have been written before entry-3 assertEquals(3, c1.getNumberOfEntries()); - assertEquals(3, c1.getNumberOfEntriesInBacklog()); + assertEquals(3, c1.getNumberOfEntriesInBacklog(false)); assertTrue(p1.getLedgerId() != p3.getLedgerId()); factory.shutdown(); } @@ -402,7 +402,7 @@ public void ledgerFencedByFailover() throws Exception { // Ok } - assertEquals(2, c2.getNumberOfEntriesInBacklog()); + assertEquals(2, c2.getNumberOfEntriesInBacklog(false)); factory1.shutdown(); factory2.shutdown(); } @@ -459,12 +459,12 @@ void testResetCursorAfterRecovery() throws Exception { assertEquals(cursor.getMarkDeletedPosition(), p3); assertEquals(cursor.getReadPosition(), p4); - assertEquals(cursor.getNumberOfEntriesInBacklog(), 1); + assertEquals(cursor.getNumberOfEntriesInBacklog(false), 1); cursor.resetCursor(p2); assertEquals(cursor.getMarkDeletedPosition(), p1); assertEquals(cursor.getReadPosition(), p2); - assertEquals(cursor.getNumberOfEntriesInBacklog(), 3); + assertEquals(cursor.getNumberOfEntriesInBacklog(false), 3); factory2.shutdown(); factory.shutdown(); @@ -531,7 +531,7 @@ public void testChangeCrcType() throws Exception { ledger.addEntry("entry-3".getBytes()); assertEquals(c1.getNumberOfEntries(), 4); - assertEquals(c1.getNumberOfEntriesInBacklog(), 4); + assertEquals(c1.getNumberOfEntriesInBacklog(false), 4); List entries = c1.readEntries(4); assertEquals(entries.size(), 4); diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerErrorsTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerErrorsTest.java index 416a58cc256ca..99b6bd59e91f8 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerErrorsTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerErrorsTest.java @@ -366,7 +366,7 @@ public void recoverAfterWriteError() throws Exception { // With one single error, the write should succeed ledger.addEntry("entry-1".getBytes()); - assertEquals(cursor.getNumberOfEntriesInBacklog(), 1); + assertEquals(cursor.getNumberOfEntriesInBacklog(false), 1); bkc.failNow(BKException.Code.BookieHandleNotAvailableException); zkc.failNow(Code.CONNECTIONLOSS); @@ -385,7 +385,7 @@ public void recoverAfterWriteError() throws Exception { // ok } - assertEquals(cursor.getNumberOfEntriesInBacklog(), 1); + assertEquals(cursor.getNumberOfEntriesInBacklog(false), 1); // Signal that ManagedLedger has recovered from write error and will be availbe for writes again ledger.readyToCreateNewLedger(); @@ -393,7 +393,7 @@ public void recoverAfterWriteError() throws Exception { // Next add should succeed, and the previous write should not appear ledger.addEntry("entry-4".getBytes()); - assertEquals(cursor.getNumberOfEntriesInBacklog(), 2); + assertEquals(cursor.getNumberOfEntriesInBacklog(false), 2); List entries = cursor.readEntries(10); assertEquals(entries.size(), 2); @@ -435,7 +435,7 @@ public void addFailed(ManagedLedgerException exception, Object ctx) { counter.await(); assertNull(ex.get()); - assertEquals(cursor.getNumberOfEntriesInBacklog(), 2); + assertEquals(cursor.getNumberOfEntriesInBacklog(false), 2); // Ensure that we are only creating one new ledger // even when there are multiple (here, 2) add entry failed ops diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index 8ffd7383f563a..a31416f8dbd03 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -176,14 +176,14 @@ public void simple() throws Exception { assertFalse(cursor.hasMoreEntries()); assertEquals(cursor.getNumberOfEntries(), 0); - assertEquals(cursor.getNumberOfEntriesInBacklog(), 0); + assertEquals(cursor.getNumberOfEntriesInBacklog(false), 0); assertEquals(cursor.readEntries(100), new ArrayList()); ledger.addEntry("dummy-entry-2".getBytes(Encoding)); assertTrue(cursor.hasMoreEntries()); assertEquals(cursor.getNumberOfEntries(), 1); - assertEquals(cursor.getNumberOfEntriesInBacklog(), 1); + assertEquals(cursor.getNumberOfEntriesInBacklog(false), 1); assertEquals(ledger.getNumberOfActiveEntries(), 1); List entries = cursor.readEntries(100); @@ -243,7 +243,7 @@ public void acknowledge1() throws Exception { assertEquals(entries.size(), 2); assertEquals(cursor.getNumberOfEntries(), 0); - assertEquals(cursor.getNumberOfEntriesInBacklog(), 2); + assertEquals(cursor.getNumberOfEntriesInBacklog(false), 2); assertFalse(cursor.hasMoreEntries()); assertEquals(ledger.getNumberOfEntries(), 2); @@ -252,7 +252,7 @@ public void acknowledge1() throws Exception { entries.forEach(e -> e.release()); assertEquals(cursor.getNumberOfEntries(), 0); - assertEquals(cursor.getNumberOfEntriesInBacklog(), 1); + assertEquals(cursor.getNumberOfEntriesInBacklog(false), 1); assertFalse(cursor.hasMoreEntries()); assertEquals(ledger.getNumberOfActiveEntries(), 1); @@ -267,7 +267,7 @@ public void acknowledge1() throws Exception { assertEquals(ledger.getTotalSize(), "dummy-entry-1".getBytes(Encoding).length * 2); assertEquals(cursor.getNumberOfEntries(), 1); - assertEquals(cursor.getNumberOfEntriesInBacklog(), 1); + assertEquals(cursor.getNumberOfEntriesInBacklog(false), 1); assertTrue(cursor.hasMoreEntries()); entries = cursor.readEntries(100); @@ -2314,7 +2314,7 @@ public void testConsumerSubscriptionInitializePosition() throws Exception{ assertEquals(earliestPositionAndCounter.getLeft().getNext(), p2); assertEquals(latestPositionAndCounter.getRight().longValue(), totalInsertedEntries); - assertEquals(earliestPositionAndCounter.getRight().longValue(), totalInsertedEntries - earliestCursor.getNumberOfEntriesInBacklog()); + assertEquals(earliestPositionAndCounter.getRight().longValue(), totalInsertedEntries - earliestCursor.getNumberOfEntriesInBacklog(false)); ledger.close(); diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java index 957e63bfa3e6a..26cad0a02260f 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java @@ -221,31 +221,31 @@ void testNumberOfEntriesInBacklog() throws Exception { Position p4 = ledger.addEntry("dummy-entry-4".getBytes(Encoding)); ManagedCursor c5 = ledger.newNonDurableCursor(PositionImpl.latest); - assertEquals(c1.getNumberOfEntriesInBacklog(), 4); - assertEquals(c2.getNumberOfEntriesInBacklog(), 3); - assertEquals(c3.getNumberOfEntriesInBacklog(), 2); - assertEquals(c4.getNumberOfEntriesInBacklog(), 1); - assertEquals(c5.getNumberOfEntriesInBacklog(), 0); + assertEquals(c1.getNumberOfEntriesInBacklog(false), 4); + assertEquals(c2.getNumberOfEntriesInBacklog(false), 3); + assertEquals(c3.getNumberOfEntriesInBacklog(false), 2); + assertEquals(c4.getNumberOfEntriesInBacklog(false), 1); + assertEquals(c5.getNumberOfEntriesInBacklog(false), 0); List entries = c1.readEntries(2); assertEquals(entries.size(), 2); entries.forEach(e -> e.release()); assertEquals(c1.getNumberOfEntries(), 2); - assertEquals(c1.getNumberOfEntriesInBacklog(), 4); + assertEquals(c1.getNumberOfEntriesInBacklog(false), 4); c1.markDelete(p1); assertEquals(c1.getNumberOfEntries(), 2); - assertEquals(c1.getNumberOfEntriesInBacklog(), 3); + assertEquals(c1.getNumberOfEntriesInBacklog(false), 3); c1.delete(p3); assertEquals(c1.getNumberOfEntries(), 1); - assertEquals(c1.getNumberOfEntriesInBacklog(), 2); + assertEquals(c1.getNumberOfEntriesInBacklog(false), 2); c1.markDelete(p4); assertEquals(c1.getNumberOfEntries(), 0); - assertEquals(c1.getNumberOfEntriesInBacklog(), 0); + assertEquals(c1.getNumberOfEntriesInBacklog(false), 0); } @Test(timeOut = 20000) @@ -371,34 +371,34 @@ void rewind() throws Exception { log.debug("p4: {}", p4); assertEquals(c1.getNumberOfEntries(), 4); - assertEquals(c1.getNumberOfEntriesInBacklog(), 4); + assertEquals(c1.getNumberOfEntriesInBacklog(false), 4); c1.markDelete(p1); assertEquals(c1.getNumberOfEntries(), 3); - assertEquals(c1.getNumberOfEntriesInBacklog(), 3); + assertEquals(c1.getNumberOfEntriesInBacklog(false), 3); List entries = c1.readEntries(10); assertEquals(entries.size(), 3); entries.forEach(e -> e.release()); assertEquals(c1.getNumberOfEntries(), 0); - assertEquals(c1.getNumberOfEntriesInBacklog(), 3); + assertEquals(c1.getNumberOfEntriesInBacklog(false), 3); c1.rewind(); assertEquals(c1.getNumberOfEntries(), 3); - assertEquals(c1.getNumberOfEntriesInBacklog(), 3); + assertEquals(c1.getNumberOfEntriesInBacklog(false), 3); c1.markDelete(p2); assertEquals(c1.getNumberOfEntries(), 2); - assertEquals(c1.getNumberOfEntriesInBacklog(), 2); + assertEquals(c1.getNumberOfEntriesInBacklog(false), 2); entries = c1.readEntries(10); assertEquals(entries.size(), 2); entries.forEach(e -> e.release()); assertEquals(c1.getNumberOfEntries(), 0); - assertEquals(c1.getNumberOfEntriesInBacklog(), 2); + assertEquals(c1.getNumberOfEntriesInBacklog(false), 2); c1.rewind(); assertEquals(c1.getNumberOfEntries(), 2); c1.markDelete(p4); assertEquals(c1.getNumberOfEntries(), 0); - assertEquals(c1.getNumberOfEntriesInBacklog(), 0); + assertEquals(c1.getNumberOfEntriesInBacklog(false), 0); c1.rewind(); assertEquals(c1.getNumberOfEntries(), 0); ledger.addEntry("dummy-entry-5".getBytes(Encoding)); @@ -573,13 +573,13 @@ void subscribeToEarliestPositionWithDeferredDeletion() throws Exception { assertEquals(c1.getReadPosition(), p1); assertEquals(c1.getMarkDeletedPosition(), new PositionImpl(3, -1)); assertEquals(c1.getNumberOfEntries(), 6); - assertEquals(c1.getNumberOfEntriesInBacklog(), 6); + assertEquals(c1.getNumberOfEntriesInBacklog(false), 6); ManagedCursor c2 = ledger.newNonDurableCursor(p1); assertEquals(c2.getReadPosition(), p2); assertEquals(c2.getMarkDeletedPosition(), p1); assertEquals(c2.getNumberOfEntries(), 5); - assertEquals(c2.getNumberOfEntriesInBacklog(), 5); + assertEquals(c2.getNumberOfEntriesInBacklog(false), 5); } @Test diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index f015a92b66952..86da1027f34d6 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -1376,6 +1376,14 @@ public class ServiceConfiguration implements PulsarConfiguration { doc = "Classname of Pluggable JVM GC metrics logger that can log GC specific metrics") private String jvmGCMetricsLoggerClassName; + @FieldContext( + category = CATEGORY_METRICS, + doc = "Enable expose the precise backlog stats.\n" + + " Set false to use published counter and consumed counter to calculate,\n" + + " this would be more efficient but may be inaccurate. Default is false." + ) + private boolean exposePreciseBacklogInPrometheus = false; + /**** --- Functions --- ****/ @FieldContext( category = CATEGORY_FUNCTIONS, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 4a1021f9aa366..e33f94a3f744d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -808,14 +808,14 @@ protected void internalGetSubscriptions(AsyncResponse asyncResponse, boolean aut } } - protected TopicStats internalGetStats(boolean authoritative) { + protected TopicStats internalGetStats(boolean authoritative, boolean getPreciseBacklog) { validateAdminAndClientPermission(); if (topicName.isGlobal()) { validateGlobalNamespaceOwnership(namespaceName); } validateTopicOwnership(topicName, authoritative); Topic topic = getTopicReference(topicName); - return topic.getStats(); + return topic.getStats(getPreciseBacklog); } protected PersistentTopicInternalStats internalGetInternalStats(boolean authoritative) { @@ -850,7 +850,7 @@ public void getInfoFailed(ManagedLedgerException exception, Object ctx) { } protected void internalGetPartitionedStats(AsyncResponse asyncResponse, boolean authoritative, - boolean perPartition) { + boolean perPartition, boolean getPreciseBacklog) { PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative, false); if (partitionMetadata.partitions == 0) { throw new RestException(Status.NOT_FOUND, "Partitioned Topic not found"); @@ -864,7 +864,7 @@ protected void internalGetPartitionedStats(AsyncResponse asyncResponse, boolean for (int i = 0; i < partitionMetadata.partitions; i++) { try { topicStatsFutureList - .add(pulsar().getAdminClient().topics().getStatsAsync((topicName.getPartition(i).toString()))); + .add(pulsar().getAdminClient().topics().getStatsAsync((topicName.getPartition(i).toString()), getPreciseBacklog)); } catch (PulsarServerException e) { asyncResponse.resume(new RestException(e)); return; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java index 81759cc840675..4bc0ddfd154d9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java @@ -99,7 +99,7 @@ public NonPersistentTopicStats getStats(@PathParam("property") String property, validateTopicName(property, cluster, namespace, encodedTopic); validateAdminOperationOnTopic(authoritative); Topic topic = getTopicReference(topicName); - return ((NonPersistentTopic) topic).getStats(); + return ((NonPersistentTopic) topic).getStats(false); } @GET diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java index ae139a1f0894a..836ca14282781 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java @@ -282,7 +282,7 @@ public TopicStats getStats(@PathParam("property") String property, @PathParam("c @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic, @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateTopicName(property, cluster, namespace, encodedTopic); - return internalGetStats(authoritative); + return internalGetStats(authoritative, false); } @GET @@ -327,7 +327,7 @@ public void getPartitionedStats(@Suspended final AsyncResponse asyncResponse, @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { try { validateTopicName(property, cluster, namespace, encodedTopic); - internalGetPartitionedStats(asyncResponse, authoritative, perPartition); + internalGetPartitionedStats(asyncResponse, authoritative, perPartition, false); } catch (WebApplicationException wae) { asyncResponse.resume(wae); } catch (Exception e) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java index 480e63ba0da8a..7e88eed2fe5ab 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java @@ -115,11 +115,13 @@ public NonPersistentTopicStats getStats( @ApiParam(value = "Specify topic name", required = true) @PathParam("topic") @Encoded String encodedTopic, @ApiParam(value = "Is authentication required to perform this operation") - @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, + @ApiParam(value = "Is return precise backlog or imprecise backlog") + @QueryParam("getPreciseBacklog") @DefaultValue("false") boolean getPreciseBacklog) { validateTopicName(tenant, namespace, encodedTopic); validateAdminOperationOnTopic(topicName, authoritative); Topic topic = getTopicReference(topicName); - return ((NonPersistentTopic) topic).getStats(); + return ((NonPersistentTopic) topic).getStats(getPreciseBacklog); } @GET diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java index fbf74c971cdf1..8c59fa5b6b079 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java @@ -46,8 +46,6 @@ import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.common.policies.data.AuthAction; -import org.apache.pulsar.common.policies.data.PartitionedTopicInternalStats; -import org.apache.pulsar.common.policies.data.PartitionedTopicStats; import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats; import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; import org.apache.pulsar.common.policies.data.TopicStats; @@ -469,9 +467,11 @@ public TopicStats getStats( @ApiParam(value = "Specify topic name", required = true) @PathParam("topic") @Encoded String encodedTopic, @ApiParam(value = "Is authentication required to perform this operation") - @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, + @ApiParam(value = "Is return precise backlog or imprecise backlog") + @QueryParam("getPreciseBacklog") @DefaultValue("false") boolean getPreciseBacklog) { validateTopicName(tenant, namespace, encodedTopic); - return internalGetStats(authoritative); + return internalGetStats(authoritative, getPreciseBacklog); } @GET @@ -542,10 +542,12 @@ public void getPartitionedStats( @ApiParam(value = "Get per partition stats") @QueryParam("perPartition") @DefaultValue("true") boolean perPartition, @ApiParam(value = "Is authentication required to perform this operation") - @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, + @ApiParam(value = "Is return precise backlog or imprecise backlog") + @QueryParam("getPreciseBacklog") @DefaultValue("false") boolean getPreciseBacklog) { try { validatePartitionedTopicName(tenant, namespace, encodedTopic); - internalGetPartitionedStats(asyncResponse, authoritative, perPartition); + internalGetPartitionedStats(asyncResponse, authoritative, perPartition, getPreciseBacklog); } catch (WebApplicationException wae) { asyncResponse.resume(wae); } catch (Exception e) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java index b0afc73620abc..fd47425bfaac5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java @@ -144,7 +144,7 @@ private void dropBacklog(PersistentTopic persistentTopic, BacklogQuota quota) { } // Calculate number of messages to be skipped using the current backlog and the skip factor. - long entriesInBacklog = slowestConsumer.getNumberOfEntriesInBacklog(); + long entriesInBacklog = slowestConsumer.getNumberOfEntriesInBacklog(false); int messagesToSkip = (int) (messageSkipFactor * entriesInBacklog); try { // If there are no messages to skip, break out of the loop diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index db055b69b9ddc..7d48f76b71e90 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1407,7 +1407,7 @@ public String generateUniqueProducerName() { public Map getTopicStats() { HashMap stats = new HashMap<>(); - forEachTopic(topic -> stats.put(topic.getName(), topic.getStats())); + forEachTopic(topic -> stats.put(topic.getName(), topic.getStats(false))); return stats; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 50a5107c05fd9..bef63fc7f5b0a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -454,7 +454,7 @@ CommandConsumerStatsResponse.Builder createConsumerStatsResponse(Consumer consum commandConsumerStatsResponseBuilder.setConnectedSince(consumerStats.getConnectedSince()); Subscription subscription = consumer.getSubscription(); - commandConsumerStatsResponseBuilder.setMsgBacklog(subscription.getNumberOfEntriesInBacklog()); + commandConsumerStatsResponseBuilder.setMsgBacklog(subscription.getNumberOfEntriesInBacklog(false)); commandConsumerStatsResponseBuilder.setMsgRateExpired(subscription.getExpiredMessageRate()); commandConsumerStatsResponseBuilder.setType(subscription.getTypeString()); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java index 18c7c49ae37f5..f7d9687523960 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java @@ -53,7 +53,7 @@ default void removeConsumer(Consumer consumer) throws BrokerServiceException { Dispatcher getDispatcher(); - long getNumberOfEntriesInBacklog(); + long getNumberOfEntriesInBacklog(boolean getPreciseBacklog); default long getNumberOfEntriesDelayed() { return 0; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java index 26af1c1c5c8bf..6b3685deba927 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java @@ -162,7 +162,7 @@ void updateRates(NamespaceStats nsStats, NamespaceBundleStats currentBundleStats ConcurrentOpenHashMap getReplicators(); - TopicStats getStats(); + TopicStats getStats(boolean getPreciseBacklog); PersistentTopicInternalStats getInternalStats(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java index 652042000b6ac..f653ee52b4953 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java @@ -250,7 +250,7 @@ public CompletableFuture peekNthMessage(int messagePosition) { } @Override - public long getNumberOfEntriesInBacklog() { + public long getNumberOfEntriesInBacklog(boolean getPreciseBacklog) { // No-op return 0; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index 8e3b14fa7f43a..27dc4bb3111bd 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -697,7 +697,7 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats topicStatsStream.endList(); // Populate subscription specific stats here - topicStatsStream.writePair("msgBacklog", subscription.getNumberOfEntriesInBacklog()); + topicStatsStream.writePair("msgBacklog", subscription.getNumberOfEntriesInBacklog(false)); topicStatsStream.writePair("msgRateExpired", subscription.getExpiredMessageRate()); topicStatsStream.writePair("msgRateOut", subMsgRateOut); topicStatsStream.writePair("msgThroughputOut", subMsgThroughputOut); @@ -714,7 +714,7 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats topicStats.aggMsgRateOut += subMsgRateOut; topicStats.aggMsgThroughputOut += subMsgThroughputOut; - nsStats.msgBacklog += subscription.getNumberOfEntriesInBacklog(); + nsStats.msgBacklog += subscription.getNumberOfEntriesInBacklog(false); } catch (Exception e) { log.error("Got exception when creating consumer stats for subscription {}: {}", subscriptionName, e.getMessage(), e); @@ -751,7 +751,7 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats topicStatsStream.endObject(); } - public NonPersistentTopicStats getStats() { + public NonPersistentTopicStats getStats(boolean getPreciseBacklog) { NonPersistentTopicStats stats = new NonPersistentTopicStats(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java index 0c3feb31bfd86..5d39c20a2441d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java @@ -89,7 +89,7 @@ public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { } }, null); - if (topic.getManagedLedger().isTerminated() && cursor.getNumberOfEntriesInBacklog() == 0) { + if (topic.getManagedLedger().isTerminated() && cursor.getNumberOfEntriesInBacklog(false) == 0) { // Notify all consumer that the end of topic was reached dispatcher.getConsumers().forEach(Consumer::reachedEndOfTopic); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index 9c6b1efa9810f..9457af38d398a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -540,7 +540,7 @@ public synchronized void readEntriesFailed(ManagedLedgerException exception, Obj long waitTimeMillis = readFailureBackoff.next(); if (exception instanceof NoMoreEntriesToReadException) { - if (cursor.getNumberOfEntriesInBacklog() == 0) { + if (cursor.getNumberOfEntriesInBacklog(false) == 0) { // Topic has been terminated and there are no more entries to read // Notify the consumer only if all the messages were already acknowledged consumerList.forEach(Consumer::reachedEndOfTopic); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java index c29fbc85b7879..fe73760171286 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java @@ -470,7 +470,7 @@ private synchronized void internalReadEntriesFailed(ManagedLedgerException excep long waitTimeMillis = readFailureBackoff.next(); if (exception instanceof NoMoreEntriesToReadException) { - if (cursor.getNumberOfEntriesInBacklog() == 0) { + if (cursor.getNumberOfEntriesInBacklog(false) == 0) { // Topic has been terminated and there are no more entries to read // Notify the consumer only if all the messages were already acknowledged consumers.forEach(Consumer::reachedEndOfTopic); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java index deb274457bf92..1b761d8fb7af6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java @@ -99,7 +99,7 @@ public double getMessageExpiryRate() { private final MarkDeleteCallback markDeleteCallback = new MarkDeleteCallback() { @Override public void markDeleteComplete(Object ctx) { - long numMessagesExpired = (long) ctx - cursor.getNumberOfEntriesInBacklog(); + long numMessagesExpired = (long) ctx - cursor.getNumberOfEntriesInBacklog(false); msgExpired.recordMultipleEvents(numMessagesExpired, 0 /* no value stats */); updateRates(); @@ -119,7 +119,7 @@ public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { public void findEntryComplete(Position position, Object ctx) { if (position != null) { log.info("[{}][{}] Expiring all messages until position {}", topicName, subName, position); - cursor.asyncMarkDelete(position, markDeleteCallback, cursor.getNumberOfEntriesInBacklog()); + cursor.asyncMarkDelete(position, markDeleteCallback, cursor.getNumberOfEntriesInBacklog(false)); } else { if (log.isDebugEnabled()) { log.debug("[{}][{}] No messages to expire", topicName, subName); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java index 2bc18ca384709..c4a19c460b529 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java @@ -149,7 +149,7 @@ protected Position getReplicatorReadPosition() { @Override protected long getNumberOfEntriesInBacklog() { - return cursor.getNumberOfEntriesInBacklog(); + return cursor.getNumberOfEntriesInBacklog(false); } @Override @@ -507,7 +507,7 @@ public CompletableFuture clearBacklog() { if (log.isDebugEnabled()) { log.debug("[{}][{} -> {}] Backlog size before clearing: {}", topicName, localCluster, remoteCluster, - cursor.getNumberOfEntriesInBacklog()); + cursor.getNumberOfEntriesInBacklog(false)); } cursor.asyncClearBacklog(new ClearBacklogCallback() { @@ -515,7 +515,7 @@ public CompletableFuture clearBacklog() { public void clearBacklogComplete(Object ctx) { if (log.isDebugEnabled()) { log.debug("[{}][{} -> {}] Backlog size after clearing: {}", topicName, localCluster, remoteCluster, - cursor.getNumberOfEntriesInBacklog()); + cursor.getNumberOfEntriesInBacklog(false)); } future.complete(null); } @@ -535,7 +535,7 @@ public CompletableFuture skipMessages(int numMessagesToSkip) { if (log.isDebugEnabled()) { log.debug("[{}][{} -> {}] Skipping {} messages, current backlog {}", topicName, localCluster, remoteCluster, - numMessagesToSkip, cursor.getNumberOfEntriesInBacklog()); + numMessagesToSkip, cursor.getNumberOfEntriesInBacklog(false)); } cursor.asyncSkipEntries(numMessagesToSkip, IndividualDeletedEntries.Exclude, new AsyncCallbacks.SkipEntriesCallback() { @@ -543,7 +543,7 @@ public CompletableFuture skipMessages(int numMessagesToSkip) { public void skipEntriesComplete(Object ctx) { if (log.isDebugEnabled()) { log.debug("[{}][{} -> {}] Skipped {} messages, new backlog {}", topicName, localCluster, - remoteCluster, numMessagesToSkip, cursor.getNumberOfEntriesInBacklog()); + remoteCluster, numMessagesToSkip, cursor.getNumberOfEntriesInBacklog(false)); } future.complete(null); } @@ -605,7 +605,7 @@ public void updateRates() { } public ReplicatorStats getStats() { - stats.replicationBacklog = cursor.getNumberOfEntriesInBacklog(); + stats.replicationBacklog = cursor.getNumberOfEntriesInBacklog(false); stats.connected = producer != null && producer.isConnected(); stats.replicationDelayInSeconds = getReplicationDelayInSeconds(); @@ -633,8 +633,8 @@ private long getReplicationDelayInSeconds() { } public void expireMessages(int messageTTLInSeconds) { - if ((cursor.getNumberOfEntriesInBacklog() == 0) - || (cursor.getNumberOfEntriesInBacklog() < MINIMUM_BACKLOG_FOR_EXPIRY_CHECK + if ((cursor.getNumberOfEntriesInBacklog(false) == 0) + || (cursor.getNumberOfEntriesInBacklog(false) < MINIMUM_BACKLOG_FOR_EXPIRY_CHECK && !topic.isOldestMessageExpired(cursor, messageTTLInSeconds))) { // don't do anything for almost caught-up connected subscriptions return; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index 88d8e45c8b3cc..7055ac0071e62 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -31,6 +31,7 @@ import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.stream.Collectors; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.MoreObjects; import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.AsyncCallbacks.ClearBacklogCallback; @@ -391,7 +392,7 @@ public void acknowledgeMessage(List positions, AckType ackType, Map clearBacklog() { if (log.isDebugEnabled()) { log.debug("[{}][{}] Backlog size before clearing: {}", topicName, subName, - cursor.getNumberOfEntriesInBacklog()); + cursor.getNumberOfEntriesInBacklog(false)); } cursor.asyncClearBacklog(new ClearBacklogCallback() { @@ -581,7 +582,7 @@ public CompletableFuture clearBacklog() { public void clearBacklogComplete(Object ctx) { if (log.isDebugEnabled()) { log.debug("[{}][{}] Backlog size after clearing: {}", topicName, subName, - cursor.getNumberOfEntriesInBacklog()); + cursor.getNumberOfEntriesInBacklog(false)); } future.complete(null); } @@ -602,7 +603,7 @@ public CompletableFuture skipMessages(int numMessagesToSkip) { if (log.isDebugEnabled()) { log.debug("[{}][{}] Skipping {} messages, current backlog {}", topicName, subName, numMessagesToSkip, - cursor.getNumberOfEntriesInBacklog()); + cursor.getNumberOfEntriesInBacklog(false)); } cursor.asyncSkipEntries(numMessagesToSkip, IndividualDeletedEntries.Exclude, new AsyncCallbacks.SkipEntriesCallback() { @@ -610,7 +611,7 @@ public CompletableFuture skipMessages(int numMessagesToSkip) { public void skipEntriesComplete(Object ctx) { if (log.isDebugEnabled()) { log.debug("[{}][{}] Skipped {} messages, new backlog {}", topicName, subName, - numMessagesToSkip, cursor.getNumberOfEntriesInBacklog()); + numMessagesToSkip, cursor.getNumberOfEntriesInBacklog(false)); } future.complete(null); } @@ -776,8 +777,8 @@ public void readEntryComplete(Entry entry, Object ctx) { } @Override - public long getNumberOfEntriesInBacklog() { - return cursor.getNumberOfEntriesInBacklog(); + public long getNumberOfEntriesInBacklog(boolean getPreciseBacklog) { + return cursor.getNumberOfEntriesInBacklog(getPreciseBacklog); } @Override @@ -917,8 +918,8 @@ public List getConsumers() { @Override public void expireMessages(int messageTTLInSeconds) { this.lastExpireTimestamp = System.currentTimeMillis(); - if ((getNumberOfEntriesInBacklog() == 0) || (dispatcher != null && dispatcher.isConsumerConnected() - && getNumberOfEntriesInBacklog() < MINIMUM_BACKLOG_FOR_EXPIRY_CHECK + if ((getNumberOfEntriesInBacklog(false) == 0) || (dispatcher != null && dispatcher.isConsumerConnected() + && getNumberOfEntriesInBacklog(false) < MINIMUM_BACKLOG_FOR_EXPIRY_CHECK && !topic.isOldestMessageExpired(cursor, messageTTLInSeconds))) { // don't do anything for almost caught-up connected subscriptions return; @@ -934,7 +935,7 @@ public long estimateBacklogSize() { return cursor.getEstimatedSizeSinceMarkDeletePosition(); } - public SubscriptionStats getStats() { + public SubscriptionStats getStats(Boolean getPreciseBacklog) { SubscriptionStats subStats = new SubscriptionStats(); subStats.lastExpireTimestamp = lastExpireTimestamp; subStats.lastConsumedFlowTimestamp = lastConsumedFlowTimestamp; @@ -967,7 +968,8 @@ public SubscriptionStats getStats() { subStats.msgDelayed = d.getNumberOfDelayedMessages(); } } - subStats.msgBacklog = getNumberOfEntriesInBacklog(); + subStats.msgBacklog = getNumberOfEntriesInBacklog(getPreciseBacklog); + subStats.msgBacklogNoDelayed = subStats.msgBacklog - subStats.msgDelayed; subStats.msgRateExpired = expiryMonitor.getMessageExpiryRate(); subStats.isReplicated = isReplicated(); return subStats; @@ -1047,7 +1049,7 @@ public void markTopicWithBatchMessagePublished() { } void topicTerminated() { - if (cursor.getNumberOfEntriesInBacklog() == 0) { + if (cursor.getNumberOfEntriesInBacklog(false) == 0) { // notify the consumers if there are consumers connected to this topic. if (null != dispatcher) { // Immediately notify the consumer that there are no more available messages @@ -1197,5 +1199,10 @@ public void processReplicatedSubscriptionSnapshot(ReplicatedSubscriptionsSnapsho } } + @VisibleForTesting + public ManagedCursor getCursor() { + return cursor; + } + private static final Logger log = LoggerFactory.getLogger(PersistentSubscription.class); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index f624ae8011268..335be8cf3a847 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1435,7 +1435,7 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats topicStatsStream.endList(); // Populate subscription specific stats here - topicStatsStream.writePair("msgBacklog", subscription.getNumberOfEntriesInBacklog()); + topicStatsStream.writePair("msgBacklog", subscription.getNumberOfEntriesInBacklog(false)); topicStatsStream.writePair("msgRateExpired", subscription.getExpiredMessageRate()); topicStatsStream.writePair("msgRateOut", subMsgRateOut); topicStatsStream.writePair("msgThroughputOut", subMsgThroughputOut); @@ -1456,7 +1456,7 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats topicStatsHelper.aggMsgRateOut += subMsgRateOut; topicStatsHelper.aggMsgThroughputOut += subMsgThroughputOut; - nsStats.msgBacklog += subscription.getNumberOfEntriesInBacklog(); + nsStats.msgBacklog += subscription.getNumberOfEntriesInBacklog(false); } catch (Exception e) { log.error("Got exception when creating consumer stats for subscription {}: {}", subscriptionName, e.getMessage(), e); @@ -1508,7 +1508,7 @@ public double getLastUpdatedAvgPublishRateInByte() { return lastUpdatedAvgPublishRateInByte; } - public TopicStats getStats() { + public TopicStats getStats(boolean getPreciseBacklog) { TopicStats stats = new TopicStats(); @@ -1531,7 +1531,7 @@ public TopicStats getStats() { stats.bytesInCounter = getBytesInCounter(); subscriptions.forEach((name, subscription) -> { - SubscriptionStats subStats = subscription.getStats(); + SubscriptionStats subStats = subscription.getStats(getPreciseBacklog); stats.msgRateOut += subStats.msgRateOut; stats.msgThroughputOut += subStats.msgThroughputOut; @@ -1641,7 +1641,7 @@ public boolean isActive(InactiveTopicDeleteMode deleteMode) { } private boolean hasBacklogs() { - return subscriptions.values().stream().anyMatch(sub -> sub.getNumberOfEntriesInBacklog() > 0); + return subscriptions.values().stream().anyMatch(sub -> sub.getNumberOfEntriesInBacklog(false) > 0); } @Override diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java index cc5388e8ced02..ea05ed074c09e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java @@ -93,6 +93,7 @@ void updateStats(TopicStats stats) { msgDelayed += as.msgDelayed; subsStats.blockedSubscriptionOnUnackedMsgs = as.blockedSubscriptionOnUnackedMsgs; subsStats.msgBacklog += as.msgBacklog; + subsStats.msgBacklogNoDelayed += as.msgBacklogNoDelayed; subsStats.msgDelayed += as.msgDelayed; subsStats.msgRateRedeliver += as.msgRateRedeliver; subsStats.unackedMessages += as.unackedMessages; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java index 1f3c51302e980..d5b53537dcc9a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java @@ -27,6 +27,8 @@ public class AggregatedSubscriptionStats { public long msgBacklog; + public long msgBacklogNoDelayed; + public boolean blockedSubscriptionOnUnackedMsgs; public double msgRateRedeliver; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java index 19aa043ae823a..a7b35b8c63337 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java @@ -61,7 +61,7 @@ public static void generate(PulsarService pulsar, boolean includeTopicMetrics, b bundlesMap.forEach((bundle, topicsMap) -> { topicsMap.forEach((name, topic) -> { - getTopicStats(topic, topicStats, includeConsumerMetrics); + getTopicStats(topic, topicStats, includeConsumerMetrics, pulsar.getConfiguration().isExposePreciseBacklogInPrometheus()); if (includeTopicMetrics) { topicsCount.add(1); @@ -82,7 +82,7 @@ public static void generate(PulsarService pulsar, boolean includeTopicMetrics, b }); } - private static void getTopicStats(Topic topic, TopicStats stats, boolean includeConsumerMetrics) { + private static void getTopicStats(Topic topic, TopicStats stats, boolean includeConsumerMetrics, boolean getPreciseBacklog) { stats.reset(); if (topic instanceof PersistentTopic) { @@ -104,8 +104,8 @@ private static void getTopicStats(Topic topic, TopicStats stats, boolean include stats.storageReadRate = mlStats.getReadEntriesRate(); } - stats.msgInCounter = topic.getStats().msgInCounter; - stats.bytesInCounter = topic.getStats().bytesInCounter; + stats.msgInCounter = topic.getStats(getPreciseBacklog).msgInCounter; + stats.bytesInCounter = topic.getStats(getPreciseBacklog).bytesInCounter; stats.producersCount = 0; topic.getProducers().values().forEach(producer -> { @@ -125,12 +125,13 @@ private static void getTopicStats(Topic topic, TopicStats stats, boolean include topic.getSubscriptions().forEach((name, subscription) -> { stats.subscriptionsCount++; - stats.msgBacklog += subscription.getNumberOfEntriesInBacklog(); + stats.msgBacklog += subscription.getNumberOfEntriesInBacklog(getPreciseBacklog); AggregatedSubscriptionStats subsStats = stats.subscriptionStats .computeIfAbsent(name, k -> new AggregatedSubscriptionStats()); - subsStats.msgBacklog = subscription.getNumberOfEntriesInBacklog(); + subsStats.msgBacklog = subscription.getNumberOfEntriesInBacklog(getPreciseBacklog); subsStats.msgDelayed = subscription.getNumberOfEntriesDelayed(); + subsStats.msgBacklogNoDelayed = subsStats.msgBacklog - subsStats.msgDelayed; subscription.getConsumers().forEach(consumer -> { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java index 846104441fac3..caf0ce842e2b3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java @@ -134,6 +134,7 @@ static void printTopicStats(SimpleTextOutputStream stream, String cluster, Strin stats.subscriptionStats.forEach((n, subsStats) -> { metric(stream, cluster, namespace, topic, n, "pulsar_subscription_back_log", subsStats.msgBacklog); + metric(stream, cluster, namespace, topic, n, "pulsar_subscription_back_log_no_delayed", subsStats.msgBacklogNoDelayed); metric(stream, cluster, namespace, topic, n, "pulsar_subscription_delayed", subsStats.msgDelayed); metric(stream, cluster, namespace, topic, n, "pulsar_subscription_msg_rate_redeliver", subsStats.msgRateRedeliver); metric(stream, cluster, namespace, topic, n, "pulsar_subscription_unacked_messages", subsStats.unackedMessages); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java index fb61825ff86e3..558ce39d09f27 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java @@ -53,12 +53,14 @@ import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl; import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl; import org.apache.pulsar.broker.service.Topic; +import org.apache.pulsar.broker.service.persistent.PersistentSubscription; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; @@ -1062,4 +1064,185 @@ public void testConsumerStatsLastTimestamp() throws PulsarClientException, Pulsa consumer.close(); producer.close(); } + + @Test(timeOut = 30000) + public void testPreciseBacklog() throws PulsarClientException, PulsarAdminException, InterruptedException { + final String topic = "persistent://prop-xyz/ns1/precise-back-log"; + final String subName = "sub-name"; + + @Cleanup + PulsarClient client = PulsarClient.builder().serviceUrl(pulsar.getWebServiceAddress()).build(); + + @Cleanup + Consumer consumer = client.newConsumer() + .topic(topic) + .subscriptionName(subName) + .subscribe(); + + @Cleanup + Producer producer = client.newProducer() + .topic(topic) + .enableBatching(false) + .create(); + + producer.send("message-1".getBytes(StandardCharsets.UTF_8)); + Message message = consumer.receive(); + assertNotNull(message); + + // Mock the entries added count. Default is disable the precise backlog, so the backlog is entries added count - consumed count + // Since message have not acked, so the backlog is 10 + PersistentSubscription subscription = (PersistentSubscription)pulsar.getBrokerService().getTopicReference(topic).get().getSubscription(subName); + assertNotNull(subscription); + ((ManagedLedgerImpl)subscription.getCursor().getManagedLedger()).setEntriesAddedCounter(10L); + TopicStats topicStats = admin.topics().getStats(topic); + assertEquals(topicStats.subscriptions.get(subName).msgBacklog, 10); + + topicStats = admin.topics().getStats(topic, true); + assertEquals(topicStats.subscriptions.get(subName).msgBacklog, 1); + consumer.acknowledge(message); + + // wait for ack send + Thread.sleep(500); + + // Consumer acks the message, so the precise backlog is 0 + topicStats = admin.topics().getStats(topic, true); + assertEquals(topicStats.subscriptions.get(subName).msgBacklog, 0); + + topicStats = admin.topics().getStats(topic); + assertEquals(topicStats.subscriptions.get(subName).msgBacklog, 9); + } + + @Test(timeOut = 30000) + public void testBacklogNoDelayed() throws PulsarClientException, PulsarAdminException, InterruptedException { + final String topic = "persistent://prop-xyz/ns1/precise-back-log-no-delayed"; + final String subName = "sub-name"; + + @Cleanup + PulsarClient client = PulsarClient.builder().serviceUrl(pulsar.getWebServiceAddress()).build(); + + @Cleanup + Consumer consumer = client.newConsumer() + .topic(topic) + .subscriptionName(subName) + .subscriptionType(SubscriptionType.Shared) + .subscribe(); + + @Cleanup + Producer producer = client.newProducer() + .topic(topic) + .enableBatching(false) + .create(); + + for (int i = 0; i < 10; i++) { + if (i > 4) { + producer.newMessage() + .value("message-1".getBytes(StandardCharsets.UTF_8)) + .deliverAfter(10, TimeUnit.SECONDS) + .send(); + } else { + producer.send("message-1".getBytes(StandardCharsets.UTF_8)); + } + } + + TopicStats topicStats = admin.topics().getStats(topic, true); + assertEquals(topicStats.subscriptions.get(subName).msgBacklog, 10); + assertEquals(topicStats.subscriptions.get(subName).msgBacklogNoDelayed, 5); + + for (int i = 0; i < 5; i++) { + consumer.acknowledge(consumer.receive()); + } + // Wait the ack send. + Thread.sleep(500); + topicStats = admin.topics().getStats(topic, true); + assertEquals(topicStats.subscriptions.get(subName).msgBacklog, 5); + assertEquals(topicStats.subscriptions.get(subName).msgBacklogNoDelayed, 0); + } + + @Test + public void testPreciseBacklogForPartitionedTopic() throws PulsarClientException, PulsarAdminException, InterruptedException { + final String topic = "persistent://prop-xyz/ns1/precise-back-log-for-partitioned-topic"; + admin.topics().createPartitionedTopic(topic, 2); + final String subName = "sub-name"; + + @Cleanup + PulsarClient client = PulsarClient.builder().serviceUrl(pulsar.getWebServiceAddress()).build(); + + @Cleanup + Consumer consumer = client.newConsumer() + .topic(topic) + .subscriptionName(subName) + .subscribe(); + + @Cleanup + Producer producer = client.newProducer() + .topic(topic) + .enableBatching(false) + .create(); + + producer.send("message-1".getBytes(StandardCharsets.UTF_8)); + Message message = consumer.receive(); + assertNotNull(message); + + // Mock the entries added count. Default is disable the precise backlog, so the backlog is entries added count - consumed count + // Since message have not acked, so the backlog is 10 + for (int i = 0; i < 2; i++) { + PersistentSubscription subscription = (PersistentSubscription)pulsar.getBrokerService().getTopicReference(topic + "-partition-" + i).get().getSubscription(subName); + assertNotNull(subscription); + ((ManagedLedgerImpl)subscription.getCursor().getManagedLedger()).setEntriesAddedCounter(10L); + } + + TopicStats topicStats = admin.topics().getPartitionedStats(topic, false); + assertEquals(topicStats.subscriptions.get(subName).msgBacklog, 20); + + topicStats = admin.topics().getPartitionedStats(topic, false, true); + assertEquals(topicStats.subscriptions.get(subName).msgBacklog, 1); + } + + @Test(timeOut = 30000) + public void testBacklogNoDelayedForPartitionedTopic() throws PulsarClientException, PulsarAdminException, InterruptedException { + final String topic = "persistent://prop-xyz/ns1/precise-back-log-no-delayed-partitioned-topic"; + admin.topics().createPartitionedTopic(topic, 2); + final String subName = "sub-name"; + + @Cleanup + PulsarClient client = PulsarClient.builder().serviceUrl(pulsar.getWebServiceAddress()).build(); + + @Cleanup + Consumer consumer = client.newConsumer() + .topic(topic) + .subscriptionName(subName) + .subscriptionType(SubscriptionType.Shared) + .subscribe(); + + @Cleanup + Producer producer = client.newProducer() + .topic(topic) + .enableBatching(false) + .create(); + + for (int i = 0; i < 10; i++) { + if (i > 4) { + producer.newMessage() + .value("message-1".getBytes(StandardCharsets.UTF_8)) + .deliverAfter(10, TimeUnit.SECONDS) + .send(); + } else { + producer.send("message-1".getBytes(StandardCharsets.UTF_8)); + } + } + + TopicStats topicStats = admin.topics().getPartitionedStats(topic, false, true); + assertEquals(topicStats.subscriptions.get(subName).msgBacklog, 10); + assertEquals(topicStats.subscriptions.get(subName).msgBacklogNoDelayed, 5); + + for (int i = 0; i < 5; i++) { + consumer.acknowledge(consumer.receive()); + } + // Wait the ack send. + Thread.sleep(500); + topicStats = admin.topics().getPartitionedStats(topic, false, true); + assertEquals(topicStats.subscriptions.get(subName).msgBacklog, 5); + assertEquals(topicStats.subscriptions.get(subName).msgBacklogNoDelayed, 0); + } + } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java index 7dd152827ad4d..3d5f95603b057 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java @@ -121,7 +121,7 @@ public void testSimpleBatchProducerWithFixedBatchSize(CompressionType compressio assertTrue(topic.getProducers().values().iterator().next().getStats().msgRateIn > 0.0); // we expect 2 messages in the backlog since we sent 50 messages with the batch size set to 25. We have set the // batch time high enough for it to not affect the number of messages in the batch - assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(), 2); + assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(false), 2); consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscribe(); for (int i = 0; i < numMsgs; i++) { @@ -170,7 +170,7 @@ public void testSimpleBatchProducerWithFixedBatchBytes(CompressionType compressi assertTrue(topic.getProducers().values().iterator().next().getStats().msgRateIn > 0.0); // we expect 2 messages in the backlog since we sent 50 messages with the batch size set to 25. We have set the // batch time high enough for it to not affect the number of messages in the batch - assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(), 2); + assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(false), 2); consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscribe(); for (int i = 0; i < numMsgs; i++) { @@ -215,8 +215,8 @@ public void testSimpleBatchProducerWithFixedBatchTime(CompressionType compressio rolloverPerIntervalStats(); assertTrue(topic.getProducers().values().iterator().next().getStats().msgRateIn > 0.0); LOG.info("Sent {} messages, backlog is {} messages", numMsgs, - topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog()); - assertTrue(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog() < numMsgs); + topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(false)); + assertTrue(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(false) < numMsgs); producer.close(); } @@ -251,8 +251,8 @@ public void testSimpleBatchProducerWithFixedBatchSizeAndTime(CompressionType com rolloverPerIntervalStats(); assertTrue(topic.getProducers().values().iterator().next().getStats().msgRateIn > 0.0); LOG.info("Sent {} messages, backlog is {} messages", numMsgs, - topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog()); - assertTrue(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog() < numMsgs); + topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(false)); + assertTrue(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(false) < numMsgs); producer.close(); } @@ -298,7 +298,7 @@ public void testBatchProducerWithLargeMessage(CompressionType compressionType, B assertTrue(topic.getProducers().values().iterator().next().getStats().msgRateIn > 0.0); // we expect 3 messages in the backlog since the large message in the middle should // close out the batch and be sent in a batch of its own - assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(), 3); + assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(false), 3); consumer = pulsarClient.newConsumer() .topic(topicName) .subscriptionName(subscriptionName) @@ -312,7 +312,7 @@ public void testBatchProducerWithLargeMessage(CompressionType compressionType, B consumer.acknowledge(msg); } Thread.sleep(100); - assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(), 0); + assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(false), 0); consumer.close(); producer.close(); } @@ -354,7 +354,7 @@ public void testSimpleBatchProducerConsumer(CompressionType compressionType, Bat rolloverPerIntervalStats(); assertTrue(topic.getProducers().values().iterator().next().getStats().msgRateIn > 0.0); - assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(), numMsgs / numMsgsInBatch); + assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(false), numMsgs / numMsgsInBatch); consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscribe(); Message lastunackedMsg = null; @@ -371,7 +371,7 @@ public void testSimpleBatchProducerConsumer(CompressionType compressionType, Bat consumer.acknowledgeCumulative(lastunackedMsg); } Thread.sleep(100); - assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(), 0); + assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(false), 0); consumer.close(); producer.close(); } @@ -403,7 +403,7 @@ public void testSimpleBatchSyncProducerWithFixedBatchSize(BatcherBuilder builder assertTrue(topic.getProducers().values().iterator().next().getStats().msgRateIn > 0.0); // we expect 10 messages in the backlog since we sent 10 messages with the batch size set to 5. // However, we are using synchronous send and so each message will go as an individual message - assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(), 10); + assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(false), 10); consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscribe(); for (int i = 0; i < numMsgs; i++) { @@ -458,7 +458,7 @@ public void testSimpleBatchProducerConsumer1kMessages(BatcherBuilder builder) th // allow stats to be updated.. LOG.info("[{}] checking backlog stats.."); rolloverPerIntervalStats(); - assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(), numMsgs / numMsgsInBatch); + assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(false), numMsgs / numMsgsInBatch); consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscribe(); Message lastunackedMsg = null; @@ -473,7 +473,7 @@ public void testSimpleBatchProducerConsumer1kMessages(BatcherBuilder builder) th consumer.close(); producer.close(); - assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(), 0); + assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(false), 0); } // test for ack holes @@ -507,7 +507,7 @@ public void testOutOfOrderAcksForBatchMessage() throws Exception { PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get(); rolloverPerIntervalStats(); - assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(), numMsgs / numMsgsInBatch); + assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(false), numMsgs / numMsgsInBatch); consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscribe(); Set individualAcks = new HashSet<>(); for (int i = 15; i < 20; i++) { @@ -528,7 +528,7 @@ public void testOutOfOrderAcksForBatchMessage() throws Exception { Thread.sleep(1000); rolloverPerIntervalStats(); Thread.sleep(1000); - assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(), 3); + assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(false), 3); } else if (individualAcks.contains(i)) { consumer.acknowledge(msg); } else { @@ -537,12 +537,12 @@ public void testOutOfOrderAcksForBatchMessage() throws Exception { } Thread.sleep(1000); rolloverPerIntervalStats(); - assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(), 2); + assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(false), 2); if (lastunackedMsg != null) { consumer.acknowledgeCumulative(lastunackedMsg); } Thread.sleep(100); - assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(), 0); + assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(false), 0); consumer.close(); producer.close(); } @@ -580,7 +580,7 @@ public void testNonBatchCumulativeAckAfterBatchPublish(BatcherBuilder builder) t rolloverPerIntervalStats(); assertTrue(topic.getProducers().values().iterator().next().getStats().msgRateIn > 0.0); - assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(), 2); + assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(false), 2); consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscribe(); Message lastunackedMsg = null; @@ -594,7 +594,7 @@ public void testNonBatchCumulativeAckAfterBatchPublish(BatcherBuilder builder) t } Thread.sleep(100); rolloverPerIntervalStats(); - assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(), 0); + assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(false), 0); consumer.close(); producer.close(); noBatchProducer.close(); @@ -637,7 +637,7 @@ public void testBatchAndNonBatchCumulativeAcks(BatcherBuilder builder) throws Ex rolloverPerIntervalStats(); assertTrue(topic.getProducers().values().iterator().next().getStats().msgRateIn > 0.0); - assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(), + assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(false), (numMsgs / 2) / numMsgsInBatch + numMsgs / 2); consumer = pulsarClient.newConsumer() .topic(topicName) @@ -662,7 +662,7 @@ public void testBatchAndNonBatchCumulativeAcks(BatcherBuilder builder) throws Ex consumer.acknowledgeCumulative(lastunackedMsg); } - retryStrategically(t -> topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog() == 0, 100, 100); + retryStrategically(t -> topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(false) == 0, 100, 100); consumer.close(); producer.close(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java index 505038bede387..a2f27822d0008 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java @@ -153,7 +153,7 @@ public void testBrokerServicePersistentTopicStats() throws Exception { assertNotNull(topicRef); rolloverPerIntervalStats(); - stats = topicRef.getStats(); + stats = topicRef.getStats(false); subStats = stats.subscriptions.values().iterator().next(); // subscription stats @@ -171,7 +171,7 @@ public void testBrokerServicePersistentTopicStats() throws Exception { Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT); rolloverPerIntervalStats(); - stats = topicRef.getStats(); + stats = topicRef.getStats(false); subStats = stats.subscriptions.values().iterator().next(); // publisher stats @@ -208,7 +208,7 @@ public void testBrokerServicePersistentTopicStats() throws Exception { Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT); rolloverPerIntervalStats(); - stats = topicRef.getStats(); + stats = topicRef.getStats(false); subStats = stats.subscriptions.values().iterator().next(); assertEquals(subStats.msgBacklog, 0); @@ -221,13 +221,13 @@ public void testStatsOfStorageSizeWithSubscription() throws Exception { PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get(); assertNotNull(topicRef); - assertEquals(topicRef.getStats().storageSize, 0); + assertEquals(topicRef.getStats(false).storageSize, 0); for (int i = 0; i < 10; i++) { producer.send(new byte[10]); } - assertTrue(topicRef.getStats().storageSize > 0); + assertTrue(topicRef.getStats(false).storageSize > 0); } @Test @@ -246,7 +246,7 @@ public void testBrokerServicePersistentRedeliverTopicStats() throws Exception { assertNotNull(topicRef); rolloverPerIntervalStats(); - stats = topicRef.getStats(); + stats = topicRef.getStats(false); subStats = stats.subscriptions.values().iterator().next(); // subscription stats @@ -264,7 +264,7 @@ public void testBrokerServicePersistentRedeliverTopicStats() throws Exception { Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT); rolloverPerIntervalStats(); - stats = topicRef.getStats(); + stats = topicRef.getStats(false); subStats = stats.subscriptions.values().iterator().next(); // publisher stats @@ -299,7 +299,7 @@ public void testBrokerServicePersistentRedeliverTopicStats() throws Exception { Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT); rolloverPerIntervalStats(); - stats = topicRef.getStats(); + stats = topicRef.getStats(false); subStats = stats.subscriptions.values().iterator().next(); assertTrue(subStats.msgRateRedeliver > 0.0); assertEquals(subStats.msgRateRedeliver, subStats.consumers.get(0).msgRateRedeliver); @@ -313,7 +313,7 @@ public void testBrokerServicePersistentRedeliverTopicStats() throws Exception { Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT); rolloverPerIntervalStats(); - stats = topicRef.getStats(); + stats = topicRef.getStats(false); subStats = stats.subscriptions.values().iterator().next(); assertEquals(subStats.msgBacklog, 0); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java index 21f3c03fa6097..eccaea6237244 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java @@ -187,7 +187,7 @@ public void testSimpleConsumerEventsWithoutPartition() throws Exception { rolloverPerIntervalStats(); - assertEquals(subRef.getNumberOfEntriesInBacklog(), numMsgs); + assertEquals(subRef.getNumberOfEntriesInBacklog(false), numMsgs); Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT); // 3. consumer1 should have all the messages while consumer2 should have no messages @@ -204,7 +204,7 @@ public void testSimpleConsumerEventsWithoutPartition() throws Exception { // 4. messages deleted on individual acks Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT); - assertEquals(subRef.getNumberOfEntriesInBacklog(), 0); + assertEquals(subRef.getNumberOfEntriesInBacklog(false), 0); for (int i = 0; i < numMsgs; i++) { String message = "my-message-" + i; @@ -241,7 +241,7 @@ public void testSimpleConsumerEventsWithoutPartition() throws Exception { rolloverPerIntervalStats(); Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT); - assertEquals(subRef.getNumberOfEntriesInBacklog(), 0); + assertEquals(subRef.getNumberOfEntriesInBacklog(false), 0); // 8. unsubscribe not allowed if multiple consumers connected try { @@ -517,7 +517,7 @@ public void testActiveConsumerFailoverWithDelay() throws Exception { // wait for all messages to be dequeued int retry = 20; for (int i = 0; i < retry; i++) { - if (receivedMessages.size() >= numMsgs && subRef.getNumberOfEntriesInBacklog() == 0) { + if (receivedMessages.size() >= numMsgs && subRef.getNumberOfEntriesInBacklog(false) == 0) { break; } else if (i != retry - 1) { Thread.sleep(100); @@ -526,7 +526,7 @@ public void testActiveConsumerFailoverWithDelay() throws Exception { // check if message duplication has occurred assertEquals(receivedMessages.size(), numMsgs); - assertEquals(subRef.getNumberOfEntriesInBacklog(), 0); + assertEquals(subRef.getNumberOfEntriesInBacklog(false), 0); for (int i = 0; i < receivedMessages.size(); i++) { Assert.assertNotNull(receivedMessages.get(i)); Assert.assertEquals(new String(receivedMessages.get(i).getData()), "my-message-" + i); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentQueueE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentQueueE2ETest.java index 7ffb00565c63c..495a9973b5322 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentQueueE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentQueueE2ETest.java @@ -117,7 +117,7 @@ public void testSimpleConsumerEvents() throws Exception { rolloverPerIntervalStats(); - assertEquals(subRef.getNumberOfEntriesInBacklog(), numMsgs * 2); + assertEquals(subRef.getNumberOfEntriesInBacklog(false), numMsgs * 2); Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT); // both consumers will together consumer all messages @@ -141,7 +141,7 @@ public void testSimpleConsumerEvents() throws Exception { // 3. messages deleted on individual acks Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT); - assertEquals(subRef.getNumberOfEntriesInBacklog(), 0); + assertEquals(subRef.getNumberOfEntriesInBacklog(false), 0); // 4. shared consumer unsubscribe not allowed try { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java index 38a080f0035e0..42190c7a18024 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java @@ -161,7 +161,7 @@ public void testSimpleConsumerEvents() throws Exception { assertTrue(subRef.getDispatcher().isConsumerConnected()); rolloverPerIntervalStats(); - assertEquals(subRef.getNumberOfEntriesInBacklog(), numMsgs * 2); + assertEquals(subRef.getNumberOfEntriesInBacklog(false), numMsgs * 2); // 2. messages pushed before client receive Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT); @@ -179,7 +179,7 @@ public void testSimpleConsumerEvents() throws Exception { // 4. messages deleted on individual acks Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT); - assertEquals(subRef.getNumberOfEntriesInBacklog(), numMsgs); + assertEquals(subRef.getNumberOfEntriesInBacklog(false), numMsgs); for (int i = 0; i < numMsgs; i++) { msg = consumer.receive(); @@ -192,7 +192,7 @@ public void testSimpleConsumerEvents() throws Exception { // 5. messages deleted on cumulative acks Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT); - assertEquals(subRef.getNumberOfEntriesInBacklog(), 0); + assertEquals(subRef.getNumberOfEntriesInBacklog(false), 0); // 6. consumer unsubscribe consumer.unsubscribe(); @@ -365,7 +365,7 @@ public Void call() throws Exception { PersistentSubscription subRef = topicRef.getSubscription(subName); // 1. cumulatively all threads drain the backlog - assertEquals(subRef.getNumberOfEntriesInBacklog(), 0); + assertEquals(subRef.getNumberOfEntriesInBacklog(false), 0); // 2. flow control works the same as single consumer single thread Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT); @@ -848,13 +848,13 @@ public void testMessageExpiry() throws Exception { } rolloverPerIntervalStats(); - assertEquals(subRef.getNumberOfEntriesInBacklog(), numMsgs); + assertEquals(subRef.getNumberOfEntriesInBacklog(false), numMsgs); Thread.sleep(TimeUnit.SECONDS.toMillis(messageTTLSecs)); runMessageExpiryCheck(); // 1. check all messages expired for this unconnected subscription - assertEquals(subRef.getNumberOfEntriesInBacklog(), 0); + assertEquals(subRef.getNumberOfEntriesInBacklog(false), 0); // clean-up producer.close(); @@ -895,17 +895,17 @@ public void testMessageExpiryWithFewExpiredBacklog() throws Exception { } rolloverPerIntervalStats(); - assertEquals(subRef.getNumberOfEntriesInBacklog(), numMsgs); + assertEquals(subRef.getNumberOfEntriesInBacklog(false), numMsgs); Thread.sleep(TimeUnit.SECONDS.toMillis(messageTTLSecs)); runMessageExpiryCheck(); - assertEquals(subRef.getNumberOfEntriesInBacklog(), numMsgs); + assertEquals(subRef.getNumberOfEntriesInBacklog(false), numMsgs); Thread.sleep(TimeUnit.SECONDS.toMillis(messageTTLSecs / 2)); runMessageExpiryCheck(); - assertEquals(subRef.getNumberOfEntriesInBacklog(), 0); + assertEquals(subRef.getNumberOfEntriesInBacklog(false), 0); } @Test diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java index bbcb6b194b19b..c9cc07ccaed00 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java @@ -76,19 +76,19 @@ public void testSeek() throws Exception { } PersistentSubscription sub = topicRef.getSubscription("my-subscription"); - assertEquals(sub.getNumberOfEntriesInBacklog(), 10); + assertEquals(sub.getNumberOfEntriesInBacklog(false), 10); consumer.seek(MessageId.latest); - assertEquals(sub.getNumberOfEntriesInBacklog(), 0); + assertEquals(sub.getNumberOfEntriesInBacklog(false), 0); // Wait for consumer to reconnect Thread.sleep(500); consumer.seek(MessageId.earliest); - assertEquals(sub.getNumberOfEntriesInBacklog(), 10); + assertEquals(sub.getNumberOfEntriesInBacklog(false), 10); Thread.sleep(500); consumer.seek(messageIds.get(5)); - assertEquals(sub.getNumberOfEntriesInBacklog(), 5); + assertEquals(sub.getNumberOfEntriesInBacklog(false), 5); } @Test @@ -131,16 +131,16 @@ public void testSeekTime() throws Exception { producer.send(message.getBytes()); } - assertEquals(sub.getNumberOfEntriesInBacklog(), 10); + assertEquals(sub.getNumberOfEntriesInBacklog(false), 10); long currentTimestamp = System.currentTimeMillis(); consumer.seek(currentTimestamp); - assertEquals(sub.getNumberOfEntriesInBacklog(), 1); + assertEquals(sub.getNumberOfEntriesInBacklog(false), 1); // Wait for consumer to reconnect Thread.sleep(1000); consumer.seek(currentTimestamp - resetTimeInMillis); - assertEquals(sub.getNumberOfEntriesInBacklog(), 10); + assertEquals(sub.getNumberOfEntriesInBacklog(false), 10); } @Test @@ -176,7 +176,7 @@ public void testSeekTimeOnPartitionedTopic() throws Exception { long backlogs = 0; for (PersistentSubscription sub : subs) { - backlogs += sub.getNumberOfEntriesInBacklog(); + backlogs += sub.getNumberOfEntriesInBacklog(false); } assertEquals(backlogs, 10); @@ -185,7 +185,7 @@ public void testSeekTimeOnPartitionedTopic() throws Exception { long currentTimestamp = System.currentTimeMillis(); consumer.seek(currentTimestamp); for (PersistentSubscription sub : subs) { - backlogs += sub.getNumberOfEntriesInBacklog(); + backlogs += sub.getNumberOfEntriesInBacklog(false); } assertEquals(backlogs, 2); @@ -195,7 +195,7 @@ public void testSeekTimeOnPartitionedTopic() throws Exception { backlogs = 0; for (PersistentSubscription sub : subs) { - backlogs += sub.getNumberOfEntriesInBacklog(); + backlogs += sub.getNumberOfEntriesInBacklog(false); } assertEquals(backlogs, 10); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStatsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStatsTest.java index bb010a61d05a2..c01173ee2511c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStatsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStatsTest.java @@ -54,6 +54,7 @@ public void testSimpleAggregation() throws Exception { subStats1.msgBacklog = 50; subStats1.msgRateRedeliver = 1.5; subStats1.unackedMessages = 2; + subStats1.msgBacklogNoDelayed = 30; topicStats1.subscriptionStats.put(namespace, subStats1); TopicStats topicStats2 = new TopicStats(); @@ -81,6 +82,7 @@ public void testSimpleAggregation() throws Exception { subStats2.msgBacklog = 27; subStats2.msgRateRedeliver = 0.7; subStats2.unackedMessages = 0; + subStats2.msgBacklogNoDelayed = 20; topicStats2.subscriptionStats.put(namespace, subStats2); AggregatedNamespaceStats nsStats = new AggregatedNamespaceStats(); @@ -111,6 +113,7 @@ public void testSimpleAggregation() throws Exception { AggregatedSubscriptionStats nsSubStats = nsStats.subscriptionStats.get(namespace); assertNotNull(nsSubStats); assertEquals(nsSubStats.msgBacklog, 77); + assertEquals(nsSubStats.msgBacklogNoDelayed, 50); assertEquals(nsSubStats.msgRateRedeliver, 2.2); assertEquals(nsSubStats.unackedMessages, 2); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java index bb7aa65d2398a..5cb4fc1383dae 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java @@ -544,7 +544,7 @@ public void testBlockDispatcherStats() throws Exception { assertNotNull(topicRef); rolloverPerIntervalStats(); - stats = topicRef.getStats(); + stats = topicRef.getStats(false); subStats = stats.subscriptions.values().iterator().next(); // subscription stats @@ -562,7 +562,7 @@ public void testBlockDispatcherStats() throws Exception { Thread.sleep(timeWaitToSync); rolloverPerIntervalStats(); - stats = topicRef.getStats(); + stats = topicRef.getStats(false); subStats = stats.subscriptions.values().iterator().next(); assertTrue(subStats.msgBacklog > 0); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java index 1e46fe86ca41e..9232642c02f9b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java @@ -438,7 +438,7 @@ public void testTopicStats() throws Exception { assertNotNull(topicRef); rolloverPerIntervalStats(pulsar); - stats = topicRef.getStats(); + stats = topicRef.getStats(false); subStats = stats.getSubscriptions().values().iterator().next(); // subscription stats @@ -456,7 +456,7 @@ public void testTopicStats() throws Exception { Thread.sleep(timeWaitToSync); rolloverPerIntervalStats(pulsar); - stats = topicRef.getStats(); + stats = topicRef.getStats(false); subStats = stats.getSubscriptions().values().iterator().next(); assertTrue(subStats.msgRateOut > 0); @@ -520,7 +520,7 @@ public void testReplicator() throws Exception { assertNotNull(replicatorR3); rolloverPerIntervalStats(replicationPulasr); - stats = topicRef.getStats(); + stats = topicRef.getStats(false); subStats = stats.getSubscriptions().values().iterator().next(); // subscription stats @@ -591,7 +591,7 @@ public void testReplicator() throws Exception { Thread.sleep(timeWaitToSync); rolloverPerIntervalStats(replicationPulasr); - stats = topicRef.getStats(); + stats = topicRef.getStats(false); subStats = stats.getSubscriptions().values().iterator().next(); assertTrue(subStats.msgRateOut > 0); @@ -812,7 +812,7 @@ public void testMsgDropStat() throws Exception { NonPersistentTopic topic = (NonPersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get(); pulsar.getBrokerService().updateRates(); - NonPersistentTopicStats stats = topic.getStats(); + NonPersistentTopicStats stats = topic.getStats(false); NonPersistentPublisherStats npStats = stats.getPublishers().get(0); NonPersistentSubscriptionStats sub1Stats = stats.getSubscriptions().get("subscriber-1"); NonPersistentSubscriptionStats sub2Stats = stats.getSubscriptions().get("subscriber-2"); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageParserTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageParserTest.java index 87278a4d0745b..9fcb129a373c8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageParserTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageParserTest.java @@ -124,7 +124,7 @@ public void testWithBatches() throws Exception { producer.send("hello-" + (n - 1)); // Read through raw data - assertEquals(cursor.getNumberOfEntriesInBacklog(), 1); + assertEquals(cursor.getNumberOfEntriesInBacklog(false), 1); Entry entry = cursor.readEntriesOrWait(1).get(0); List messages = Lists.newArrayList(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java index c823e608dbe7d..c6883c63e0670 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java @@ -803,9 +803,9 @@ public void testDefaultBacklogTTL() throws Exception { topic.get().checkMessageExpiry(); - retryStrategically((test) -> subscription.getNumberOfEntriesInBacklog() == 0, 5, 200); + retryStrategically((test) -> subscription.getNumberOfEntriesInBacklog(false) == 0, 5, 200); - assertEquals(subscription.getNumberOfEntriesInBacklog(), 0); + assertEquals(subscription.getNumberOfEntriesInBacklog(false), 0); } @Test(timeOut = testTimeout) diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java index c5ec04120c022..0a004628de6b0 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java @@ -565,6 +565,8 @@ List getListInBundle(String namespace, String bundleRange) * * @param topic * topic name + * @param getPreciseBacklog + * Set to true to get precise backlog, Otherwise get imprecise backlog. * @return the topic statistics * * @throws NotAuthorizedException @@ -574,7 +576,11 @@ List getListInBundle(String namespace, String bundleRange) * @throws PulsarAdminException * Unexpected error */ - TopicStats getStats(String topic) throws PulsarAdminException; + TopicStats getStats(String topic, boolean getPreciseBacklog) throws PulsarAdminException; + + default TopicStats getStats(String topic) throws PulsarAdminException { + return getStats(topic, false); + } /** * Get the stats for the topic asynchronously. All the rates are computed over a 1 minute window and are relative @@ -582,11 +588,17 @@ List getListInBundle(String namespace, String bundleRange) * * @param topic * topic name + * @param getPreciseBacklog + * Set to true to get precise backlog, Otherwise get imprecise backlog. * * @return a future that can be used to track when the topic statistics are returned * */ - CompletableFuture getStatsAsync(String topic); + CompletableFuture getStatsAsync(String topic, boolean getPreciseBacklog); + + default CompletableFuture getStatsAsync(String topic) { + return getStatsAsync(topic, false); + } /** * Get the internal stats for the topic. @@ -716,7 +728,11 @@ List getListInBundle(String namespace, String bundleRange) * Unexpected error * */ - PartitionedTopicStats getPartitionedStats(String topic, boolean perPartition) throws PulsarAdminException; + PartitionedTopicStats getPartitionedStats(String topic, boolean perPartition, boolean getPreciseBacklog) throws PulsarAdminException; + + default PartitionedTopicStats getPartitionedStats(String topic, boolean perPartition) throws PulsarAdminException { + return getPartitionedStats(topic, perPartition, false); + } /** * Get the stats for the partitioned topic asynchronously @@ -727,7 +743,11 @@ List getListInBundle(String namespace, String bundleRange) * flag to get stats per partition * @return a future that can be used to track when the partitioned topic statistics are returned */ - CompletableFuture getPartitionedStatsAsync(String topic, boolean perPartition); + CompletableFuture getPartitionedStatsAsync(String topic, boolean perPartition, boolean getPreciseBacklog); + + default CompletableFuture getPartitionedStatsAsync(String topic, boolean perPartition) { + return getPartitionedStatsAsync(topic, perPartition, false); + } /** * Get the stats for the partitioned topic diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java index 75c3d59a55d7c..c61359a74f0b5 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java @@ -437,9 +437,9 @@ public void failed(Throwable throwable) { } @Override - public TopicStats getStats(String topic) throws PulsarAdminException { + public TopicStats getStats(String topic, boolean getPreciseBacklog) throws PulsarAdminException { try { - return getStatsAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS); + return getStatsAsync(topic, getPreciseBacklog).get(this.readTimeoutMs, TimeUnit.MILLISECONDS); } catch (ExecutionException e) { throw (PulsarAdminException) e.getCause(); } catch (InterruptedException e) { @@ -451,9 +451,9 @@ public TopicStats getStats(String topic) throws PulsarAdminException { } @Override - public CompletableFuture getStatsAsync(String topic) { + public CompletableFuture getStatsAsync(String topic, boolean getPreciseBacklog) { TopicName tn = validateTopic(topic); - WebTarget path = topicPath(tn, "stats"); + WebTarget path = topicPath(tn, "stats").queryParam("getPreciseBacklog", getPreciseBacklog); final CompletableFuture future = new CompletableFuture<>(); asyncGetRequest(path, new InvocationCallback() { @@ -542,10 +542,10 @@ public void failed(Throwable throwable) { } @Override - public PartitionedTopicStats getPartitionedStats(String topic, boolean perPartition) + public PartitionedTopicStats getPartitionedStats(String topic, boolean perPartition, boolean getPreciseBacklog) throws PulsarAdminException { try { - return getPartitionedStatsAsync(topic, perPartition).get(this.readTimeoutMs, TimeUnit.MILLISECONDS); + return getPartitionedStatsAsync(topic, perPartition, getPreciseBacklog).get(this.readTimeoutMs, TimeUnit.MILLISECONDS); } catch (ExecutionException e) { throw (PulsarAdminException) e.getCause(); } catch (InterruptedException e) { @@ -558,10 +558,10 @@ public PartitionedTopicStats getPartitionedStats(String topic, boolean perPartit @Override public CompletableFuture getPartitionedStatsAsync(String topic, - boolean perPartition) { + boolean perPartition, boolean getPreciseBacklog) { TopicName tn = validateTopic(topic); WebTarget path = topicPath(tn, "partitioned-stats"); - path = path.queryParam("perPartition", perPartition); + path = path.queryParam("perPartition", perPartition).queryParam("getPreciseBacklog", getPreciseBacklog); final CompletableFuture future = new CompletableFuture<>(); asyncGetRequest(path, new InvocationCallback() { diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java index 371f752138a3e..283c5fe7c12ac 100644 --- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java @@ -642,7 +642,7 @@ void topics() throws Exception { verify(mockTopics).deleteSubscription("persistent://myprop/clust/ns1/ds1", "sub1"); cmdTopics.run(split("stats persistent://myprop/clust/ns1/ds1")); - verify(mockTopics).getStats("persistent://myprop/clust/ns1/ds1"); + verify(mockTopics).getStats("persistent://myprop/clust/ns1/ds1", false); cmdTopics.run(split("stats-internal persistent://myprop/clust/ns1/ds1")); verify(mockTopics).getInternalStats("persistent://myprop/clust/ns1/ds1"); @@ -651,7 +651,7 @@ void topics() throws Exception { verify(mockTopics).getInternalInfo("persistent://myprop/clust/ns1/ds1"); cmdTopics.run(split("partitioned-stats persistent://myprop/clust/ns1/ds1 --per-partition")); - verify(mockTopics).getPartitionedStats("persistent://myprop/clust/ns1/ds1", true); + verify(mockTopics).getPartitionedStats("persistent://myprop/clust/ns1/ds1", true, false); cmdTopics.run(split("clear-backlog persistent://myprop/clust/ns1/ds1 -s sub1")); verify(mockTopics).skipAllMessages("persistent://myprop/clust/ns1/ds1", "sub1"); diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java index af5aa02e7a2a3..f5954cf38bac4 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java @@ -370,10 +370,14 @@ private class GetStats extends CliCommand { @Parameter(description = "persistent://tenant/namespace/topic\n", required = true) private java.util.List params; + @Parameter(names = { "-gpb", + "--get-precise-backlog" }, description = "Set true to get precise backlog") + private boolean getPreciseBacklog = false; + @Override void run() throws PulsarAdminException { String topic = validateTopicName(params); - print(topics.getStats(topic)); + print(topics.getStats(topic, getPreciseBacklog)); } } @@ -412,10 +416,14 @@ private class GetPartitionedStats extends CliCommand { @Parameter(names = "--per-partition", description = "Get per partition stats") private boolean perPartition = false; + @Parameter(names = { "-gpb", + "--get-precise-backlog" }, description = "Set true to get precise backlog") + private boolean getPreciseBacklog = false; + @Override void run() throws Exception { String topic = validateTopicName(params); - print(topics.getPartitionedStats(topic, perPartition)); + print(topics.getPartitionedStats(topic, perPartition, getPreciseBacklog)); } } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java index a4c299493be79..7064883a04c2a 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java @@ -40,6 +40,9 @@ public class SubscriptionStats { /** Number of messages in the subscription backlog. */ public long msgBacklog; + /** Number of messages in the subscription backlog that do not contain the delay messages. */ + public long msgBacklogNoDelayed; + /** Flag to verify if subscription is blocked due to reaching threshold of unacked messages. */ public boolean blockedSubscriptionOnUnackedMsgs; @@ -85,6 +88,7 @@ public void reset() { msgThroughputOut = 0; msgRateRedeliver = 0; msgBacklog = 0; + msgBacklogNoDelayed = 0; unackedMessages = 0; msgRateExpired = 0; lastExpireTimestamp = 0L; @@ -99,6 +103,7 @@ public SubscriptionStats add(SubscriptionStats stats) { this.msgThroughputOut += stats.msgThroughputOut; this.msgRateRedeliver += stats.msgRateRedeliver; this.msgBacklog += stats.msgBacklog; + this.msgBacklogNoDelayed += stats.msgBacklogNoDelayed; this.unackedMessages += stats.unackedMessages; this.msgRateExpired += stats.msgRateExpired; this.isReplicated |= stats.isReplicated; diff --git a/site2/docs/admin-api-non-partitioned-topics.md b/site2/docs/admin-api-non-partitioned-topics.md index 61f7409b47938..a8cfe0fc8d183 100644 --- a/site2/docs/admin-api-non-partitioned-topics.md +++ b/site2/docs/admin-api-non-partitioned-topics.md @@ -101,3 +101,59 @@ persistent://tenant/namespace/topic2 ```java admin.topics().getList(namespace); ``` + +### Stats + +It shows current statistics of a given topic. Here's an example payload: + +The following stats are available: + +|Stat|Description| +|----|-----------| +|msgRateIn|The sum of all local and replication publishers’ publish rates in messages per second| +|msgThroughputIn|Same as msgRateIn but in bytes per second instead of messages per second| +|msgRateOut|The sum of all local and replication consumers’ dispatch rates in messages per second| +|msgThroughputOut|Same as msgRateOut but in bytes per second instead of messages per second| +|averageMsgSize|Average message size, in bytes, from this publisher within the last interval| +|storageSize|The sum of the ledgers’ storage size for this topic| +|publishers|The list of all local publishers into the topic. There can be anywhere from zero to thousands.| +|producerId|Internal identifier for this producer on this topic| +|producerName|Internal identifier for this producer, generated by the client library| +|address|IP address and source port for the connection of this producer| +|connectedSince|Timestamp this producer was created or last reconnected| +|subscriptions|The list of all local subscriptions to the topic| +|my-subscription|The name of this subscription (client defined)| +|msgBacklog|The count of messages in backlog for this subscription| +|msgBacklogNoDelayed|The count of messages in backlog without delayed messages for this subscription| +|type|This subscription type| +|msgRateExpired|The rate at which messages were discarded instead of dispatched from this subscription due to TTL| +|consumers|The list of connected consumers for this subscription| +|consumerName|Internal identifier for this consumer, generated by the client library| +|availablePermits|The number of messages this consumer has space for in the client library’s listen queue. A value of 0 means the client library’s queue is full and receive() isn’t being called. A nonzero value means this consumer is ready to be dispatched messages.| +|replication|This section gives the stats for cross-colo replication of this topic| +|replicationBacklog|The outbound replication backlog in messages| +|connected|Whether the outbound replicator is connected| +|replicationDelayInSeconds|How long the oldest message has been waiting to be sent through the connection, if connected is true| +|inboundConnection|The IP and port of the broker in the remote cluster’s publisher connection to this broker| +|inboundConnectedSince|The TCP connection being used to publish messages to the remote cluster. If there are no local publishers connected, this connection is automatically closed after a minute.| + +#### pulsar-admin + +The stats for the topic and its connected producers and consumers can be fetched by using the +[`stats`](reference-pulsar-admin.md#stats) command, specifying the topic by name: + +```shell +$ pulsar-admin topics stats \ + persistent://test-tenant/namespace/topic \ + --get-precise-backlog +``` + +#### REST API + +{@inject: endpoint|GET|/admin/v2/persistent/:tenant/:namespace/:topic/stats|operation/getStats} + +#### Java + +```java +admin.topics().getStats(persistentTopic, false /* is precise backlog */); +``` diff --git a/site2/docs/admin-api-partitioned-topics.md b/site2/docs/admin-api-partitioned-topics.md index a6507e073b759..d413425c282f1 100644 --- a/site2/docs/admin-api-partitioned-topics.md +++ b/site2/docs/admin-api-partitioned-topics.md @@ -254,6 +254,7 @@ The following stats are available: |subscriptions|The list of all local subscriptions to the topic| |my-subscription|The name of this subscription (client defined)| |msgBacklog|The count of messages in backlog for this subscription| +|msgBacklogNoDelayed|The count of messages in backlog without delayed messages for this subscription| |type|This subscription type| |msgRateExpired|The rate at which messages were discarded instead of dispatched from this subscription due to TTL| |consumers|The list of connected consumers for this subscription| @@ -274,7 +275,7 @@ The stats for the partitioned topic and its connected producers and consumers ca ```shell $ pulsar-admin topics partitioned-stats \ persistent://test-tenant/namespace/topic \ - --per-partition + --per-partition ``` #### REST API @@ -284,7 +285,7 @@ $ pulsar-admin topics partitioned-stats \ #### Java ```java -admin.persistentTopics().getStats(persistentTopic); +admin.topics().getPartitionedStats(persistentTopic, true /* per partition */, false /* is precise backlog */); ``` ### Internal stats diff --git a/site2/docs/reference-configuration.md b/site2/docs/reference-configuration.md index d72778a4d8097..cdfe0d419a70d 100644 --- a/site2/docs/reference-configuration.md +++ b/site2/docs/reference-configuration.md @@ -165,6 +165,7 @@ Pulsar brokers are responsible for handling incoming messages from producers, di |brokerClientAuthenticationPlugin| Authentication settings of the broker itself. Used when the broker connects to other brokers, either in same or other clusters || |brokerClientAuthenticationParameters||| |athenzDomainNames| Supported Athenz provider domain names(comma separated) for authentication || +|exposePreciseBacklogInPrometheus| Enable expose the precise backlog stats, set false to use published counter and consumed counter to calculate, this would be more efficient but may be inaccurate. |false| |bookkeeperClientAuthenticationPlugin| Authentication plugin to use when connecting to bookies || |bookkeeperClientAuthenticationParametersName| BookKeeper auth plugin implementatation specifics parameters name and values || |bookkeeperClientAuthenticationParameters||| @@ -353,6 +354,7 @@ The [`pulsar-client`](reference-cli-tools.md#pulsar-client) CLI tool can be used |brokerClientAuthenticationPlugin| The authentication settings of the broker itself. Used when the broker connects to other brokers either in the same cluster or from other clusters. || |brokerClientAuthenticationParameters| The parameters that go along with the plugin specified using brokerClientAuthenticationPlugin. || |athenzDomainNames| Supported Athenz authentication provider domain names as a comma-separated list. || +|exposePreciseBacklogInPrometheus| Enable expose the precise backlog stats, set false to use published counter and consumed counter to calculate, this would be more efficient but may be inaccurate. |false| |bookkeeperClientAuthenticationPlugin| Authentication plugin to be used when connecting to bookies (BookKeeper servers). || |bookkeeperClientAuthenticationParametersName| BookKeeper authentication plugin implementation parameters and values. || |bookkeeperClientAuthenticationParameters| Parameters associated with the bookkeeperClientAuthenticationParametersName ||