Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify BucketedInput serialization #5270

Merged
merged 3 commits into from
Feb 23, 2024

Conversation

clairemcginty
Copy link
Contributor

Removes overrides of readObject/writeObject by simply making all class members serializable/


// Map distinct FileOperations/FileSuffixes to indices in a map, for efficient encoding of
// large BucketedInputs
final Map<KV<String, String>, Integer> fileOperationsMetadata = new HashMap<>();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is just the logic from writeObject for efficiently encoding large #s of inputs. (the logic from readObject is now in getInputs().

Copy link

codecov bot commented Feb 22, 2024

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 62.50%. Comparing base (5510e07) to head (8a6cd12).

Additional details and impacted files
@@           Coverage Diff           @@
##             main    #5270   +/-   ##
=======================================
  Coverage   62.50%   62.50%           
=======================================
  Files         301      301           
  Lines       10860    10852    -8     
  Branches      740      736    -4     
=======================================
- Hits         6788     6783    -5     
+ Misses       4072     4069    -3     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Copy link
Contributor

@RustedBones RustedBones left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is still something I don't get: How replacing the

protected Map<ResourceId, KV<String, FileOperations<V>>> inputs;

With

private transient Map<ResourceId, KV<String, FileOperations<V>>> inputs;
private final Map<Integer, KV<String, FileOperations>> fileOperationsEncoding;
private final Map<ResourceId, Integer> directoriesEncoding;

now makes the class serializable ?

@clairemcginty
Copy link
Contributor Author

There is still something I don't get: How replacing the

protected Map<ResourceId, KV<String, FileOperations<V>>> inputs;

With

private transient Map<ResourceId, KV<String, FileOperations<V>>> inputs;
private final Map<Integer, KV<String, FileOperations>> fileOperationsEncoding;
private final Map<ResourceId, Integer> directoriesEncoding;

now makes the class serializable ?

Sorry for the confusion. To clarify, BucketedInput was Serializable, checks with SerializableUtils passed, and Dataflow had no issue parsing it. However, on FlinkRunner, we were seeing this error with Flink's custom serializer:

Caused by: java.lang.RuntimeException: Deserializing the input/output formats failed: unread block data
	at org.apache.flink.runtime.jobgraph.InputOutputFormatContainer.<init>(InputOutputFormatContainer.java:69)
	at org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.initInputOutputformatContainer(InputOutputFormatVertex.java:185)
	... 17 more
Caused by: java.lang.IllegalStateException: unread block data
	at java.base/java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:3033)
	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1722)
	at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496)
	at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390)
	at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
	at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:489)
	at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:447)
	at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:539)
	at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:527)
...

My suspicion is that we weren't properly flushing something in our complex readObject/writeObject overrides. Sure enough, after removing them and delegating to the default Java object serialization, SMB reads now work on FlinkRunner.

It's a bit frustrating because I haven't been able to repro this with a unit test (even using the Flink serializer class...), so it's all been manually tested via deploys to a locally running Flink cluster.

@clairemcginty clairemcginty merged commit 3323eda into main Feb 23, 2024
11 checks passed
@clairemcginty clairemcginty deleted the simplify-bucketedinput-serialization branch February 23, 2024 14:51
kellen pushed a commit that referenced this pull request Mar 13, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants