Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable get precise backlog and backlog without delayed messages. #6310

Merged
merged 6 commits into from
Feb 16, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -765,6 +765,11 @@ exposePublisherStats=true
statsUpdateFrequencyInSecs=60
statsUpdateInitialDelayInSecs=60

# Enable expose the precise backlog metrics.
# Set false to use published counter and consumed counter to calculate, this would be more efficient but may be inaccurate.
# Default is false.
exposePreciseBacklogEnabled=false

### --- Schema storage --- ###
# The schema storage implementation used by this broker
schemaRegistryStorageClassName=org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorageFactory
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,9 +193,10 @@ void asyncGetNthEntry(int n, IndividualDeletedEntries deletedEntries, ReadEntryC
*
* <p/>This method has linear time complexity on the number of ledgers included in the managed ledger.
*
* @param isPrecise set to true to get precise backlog count
* @return the number of entries
*/
long getNumberOfEntriesInBacklog();
long getNumberOfEntriesInBacklog(boolean isPrecise);

/**
* This signals that the reader is done with all the entries up to "position" (included). This can potentially
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -734,12 +734,16 @@ public long getEstimatedSizeSinceMarkDeletePosition() {
}

@Override
public long getNumberOfEntriesInBacklog() {
public long getNumberOfEntriesInBacklog(boolean isPrecise) {
if (log.isDebugEnabled()) {
log.debug("[{}] Consumer {} cursor ml-entries: {} -- deleted-counter: {} other counters: mdPos {} rdPos {}",
ledger.getName(), name, ManagedLedgerImpl.ENTRIES_ADDED_COUNTER_UPDATER.get(ledger),
messagesConsumedCounter, markDeletePosition, readPosition);
}
if (isPrecise) {
return getNumberOfEntries(Range.closed(markDeletePosition, ledger.getLastPosition())) - 1;
}

long backlog = ManagedLedgerImpl.ENTRIES_ADDED_COUNTER_UPDATER.get(ledger) - messagesConsumedCounter;
if (backlog < 0) {
// In some case the counters get incorrect values, fall back to the precise backlog count
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static org.apache.bookkeeper.mledger.util.Errors.isNoSuchLedgerExistsException;
import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.BoundType;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
Expand Down Expand Up @@ -142,6 +143,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
private final ManagedCursorContainer activeCursors = new ManagedCursorContainer();

// Ever increasing counter of entries added
@VisibleForTesting
static final AtomicLongFieldUpdater<ManagedLedgerImpl> ENTRIES_ADDED_COUNTER_UPDATER = AtomicLongFieldUpdater
.newUpdater(ManagedLedgerImpl.class, "entriesAddedCounter");
@SuppressWarnings("unused")
Expand Down Expand Up @@ -3176,6 +3178,11 @@ public long getOffloadedSize() {
return offloadedSize;
}

@VisibleForTesting
public void setEntriesAddedCounter(long count) {
ENTRIES_ADDED_COUNTER_UPDATER.set(this, count);
}

private static final Logger log = LoggerFactory.getLogger(ManagedLedgerImpl.class);

}
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ public long getNumberOfMessagesInBacklog() {
long count = 0;

for (ManagedCursor cursor : managedLedger.getCursors()) {
count += cursor.getNumberOfEntriesInBacklog();
count += cursor.getNumberOfEntriesInBacklog(false);
}

return count;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public long getNumberOfEntries() {
}

@Override
public long getNumberOfEntriesInBacklog() {
public long getNumberOfEntriesInBacklog(boolean isPrecise) {
return 0;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,24 +51,24 @@ void testMultiPositionDelete() throws Exception {
Position p7 = ledger.addEntry("dummy-entry-7".getBytes(Encoding));

assertEquals(c1.getNumberOfEntries(), 7);
assertEquals(c1.getNumberOfEntriesInBacklog(), 7);
assertEquals(c1.getNumberOfEntriesInBacklog(false), 7);

c1.delete(Lists.newArrayList(p2, p3, p5, p7));

assertEquals(c1.getNumberOfEntries(), 3);
assertEquals(c1.getNumberOfEntriesInBacklog(), 3);
assertEquals(c1.getNumberOfEntriesInBacklog(false), 3);
assertEquals(c1.getMarkDeletedPosition(), p0);

c1.delete(Lists.newArrayList(p1));

assertEquals(c1.getNumberOfEntries(), 2);
assertEquals(c1.getNumberOfEntriesInBacklog(), 2);
assertEquals(c1.getNumberOfEntriesInBacklog(false), 2);
assertEquals(c1.getMarkDeletedPosition(), p3);

c1.delete(Lists.newArrayList(p4, p6, p7));

assertEquals(c1.getNumberOfEntries(), 0);
assertEquals(c1.getNumberOfEntriesInBacklog(), 0);
assertEquals(c1.getNumberOfEntriesInBacklog(false), 0);
assertEquals(c1.getMarkDeletedPosition(), p7);
}

Expand Down
Loading