-
Notifications
You must be signed in to change notification settings - Fork 5.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add hive session property to enable writing staging files #12386
Conversation
Maybe we shouldn't reference issue in commit. If you look at prestodb/presto#12124 , there are many commits (during code review) refer this issue :) |
I though cross-linking would be helpful for tracking purpose but didn't realize the commits are also shown in the issue. This seems an undesired feature from github :( I'm not sure if removing references now would be helpful. I think we should probably link to the issue when PR is ready to merge. But anyway there should be a kill-switch for github #featurerequest |
@shixuan-fan : The problem is every time you rebase, it is considered as a different commit :). And dropping it now still helps :) |
b60f7f2
to
fa7fcb9
Compare
I don't think dropping it helps sadly because my latest push was still reflected on the issue. |
fa7fcb9
to
a879b69
Compare
Addressed offline. It is about commit message, and I misunderstood as reference in pull request 😂 |
192612d
to
11a4085
Compare
presto-hive/src/main/java/com/facebook/presto/hive/HiveWriteUtils.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"Extract partitionUpdates deserialization logic"
Looks good.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"Support staging file commit"
generally looks good. per offline discussion, we don't want to name it staging file commit
-- staging file
is very specific to Hive. The real "purpose" is that this connector supports retryable lifespan updates -- and use staging file is the "way". I will think about it :)
@Override | ||
public boolean isWritingStagingFilesEnabled(Session session) | ||
{ | ||
CatalogMetadata catalogMetadata = getCatalogMetadataForWrite(session, session.getCatalog().orElseThrow(() -> new IllegalStateException("No catalog specified"))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I prefer to have line break to help read
CatalogMetadata catalogMetadata = getCatalogMetadataForWrite(
session,
session.getCatalog()
.orElseThrow(() -> new IllegalStateException("No catalog specified")));
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if catalog was specified explicitly?
Say
INSERT INTO prism.table
SELECT FROM ...
I would expect this method to accept catalogName
presto-main/src/main/java/com/facebook/presto/metadata/Metadata.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"Add hive session property to enable writing staging files"
Two quick comments. I haven't looked into HiveStagingFileCommitter
and tests.
My general impression is SemiTransactionalHiveMetastore
doesn't need to face StagingFileInfo
, right?
...to-hive/src/main/java/com/facebook/presto/hive/metastore/SemiTransactionalHiveMetastore.java
Outdated
Show resolved
Hide resolved
missingFileNamesBuilder.add(fileName); | ||
String targetFileName = HiveWriterFactory.computeBucketedFileName(filePrefix, i) + fileExtension; | ||
if (!targetFileNames.contains(targetFileName)) { | ||
missingFileNamesBuilder.add(new StagingFileInfo(targetFileName, targetFileName)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the purpose of this (instead of returning a List<String>
? :) -- Looks like they get called into createEmptyFile
anyway, which can take List<String>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem here is we need to merge empty partition's PartitionUpdate
with non-empty partitions's PartitionUpdate
for later processing. See computePartitionUpdatesForMissingBuckets
and the caller in finishCreateTable
and finishInsert
.
Of course we could leave createEmptyFile
and computeFileNamesForMissingBuckets
untouched.
11a4085
to
917a680
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Extract partitionUpdates deserialization logic
LGTM
@wenleix Yes I think you are right. Let me see if I could remove this dependency. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some comments
@Override | ||
public boolean isWritingStagingFilesEnabled(Session session) | ||
{ | ||
CatalogMetadata catalogMetadata = getCatalogMetadataForWrite(session, session.getCatalog().orElseThrow(() -> new IllegalStateException("No catalog specified"))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if catalog was specified explicitly?
Say
INSERT INTO prism.table
SELECT FROM ...
I would expect this method to accept catalogName
presto-main/src/main/java/com/facebook/presto/metadata/MetadataManager.java
Outdated
Show resolved
Hide resolved
presto-main/src/main/java/com/facebook/presto/metadata/MetadataManager.java
Outdated
Show resolved
Hide resolved
presto-main/src/main/java/com/facebook/presto/sql/planner/LocalExecutionPlanner.java
Outdated
Show resolved
Hide resolved
presto-main/src/main/java/com/facebook/presto/sql/planner/LocalExecutionPlanner.java
Outdated
Show resolved
Hide resolved
presto-hive/src/main/java/com/facebook/presto/hive/HiveStagingFileCommitter.java
Outdated
Show resolved
Hide resolved
presto-hive/src/main/java/com/facebook/presto/hive/HiveStagingFileCommitter.java
Outdated
Show resolved
Hide resolved
presto-hive/src/main/java/com/facebook/presto/hive/HiveStagingFileCommitter.java
Outdated
Show resolved
Hide resolved
...to-hive/src/main/java/com/facebook/presto/hive/metastore/SemiTransactionalHiveMetastore.java
Outdated
Show resolved
Hide resolved
...to-hive/src/main/java/com/facebook/presto/hive/metastore/SemiTransactionalHiveMetastore.java
Outdated
Show resolved
Hide resolved
cc @elonazoulay for some context on renaming stage files. |
Thanks for reviewing @arhimondr and @wenleix. I addressed most of the comments and left some comments in unresolved states so people could revisit. |
320e286
to
78466f0
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"Move part of asyncRename logic into MetastoreUtil"
Looks good except minor comment.
...to-hive/src/main/java/com/facebook/presto/hive/metastore/SemiTransactionalHiveMetastore.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"Add hive session property to enable writing staging files"
Looks good % minor comments.
presto-hive/src/main/java/com/facebook/presto/hive/HiveWriterFactory.java
Show resolved
Hide resolved
presto-hive/src/main/java/com/facebook/presto/hive/HiveWriter.java
Outdated
Show resolved
Hide resolved
presto-hive/src/main/java/com/facebook/presto/hive/HiveWriterFactory.java
Outdated
Show resolved
Hide resolved
presto-hive/src/main/java/com/facebook/presto/hive/metastore/MetastoreUtil.java
Show resolved
Hide resolved
229ffed
to
be09eef
Compare
aea5c54
to
dc37747
Compare
MoreFutures.getFutureValue(fileRenameFuture, PrestoException.class); | ||
ListenableFuture<?> fileRenameFutureAggregate = allAsList(fileRenameFutures); | ||
try { | ||
MoreFutures.getFutureValue(fileRenameFutureAggregate); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: static import?
presto-hive/src/main/java/com/facebook/presto/hive/HiveMetadataFactory.java
Outdated
Show resolved
Hide resolved
presto-hive/src/main/java/com/facebook/presto/hive/HiveClientConfig.java
Outdated
Show resolved
Hide resolved
presto-hive/src/main/java/com/facebook/presto/hive/HiveStagingFileCommitter.java
Outdated
Show resolved
Hide resolved
Moved logic: - getFileSystem - renameFile - waitForAsyncRename
Comments addressed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some questions / suggestions. No action needed.
@@ -127,6 +126,7 @@ public HiveMetadataFactory( | |||
this.locationService = requireNonNull(locationService, "locationService is null"); | |||
this.tableParameterCodec = requireNonNull(tableParameterCodec, "tableParameterCodec is null"); | |||
this.partitionUpdateCodec = requireNonNull(partitionUpdateCodec, "partitionUpdateCodec is null"); | |||
this.fileRenameExecutor = listeningDecorator(requireNonNull(executorService, "executorService is null")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why don't we just bind this executor as ListeningExecutorService
?
@Provides | ||
public ExecutorService createFileRanemeExecutor(HiveConnectorId hiveClientId, HiveClientConfig hiveClientConfig) | ||
{ | ||
return newFixedThreadPool( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The disadvantage of the fixedThreadPool
is that it keeps the entire thread pool alive all the time. That increases the memory footprint.
I think the listeningDecorator(new ExecutorServiceAdapter(new BoundedExector(newCachedThreadPool(daemonThreadsNamed("hive-" + hiveClientId + "-%s")), threadLimit)))
would be a better option.
} | ||
} | ||
|
||
waitForListenableFutures(commitFutures); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just some thoughts.
I can see how the waitForListenableFutures
saves the code lines, and follows the DRY principles. However the semantics of this method are very unclear. It is very hard to know what does this method do. Without looking into the method guts it is impossible to give answers to these questions:
- Single future has failed? Does it fail? Does it fail fast?
- Does it cancel the features in case of a failure?
- How are the failures propagated?
I don't have a strong opinion here. But it seems to me that in such tricky cases sometimes it is better to leave a few lines more, but use the standard library methods (that engineers are more likely to know semantics of).
But again, no action is required here. Those are just thoughts.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with @arhimondr . Especially given it's only used twice, inline it seems to be reasonable.
While DRY principal is in general a good practice, there are cases it doesn't apply. It really depends on:
- Does the refactor/abstraction natural ? -- does it make things difficult to follow ?
- How evil is to repeat your-self in this case?
- Are these repeating real semantic repeat, or it's just the code look very similar?
Now I think about it, I understand there is no Guava method that does this waitForListenableFutures
-- I was very confused before since it looks like a very common use case :). Thanks @arhimondr for noting this is a "simple method in disguise".
{ | ||
ListenableFuture<?> listenableFutureAggregate = whenAllSucceed(listenableFutures).call(() -> null, directExecutor()); | ||
try { | ||
getFutureValue(listenableFutureAggregate); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you want to do getFutureValue(listenableFutureAggregate, PrestoException.class)
I'll create a separate PR to address these remaining comments :). Thanks for raising them up! @arhimondr @wenleix |
This is part of the effort on recoverable grouped execution (#12124).