Skip to content

Commit

Permalink
Inline MetastoreUtil#waitForListenableFutures
Browse files Browse the repository at this point in the history
This method does not have a clear semantic regarding to loose ends from
method name, so it is better to inline it to make it easier to
understand.
  • Loading branch information
shixuan-fan committed Apr 15, 2019
1 parent 1595ef0 commit 27dfa76
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.facebook.presto.hive.HdfsEnvironment.HdfsContext;
import com.facebook.presto.hive.PartitionUpdate.FileWriteInfo;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.PrestoException;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.hadoop.fs.FileSystem;
Expand All @@ -28,8 +29,10 @@

import static com.facebook.presto.hive.metastore.MetastoreUtil.getFileSystem;
import static com.facebook.presto.hive.metastore.MetastoreUtil.renameFile;
import static com.facebook.presto.hive.metastore.MetastoreUtil.waitForListenableFutures;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.util.concurrent.Futures.whenAllSucceed;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static io.airlift.concurrent.MoreFutures.getFutureValue;
import static java.util.Objects.requireNonNull;

public class HiveStagingFileCommitter
Expand Down Expand Up @@ -64,6 +67,13 @@ public void commitFiles(ConnectorSession session, String schemaName, String tabl
}
}

waitForListenableFutures(commitFutures);
ListenableFuture<?> listenableFutureAggregate = whenAllSucceed(commitFutures).call(() -> null, directExecutor());
try {
getFutureValue(listenableFutureAggregate, PrestoException.class);
}
catch (RuntimeException e) {
listenableFutureAggregate.cancel(true);
throw e;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.SchemaTableName;
import com.facebook.presto.spi.TableNotFoundException;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.FileUtils;
Expand All @@ -38,9 +37,6 @@
import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Strings.isNullOrEmpty;
import static com.google.common.util.concurrent.Futures.whenAllSucceed;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static io.airlift.concurrent.MoreFutures.getFutureValue;
import static java.lang.String.format;
import static java.util.stream.Collectors.toList;
import static org.apache.hadoop.hive.metastore.MetaStoreUtils.typeToThriftType;
Expand Down Expand Up @@ -298,16 +294,4 @@ private static String getRenameErrorMessage(Path source, Path target)
{
return format("Error moving data files from %s to final location %s", source, target);
}

public static void waitForListenableFutures(List<ListenableFuture<?>> listenableFutures)
{
ListenableFuture<?> listenableFutureAggregate = whenAllSucceed(listenableFutures).call(() -> null, directExecutor());
try {
getFutureValue(listenableFutureAggregate);
}
catch (RuntimeException e) {
listenableFutureAggregate.cancel(true);
throw e;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@
import static com.facebook.presto.hive.metastore.HivePrivilegeInfo.HivePrivilege.OWNERSHIP;
import static com.facebook.presto.hive.metastore.MetastoreUtil.getFileSystem;
import static com.facebook.presto.hive.metastore.MetastoreUtil.renameFile;
import static com.facebook.presto.hive.metastore.MetastoreUtil.waitForListenableFutures;
import static com.facebook.presto.hive.metastore.PrestoTableType.MANAGED_TABLE;
import static com.facebook.presto.hive.metastore.PrestoTableType.TEMPORARY_TABLE;
import static com.facebook.presto.hive.metastore.PrestoTableType.VIRTUAL_VIEW;
Expand All @@ -91,6 +90,9 @@
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.util.concurrent.Futures.whenAllSucceed;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static io.airlift.concurrent.MoreFutures.getFutureValue;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static org.apache.hadoop.hive.common.FileUtils.makePartName;
Expand Down Expand Up @@ -979,7 +981,14 @@ private void commitShared()
}

// Wait for all renames submitted for "INSERT_EXISTING" action to finish
waitForListenableFutures(committer.getFileRenameFutures());
ListenableFuture<?> listenableFutureAggregate = whenAllSucceed(committer.getFileRenameFutures()).call(() -> null, directExecutor());
try {
getFutureValue(listenableFutureAggregate, PrestoException.class);
}
catch (RuntimeException e) {
listenableFutureAggregate.cancel(true);
throw e;
}

// At this point, all file system operations, whether asynchronously issued or not, have completed successfully.
// We are moving on to metastore operations now.
Expand Down

0 comments on commit 27dfa76

Please sign in to comment.