diff --git a/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java b/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java index d1129156a92..7ec6294b49b 100644 --- a/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java +++ b/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java @@ -55,6 +55,7 @@ import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.S3ClientBuilder; import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest; +import software.amazon.awssdk.services.s3.model.CommonPrefix; import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest; import software.amazon.awssdk.services.s3.model.CompletedMultipartUpload; import software.amazon.awssdk.services.s3.model.CompletedPart; @@ -501,11 +502,11 @@ public String[] listFiles(URI fileUri, boolean recursive) throws IOException { ImmutableList.Builder builder = ImmutableList.builder(); visitFiles(fileUri, recursive, s3Object -> { - // TODO: Looks like S3PinotFS filters out directories, inconsistent with the other implementations. - // Only add files and not directories if (!s3Object.key().equals(fileUri.getPath()) && !s3Object.key().endsWith(DELIMITER)) { builder.add(S3_SCHEME + fileUri.getHost() + DELIMITER + getNormalizedFileKey(s3Object)); } + }, commonPrefix -> { + builder.add(S3_SCHEME + fileUri.getHost() + DELIMITER + getNormalizedFileKey(commonPrefix)); }); String[] listedFiles = builder.build().toArray(new String[0]); LOGGER.info("Listed {} files from URI: {}, is recursive: {}", listedFiles.length, fileUri, recursive); @@ -524,7 +525,7 @@ public List listFilesWithMetadata(URI fileUri, boolean recursive) .setIsDirectory(s3Object.key().endsWith(DELIMITER)); listBuilder.add(fileBuilder.build()); } - }); + }, null); ImmutableList listedFiles = listBuilder.build(); LOGGER.info("Listed {} files from URI: {}, is recursive: {}", listedFiles.size(), fileUri, recursive); return listedFiles; @@ -538,8 +539,13 @@ private static String getNormalizedFileKey(S3Object s3Object) { return fileKey; } - private void visitFiles(URI fileUri, boolean recursive, Consumer visitor) - throws IOException { + private static String getNormalizedFileKey(CommonPrefix commonPrefix) { + String prefix = commonPrefix.prefix(); + return prefix.substring(0, prefix.length() - 1); + } + + private void visitFiles(URI fileUri, boolean recursive, Consumer objectVisitor, + Consumer commonPrefixVisitor) throws IOException { try { String continuationToken = null; boolean isDone = false; @@ -561,7 +567,11 @@ private void visitFiles(URI fileUri, boolean recursive, Consumer visit ListObjectsV2Response listObjectsV2Response = _s3Client.listObjectsV2(listObjectsV2Request); LOGGER.debug("Getting ListObjectsV2Response: {}", listObjectsV2Response); List filesReturned = listObjectsV2Response.contents(); - filesReturned.forEach(visitor); + filesReturned.forEach(objectVisitor); + if (!recursive && listObjectsV2Response.hasCommonPrefixes() && commonPrefixVisitor != null) { + List dirsReturned = listObjectsV2Response.commonPrefixes(); + dirsReturned.forEach(commonPrefixVisitor); + } isDone = !listObjectsV2Response.isTruncated(); continuationToken = listObjectsV2Response.nextContinuationToken(); } diff --git a/pinot-plugins/pinot-file-system/pinot-s3/src/test/java/org/apache/pinot/plugin/filesystem/S3PinotFSTest.java b/pinot-plugins/pinot-file-system/pinot-s3/src/test/java/org/apache/pinot/plugin/filesystem/S3PinotFSTest.java index 18ca80f0046..88770c99e57 100644 --- a/pinot-plugins/pinot-file-system/pinot-s3/src/test/java/org/apache/pinot/plugin/filesystem/S3PinotFSTest.java +++ b/pinot-plugins/pinot-file-system/pinot-s3/src/test/java/org/apache/pinot/plugin/filesystem/S3PinotFSTest.java @@ -165,17 +165,16 @@ public void testListFilesInFolderNonRecursive() for (String fileName : originalFiles) { createEmptyFile(folder, fileName); } - // Files in sub folders should be skipped. + createEmptyFile(folder + DELIMITER + "subfolder1", "a-sub-file.txt"); createEmptyFile(folder + DELIMITER + "subfolder2", "a-sub-file.txt"); - String[] actualFiles = _s3PinotFS.listFiles(URI.create(String.format(FILE_FORMAT, SCHEME, BUCKET, folder)), false); - Assert.assertEquals(actualFiles.length, originalFiles.length); + String[] expectedFiles = new String[]{"a-list-2.txt", "b-list-2.txt", "c-list-2.txt", "subfolder1", "subfolder2"}; - actualFiles = Arrays.stream(actualFiles).filter(x -> x.contains("list-2")).toArray(String[]::new); - Assert.assertEquals(actualFiles.length, originalFiles.length); + String[] actualFiles = _s3PinotFS.listFiles(URI.create(String.format(FILE_FORMAT, SCHEME, BUCKET, folder)), false); + Assert.assertEquals(actualFiles.length, expectedFiles.length); - Assert.assertTrue(Arrays.equals(Arrays.stream(originalFiles) + Assert.assertTrue(Arrays.equals(Arrays.stream(expectedFiles) .map(fileName -> String.format(FILE_FORMAT, SCHEME, BUCKET, folder + DELIMITER + fileName)).toArray(), actualFiles)); }