Skip to content

Commit

Permalink
Make S3PinotFS listFiles return directories when non-recursive
Browse files Browse the repository at this point in the history
  • Loading branch information
dd-willgan committed Sep 25, 2024
1 parent 49896eb commit 9682eef
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.S3ClientBuilder;
import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.CommonPrefix;
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.CompletedMultipartUpload;
import software.amazon.awssdk.services.s3.model.CompletedPart;
Expand Down Expand Up @@ -501,11 +502,11 @@ public String[] listFiles(URI fileUri, boolean recursive)
throws IOException {
ImmutableList.Builder<String> builder = ImmutableList.builder();
visitFiles(fileUri, recursive, s3Object -> {
// TODO: Looks like S3PinotFS filters out directories, inconsistent with the other implementations.
// Only add files and not directories
if (!s3Object.key().equals(fileUri.getPath()) && !s3Object.key().endsWith(DELIMITER)) {
builder.add(S3_SCHEME + fileUri.getHost() + DELIMITER + getNormalizedFileKey(s3Object));
}
}, commonPrefix -> {
builder.add(S3_SCHEME + fileUri.getHost() + DELIMITER + getNormalizedFileKey(commonPrefix));
});
String[] listedFiles = builder.build().toArray(new String[0]);
LOGGER.info("Listed {} files from URI: {}, is recursive: {}", listedFiles.length, fileUri, recursive);
Expand All @@ -524,7 +525,7 @@ public List<FileMetadata> listFilesWithMetadata(URI fileUri, boolean recursive)
.setIsDirectory(s3Object.key().endsWith(DELIMITER));
listBuilder.add(fileBuilder.build());
}
});
}, null);
ImmutableList<FileMetadata> listedFiles = listBuilder.build();
LOGGER.info("Listed {} files from URI: {}, is recursive: {}", listedFiles.size(), fileUri, recursive);
return listedFiles;
Expand All @@ -538,8 +539,13 @@ private static String getNormalizedFileKey(S3Object s3Object) {
return fileKey;
}

private void visitFiles(URI fileUri, boolean recursive, Consumer<S3Object> visitor)
throws IOException {
private static String getNormalizedFileKey(CommonPrefix commonPrefix) {
String prefix = commonPrefix.prefix();
return prefix.substring(0, prefix.length() - 1);
}

private void visitFiles(URI fileUri, boolean recursive, Consumer<S3Object> objectVisitor,
Consumer<CommonPrefix> commonPrefixVisitor) throws IOException {
try {
String continuationToken = null;
boolean isDone = false;
Expand All @@ -561,7 +567,11 @@ private void visitFiles(URI fileUri, boolean recursive, Consumer<S3Object> visit
ListObjectsV2Response listObjectsV2Response = _s3Client.listObjectsV2(listObjectsV2Request);
LOGGER.debug("Getting ListObjectsV2Response: {}", listObjectsV2Response);
List<S3Object> filesReturned = listObjectsV2Response.contents();
filesReturned.forEach(visitor);
filesReturned.forEach(objectVisitor);
if (!recursive && listObjectsV2Response.hasCommonPrefixes() && commonPrefixVisitor != null) {
List<CommonPrefix> dirsReturned = listObjectsV2Response.commonPrefixes();
dirsReturned.forEach(commonPrefixVisitor);
}
isDone = !listObjectsV2Response.isTruncated();
continuationToken = listObjectsV2Response.nextContinuationToken();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,17 +165,16 @@ public void testListFilesInFolderNonRecursive()
for (String fileName : originalFiles) {
createEmptyFile(folder, fileName);
}
// Files in sub folders should be skipped.

createEmptyFile(folder + DELIMITER + "subfolder1", "a-sub-file.txt");
createEmptyFile(folder + DELIMITER + "subfolder2", "a-sub-file.txt");

String[] actualFiles = _s3PinotFS.listFiles(URI.create(String.format(FILE_FORMAT, SCHEME, BUCKET, folder)), false);
Assert.assertEquals(actualFiles.length, originalFiles.length);
String[] expectedFiles = new String[]{"a-list-2.txt", "b-list-2.txt", "c-list-2.txt", "subfolder1", "subfolder2"};

actualFiles = Arrays.stream(actualFiles).filter(x -> x.contains("list-2")).toArray(String[]::new);
Assert.assertEquals(actualFiles.length, originalFiles.length);
String[] actualFiles = _s3PinotFS.listFiles(URI.create(String.format(FILE_FORMAT, SCHEME, BUCKET, folder)), false);
Assert.assertEquals(actualFiles.length, expectedFiles.length);

Assert.assertTrue(Arrays.equals(Arrays.stream(originalFiles)
Assert.assertTrue(Arrays.equals(Arrays.stream(expectedFiles)
.map(fileName -> String.format(FILE_FORMAT, SCHEME, BUCKET, folder + DELIMITER + fileName)).toArray(),
actualFiles));
}
Expand Down

0 comments on commit 9682eef

Please sign in to comment.