From da709a6200d9faabe60defc4a2f32ff2f08a9b98 Mon Sep 17 00:00:00 2001 From: Varun Bansal Date: Mon, 11 Mar 2024 14:42:44 +0530 Subject: [PATCH 1/5] handle unexpected exception on success callback of translog upload Signed-off-by: Varun Bansal --- .../index/translog/transfer/FileTransferTracker.java | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/FileTransferTracker.java b/server/src/main/java/org/opensearch/index/translog/transfer/FileTransferTracker.java index 9c2304f809f46..cb723e743f2eb 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/FileTransferTracker.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/FileTransferTracker.java @@ -64,10 +64,14 @@ long getTotalBytesToUpload() { @Override public void onSuccess(TransferFileSnapshot fileSnapshot) { - long durationInMillis = (System.nanoTime() - fileTransferStartTime) / 1_000_000L; - remoteTranslogTransferTracker.addUploadTimeInMillis(durationInMillis); - remoteTranslogTransferTracker.addUploadBytesSucceeded(bytesForTlogCkpFileToUpload.get(fileSnapshot.getName())); - add(fileSnapshot.getName(), TransferState.SUCCESS); + try { + long durationInMillis = (System.nanoTime() - fileTransferStartTime) / 1_000_000L; + remoteTranslogTransferTracker.addUploadTimeInMillis(durationInMillis); + remoteTranslogTransferTracker.addUploadBytesSucceeded(bytesForTlogCkpFileToUpload.get(fileSnapshot.getName())); + add(fileSnapshot.getName(), TransferState.SUCCESS); + } catch (Exception ex) { + throw new FileTransferException(fileSnapshot, ex); + } } void add(String file, boolean success) { From dde1609a0d21194e15bfcef40a9a8e94021c97a9 Mon Sep 17 00:00:00 2001 From: Varun Bansal Date: Mon, 11 Mar 2024 15:40:20 +0530 Subject: [PATCH 2/5] better exception handling Signed-off-by: Varun Bansal --- .../index/translog/transfer/FileTransferTracker.java | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/FileTransferTracker.java b/server/src/main/java/org/opensearch/index/translog/transfer/FileTransferTracker.java index cb723e743f2eb..2087a69baef00 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/FileTransferTracker.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/FileTransferTracker.java @@ -8,6 +8,8 @@ package org.opensearch.index.translog.transfer; +import org.apache.logging.log4j.Logger; +import org.opensearch.common.logging.Loggers; import org.opensearch.core.index.shard.ShardId; import org.opensearch.index.remote.RemoteTranslogTransferTracker; import org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot; @@ -33,11 +35,13 @@ public class FileTransferTracker implements FileTransferListener { private final RemoteTranslogTransferTracker remoteTranslogTransferTracker; private Map bytesForTlogCkpFileToUpload; private long fileTransferStartTime = -1; + private final Logger logger; public FileTransferTracker(ShardId shardId, RemoteTranslogTransferTracker remoteTranslogTransferTracker) { this.shardId = shardId; this.fileTransferTracker = new ConcurrentHashMap<>(); this.remoteTranslogTransferTracker = remoteTranslogTransferTracker; + this.logger = Loggers.getLogger(getClass(), shardId); } void recordFileTransferStartTime(long uploadStartTime) { @@ -68,8 +72,14 @@ public void onSuccess(TransferFileSnapshot fileSnapshot) { long durationInMillis = (System.nanoTime() - fileTransferStartTime) / 1_000_000L; remoteTranslogTransferTracker.addUploadTimeInMillis(durationInMillis); remoteTranslogTransferTracker.addUploadBytesSucceeded(bytesForTlogCkpFileToUpload.get(fileSnapshot.getName())); - add(fileSnapshot.getName(), TransferState.SUCCESS); } catch (Exception ex) { + logger.error("Failure to update translog upload success stats", ex); + } + + try { + // add() gets a dedicated try/catch as its on the critical path + add(fileSnapshot.getName(), TransferState.SUCCESS); + } catch (IllegalStateException ex) { throw new FileTransferException(fileSnapshot, ex); } } From 1406e1668b4160c19d3e5b4d4fd85e1908dbd466 Mon Sep 17 00:00:00 2001 From: Varun Bansal Date: Thu, 14 Mar 2024 14:33:02 +0530 Subject: [PATCH 3/5] add unit tests Signed-off-by: Varun Bansal --- .../transfer/FileTransferTracker.java | 7 +---- .../transfer/FileTransferTrackerTests.java | 30 +++++++++++++++++++ 2 files changed, 31 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/FileTransferTracker.java b/server/src/main/java/org/opensearch/index/translog/transfer/FileTransferTracker.java index 2087a69baef00..f3c17cdaa0054 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/FileTransferTracker.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/FileTransferTracker.java @@ -76,12 +76,7 @@ public void onSuccess(TransferFileSnapshot fileSnapshot) { logger.error("Failure to update translog upload success stats", ex); } - try { - // add() gets a dedicated try/catch as its on the critical path - add(fileSnapshot.getName(), TransferState.SUCCESS); - } catch (IllegalStateException ex) { - throw new FileTransferException(fileSnapshot, ex); - } + add(fileSnapshot.getName(), TransferState.SUCCESS); } void add(String file, boolean success) { diff --git a/server/src/test/java/org/opensearch/index/translog/transfer/FileTransferTrackerTests.java b/server/src/test/java/org/opensearch/index/translog/transfer/FileTransferTrackerTests.java index b96ada1f6bbff..9665b8e6fd646 100644 --- a/server/src/test/java/org/opensearch/index/translog/transfer/FileTransferTrackerTests.java +++ b/server/src/test/java/org/opensearch/index/translog/transfer/FileTransferTrackerTests.java @@ -20,6 +20,10 @@ import java.util.List; import java.util.Set; +import static org.mockito.Mockito.anyLong; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.spy; + public class FileTransferTrackerTests extends OpenSearchTestCase { protected final ShardId shardId = new ShardId("index", "_na_", 1); @@ -94,6 +98,32 @@ public void testOnFailure() throws IOException { } } + public void testOnSuccessStatsFailure() throws IOException { + RemoteTranslogTransferTracker localRemoteTranslogTransferTracker = spy(remoteTranslogTransferTracker); + doAnswer((count) -> { throw new NullPointerException("Error while updating stats"); }).when(localRemoteTranslogTransferTracker) + .addUploadBytesSucceeded(anyLong()); + + FileTransferTracker localFileTransferTracker = new FileTransferTracker(shardId, localRemoteTranslogTransferTracker); + + Path testFile = createTempFile(); + int fileSize = 128; + Files.write(testFile, randomByteArrayOfLength(fileSize), StandardOpenOption.APPEND); + try ( + FileSnapshot.TransferFileSnapshot transferFileSnapshot = new FileSnapshot.TransferFileSnapshot( + testFile, + randomNonNegativeLong(), + null + ); + ) { + Set toUpload = new HashSet<>(2); + toUpload.add(transferFileSnapshot); + localFileTransferTracker.recordBytesForFiles(toUpload); + localRemoteTranslogTransferTracker.addUploadBytesStarted(fileSize); + localFileTransferTracker.onSuccess(transferFileSnapshot); + assertEquals(localFileTransferTracker.allUploaded().size(), 1); + } + } + public void testUploaded() throws IOException { Path testFile = createTempFile(); int fileSize = 128; From 87ede1b8cbe1d9ff95e0418f513bb4affe1184cd Mon Sep 17 00:00:00 2001 From: Varun Bansal Date: Thu, 14 Mar 2024 16:42:45 +0530 Subject: [PATCH 4/5] Empty Commit Signed-off-by: Varun Bansal From f5114751cf905b7332eda5b652ac11c1c5bd2b5e Mon Sep 17 00:00:00 2001 From: Varun Bansal Date: Thu, 14 Mar 2024 18:55:40 +0530 Subject: [PATCH 5/5] Empty Commit Signed-off-by: Varun Bansal