Skip to content

Commit

Permalink
Add UTs
Browse files Browse the repository at this point in the history
Signed-off-by: Ashish Singh <ssashish@amazon.com>
  • Loading branch information
ashking94 committed Sep 25, 2024
1 parent 27b6828 commit d32dea3
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1071,9 +1071,6 @@ public void testCloseIndexWithNoOpSyncAndFlushForAsyncTranslog() throws Interrup
}).start();
// Wait for atleast one doc to be ingested.
latch.await();
// Sleep for some time for the next doc to be present in lucene buffer. If flush happens first before the doc #2
// gets indexed, then it goes into the happy case where the close index happens succefully.
Thread.sleep(1000);
// Flush so that the subsequent sync or flushes are no-op.
flush(INDEX_NAME);
// Closing the index involves translog.sync and shard.flush which are now no-op.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1801,6 +1801,83 @@ public void testDownloadWithEmptyTranslogOnlyInLocal() throws IOException {
assertArrayEquals(filesPostFirstDownload, filesPostSecondDownload);
}

public void testSyncWithGlobalCheckpointUpdate() throws IOException {
ArrayList<Translog.Operation> ops = new ArrayList<>();
addToTranslogAndListAndUpload(translog, ops, new Translog.Index("1", 0, primaryTerm.get(), new byte[] { 1 }));
addToTranslogAndListAndUpload(translog, ops, new Translog.Index("2", 1, primaryTerm.get(), new byte[] { 2 }));

// Set a global checkpoint
long initialGlobalCheckpoint = 1L;
globalCheckpoint.set(initialGlobalCheckpoint);

// Sync the translog
translog.sync();

// Verify that the globalCheckpointSynced is updated
assertEquals(initialGlobalCheckpoint, ((RemoteFsTranslog) translog).getLastSyncedCheckpoint().globalCheckpoint);

// Update global checkpoint
long newGlobalCheckpoint = 2L;
globalCheckpoint.set(newGlobalCheckpoint);

// Add a new operation and sync
addToTranslogAndListAndUpload(translog, ops, new Translog.Index("3", 2, primaryTerm.get(), new byte[] { 3 }));
translog.sync();

// Verify that the globalCheckpointSynced is updated to the new value
assertEquals(newGlobalCheckpoint, ((RemoteFsTranslog) translog).getLastSyncedCheckpoint().globalCheckpoint);
}

public void testSyncNeededWithGlobalCheckpointUpdate() throws IOException {
ArrayList<Translog.Operation> ops = new ArrayList<>();
addToTranslogAndListAndUpload(translog, ops, new Translog.Index("1", 0, primaryTerm.get(), new byte[] { 1 }));

// Set initial global checkpoint
long initialGlobalCheckpoint = 0L;
globalCheckpoint.set(initialGlobalCheckpoint);

// Sync the translog
translog.sync();

// Verify that sync is not needed
assertFalse(translog.syncNeeded());

// Update global checkpoint
long newGlobalCheckpoint = 1L;
globalCheckpoint.set(newGlobalCheckpoint);

// Verify that sync is now needed due to global checkpoint update
assertTrue(translog.syncNeeded());

// Sync again
translog.sync();

// Verify that sync is not needed after syncing
assertFalse(translog.syncNeeded());
}

public void testGlobalCheckpointUpdateDuringClose() throws IOException {
ArrayList<Translog.Operation> ops = new ArrayList<>();
addToTranslogAndListAndUpload(translog, ops, new Translog.Index("1", 0, primaryTerm.get(), new byte[] { 1 }));

// Set initial global checkpoint
long initialGlobalCheckpoint = 0L;
globalCheckpoint.set(initialGlobalCheckpoint);

// Sync the translog
translog.sync();

// Update global checkpoint
long newGlobalCheckpoint = 1L;
globalCheckpoint.set(newGlobalCheckpoint);

// Close the translog
translog.close();

// Verify that the last synced checkpoint includes the updated global checkpoint
assertEquals(newGlobalCheckpoint, ((RemoteFsTranslog) translog).getLastSyncedCheckpoint().globalCheckpoint);
}

public class ThrowingBlobRepository extends FsRepository {

private final Environment environment;
Expand Down

0 comments on commit d32dea3

Please sign in to comment.