-
Notifications
You must be signed in to change notification settings - Fork 513
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Safe coder unwrap #4887
Safe coder unwrap #4887
Conversation
case c: RecordCoder[(K, V)] => | ||
val ac = coderElement[K](c)(0) | ||
val bc = coderElement[V](c)(1) | ||
(ac, bc) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We'll not support users who force non tuple coders for (K, V)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed to accept any StructuredCoder
of size 2. even if wrapped in Ref
or LazyCoder
, this returns the element coders
// Needed because scalac is an idiot | ||
implicit def btCollCoder: Coder[BTRow] = | ||
Coder.gen[(ByteString, Iterable[Mutation])] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like not. And Coder.tuple2Coder
should have been used
Codecov Report
@@ Coverage Diff @@
## main #4887 +/- ##
==========================================
- Coverage 62.84% 62.83% -0.02%
==========================================
Files 282 282
Lines 10573 10595 +22
Branches 795 775 -20
==========================================
+ Hits 6645 6657 +12
- Misses 3928 3938 +10
|
case c: beam.NullableCoder[T] => c.getValueCoder | ||
case _ => coder | ||
case c: MaterializedCoder[T] => unwrap(options, c.bcoder) | ||
case c: beam.NullableCoder[T] if options.nullableCoders => c.getValueCoder |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what will happen in the case that a user doesn't have the --nullableCoders
option set, but still uses NullableCoder in their pipeline? (i.e. they have a step like .map(x => ...)(Coder.beam(NullableCoder.of(...))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The null coder will be propagated. This is required by countByValue
for example, otherwise we would fail on null
values.
|
||
/** Get key-value coders from a `SideInput[Map[K, Iterable[V]]]`. */ | ||
def getMultiMapKV[K, V](si: SideInput[Map[K, Iterable[V]]]): (Coder[K], Coder[V]) = { | ||
val coder = si.view.getPCollection.getCoder |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was unsafe. The PCollection
behind a view has no guarantee to be of the same type (int contains a transform function). Drop this helper and expect a coder when we pass a SideInput
.asMultiMapSideInput | ||
.map(_.map { case (k, vs) => k -> vs.map(_.toInt) }) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This transformation was failing, because we tried to use a Coder[String]
to encode the produced Int
value
Unwrapping was done on
Ref
andLazyCoder
. This was required to extract values from tuple coders whenRecordCoder
was used (materialization wraps into aRef
/LazyCoder
)IMHO, we should expect
TupleCoder
in those cases and fail if aRecordCoder
was given.We could then only unwrap
MaterlializedCoder
andNullableCoder
(double check the option is enabled). implicit scio coders will get stable.Probably a better fix than #4886