-
Notifications
You must be signed in to change notification settings - Fork 513
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support Tap for SMB writes (addresses #5080) #5144
Conversation
@@ -600,7 +600,7 @@ public abstract static class Write<K1, K2, V> extends PTransform<PCollection<V>, | |||
|
|||
abstract int getSorterMemoryMb(); | |||
|
|||
abstract FileOperations<V> getFileOperations(); | |||
public abstract FileOperations<V> getFileOperations(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could get around the signature change by adding a new method to SortedBucketIO.Write like:
public Iterator<T> tapBucketFile(ResourceId resourceId) {
return getFileOperations().open(resourceId);
}
but this seems overtly hacky to me - plus, we re-create a FileOperations instance for every bucket/shard combo
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tbh it might be simplest to just make all these getters public 🤷♀️
val bucketFiles = self.context | ||
.wrap( | ||
writeResult | ||
.get(new TupleTag("WrittenFiles")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
||
ClosedTap( | ||
bucketFiles.underlying | ||
.flatMap(kv => fileOps.iterator(kv.getValue).asScala) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could keep the BucketShardId
so the tap returns a mapping from Bucket ID --> records in that bucket - although we'd have to also find a way to implement this in test context (probably by making another SortedBucketIO.Write#getBucketMetadata
public too and mapping the bucketing function over each elements)
Codecov ReportAttention:
Additional details and impacted files@@ Coverage Diff @@
## main #5144 +/- ##
==========================================
+ Coverage 63.42% 63.55% +0.13%
==========================================
Files 291 291
Lines 10832 10845 +13
Branches 755 758 +3
==========================================
+ Hits 6870 6893 +23
+ Misses 3962 3952 -10 ☔ View full report in Codecov by Sentry. |
fc33ef3
to
c87d41a
Compare
eaf197c
to
95e0005
Compare
scio-smb/src/main/scala/com/spotify/scio/smb/syntax/SortMergeBucketSCollectionSyntax.scala
Outdated
Show resolved
Hide resolved
scio-smb/src/main/scala/com/spotify/scio/smb/syntax/SortMergeBucketSCollectionSyntax.scala
Show resolved
Hide resolved
scio-smb/src/main/scala/com/spotify/scio/smb/syntax/SortMergeBucketSCollectionSyntax.scala
Show resolved
Hide resolved
sc.applyInternal(t) | ||
ClosedTap[Nothing](EmptyTap) | ||
|
||
val writeResult = sc.applyInternal(t) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Introduction of duplicated transform
addresses #5080
Simple implementation of Tap for SMB writes
todo: