Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce memory usage of HiveSplitSource #9232

Merged
merged 6 commits into from
Nov 7, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ public GenericHiveRecordCursorProvider(HdfsEnvironment hdfsEnvironment)

@Override
public Optional<RecordCursor> createRecordCursor(
String clientId,
Configuration configuration,
ConnectorSession session,
Path path,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,14 +188,14 @@ private static int hashBytes(int initialValue, Slice bytes)
return result;
}

public static Optional<HiveBucketHandle> getHiveBucketHandle(String connectorId, Table table)
public static Optional<HiveBucketHandle> getHiveBucketHandle(Table table)
{
Optional<HiveBucketProperty> hiveBucketProperty = table.getStorage().getBucketProperty();
if (!hiveBucketProperty.isPresent()) {
return Optional.empty();
}

Map<String, HiveColumnHandle> map = getRegularColumnHandles(connectorId, table).stream()
Map<String, HiveColumnHandle> map = getRegularColumnHandles(table).stream()
.collect(Collectors.toMap(HiveColumnHandle::getName, identity()));

ImmutableList.Builder<HiveColumnHandle> bucketColumns = ImmutableList.builder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public class HiveClientConfig
private int minPartitionBatchSize = 10;
private int maxPartitionBatchSize = 100;
private int maxInitialSplits = 200;
private int splitLoaderConcurrency = 4;
private DataSize maxInitialSplitSize;
private int domainCompactionThreshold = 100;
private boolean forceLocalScheduling;
Expand Down Expand Up @@ -147,6 +148,19 @@ public HiveClientConfig setMaxInitialSplitSize(DataSize maxInitialSplitSize)
return this;
}

@Min(1)
public int getSplitLoaderConcurrency()
{
return splitLoaderConcurrency;
}

@Config("hive.split-loader-concurrency")
public HiveClientConfig setSplitLoaderConcurrency(int splitLoaderConcurrency)
{
this.splitLoaderConcurrency = splitLoaderConcurrency;
return this;
}

@Min(1)
public int getDomainCompactionThreshold()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ public enum ColumnType
HIDDEN
}

private final String clientId;
private final String name;
private final HiveType hiveType;
private final TypeSignature typeName;
Expand All @@ -65,15 +64,13 @@ public enum ColumnType

@JsonCreator
public HiveColumnHandle(
@JsonProperty("clientId") String clientId,
@JsonProperty("name") String name,
@JsonProperty("hiveType") HiveType hiveType,
@JsonProperty("typeSignature") TypeSignature typeSignature,
@JsonProperty("hiveColumnIndex") int hiveColumnIndex,
@JsonProperty("columnType") ColumnType columnType,
@JsonProperty("comment") Optional<String> comment)
{
this.clientId = requireNonNull(clientId, "clientId is null");
this.name = requireNonNull(name, "name is null");
checkArgument(hiveColumnIndex >= 0 || columnType == PARTITION_KEY || columnType == HIDDEN, "hiveColumnIndex is negative");
this.hiveColumnIndex = hiveColumnIndex;
Expand All @@ -83,12 +80,6 @@ public HiveColumnHandle(
this.comment = requireNonNull(comment, "comment is null");
}

@JsonProperty
public String getClientId()
{
return clientId;
}

@JsonProperty
public String getName()
{
Expand Down Expand Up @@ -143,7 +134,7 @@ public ColumnType getColumnType()
@Override
public int hashCode()
{
return Objects.hash(clientId, name, hiveColumnIndex, hiveType, columnType, comment);
return Objects.hash(name, hiveColumnIndex, hiveType, columnType, comment);
}

@Override
Expand All @@ -156,8 +147,7 @@ public boolean equals(Object obj)
return false;
}
HiveColumnHandle other = (HiveColumnHandle) obj;
return Objects.equals(this.clientId, other.clientId) &&
Objects.equals(this.name, other.name) &&
return Objects.equals(this.name, other.name) &&
Objects.equals(this.hiveColumnIndex, other.hiveColumnIndex) &&
Objects.equals(this.hiveType, other.hiveType) &&
Objects.equals(this.columnType, other.columnType) &&
Expand All @@ -168,7 +158,6 @@ public boolean equals(Object obj)
public String toString()
{
return toStringHelper(this)
.add("clientId", clientId)
.add("name", name)
.add("hiveType", hiveType)
.add("hiveColumnIndex", hiveColumnIndex)
Expand All @@ -178,25 +167,25 @@ public String toString()
.toString();
}

public static HiveColumnHandle updateRowIdHandle(String connectorId)
public static HiveColumnHandle updateRowIdHandle()
{
// Hive connector only supports metadata delete. It does not support generic row-by-row deletion.
// Metadata delete is implemented in Presto by generating a plan for row-by-row delete first,
// and then optimize it into metadata delete. As a result, Hive connector must provide partial
// plan-time support for row-by-row delete so that planning doesn't fail. This is why we need
// rowid handle. Note that in Hive connector, rowid handle is not implemented beyond plan-time.

return new HiveColumnHandle(connectorId, UPDATE_ROW_ID_COLUMN_NAME, HIVE_LONG, BIGINT.getTypeSignature(), -1, HIDDEN, Optional.empty());
return new HiveColumnHandle(UPDATE_ROW_ID_COLUMN_NAME, HIVE_LONG, BIGINT.getTypeSignature(), -1, HIDDEN, Optional.empty());
}

public static HiveColumnHandle pathColumnHandle(String connectorId)
public static HiveColumnHandle pathColumnHandle()
{
return new HiveColumnHandle(connectorId, PATH_COLUMN_NAME, PATH_HIVE_TYPE, PATH_TYPE_SIGNATURE, PATH_COLUMN_INDEX, HIDDEN, Optional.empty());
return new HiveColumnHandle(PATH_COLUMN_NAME, PATH_HIVE_TYPE, PATH_TYPE_SIGNATURE, PATH_COLUMN_INDEX, HIDDEN, Optional.empty());
}

public static HiveColumnHandle bucketColumnHandle(String connectorId)
public static HiveColumnHandle bucketColumnHandle()
{
return new HiveColumnHandle(connectorId, BUCKET_COLUMN_NAME, BUCKET_HIVE_TYPE, BUCKET_TYPE_SIGNATURE, BUCKET_COLUMN_INDEX, HIDDEN, Optional.empty());
return new HiveColumnHandle(BUCKET_COLUMN_NAME, BUCKET_HIVE_TYPE, BUCKET_TYPE_SIGNATURE, BUCKET_COLUMN_INDEX, HIDDEN, Optional.empty());
}

public static boolean isPathColumnHandle(HiveColumnHandle column)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ public class HiveInsertTableHandle
{
@JsonCreator
public HiveInsertTableHandle(
@JsonProperty("clientId") String clientId,
@JsonProperty("schemaName") String schemaName,
@JsonProperty("tableName") String tableName,
@JsonProperty("inputColumns") List<HiveColumnHandle> inputColumns,
Expand All @@ -39,7 +38,6 @@ public HiveInsertTableHandle(
@JsonProperty("partitionStorageFormat") HiveStorageFormat partitionStorageFormat)
{
super(
clientId,
schemaName,
tableName,
inputColumns,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,6 @@
import static com.facebook.presto.spi.StandardErrorCode.SCHEMA_NOT_EMPTY;
import static com.facebook.presto.spi.predicate.TupleDomain.withColumnDomains;
import static com.facebook.presto.spi.statistics.TableStatistics.EMPTY_STATISTICS;
import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
Expand All @@ -173,7 +172,6 @@ public class HiveMetadata
private static final String ORC_BLOOM_FILTER_COLUMNS_KEY = "orc.bloom.filter.columns";
private static final String ORC_BLOOM_FILTER_FPP_KEY = "orc.bloom.filter.fpp";

private final String connectorId;
private final boolean allowCorruptWritesForTesting;
private final SemiTransactionalHiveMetastore metastore;
private final HdfsEnvironment hdfsEnvironment;
Expand All @@ -192,7 +190,6 @@ public class HiveMetadata
private final HiveStatisticsProvider hiveStatisticsProvider;

public HiveMetadata(
String connectorId,
SemiTransactionalHiveMetastore metastore,
HdfsEnvironment hdfsEnvironment,
HivePartitionManager partitionManager,
Expand All @@ -210,8 +207,6 @@ public HiveMetadata(
String prestoVersion,
HiveStatisticsProvider hiveStatisticsProvider)
{
this.connectorId = requireNonNull(connectorId, "connectorId is null");

this.allowCorruptWritesForTesting = allowCorruptWritesForTesting;

this.metastore = requireNonNull(metastore, "metastore is null");
Expand Down Expand Up @@ -251,7 +246,7 @@ public HiveTableHandle getTableHandle(ConnectorSession session, SchemaTableName
return null;
}
verifyOnline(tableName, Optional.empty(), getProtectMode(table.get()), table.get().getParameters());
return new HiveTableHandle(connectorId, tableName.getSchemaName(), tableName.getTableName());
return new HiveTableHandle(tableName.getSchemaName(), tableName.getTableName());
}

@Override
Expand All @@ -271,7 +266,7 @@ private ConnectorTableMetadata getTableMetadata(SchemaTableName tableName)

Function<HiveColumnHandle, ColumnMetadata> metadataGetter = columnMetadataGetter(table.get(), typeManager);
ImmutableList.Builder<ColumnMetadata> columns = ImmutableList.builder();
for (HiveColumnHandle columnHandle : hiveColumnHandles(connectorId, table.get())) {
for (HiveColumnHandle columnHandle : hiveColumnHandles(table.get())) {
columns.add(metadataGetter.apply(columnHandle));
}

Expand Down Expand Up @@ -365,7 +360,7 @@ public Map<String, ColumnHandle> getColumnHandles(ConnectorSession session, Conn
throw new TableNotFoundException(tableName);
}
ImmutableMap.Builder<String, ColumnHandle> columnHandles = ImmutableMap.builder();
for (HiveColumnHandle columnHandle : hiveColumnHandles(connectorId, table.get())) {
for (HiveColumnHandle columnHandle : hiveColumnHandles(table.get())) {
columnHandles.put(columnHandle.getName(), columnHandle);
}
return columnHandles.build();
Expand Down Expand Up @@ -478,7 +473,7 @@ private void createHiveTable(ConnectorSession session, ConnectorTableMetadata ta
if (bucketProperty.isPresent() && !bucketWritingEnabled) {
throw new PrestoException(NOT_SUPPORTED, "Writing to bucketed Hive table has been temporarily disabled");
}
List<HiveColumnHandle> columnHandles = getColumnHandles(connectorId, tableMetadata, ImmutableSet.copyOf(partitionedBy), typeTranslator);
List<HiveColumnHandle> columnHandles = getColumnHandles(tableMetadata, ImmutableSet.copyOf(partitionedBy), typeTranslator);
HiveStorageFormat hiveStorageFormat = getHiveStorageFormat(tableMetadata.getProperties());
Map<String, String> tableProperties = getTableProperties(tableMetadata);

Expand Down Expand Up @@ -687,7 +682,7 @@ public HiveOutputTableHandle beginCreateTable(ConnectorSession session, Connecto
String schemaName = schemaTableName.getSchemaName();
String tableName = schemaTableName.getTableName();

List<HiveColumnHandle> columnHandles = getColumnHandles(connectorId, tableMetadata, ImmutableSet.copyOf(partitionedBy), typeTranslator);
List<HiveColumnHandle> columnHandles = getColumnHandles(tableMetadata, ImmutableSet.copyOf(partitionedBy), typeTranslator);
HiveStorageFormat partitionStorageFormat = respectTableFormat ? tableStorageFormat : defaultStorageFormat;

// unpartitioned tables ignore the partition storage format
Expand All @@ -696,7 +691,6 @@ public HiveOutputTableHandle beginCreateTable(ConnectorSession session, Connecto

LocationHandle locationHandle = locationService.forNewTable(metastore, session, schemaName, tableName);
HiveOutputTableHandle result = new HiveOutputTableHandle(
connectorId,
schemaName,
tableName,
columnHandles,
Expand Down Expand Up @@ -888,14 +882,13 @@ public HiveInsertTableHandle beginInsert(ConnectorSession session, ConnectorTabl
}
}

List<HiveColumnHandle> handles = hiveColumnHandles(connectorId, table.get()).stream()
List<HiveColumnHandle> handles = hiveColumnHandles(table.get()).stream()
.filter(columnHandle -> !columnHandle.isHidden())
.collect(toList());

HiveStorageFormat tableStorageFormat = extractHiveStorageFormat(table.get());
LocationHandle locationHandle = locationService.forExistingTable(metastore, session, table.get());
HiveInsertTableHandle result = new HiveInsertTableHandle(
connectorId,
tableName.getSchemaName(),
tableName.getTableName(),
handles,
Expand Down Expand Up @@ -1115,7 +1108,7 @@ public ConnectorTableHandle beginDelete(ConnectorSession session, ConnectorTable
@Override
public ColumnHandle getUpdateRowIdColumnHandle(ConnectorSession session, ConnectorTableHandle tableHandle)
{
return updateRowIdHandle(connectorId);
return updateRowIdHandle();
}

@Override
Expand Down Expand Up @@ -1177,7 +1170,6 @@ public List<ConnectorTableLayoutResult> getTableLayouts(ConnectorSession session
getTableLayout(
session,
new HiveTableLayoutHandle(
handle.getClientId(),
ImmutableList.copyOf(hivePartitionResult.getPartitionColumns()),
hivePartitionResult.getPartitions(),
hivePartitionResult.getCompactEffectivePredicate(),
Expand Down Expand Up @@ -1208,7 +1200,6 @@ public ConnectorTableLayout getTableLayout(ConnectorSession session, ConnectorTa
if (isBucketExecutionEnabled(session) && hiveLayoutHandle.getBucketHandle().isPresent()) {
nodePartitioning = hiveLayoutHandle.getBucketHandle().map(hiveBucketHandle -> new ConnectorTablePartitioning(
new HivePartitioningHandle(
connectorId,
hiveBucketHandle.getBucketCount(),
hiveBucketHandle.getColumns().stream()
.map(HiveColumnHandle::getHiveType)
Expand Down Expand Up @@ -1288,15 +1279,14 @@ public Optional<ConnectorNewTableLayout> getInsertLayout(ConnectorSession sessio
Table table = metastore.getTable(tableName.getSchemaName(), tableName.getTableName())
.orElseThrow(() -> new TableNotFoundException(tableName));

Optional<HiveBucketHandle> hiveBucketHandle = getHiveBucketHandle(connectorId, table);
Optional<HiveBucketHandle> hiveBucketHandle = getHiveBucketHandle(table);
if (!hiveBucketHandle.isPresent()) {
return Optional.empty();
}
if (!bucketWritingEnabled) {
throw new PrestoException(NOT_SUPPORTED, "Writing to bucketed Hive table has been temporarily disabled");
}
HivePartitioningHandle partitioningHandle = new HivePartitioningHandle(
connectorId,
hiveBucketHandle.get().getBucketCount(),
hiveBucketHandle.get().getColumns().stream()
.map(HiveColumnHandle::getHiveType)
Expand All @@ -1322,7 +1312,6 @@ public Optional<ConnectorNewTableLayout> getNewTableLayout(ConnectorSession sess
.collect(toMap(ColumnMetadata::getName, column -> toHiveType(typeTranslator, column.getType())));
return Optional.of(new ConnectorNewTableLayout(
new HivePartitioningHandle(
connectorId,
bucketProperty.get().getBucketCount(),
bucketedBy.stream()
.map(hiveTypeMap::get)
Expand Down Expand Up @@ -1377,14 +1366,6 @@ public List<GrantInfo> listTablePrivileges(ConnectorSession session, SchemaTable
return grantInfos.build();
}

@Override
public String toString()
{
return toStringHelper(this)
.add("clientId", connectorId)
.toString();
}

private void verifyJvmTimeZone()
{
if (!allowCorruptWritesForTesting && !timeZone.equals(DateTimeZone.getDefault())) {
Expand Down Expand Up @@ -1444,7 +1425,7 @@ private static void validatePartitionColumns(ConnectorTableMetadata tableMetadat
}
}

private static List<HiveColumnHandle> getColumnHandles(String connectorId, ConnectorTableMetadata tableMetadata, Set<String> partitionColumnNames, TypeTranslator typeTranslator)
private static List<HiveColumnHandle> getColumnHandles(ConnectorTableMetadata tableMetadata, Set<String> partitionColumnNames, TypeTranslator typeTranslator)
{
validatePartitionColumns(tableMetadata);
validateBucketColumns(tableMetadata);
Expand All @@ -1463,7 +1444,6 @@ else if (column.isHidden()) {
columnType = REGULAR;
}
columnHandles.add(new HiveColumnHandle(
connectorId,
column.getName(),
toHiveType(typeTranslator, column.getType()),
column.getType().getTypeSignature(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ public class HiveMetadataFactory
{
private static final Logger log = Logger.get(HiveMetadataFactory.class);

private final String connectorId;
private final boolean allowCorruptWritesForTesting;
private final boolean respectTableFormat;
private final boolean bucketWritingEnabled;
Expand All @@ -56,7 +55,6 @@ public class HiveMetadataFactory
@Inject
@SuppressWarnings("deprecation")
public HiveMetadataFactory(
HiveConnectorId connectorId,
HiveClientConfig hiveClientConfig,
ExtendedHiveMetastore metastore,
HdfsEnvironment hdfsEnvironment,
Expand All @@ -69,7 +67,7 @@ public HiveMetadataFactory(
TypeTranslator typeTranslator,
NodeVersion nodeVersion)
{
this(connectorId,
this(
metastore,
hdfsEnvironment,
partitionManager,
Expand All @@ -92,7 +90,6 @@ public HiveMetadataFactory(
}

public HiveMetadataFactory(
HiveConnectorId connectorId,
ExtendedHiveMetastore metastore,
HdfsEnvironment hdfsEnvironment,
HivePartitionManager partitionManager,
Expand All @@ -113,8 +110,6 @@ public HiveMetadataFactory(
TypeTranslator typeTranslator,
String prestoVersion)
{
this.connectorId = requireNonNull(connectorId, "connectorId is null").toString();

this.allowCorruptWritesForTesting = allowCorruptWritesForTesting;
this.respectTableFormat = respectTableFormat;
this.skipDeletionForAlter = skipDeletionForAlter;
Expand Down Expand Up @@ -153,7 +148,6 @@ public HiveMetadata create()
skipDeletionForAlter);

return new HiveMetadata(
connectorId,
metastore,
hdfsEnvironment,
partitionManager,
Expand Down
Loading