Skip to content

Commit

Permalink
ManagedCursor: manually serialise PositionInfo (apache#270)
Browse files Browse the repository at this point in the history
* ManagedCursor: manually serialise PositionInfo
* Add tests and save last serialized side to prevent reallocations
  • Loading branch information
eolivelli committed May 16, 2024
1 parent 98a3d25 commit 8a365d0
Show file tree
Hide file tree
Showing 4 changed files with 1,646 additions and 48 deletions.
1 change: 1 addition & 0 deletions buildtools/src/main/resources/pulsar/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
<suppress checks=".*" files=".+[\\/]generated[\\/].+\.java"/>
<suppress checks=".*" files=".+[\\/]generated-sources[\\/].+\.java"/>
<suppress checks=".*" files=".+[\\/]generated-test-sources[\\/].+\.java"/>
<suppress checks=".*" files=".+PositionInfoUtils.java"/>

<!-- suppress most all checks expect below-->
<suppress checks="^(?!.*(UnusedImports|IllegalImport)).*$" files=".*[\\/]src[\\/]test[\\/].*"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ protected LightMLDataFormats.PositionInfo initialValue() {
private final BookKeeper.DigestType digestType;

protected volatile PositionImpl markDeletePosition;
private int lastSerializedSize;

// this position is have persistent mark delete position
protected volatile PositionImpl persistentMarkDeletePosition;
Expand Down Expand Up @@ -255,7 +256,7 @@ protected LightMLDataFormats.PositionInfo initialValue() {
// active state cache in ManagedCursor. It should be in sync with the state in activeCursors in ManagedLedger.
private volatile boolean isActive = false;

class MarkDeleteEntry {
static class MarkDeleteEntry {
final PositionImpl newPosition;
final MarkDeleteCallback callback;
final Object ctx;
Expand Down Expand Up @@ -700,8 +701,14 @@ private void completeCursorRecovery(VoidCallback callback, LedgerHandle lh, byt
try {
positionInfo = PositionInfo.parseFrom(data);
} catch (InvalidProtocolBufferException e) {
callback.operationFailed(new ManagedLedgerException(e));
return;
log.error("[{}] Failed to parse position info from ledger {} for cursor {}: {}", ledger.getName(),
lh.getId(), name, e);
// Rewind to oldest entry available
positionInfo = PositionInfo
.newBuilder()
.setLedgerId(-1)
.setEntryId(-1)
.build();
}

Map<String, Long> recoveredProperties = Collections.emptyMap();
Expand Down Expand Up @@ -3176,46 +3183,42 @@ private List<MLDataFormats.MessageRange> buildIndividualDeletedMessageRanges() {
+ "individualDeletedMessagesSerializedSize {} rangeListSize {} "
+ "maxUnackedRangesToPersist {}",
ledger.getName(), name, individualDeletedMessages.size(),
individualDeletedMessagesSerializedSize, rangeList.size(), config.getMaxUnackedRangesToPersist());
individualDeletedMessagesSerializedSize, rangeList.size(),
config.getMaxUnackedRangesToPersist());

return rangeList;
} finally {
lock.readLock().unlock();
}
}

private void addIndividualDeletedMessageRanges(LightMLDataFormats.PositionInfo lpi) {
private void scanIndividualDeletedMessageRanges(
PositionInfoUtils.IndividuallyDeletedMessagesRangeConsumer consumer) {
final int maxUnackedRangesToPersist = config.getMaxUnackedRangesToPersist();
AtomicInteger acksSerializedSize = new AtomicInteger(0);
AtomicInteger rangeCount = new AtomicInteger(0);

lock.readLock().lock();
try {
if (individualDeletedMessages.isEmpty()) {
this.individualDeletedMessagesSerializedSize = 0;
return;
}

AtomicInteger acksSerializedSize = new AtomicInteger(0);
AtomicInteger rangeCount = new AtomicInteger(0);

individualDeletedMessages.forEachRawRange((lowerKey, lowerValue, upperKey, upperValue) -> {
LightMLDataFormats.MessageRange messageRange = lpi.addIndividualDeletedMessage();
messageRange.setLowerEndpoint()
.setLedgerId(lowerKey)
.setEntryId(lowerValue);
messageRange.setUpperEndpoint()
.setLedgerId(upperKey)
.setEntryId(upperValue);

acksSerializedSize.addAndGet(messageRange.getSerializedSize());

return rangeCount.incrementAndGet() <= config.getMaxUnackedRangesToPersist();
acksSerializedSize.addAndGet(16 * 4);
consumer.acceptRange(lowerKey, lowerValue, upperKey, upperValue);
return rangeCount.incrementAndGet() <= maxUnackedRangesToPersist;
});

this.individualDeletedMessagesSerializedSize = acksSerializedSize.get();
individualDeletedMessages.resetDirtyKeys();
log.info("[{}] [{}] buildIndividualDeletedMessageRanges, numRanges {} "
log.info("[{}] [{}] scanIndividualDeletedMessageRanges, numRanges {} "
+ "individualDeletedMessagesSerializedSize {} rangeListSize {} "
+ "maxUnackedRangesToPersist {}",
ledger.getName(), name, individualDeletedMessages.size(),
individualDeletedMessagesSerializedSize, rangeCount.get(), config.getMaxUnackedRangesToPersist());
individualDeletedMessagesSerializedSize, rangeCount.get(),
config.getMaxUnackedRangesToPersist());
} finally {
lock.readLock().unlock();
}
Expand All @@ -3235,9 +3238,6 @@ private List<MLDataFormats.BatchedEntryDeletionIndexInfo> buildBatchEntryDeletio
Iterator<Map.Entry<PositionImpl, BitSetRecyclable>> iterator = batchDeletedIndexes.entrySet().iterator();
while (iterator.hasNext() && result.size() < config.getMaxBatchDeletedIndexToPersist()) {
Map.Entry<PositionImpl, BitSetRecyclable> entry = iterator.next();
nestedPositionBuilder.setLedgerId(entry.getKey().getLedgerId());
nestedPositionBuilder.setEntryId(entry.getKey().getEntryId());
batchDeletedIndexInfoBuilder.setPosition(nestedPositionBuilder.build());
long[] array = entry.getValue().toLongArray();
List<Long> deleteSet = new ArrayList<>(array.length);
for (long l : array) {
Expand All @@ -3253,27 +3253,23 @@ private List<MLDataFormats.BatchedEntryDeletionIndexInfo> buildBatchEntryDeletio
}
}

private void addAllBatchedEntryDeletionIndexInfo(LightMLDataFormats.PositionInfo lpi) {
private void buildBatchEntryDeletionIndexInfoList(
PositionInfoUtils.BatchedEntryDeletionIndexInfoConsumer consumer) {
if (!config.isDeletionAtBatchIndexLevelEnabled()) {
return;
}
int maxBatchDeletedIndexToPersist = config.getMaxBatchDeletedIndexToPersist();
lock.readLock().lock();
try {
if (!config.isDeletionAtBatchIndexLevelEnabled() || batchDeletedIndexes.isEmpty()) {
if (batchDeletedIndexes.isEmpty()) {
return;
}
Iterator<Map.Entry<PositionImpl, BitSetRecyclable>> iterator = batchDeletedIndexes.entrySet().iterator();
int count = 0;
while (iterator.hasNext() && count < config.getMaxBatchDeletedIndexToPersist()) {
Iterator<Map.Entry<PositionImpl, BitSetRecyclable>> iterator = batchDeletedIndexes.entrySet().iterator();
while (iterator.hasNext() && count < maxBatchDeletedIndexToPersist) {
Map.Entry<PositionImpl, BitSetRecyclable> entry = iterator.next();

LightMLDataFormats.BatchedEntryDeletionIndexInfo batchInfo = lpi.addBatchedEntryDeletionIndexInfo();
batchInfo.setPosition()
.setLedgerId(entry.getKey().getLedgerId())
.setEntryId(entry.getKey().getEntryId());

long[] array = entry.getValue().toLongArray();
List<Long> deleteSet = new ArrayList<>(array.length);
for (long l : array) {
batchInfo.addDeleteSet(l);
}
consumer.acceptRange(entry.getKey().getLedgerId(), entry.getKey().getEntryId(), array);
count++;
}
} finally {
Expand All @@ -3292,23 +3288,17 @@ void persistPositionToLedger(final LedgerHandle lh, MarkDeleteEntry mdEntry, fin
long now = System.nanoTime();
PositionImpl position = mdEntry.newPosition;

LightMLDataFormats.PositionInfo pi = piThreadLocal.get();
pi.clear();

pi.setLedgerId(position.getLedgerId())
.setEntryId(position.getEntryId());
addIndividualDeletedMessageRanges(pi);
addAllBatchedEntryDeletionIndexInfo(pi);
addAllProperties(pi, mdEntry.properties);

if (log.isDebugEnabled()) {
log.debug("[{}] Cursor {} Appending to ledger={} position={}", ledger.getName(), name, lh.getId(),
position);
}

requireNonNull(lh);
ByteBuf rawData = toByteBuf(pi);
ByteBuf rawData = PositionInfoUtils.serializePositionInfo(mdEntry, position,
this::scanIndividualDeletedMessageRanges, this::buildBatchEntryDeletionIndexInfoList,
lastSerializedSize);
long endSer = System.nanoTime();
this.lastSerializedSize = rawData.readableBytes();

// rawData is released by compressDataIfNeeded if needed
ByteBuf data = compressDataIfNeeded(rawData, lh);
Expand Down Expand Up @@ -3379,6 +3369,7 @@ void persistPositionToLedger(final LedgerHandle lh, MarkDeleteEntry mdEntry, fin
}
}


private void writeToBookKeeperLastChunk(LedgerHandle lh,
MarkDeleteEntry mdEntry,
VoidCallback callback,
Expand Down Expand Up @@ -4019,4 +4010,5 @@ public ManagedCursor duplicateNonDurableCursor(String nonDurableCursorName) thro
}
return newNonDurableCursor;
}

}
Loading

0 comments on commit 8a365d0

Please sign in to comment.