Skip to content

Commit

Permalink
Prevent ZK connection loss in case of huge cursor status (apache#273)
Browse files Browse the repository at this point in the history
  • Loading branch information
eolivelli committed May 20, 2024
1 parent 5f07f0c commit 6d2e494
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2779,6 +2779,8 @@ private void persistPositionMetaStore(long cursorsLedgerId, PositionImpl positio
new CursorAlreadyClosedException(name + " cursor already closed"))));
return;
}
log.info("[{}][{}] Persisting cursor metadata into metadata store (persistIndividualDeletedMessageRanges: {})",
ledger.getName(), name, persistIndividualDeletedMessageRanges);

final Stat lastCursorLedgerStat = cursorLedgerStat;

Expand All @@ -2793,7 +2795,7 @@ private void persistPositionMetaStore(long cursorsLedgerId, PositionImpl positio
info.addAllProperties(buildPropertiesMap(properties));
info.addAllCursorProperties(buildStringPropertiesMap(cursorProperties));
if (persistIndividualDeletedMessageRanges) {
info.addAllIndividualDeletedMessages(buildIndividualDeletedMessageRanges());
info.addAllIndividualDeletedMessages(buildIndividualDeletedMessageRanges(true));
if (config.isDeletionAtBatchIndexLevelEnabled()) {
info.addAllBatchedEntryDeletionIndexInfo(buildBatchEntryDeletionIndexInfoList());
}
Expand Down Expand Up @@ -3130,7 +3132,7 @@ private static List<StringProperty> buildStringPropertiesMap(Map<String, String>
return stringProperties;
}

private List<MLDataFormats.MessageRange> buildIndividualDeletedMessageRanges() {
private List<MLDataFormats.MessageRange> buildIndividualDeletedMessageRanges(boolean forMetastore) {
lock.readLock().lock();
try {
if (individualDeletedMessages.isEmpty()) {
Expand Down Expand Up @@ -3163,19 +3165,28 @@ private List<MLDataFormats.MessageRange> buildIndividualDeletedMessageRanges() {
.setUpperEndpoint(upperPosition)
.build();

acksSerializedSize.addAndGet(messageRange.getSerializedSize());
int currentSize = acksSerializedSize.addAndGet(messageRange.getSerializedSize());
rangeList.add(messageRange);

if (forMetastore && currentSize > (1024 * 1024 - 10 * 1024)) {
log.warn("[{}] [{}] buildIndividualDeletedMessageRanges, "
+ "rangeListSize {} "
+ "maxUnackedRangesToPersist {}, "
+ "reached {} bytes that is too big for the metastore",
ledger.getName(), name,
rangeList.size(),
config.getMaxUnackedRangesToPersist(), currentSize);
return false;
}

return rangeList.size() <= config.getMaxUnackedRangesToPersist();
});

this.individualDeletedMessagesSerializedSize = acksSerializedSize.get();
individualDeletedMessages.resetDirtyKeys();
log.info("[{}] [{}] buildIndividualDeletedMessageRanges, numRanges {} "
+ "individualDeletedMessagesSerializedSize {} rangeListSize {} "
log.info("[{}] [{}] buildIndividualDeletedMessageRanges, rangeListSize {} "
+ "maxUnackedRangesToPersist {}",
ledger.getName(), name, individualDeletedMessages.size(),
individualDeletedMessagesSerializedSize, rangeList.size(),
ledger.getName(), name, rangeList.size(),
config.getMaxUnackedRangesToPersist());

return rangeList;
Expand Down Expand Up @@ -3205,11 +3216,11 @@ private void scanIndividualDeletedMessageRanges(

this.individualDeletedMessagesSerializedSize = acksSerializedSize.get();
individualDeletedMessages.resetDirtyKeys();
log.info("[{}] [{}] scanIndividualDeletedMessageRanges, numRanges {} "
+ "individualDeletedMessagesSerializedSize {} rangeListSize {} "
log.info("[{}] [{}] scanIndividualDeletedMessageRanges, "
+ "rangeListSize {} "
+ "maxUnackedRangesToPersist {}",
ledger.getName(), name, individualDeletedMessages.size(),
individualDeletedMessagesSerializedSize, rangeCount.get(),
ledger.getName(), name,
rangeCount.get(),
config.getMaxUnackedRangesToPersist());
} finally {
lock.readLock().unlock();
Expand Down Expand Up @@ -3362,27 +3373,30 @@ private void writeToBookKeeperLastChunk(LedgerHandle lh,
PositionImpl position,
Runnable onFinished) {
lh.asyncAddEntry(data, (rc, lh1, entryId, ctx) -> {
if (rc == BKException.Code.OK) {
if (log.isDebugEnabled()) {
log.debug("[{}] Updated cursor {} position {} in meta-ledger {}", ledger.getName(), name, position,
lh1.getId());
}
try {
if (rc == BKException.Code.OK) {
if (log.isDebugEnabled()) {
log.debug("[{}] Updated cursor {} position {} in meta-ledger {}", ledger.getName(), name,
position,
lh1.getId());
}

rolloverLedgerIfNeeded(lh1);
rolloverLedgerIfNeeded(lh1);

mbean.persistToLedger(true);
mbean.addWriteCursorLedgerSize(data.readableBytes());
callback.operationComplete();
onFinished.run();
} else {
log.warn("[{}] Error updating cursor {} position {} in meta-ledger {}: {}", ledger.getName(), name,
position, lh1.getId(), BKException.getMessage(rc));
// If we've had a write error, the ledger will be automatically closed, we need to create a new one,
// in the meantime the mark-delete will be queued.
STATE_UPDATER.compareAndSet(ManagedCursorImpl.this, State.Open, State.NoLedger);

// Before giving up, try to persist the position in the metadata store.
persistPositionToMetaStore(mdEntry, callback);
mbean.persistToLedger(true);
mbean.addWriteCursorLedgerSize(data.readableBytes());
callback.operationComplete();
} else {
log.warn("[{}] Error updating cursor {} position {} in meta-ledger {}: {}", ledger.getName(), name,
position, lh1.getId(), BKException.getMessage(rc));
// If we've had a write error, the ledger will be automatically closed, we need to create a new one,
// in the meantime the mark-delete will be queued.
STATE_UPDATER.compareAndSet(ManagedCursorImpl.this, State.Open, State.NoLedger);

// Before giving up, try to persist the position in the metadata store.
persistPositionToMetaStore(mdEntry, callback);
}
} finally {
onFinished.run();
}
}, null);
Expand Down Expand Up @@ -3480,6 +3494,7 @@ boolean rolloverLedgerIfNeeded(LedgerHandle lh1) {
}

void persistPositionToMetaStore(MarkDeleteEntry mdEntry, final VoidCallback callback) {
log.info("[{}][{}] Persisting cursor metadata into metadata store", ledger.getName(), name);
final PositionImpl newPosition = mdEntry.newPosition;
STATE_UPDATER.compareAndSet(ManagedCursorImpl.this, State.Open, State.NoLedger);
mbean.persistToLedger(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,8 @@ public void asyncUpdateCursorInfo(String ledgerName, String cursorName, ManagedC

String path = PREFIX + ledgerName + "/" + cursorName;
byte[] content = compressCursorInfo(info);
log.info("[{}] Persisting cursor={} info with content size {} bytes to metastore",
ledgerName, cursorName, content.length);

long expectedVersion;

Expand All @@ -267,6 +269,7 @@ public void asyncUpdateCursorInfo(String ledgerName, String cursorName, ManagedC
.thenAcceptAsync(optStat -> callback.operationComplete(null, optStat), executor
.chooseThread(ledgerName))
.exceptionally(ex -> {
log.error("[{}] [{}] Failed to update cursor info", ledgerName, cursorName, ex);
executor.executeOrdered(ledgerName,
() -> callback.operationFailed(getException(ex)));
return null;
Expand Down Expand Up @@ -525,6 +528,8 @@ private byte[] compressManagedInfo(byte[] info, byte[] metadata, int metadataSer
compositeByteBuf.addComponent(true, encodeByteBuf);
byte[] dataBytes = new byte[compositeByteBuf.readableBytes()];
compositeByteBuf.readBytes(dataBytes);
log.info("Compressed cursor info, info size {}, metadata size {}, compressed size: {}",
info.length, metadata.length, dataBytes.length);
return dataBytes;
} finally {
if (metadataByteBuf != null) {
Expand Down

0 comments on commit 6d2e494

Please sign in to comment.