From a430e733961453c030bfb60437e215242e4bc9e7 Mon Sep 17 00:00:00 2001 From: Marco Ziccardi Date: Thu, 7 Jan 2016 18:05:47 +0100 Subject: [PATCH] Add support for resumable uploads - Add LoadConfiguration class for data needed by load jobs and resumable uploads - Update LoadJobInfo and related classes to use LoadConfiguration instead - Add insertAll(LoadConfiguration, byte[]) method - Add insertAll(LoadConfiguration. SeekableByteChannel) method - Add Table.insert methods that use resumable upload - Add unit and integration tests --- .../com/google/gcloud/bigquery/BigQuery.java | 28 ++ .../google/gcloud/bigquery/BigQueryImpl.java | 87 +++- .../gcloud/bigquery/LoadConfiguration.java | 382 ++++++++++++++++++ .../google/gcloud/bigquery/LoadJobInfo.java | 347 ++-------------- .../com/google/gcloud/bigquery/Table.java | 41 +- .../com/google/gcloud/spi/BigQueryRpc.java | 33 ++ .../google/gcloud/spi/DefaultBigQueryRpc.java | 78 ++++ .../gcloud/bigquery/BigQueryImplTest.java | 142 ++++++- .../gcloud/bigquery/ITBigQueryTest.java | 56 ++- .../bigquery/LoadConfigurationTest.java | 123 ++++++ .../gcloud/bigquery/LoadJobInfoTest.java | 87 ++-- .../gcloud/bigquery/SerializationTest.java | 14 +- .../com/google/gcloud/bigquery/TableTest.java | 6 +- .../gcloud/examples/BigQueryExample.java | 42 +- 14 files changed, 1071 insertions(+), 395 deletions(-) create mode 100644 gcloud-java-bigquery/src/main/java/com/google/gcloud/bigquery/LoadConfiguration.java create mode 100644 gcloud-java-bigquery/src/test/java/com/google/gcloud/bigquery/LoadConfigurationTest.java diff --git a/gcloud-java-bigquery/src/main/java/com/google/gcloud/bigquery/BigQuery.java b/gcloud-java-bigquery/src/main/java/com/google/gcloud/bigquery/BigQuery.java index af5ced9d4230..94727a747688 100644 --- a/gcloud-java-bigquery/src/main/java/com/google/gcloud/bigquery/BigQuery.java +++ b/gcloud-java-bigquery/src/main/java/com/google/gcloud/bigquery/BigQuery.java @@ -27,6 +27,7 @@ import com.google.gcloud.Service; import com.google.gcloud.spi.BigQueryRpc; +import java.nio.channels.SeekableByteChannel; import java.util.List; import java.util.Set; @@ -590,6 +591,33 @@ Page listTables(DatasetId datasetId, TableListOption... options) */ InsertAllResponse insertAll(InsertAllRequest request) throws BigQueryException; + /** + * Sends a resumable insert request given a seekable channel containing the rows to be inserted. + * This method does not close the channel so you should take care of closing it. + * + *

Example usage of inserting data from a local file: + *

 {@code
+   * LoadConfiguration config = LoadConfiguration.of(TableId.of("my_dataset_id", "my_table_id"));
+   * try(FileChannel channel = FileChannel.open(Paths.get("/path/to/file"))) {
+   *   bigquery.insertAll(config, channel);
+   * }}
+ * + * @throws BigQueryException upon failure + * @see Resumable + * Upload + */ + void insertAll(LoadConfiguration configuration, SeekableByteChannel channel) + throws BigQueryException; + + /** + * Sends a resumable insert request given a byte array containing the rows to be inserted. + * + * @throws BigQueryException upon failure + * @see Resumable + * Upload + */ + void insertAll(LoadConfiguration configuration, byte[] content) throws BigQueryException; + /** * Lists the table's rows. * diff --git a/gcloud-java-bigquery/src/main/java/com/google/gcloud/bigquery/BigQueryImpl.java b/gcloud-java-bigquery/src/main/java/com/google/gcloud/bigquery/BigQueryImpl.java index 9bc89206889b..95b15799cc0c 100644 --- a/gcloud-java-bigquery/src/main/java/com/google/gcloud/bigquery/BigQueryImpl.java +++ b/gcloud-java-bigquery/src/main/java/com/google/gcloud/bigquery/BigQueryImpl.java @@ -18,6 +18,7 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.gcloud.RetryHelper.runWithRetries; +import static java.util.concurrent.Executors.callable; import com.google.api.services.bigquery.model.Dataset; import com.google.api.services.bigquery.model.GetQueryResultsResponse; @@ -43,9 +44,18 @@ import com.google.gcloud.bigquery.InsertAllRequest.RowToInsert; import com.google.gcloud.spi.BigQueryRpc; +import java.io.ByteArrayInputStream; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.channels.Channels; +import java.nio.channels.FileChannel; +import java.nio.channels.SeekableByteChannel; +import java.nio.file.StandardOpenOption; import java.util.List; import java.util.Map; import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicLong; final class BigQueryImpl extends BaseService implements BigQuery { @@ -422,6 +432,74 @@ public Rows apply(RowToInsert rowToInsert) { bigQueryRpc.insertAll(tableId.dataset(), tableId.table(), requestPb)); } + @Override + public void insertAll(LoadConfiguration configuration, final byte[] content) { + Function nextStream = new Function() { + @Override + public InputStream apply(Long startPos) { + return new ByteArrayInputStream(content, startPos.intValue(), content.length); + } + }; + insertAll(configuration, nextStream); + } + + @Override + public void insertAll(LoadConfiguration configuration, final SeekableByteChannel channel) { + Function nextStream = new Function() { + @Override + public InputStream apply(Long startPos) { + try { + channel.position(startPos); + return Channels.newInputStream(channel); + } catch (IOException e) { + BigQueryException exception = new BigQueryException(0, e.getMessage(), false); + exception.initCause(e); + throw exception; + } + } + }; + insertAll(configuration, nextStream); + } + + private void insertAll(LoadConfiguration configuration, + final Function nextStream) throws BigQueryException { + try { + final String uploadId = open(configuration); + final AtomicLong startPos = new AtomicLong(0L); + runWithRetries(callable(new Runnable() { + @Override + public void run() { + try { + bigQueryRpc.write(uploadId, nextStream.apply(startPos.get()), startPos.get()); + } catch (BigQueryException ex) { + BigQueryRpc.Tuple uploadStatus = runWithRetries( + new Callable>() { + @Override + public BigQueryRpc.Tuple call() { + return bigQueryRpc.status(uploadId); + } + }, options().retryParams(), EXCEPTION_HANDLER); + if (!uploadStatus.x()) { + startPos.set(uploadStatus.y() != null ? uploadStatus.y() + 1 : 0); + throw new BigQueryException(0, "Resume Incomplete", true); + } + } + } + }), options().retryParams(), EXCEPTION_HANDLER); + } catch (RetryHelper.RetryHelperException e) { + throw BigQueryException.translateAndThrow(e); + } + } + + private String open(final LoadConfiguration configuration) { + return runWithRetries(new Callable() { + @Override + public String call() { + return bigQueryRpc.open(setProjectId(configuration).toPb()); + } + }, options().retryParams(), EXCEPTION_HANDLER); + } + @Override public Page> listTableData(String datasetId, String tableId, TableDataListOption... options) throws BigQueryException { @@ -698,8 +776,7 @@ public TableId apply(TableId tableId) { if (job instanceof LoadJobInfo) { LoadJobInfo loadJob = (LoadJobInfo) job; LoadJobInfo.Builder loadBuilder = loadJob.toBuilder(); - loadBuilder.destinationTable(setProjectId(loadJob.destinationTable())); - return loadBuilder.build(); + return loadBuilder.configuration(setProjectId(loadJob.configuration())).build(); } return job; } @@ -711,4 +788,10 @@ private QueryRequest setProjectId(QueryRequest request) { } return builder.build(); } + + private LoadConfiguration setProjectId(LoadConfiguration configuration) { + LoadConfiguration.Builder builder = configuration.toBuilder(); + builder.destinationTable(setProjectId(configuration.destinationTable())); + return builder.build(); + } } diff --git a/gcloud-java-bigquery/src/main/java/com/google/gcloud/bigquery/LoadConfiguration.java b/gcloud-java-bigquery/src/main/java/com/google/gcloud/bigquery/LoadConfiguration.java new file mode 100644 index 000000000000..aabe42c43c1a --- /dev/null +++ b/gcloud-java-bigquery/src/main/java/com/google/gcloud/bigquery/LoadConfiguration.java @@ -0,0 +1,382 @@ +/* + * Copyright 2015 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.gcloud.bigquery; + +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.api.services.bigquery.model.JobConfigurationLoad; +import com.google.common.base.MoreObjects; +import com.google.common.collect.ImmutableList; +import com.google.gcloud.bigquery.JobInfo.CreateDisposition; +import com.google.gcloud.bigquery.JobInfo.WriteDisposition; + +import java.io.Serializable; +import java.nio.channels.SeekableByteChannel; +import java.util.List; +import java.util.Objects; + +/** + * Google BigQuery Configuration for a Load operation. A Load configuration can be used to build a + * {@link LoadJobInfo} or load data into a table with a resumable upload by using either + * {@link BigQuery#insertAll(LoadConfiguration, SeekableByteChannel)} or + * {@link BigQuery#insertAll(LoadConfiguration, byte[])}. + */ +public class LoadConfiguration implements Serializable { + + private static final long serialVersionUID = 470267591917413578L; + + private final TableId destinationTable; + private final CreateDisposition createDisposition; + private final WriteDisposition writeDisposition; + private final FormatOptions formatOptions; + private final Integer maxBadRecords; + private final Schema schema; + private final Boolean ignoreUnknownValues; + private final List projectionFields; + + public static final class Builder { + + private TableId destinationTable; + private CreateDisposition createDisposition; + private WriteDisposition writeDisposition; + private FormatOptions formatOptions; + private Integer maxBadRecords; + private Schema schema; + private Boolean ignoreUnknownValues; + private List projectionFields; + + private Builder() {} + + private Builder(LoadConfiguration loadConfiguration) { + this.destinationTable = loadConfiguration.destinationTable; + this.createDisposition = loadConfiguration.createDisposition; + this.writeDisposition = loadConfiguration.writeDisposition; + this.formatOptions = loadConfiguration.formatOptions; + this.maxBadRecords = loadConfiguration.maxBadRecords; + this.schema = loadConfiguration.schema; + this.ignoreUnknownValues = loadConfiguration.ignoreUnknownValues; + this.projectionFields = loadConfiguration.projectionFields; + } + + private Builder(JobConfigurationLoad loadConfigurationPb) { + this.destinationTable = TableId.fromPb(loadConfigurationPb.getDestinationTable()); + if (loadConfigurationPb.getCreateDisposition() != null) { + this.createDisposition = + CreateDisposition.valueOf(loadConfigurationPb.getCreateDisposition()); + } + if (loadConfigurationPb.getWriteDisposition() != null) { + this.writeDisposition = WriteDisposition.valueOf(loadConfigurationPb.getWriteDisposition()); + } + if (loadConfigurationPb.getSourceFormat() != null) { + this.formatOptions = FormatOptions.of(loadConfigurationPb.getSourceFormat()); + } + if (loadConfigurationPb.getAllowJaggedRows() != null + || loadConfigurationPb.getAllowQuotedNewlines() != null + || loadConfigurationPb.getEncoding() != null + || loadConfigurationPb.getFieldDelimiter() != null + || loadConfigurationPb.getQuote() != null + || loadConfigurationPb.getSkipLeadingRows() != null) { + CsvOptions.Builder builder = CsvOptions.builder() + .allowJaggedRows(loadConfigurationPb.getAllowJaggedRows()) + .allowQuotedNewLines(loadConfigurationPb.getAllowQuotedNewlines()) + .encoding(loadConfigurationPb.getEncoding()) + .fieldDelimiter(loadConfigurationPb.getFieldDelimiter()) + .quote(loadConfigurationPb.getQuote()) + .skipLeadingRows(loadConfigurationPb.getSkipLeadingRows()); + this.formatOptions = builder.build(); + } + this.maxBadRecords = loadConfigurationPb.getMaxBadRecords(); + if (loadConfigurationPb.getSchema() != null) { + this.schema = Schema.fromPb(loadConfigurationPb.getSchema()); + } + this.ignoreUnknownValues = loadConfigurationPb.getIgnoreUnknownValues(); + this.projectionFields = loadConfigurationPb.getProjectionFields(); + } + + /** + * Sets the destination table to load the data into. + */ + public Builder destinationTable(TableId destinationTable) { + this.destinationTable = destinationTable; + return this; + } + + /** + * Sets whether the job is allowed to create new tables. + * + * @see + * Jobs: Load Configuration + */ + public Builder createDisposition(CreateDisposition createDisposition) { + this.createDisposition = createDisposition; + return this; + } + + /** + * Sets the action that should occur if the destination table already exists. + * + * @see + * Jobs: Load Configuration + */ + public Builder writeDisposition(WriteDisposition writeDisposition) { + this.writeDisposition = writeDisposition; + return this; + } + + /** + * Sets the source format, and possibly some parsing options, of the external data. Supported + * formats are {@code CSV}, {@code NEWLINE_DELIMITED_JSON} and {@code DATASTORE_BACKUP}. If not + * specified, {@code CSV} format is assumed. + * + * + * Source Format + */ + public Builder formatOptions(FormatOptions formatOptions) { + this.formatOptions = formatOptions; + return this; + } + + /** + * Sets the maximum number of bad records that BigQuery can ignore when running the job. If the + * number of bad records exceeds this value, an invalid error is returned in the job result. + * By default no bad record is ignored. + */ + public Builder maxBadRecords(Integer maxBadRecords) { + this.maxBadRecords = maxBadRecords; + return this; + } + + /** + * Sets the schema for the destination table. The schema can be omitted if the destination table + * already exists, or if you're loading data from Google Cloud Datastore. + */ + public Builder schema(Schema schema) { + this.schema = schema; + return this; + } + + /** + * Sets whether BigQuery should allow extra values that are not represented in the table schema. + * If {@code true}, the extra values are ignored. If {@code true}, records with extra columns + * are treated as bad records, and if there are too many bad records, an invalid error is + * returned in the job result. By default unknown values are not allowed. + */ + public Builder ignoreUnknownValues(Boolean ignoreUnknownValues) { + this.ignoreUnknownValues = ignoreUnknownValues; + return this; + } + + /** + * Sets which entity properties to load into BigQuery from a Cloud Datastore backup. This field + * is only used if the source format is set to {@code DATASTORE_BACKUP}. Property names are case + * sensitive and must be top-level properties. If no properties are specified, BigQuery loads + * all properties. If any named property isn't found in the Cloud Datastore backup, an invalid + * error is returned in the job result. + */ + public Builder projectionFields(List projectionFields) { + this.projectionFields = + projectionFields != null ? ImmutableList.copyOf(projectionFields) : null; + return this; + } + + public LoadConfiguration build() { + return new LoadConfiguration(this); + } + } + + private LoadConfiguration(Builder builder) { + this.destinationTable = checkNotNull(builder.destinationTable); + this.createDisposition = builder.createDisposition; + this.writeDisposition = builder.writeDisposition; + this.formatOptions = builder.formatOptions; + this.maxBadRecords = builder.maxBadRecords; + this.schema = builder.schema; + this.ignoreUnknownValues = builder.ignoreUnknownValues; + this.projectionFields = builder.projectionFields; + } + + /** + * Returns the destination table to load the data into. + */ + public TableId destinationTable() { + return destinationTable; + } + + /** + * Returns whether the job is allowed to create new tables. + * + * @see + * Jobs: Load Configuration + */ + public CreateDisposition createDisposition() { + return this.createDisposition; + } + + /** + * Returns the action that should occur if the destination table already exists. + * + * @see + * Jobs: Load Configuration + */ + public WriteDisposition writeDisposition() { + return writeDisposition; + } + + /** + * Returns additional properties used to parse CSV data (used when {@link #format()} is set + * to CSV). Returns {@code null} if not set. + */ + public CsvOptions csvOptions() { + return formatOptions instanceof CsvOptions ? (CsvOptions) formatOptions : null; + } + + /** + * Returns the maximum number of bad records that BigQuery can ignore when running the job. If the + * number of bad records exceeds this value, an invalid error is returned in the job result. + * By default no bad record is ignored. + */ + public Integer maxBadRecords() { + return maxBadRecords; + } + + /** + * Returns the schema for the destination table, if set. Returns {@code null} otherwise. + */ + public Schema schema() { + return schema; + } + + /** + * Returns the format of the data files. + */ + public String format() { + return formatOptions != null ? formatOptions.type() : null; + } + + /** + * Returns whether BigQuery should allow extra values that are not represented in the table + * schema. If {@code true}, the extra values are ignored. If {@code true}, records with extra + * columns are treated as bad records, and if there are too many bad records, an invalid error is + * returned in the job result. By default unknown values are not allowed. + */ + public Boolean ignoreUnknownValues() { + return ignoreUnknownValues; + } + + /** + * Returns which entity properties to load into BigQuery from a Cloud Datastore backup. This field + * is only used if the source format is set to {@code DATASTORE_BACKUP}. Property names are case + * sensitive and must be top-level properties. If no properties are specified, BigQuery loads + * all properties. If any named property isn't found in the Cloud Datastore backup, an invalid + * error is returned in the job result. + */ + public List projectionFields() { + return projectionFields; + } + + public Builder toBuilder() { + return new Builder(this); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("destinationTable", destinationTable) + .add("createDisposition", createDisposition) + .add("writeDisposition", writeDisposition) + .add("formatOptions", formatOptions) + .add("maxBadRecords", maxBadRecords) + .add("schema", schema) + .add("ignoreUnknownValue", ignoreUnknownValues) + .add("projectionFields", projectionFields) + .toString(); + } + + @Override + public boolean equals(Object obj) { + return obj instanceof LoadConfiguration + && Objects.equals(toPb(), ((LoadConfiguration) obj).toPb()); + } + + @Override + public int hashCode() { + return Objects.hash(destinationTable, createDisposition, writeDisposition, formatOptions, + maxBadRecords, schema, ignoreUnknownValues, projectionFields); + } + + JobConfigurationLoad toPb() { + JobConfigurationLoad loadConfigurationPb = new JobConfigurationLoad(); + loadConfigurationPb.setDestinationTable(destinationTable.toPb()); + if (createDisposition != null) { + loadConfigurationPb.setCreateDisposition(createDisposition.toString()); + } + if (writeDisposition != null) { + loadConfigurationPb.setWriteDisposition(writeDisposition.toString()); + } + if (csvOptions() != null) { + CsvOptions csvOptions = csvOptions(); + loadConfigurationPb.setFieldDelimiter(csvOptions.fieldDelimiter()) + .setAllowJaggedRows(csvOptions.allowJaggedRows()) + .setAllowQuotedNewlines(csvOptions.allowQuotedNewLines()) + .setEncoding(csvOptions.encoding()) + .setQuote(csvOptions.quote()) + .setSkipLeadingRows(csvOptions.skipLeadingRows()); + } + if (schema != null) { + loadConfigurationPb.setSchema(schema.toPb()); + } + if (formatOptions != null) { + loadConfigurationPb.setSourceFormat(formatOptions.type()); + } + loadConfigurationPb.setMaxBadRecords(maxBadRecords); + loadConfigurationPb.setIgnoreUnknownValues(ignoreUnknownValues); + loadConfigurationPb.setProjectionFields(projectionFields); + return loadConfigurationPb; + } + + static LoadConfiguration fromPb(JobConfigurationLoad configurationPb) { + return new Builder(configurationPb).build(); + } + + /** + * Creates a builder for a BigQuery Load Configuration given the destination table. + */ + public static Builder builder(TableId destinationTable) { + return new Builder().destinationTable(destinationTable); + } + + /** + * Creates a builder for a BigQuery Load Configuration given the destination table and format. + */ + public static Builder builder(TableId destinationTable, FormatOptions format) { + return new Builder().destinationTable(destinationTable).formatOptions(format); + } + + /** + * Returns a BigQuery Load Configuration for the given destination table. + */ + public static LoadConfiguration of(TableId destinationTable) { + return builder(destinationTable).build(); + } + + /** + * Returns a BigQuery Load Configuration for the given destination table and format. + */ + public static LoadConfiguration of(TableId destinationTable, FormatOptions format) { + return builder(destinationTable).formatOptions(format).build(); + } +} diff --git a/gcloud-java-bigquery/src/main/java/com/google/gcloud/bigquery/LoadJobInfo.java b/gcloud-java-bigquery/src/main/java/com/google/gcloud/bigquery/LoadJobInfo.java index 1120bbbacf3f..4f8d03cbc6a9 100644 --- a/gcloud-java-bigquery/src/main/java/com/google/gcloud/bigquery/LoadJobInfo.java +++ b/gcloud-java-bigquery/src/main/java/com/google/gcloud/bigquery/LoadJobInfo.java @@ -34,81 +34,29 @@ */ public class LoadJobInfo extends JobInfo { - private static final long serialVersionUID = 2515503817007974115L; + private static final long serialVersionUID = 6349304826867750535L; private final List sourceUris; - private final TableId destinationTable; - private final CreateDisposition createDisposition; - private final WriteDisposition writeDisposition; - private final FormatOptions formatOptions; - private final Integer maxBadRecords; - private final Schema schema; - private final Boolean ignoreUnknownValues; - private final List projectionFields; + private final LoadConfiguration configuration; public static final class Builder extends JobInfo.Builder { private List sourceUris; - private TableId destinationTable; - private CreateDisposition createDisposition; - private WriteDisposition writeDisposition; - private FormatOptions formatOptions; - private Integer maxBadRecords; - private Schema schema; - private Boolean ignoreUnknownValues; - private List projectionFields; + private LoadConfiguration configuration; private Builder() {} private Builder(LoadJobInfo jobInfo) { super(jobInfo); this.sourceUris = jobInfo.sourceUris; - this.destinationTable = jobInfo.destinationTable; - this.createDisposition = jobInfo.createDisposition; - this.writeDisposition = jobInfo.writeDisposition; - this.formatOptions = jobInfo.formatOptions; - this.maxBadRecords = jobInfo.maxBadRecords; - this.schema = jobInfo.schema; - this.ignoreUnknownValues = jobInfo.ignoreUnknownValues; - this.projectionFields = jobInfo.projectionFields; + this.configuration = jobInfo.configuration; } private Builder(Job jobPb) { super(jobPb); JobConfigurationLoad loadConfigurationPb = jobPb.getConfiguration().getLoad(); + this.configuration = LoadConfiguration.fromPb(loadConfigurationPb); this.sourceUris = loadConfigurationPb.getSourceUris(); - this.destinationTable = TableId.fromPb(loadConfigurationPb.getDestinationTable()); - if (loadConfigurationPb.getCreateDisposition() != null) { - this.createDisposition = - CreateDisposition.valueOf(loadConfigurationPb.getCreateDisposition()); - } - if (loadConfigurationPb.getWriteDisposition() != null) { - this.writeDisposition = WriteDisposition.valueOf(loadConfigurationPb.getWriteDisposition()); - } - if (loadConfigurationPb.getSourceFormat() != null) { - this.formatOptions = FormatOptions.of(loadConfigurationPb.getSourceFormat()); - } - if (loadConfigurationPb.getAllowJaggedRows() != null - || loadConfigurationPb.getAllowQuotedNewlines() != null - || loadConfigurationPb.getEncoding() != null - || loadConfigurationPb.getFieldDelimiter() != null - || loadConfigurationPb.getQuote() != null - || loadConfigurationPb.getSkipLeadingRows() != null) { - CsvOptions.Builder builder = CsvOptions.builder() - .allowJaggedRows(loadConfigurationPb.getAllowJaggedRows()) - .allowQuotedNewLines(loadConfigurationPb.getAllowQuotedNewlines()) - .encoding(loadConfigurationPb.getEncoding()) - .fieldDelimiter(loadConfigurationPb.getFieldDelimiter()) - .quote(loadConfigurationPb.getQuote()) - .skipLeadingRows(loadConfigurationPb.getSkipLeadingRows()); - this.formatOptions = builder.build(); - } - this.maxBadRecords = loadConfigurationPb.getMaxBadRecords(); - if (loadConfigurationPb.getSchema() != null) { - this.schema = Schema.fromPb(loadConfigurationPb.getSchema()); - } - this.ignoreUnknownValues = loadConfigurationPb.getIgnoreUnknownValues(); - this.projectionFields = loadConfigurationPb.getProjectionFields(); } /** @@ -122,88 +70,10 @@ public Builder sourceUris(List sourceUris) { } /** - * Sets the destination table to load the data into. + * Sets the configuration for the BigQuery Load Job. */ - public Builder destinationTable(TableId destinationTable) { - this.destinationTable = destinationTable; - return this; - } - - /** - * Sets whether the job is allowed to create new tables. - * - * @see - * Jobs: Load Configuration - */ - public Builder createDisposition(CreateDisposition createDisposition) { - this.createDisposition = createDisposition; - return this; - } - - /** - * Sets the action that should occur if the destination table already exists. - * - * @see - * Jobs: Load Configuration - */ - public Builder writeDisposition(WriteDisposition writeDisposition) { - this.writeDisposition = writeDisposition; - return this; - } - - /** - * Sets the source format, and possibly some parsing options, of the external data. Supported - * formats are {@code CSV}, {@code NEWLINE_DELIMITED_JSON} and {@code DATASTORE_BACKUP}. If not - * specified, {@code CSV} format is assumed. - * - * - * Source Format - */ - public Builder formatOptions(FormatOptions formatOptions) { - this.formatOptions = formatOptions; - return this; - } - - /** - * Sets the maximum number of bad records that BigQuery can ignore when running the job. If the - * number of bad records exceeds this value, an invalid error is returned in the job result. - * By default no bad record is ignored. - */ - public Builder maxBadRecords(Integer maxBadRecords) { - this.maxBadRecords = maxBadRecords; - return this; - } - - /** - * Sets the schema for the destination table. The schema can be omitted if the destination table - * already exists, or if you're loading data from Google Cloud Datastore. - */ - public Builder schema(Schema schema) { - this.schema = schema; - return this; - } - - /** - * Sets whether BigQuery should allow extra values that are not represented in the table schema. - * If {@code true}, the extra values are ignored. If {@code true}, records with extra columns - * are treated as bad records, and if there are too many bad records, an invalid error is - * returned in the job result. By default unknown values are not allowed. - */ - public Builder ignoreUnknownValues(Boolean ignoreUnknownValues) { - this.ignoreUnknownValues = ignoreUnknownValues; - return this; - } - - /** - * Sets which entity properties to load into BigQuery from a Cloud Datastore backup. This field - * is only used if the source format is set to {@code DATASTORE_BACKUP}. Property names are case - * sensitive and must be top-level properties. If no properties are specified, BigQuery loads - * all properties. If any named property isn't found in the Cloud Datastore backup, an invalid - * error is returned in the job result. - */ - public Builder projectionFields(List projectionFields) { - this.projectionFields = - projectionFields != null ? ImmutableList.copyOf(projectionFields) : null; + public Builder configuration(LoadConfiguration configuration) { + this.configuration = configuration; return this; } @@ -216,14 +86,7 @@ public LoadJobInfo build() { private LoadJobInfo(Builder builder) { super(builder); this.sourceUris = builder.sourceUris; - this.destinationTable = checkNotNull(builder.destinationTable); - this.createDisposition = builder.createDisposition; - this.writeDisposition = builder.writeDisposition; - this.formatOptions = builder.formatOptions; - this.maxBadRecords = builder.maxBadRecords; - this.schema = builder.schema; - this.ignoreUnknownValues = builder.ignoreUnknownValues; - this.projectionFields = builder.projectionFields; + this.configuration = builder.configuration; } /** @@ -236,82 +99,10 @@ public List sourceUris() { } /** - * Returns the destination table to load the data into. + * Returns the configuration for the BigQuery Load Job. */ - public TableId destinationTable() { - return destinationTable; - } - - /** - * Returns whether the job is allowed to create new tables. - * - * @see - * Jobs: Load Configuration - */ - public CreateDisposition createDisposition() { - return this.createDisposition; - } - - /** - * Returns the action that should occur if the destination table already exists. - * - * @see - * Jobs: Load Configuration - */ - public WriteDisposition writeDisposition() { - return writeDisposition; - } - - /** - * Returns additional properties used to parse CSV data (used when {@link #format()} is set - * to CSV). Returns {@code null} if not set. - */ - public CsvOptions csvOptions() { - return formatOptions instanceof CsvOptions ? (CsvOptions) formatOptions : null; - } - - /** - * Returns the maximum number of bad records that BigQuery can ignore when running the job. If the - * number of bad records exceeds this value, an invalid error is returned in the job result. - * By default no bad record is ignored. - */ - public Integer maxBadRecords() { - return maxBadRecords; - } - - /** - * Returns the schema for the destination table, if set. Returns {@code null} otherwise. - */ - public Schema schema() { - return schema; - } - - /** - * Returns the format of the data files. - */ - public String format() { - return formatOptions != null ? formatOptions.type() : null; - } - - /** - * Returns whether BigQuery should allow extra values that are not represented in the table - * schema. If {@code true}, the extra values are ignored. If {@code true}, records with extra - * columns are treated as bad records, and if there are too many bad records, an invalid error is - * returned in the job result. By default unknown values are not allowed. - */ - public Boolean ignoreUnknownValues() { - return ignoreUnknownValues; - } - - /** - * Returns which entity properties to load into BigQuery from a Cloud Datastore backup. This field - * is only used if the source format is set to {@code DATASTORE_BACKUP}. Property names are case - * sensitive and must be top-level properties. If no properties are specified, BigQuery loads - * all properties. If any named property isn't found in the Cloud Datastore backup, an invalid - * error is returned in the job result. - */ - public List projectionFields() { - return projectionFields; + public LoadConfiguration configuration() { + return configuration; } @Override @@ -321,16 +112,7 @@ public Builder toBuilder() { @Override ToStringHelper toStringHelper() { - return super.toStringHelper() - .add("destinationTable", destinationTable) - .add("sourceUris", sourceUris) - .add("createDisposition", createDisposition) - .add("writeDisposition", writeDisposition) - .add("formatOptions", formatOptions) - .add("maxBadRecords", maxBadRecords) - .add("schema", schema) - .add("ignoreUnknownValue", ignoreUnknownValues) - .add("projectionFields", projectionFields); + return super.toStringHelper().add("sourceUris", sourceUris).add("configuration", configuration); } @Override @@ -340,122 +122,61 @@ public boolean equals(Object obj) { @Override public int hashCode() { - return Objects.hash(super.hashCode(), sourceUris, destinationTable, createDisposition, - writeDisposition, formatOptions, maxBadRecords, schema, ignoreUnknownValues, - projectionFields); + return Objects.hash(super.hashCode(), sourceUris, configuration); } @Override Job toPb() { - JobConfigurationLoad loadConfigurationPb = new JobConfigurationLoad(); + JobConfigurationLoad loadConfigurationPb = configuration.toPb(); loadConfigurationPb.setSourceUris(sourceUris); - loadConfigurationPb.setDestinationTable(destinationTable.toPb()); - if (createDisposition != null) { - loadConfigurationPb.setCreateDisposition(createDisposition.toString()); - } - if (writeDisposition != null) { - loadConfigurationPb.setWriteDisposition(writeDisposition.toString()); - } - if (csvOptions() != null) { - CsvOptions csvOptions = csvOptions(); - loadConfigurationPb.setFieldDelimiter(csvOptions.fieldDelimiter()) - .setAllowJaggedRows(csvOptions.allowJaggedRows()) - .setAllowQuotedNewlines(csvOptions.allowQuotedNewLines()) - .setEncoding(csvOptions.encoding()) - .setQuote(csvOptions.quote()) - .setSkipLeadingRows(csvOptions.skipLeadingRows()); - } - if (schema != null) { - loadConfigurationPb.setSchema(schema.toPb()); - } - if (formatOptions != null) { - loadConfigurationPb.setSourceFormat(formatOptions.type()); - } - loadConfigurationPb.setMaxBadRecords(maxBadRecords); - loadConfigurationPb.setIgnoreUnknownValues(ignoreUnknownValues); - loadConfigurationPb.setProjectionFields(projectionFields); return super.toPb().setConfiguration(new JobConfiguration().setLoad(loadConfigurationPb)); } /** - * Creates a builder for a BigQuery Load Job given destination table and source URI. + * Creates a builder for a BigQuery Load Job given the load configuration and source URI. */ - public static Builder builder(TableId destinationTable, String sourceUri) { - return builder(destinationTable, ImmutableList.of(checkNotNull(sourceUri))); + public static Builder builder(LoadConfiguration configuration, String sourceUri) { + return builder(configuration, ImmutableList.of(checkNotNull(sourceUri))); } /** - * Creates a builder for a BigQuery Load Job given destination table and source URIs. + * Creates a builder for a BigQuery Load Job given the load configuration and source URIs. */ - public static Builder builder(TableId destinationTable, List sourceUris) { - return new Builder().destinationTable(destinationTable).sourceUris(sourceUris); + public static Builder builder(LoadConfiguration configuration, List sourceUris) { + return new Builder().configuration(configuration).sourceUris(sourceUris); } /** - * Returns a BigQuery Load Job for the given destination table and source URI. Job's id is chosen + * Returns a BigQuery Load Job for the given load configuration and source URI. Job's id is chosen * by the service. */ - public static LoadJobInfo of(TableId destinationTable, String sourceUri) { - return builder(destinationTable, sourceUri).build(); + public static LoadJobInfo of(LoadConfiguration configuration, String sourceUri) { + return builder(configuration, sourceUri).build(); } /** - * Returns a BigQuery Load Job for the given destination table and source URIs. Job's id is chosen - * by the service. - */ - public static LoadJobInfo of(TableId destinationTable, List sourceUris) { - return builder(destinationTable, sourceUris).build(); - } - - /** - * Returns a BigQuery Load Job for the given destination table, format and source URI. Job's id is + * Returns a BigQuery Load Job for the given load configuration and source URIs. Job's id is * chosen by the service. */ - public static LoadJobInfo of(TableId destinationTable, FormatOptions format, String sourceUri) { - return builder(destinationTable, sourceUri).formatOptions(format).build(); - } - - /** - * Returns a BigQuery Load Job for the given destination table, format and source URIs. Job's id - * is chosen by the service. - */ - public static LoadJobInfo of(TableId destinationTable, FormatOptions format, - List sourceUris) { - return builder(destinationTable, sourceUris).formatOptions(format).build(); + public static LoadJobInfo of(LoadConfiguration configuration, List sourceUris) { + return builder(configuration, sourceUris).build(); } /** - * Returns a BigQuery Load Job for the given destination table and source URI. Job's id is set to + * Returns a BigQuery Load Job for the given load configuration and source URI. Job's id is set to * the provided value. */ - public static LoadJobInfo of(JobId jobId, TableId destinationTable, String sourceUri) { - return builder(destinationTable, sourceUri).jobId(jobId).build(); - } - - /** - * Returns a BigQuery Load Job for the given destination table and source URIs. Job's id is set to - * the provided value. - */ - public static LoadJobInfo of(JobId jobId, TableId destinationTable, List sourceUris) { - return builder(destinationTable, sourceUris).jobId(jobId).build(); - } - - /** - * Returns a BigQuery Load Job for the given destination table, format, and source URI. Job's id - * is set to the provided value. - */ - public static LoadJobInfo of(JobId jobId, TableId destinationTable, FormatOptions format, - String sourceUri) { - return builder(destinationTable, sourceUri).formatOptions(format).jobId(jobId).build(); + public static LoadJobInfo of(JobId jobId, LoadConfiguration configuration, String sourceUri) { + return builder(configuration, sourceUri).jobId(jobId).build(); } /** - * Returns a BigQuery Load Job for the given destination table, format and source URIs. Job's id - * is set to the provided value. + * Returns a BigQuery Load Job for the given load configuration and source URIs. Job's id is set + * to the provided value. */ - public static LoadJobInfo of(JobId jobId, TableId destinationTable, FormatOptions format, + public static LoadJobInfo of(JobId jobId, LoadConfiguration configuration, List sourceUris) { - return builder(destinationTable, sourceUris).formatOptions(format).jobId(jobId).build(); + return builder(configuration, sourceUris).jobId(jobId).build(); } @SuppressWarnings("unchecked") diff --git a/gcloud-java-bigquery/src/main/java/com/google/gcloud/bigquery/Table.java b/gcloud-java-bigquery/src/main/java/com/google/gcloud/bigquery/Table.java index fd9a96e132b6..d4aea9913b6a 100644 --- a/gcloud-java-bigquery/src/main/java/com/google/gcloud/bigquery/Table.java +++ b/gcloud-java-bigquery/src/main/java/com/google/gcloud/bigquery/Table.java @@ -22,6 +22,7 @@ import com.google.common.collect.ImmutableList; import com.google.gcloud.Page; +import java.nio.channels.SeekableByteChannel; import java.util.List; import java.util.Objects; @@ -167,6 +168,43 @@ InsertAllResponse insert(Iterable rows, boolean sk return bigquery.insertAll(request); } + /** + * Insert the rows in the provided byte array into the table using a resumable upload. + * + * @param format the format of the data to be inserted + * @param content data to be inserted as a byte array + * @throws BigQueryException upon failure + */ + void insert(FormatOptions format, byte[] content) throws BigQueryException { + LoadConfiguration configuration = LoadConfiguration.builder(info.tableId()) + .schema(info.schema()) + .formatOptions(format) + .build(); + bigquery.insertAll(configuration, content); + } + + /** + * Insert the rows in the provided seekable byte channel into the table using a resumable upload. + * This method does not close the channel. + * + *

Example usage of inserting data from a local file: + *

 {@code
+   * try(FileChannel channel = FileChannel.open(Paths.get("/path/to/file"))) {
+   *   table.insert(FormatOptions.csv(), channel);
+   * }}
+ * + * @param format the format of the data to be inserted + * @param channel data to be inserted as a seekable byte channel + * @throws BigQueryException upon failure + */ + void insert(FormatOptions format, SeekableByteChannel channel) throws BigQueryException { + LoadConfiguration configuration = LoadConfiguration.builder(info.tableId()) + .schema(info.schema()) + .formatOptions(format) + .build(); + bigquery.insertAll(configuration, channel); + } + /** * Returns the paginated list rows in this table. * @@ -262,7 +300,8 @@ Job load(FormatOptions format, String sourceUri, BigQuery.JobOption... options) */ Job load(FormatOptions format, List sourceUris, BigQuery.JobOption... options) throws BigQueryException { - return new Job(bigquery, bigquery.create(LoadJobInfo.of(info.tableId(), format, sourceUris), + return new Job(bigquery, + bigquery.create(LoadJobInfo.of(LoadConfiguration.of(info.tableId(), format), sourceUris), options)); } diff --git a/gcloud-java-bigquery/src/main/java/com/google/gcloud/spi/BigQueryRpc.java b/gcloud-java-bigquery/src/main/java/com/google/gcloud/spi/BigQueryRpc.java index d53ad838b802..aebc6dde33b9 100644 --- a/gcloud-java-bigquery/src/main/java/com/google/gcloud/spi/BigQueryRpc.java +++ b/gcloud-java-bigquery/src/main/java/com/google/gcloud/spi/BigQueryRpc.java @@ -19,6 +19,7 @@ import com.google.api.services.bigquery.model.Dataset; import com.google.api.services.bigquery.model.GetQueryResultsResponse; import com.google.api.services.bigquery.model.Job; +import com.google.api.services.bigquery.model.JobConfigurationLoad; import com.google.api.services.bigquery.model.QueryRequest; import com.google.api.services.bigquery.model.QueryResponse; import com.google.api.services.bigquery.model.Table; @@ -27,6 +28,7 @@ import com.google.api.services.bigquery.model.TableRow; import com.google.gcloud.bigquery.BigQueryException; +import java.io.InputStream; import java.util.Map; public interface BigQueryRpc { @@ -185,4 +187,35 @@ GetQueryResultsResponse getQueryResults(String jobId, Map options) throws BigQueryException; QueryResponse query(QueryRequest request) throws BigQueryException; + + /** + * Opens a resumable upload session and returns an upload URI. + * + * @param configuration load configuration + * @throws BigQueryException upon failure + */ + String open(JobConfigurationLoad configuration) throws BigQueryException; + + /** + * Returns the status of a resumable upload session. + * + * @param uploadUri the resumable upload session URI + * @return a {@link Tuple} {@code t} such that {@code t.x()} is {@code true} if and only if the + * upload completed. If {@code t.x()} is {@code false} {@code t.y()} is the last byte + * correctly uploaded to the resumable URI + * @throws BigQueryException upon failure + */ + Tuple status(String uploadUri); + + /** + * Uploads the provided data to the resumable upload session at the specified position. + * + * @param uploadUri the resumable upload session URI + * @param toWrite a stream of data to upload + * @param startPos the start position of data being uploaded. Should be set to {@code 0} for the + * first upload attempt. For resume attempts should be set to the value returned by + * {@link #status(String)} + * @throws BigQueryException upon failure + */ + void write(String uploadUri, InputStream toWrite, long startPos) throws BigQueryException; } diff --git a/gcloud-java-bigquery/src/main/java/com/google/gcloud/spi/DefaultBigQueryRpc.java b/gcloud-java-bigquery/src/main/java/com/google/gcloud/spi/DefaultBigQueryRpc.java index 04e481b345c2..477b8b99f228 100644 --- a/gcloud-java-bigquery/src/main/java/com/google/gcloud/spi/DefaultBigQueryRpc.java +++ b/gcloud-java-bigquery/src/main/java/com/google/gcloud/spi/DefaultBigQueryRpc.java @@ -20,12 +20,22 @@ import static com.google.gcloud.spi.BigQueryRpc.Option.PAGE_TOKEN; import static com.google.gcloud.spi.BigQueryRpc.Option.START_INDEX; import static com.google.gcloud.spi.BigQueryRpc.Option.TIMEOUT; +import static java.net.HttpURLConnection.HTTP_CREATED; import static java.net.HttpURLConnection.HTTP_NOT_FOUND; +import static java.net.HttpURLConnection.HTTP_OK; import com.google.api.client.googleapis.json.GoogleJsonError; import com.google.api.client.googleapis.json.GoogleJsonResponseException; +import com.google.api.client.http.EmptyContent; +import com.google.api.client.http.GenericUrl; +import com.google.api.client.http.HttpRequest; +import com.google.api.client.http.HttpRequestFactory; import com.google.api.client.http.HttpRequestInitializer; +import com.google.api.client.http.HttpResponse; import com.google.api.client.http.HttpTransport; +import com.google.api.client.http.InputStreamContent; +import com.google.api.client.http.json.JsonHttpContent; +import com.google.api.client.json.JsonFactory; import com.google.api.client.json.jackson.JacksonFactory; import com.google.api.services.bigquery.Bigquery; import com.google.api.services.bigquery.model.Dataset; @@ -33,6 +43,8 @@ import com.google.api.services.bigquery.model.DatasetReference; import com.google.api.services.bigquery.model.GetQueryResultsResponse; import com.google.api.services.bigquery.model.Job; +import com.google.api.services.bigquery.model.JobConfiguration; +import com.google.api.services.bigquery.model.JobConfigurationLoad; import com.google.api.services.bigquery.model.JobList; import com.google.api.services.bigquery.model.JobStatus; import com.google.api.services.bigquery.model.QueryRequest; @@ -54,6 +66,7 @@ import com.google.gcloud.bigquery.BigQueryOptions; import java.io.IOException; +import java.io.InputStream; import java.math.BigInteger; import java.util.List; import java.util.Map; @@ -64,6 +77,10 @@ public class DefaultBigQueryRpc implements BigQueryRpc { public static final String DEFAULT_PROJECTION = "full"; // see: https://cloud.google.com/bigquery/troubleshooting-errors private static final Set RETRYABLE_CODES = ImmutableSet.of(500, 502, 503, 504); + private static final String BASE_RESUMABLE_URI = + "https://www.googleapis.com/upload/bigquery/v2/projects/"; + private static final int HTTP_RESUME_INCOMPLETE = 308; + private final BigQueryOptions options; private final Bigquery bigquery; @@ -417,4 +434,65 @@ public QueryResponse query(QueryRequest request) throws BigQueryException { throw translate(ex); } } + + @Override + public String open(JobConfigurationLoad configuration) throws BigQueryException { + try { + Job loadJob = new Job().setConfiguration(new JobConfiguration().setLoad(configuration)); + StringBuilder builder = new StringBuilder() + .append(BASE_RESUMABLE_URI) + .append(options.projectId()) + .append("/jobs"); + GenericUrl url = new GenericUrl(builder.toString()); + url.set("uploadType", "resumable"); + JsonFactory jsonFactory = bigquery.getJsonFactory(); + HttpRequestFactory requestFactory = bigquery.getRequestFactory(); + HttpRequest httpRequest = + requestFactory.buildPostRequest(url, new JsonHttpContent(jsonFactory, loadJob)); + httpRequest.getHeaders().set("X-Upload-Content-Value", "application/octet-stream"); + HttpResponse response = httpRequest.execute(); + return response.getHeaders().getLocation(); + } catch (IOException ex) { + throw translate(ex); + } + } + + @Override + public Tuple status(String uploadUri) { + try { + GenericUrl url = new GenericUrl(uploadUri); + HttpRequest httpRequest = + bigquery.getRequestFactory().buildPutRequest(url, new EmptyContent()); + httpRequest.getHeaders().setContentRange("bytes */*"); + httpRequest.setThrowExceptionOnExecuteError(false); + HttpResponse response = httpRequest.execute(); + int code = response.getStatusCode(); + String message = response.getStatusMessage(); + String range = response.getHeaders().getRange(); + Long maxPos = null; + if (code == HTTP_RESUME_INCOMPLETE && range != null && range.matches("^bytes=0-\\d+$")) { + maxPos = Long.valueOf(range.split("-")[1]); + } + if (!(code == HTTP_OK || code == HTTP_CREATED || code == HTTP_RESUME_INCOMPLETE)) { + throw new BigQueryException(code, message, RETRYABLE_CODES.contains(code)); + } + return Tuple.of(code != HTTP_RESUME_INCOMPLETE, maxPos); + } catch (IOException ex) { + throw translate(ex); + } + } + + @Override + public void write(String uploadUri, InputStream toWrite, long startPos) throws BigQueryException { + try { + GenericUrl url = new GenericUrl(uploadUri); + HttpRequest httpRequest = bigquery.getRequestFactory().buildPutRequest(url, + new InputStreamContent(null, toWrite).setCloseInputStream(false)); + httpRequest.getHeaders().setContentType("*/*"); + httpRequest.getHeaders().setContentRange("bytes " + startPos + "-*/*"); + httpRequest.execute(); + } catch (IOException ex) { + throw translate(ex); + } + } } diff --git a/gcloud-java-bigquery/src/test/java/com/google/gcloud/bigquery/BigQueryImplTest.java b/gcloud-java-bigquery/src/test/java/com/google/gcloud/bigquery/BigQueryImplTest.java index 6a6a1c7cd6af..eced2b76e779 100644 --- a/gcloud-java-bigquery/src/test/java/com/google/gcloud/bigquery/BigQueryImplTest.java +++ b/gcloud-java-bigquery/src/test/java/com/google/gcloud/bigquery/BigQueryImplTest.java @@ -54,7 +54,12 @@ import org.junit.Test; import org.junit.rules.ExpectedException; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; import java.math.BigInteger; +import java.nio.ByteBuffer; +import java.nio.channels.SeekableByteChannel; import java.util.List; import java.util.Map; @@ -107,11 +112,11 @@ public class BigQueryImplTest { private static final TableInfo OTHER_TABLE_INFO = TableInfo.of(OTHER_TABLE_ID, TABLE_SCHEMA); private static final TableInfo TABLE_INFO_WITH_PROJECT = TableInfo.of(TABLE_ID_WITH_PROJECT, TABLE_SCHEMA); - private static final LoadJobInfo LOAD_JOB = LoadJobInfo.of(TABLE_ID, "URI"); + private static final LoadJobInfo LOAD_JOB = LoadJobInfo.of(LoadConfiguration.of(TABLE_ID), "URI"); private static final LoadJobInfo LOAD_JOB_WITH_PROJECT = - LoadJobInfo.of(TABLE_ID_WITH_PROJECT, "URI"); + LoadJobInfo.of(LoadConfiguration.of(TABLE_ID_WITH_PROJECT), "URI"); private static final LoadJobInfo COMPLETE_LOAD_JOB = - LoadJobInfo.builder(TABLE_ID_WITH_PROJECT, "URI") + LoadJobInfo.builder(LoadConfiguration.of(TABLE_ID_WITH_PROJECT), "URI") .jobId(JobId.of(PROJECT, JOB)) .build(); private static final CopyJobInfo COPY_JOB = @@ -157,6 +162,57 @@ public class BigQueryImplTest { .useQueryCache(false) .defaultDataset(DatasetId.of(PROJECT, DATASET)) .build(); + private static final byte[] CONTENT = {0xD, 0xE, 0xA, 0xD}; + private static final SeekableByteChannel SEEKABLE_BYTE_CHANNEL = new SeekableByteChannel() { + + private int position = 0; + private byte[] content = CONTENT; + + @Override + public int read(ByteBuffer dst) throws IOException { + if (position >= content.length) { + return 0; + } + int toRead = dst.remaining() > (CONTENT.length - position) ? (CONTENT.length - position) + : dst.remaining(); + dst.put(content, position, toRead); + return toRead; + } + + @Override + public int write(ByteBuffer src) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public long position() throws IOException { + return position; + } + + @Override + public SeekableByteChannel position(long newPosition) throws IOException { + this.position = (int) newPosition; + return this; + } + + @Override + public long size() throws IOException { + return content.length; + } + + @Override + public SeekableByteChannel truncate(long size) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isOpen() { + return true; + } + + @Override + public void close() throws IOException { } + }; // Empty BigQueryRpc options private static final Map EMPTY_RPC_OPTIONS = ImmutableMap.of(); @@ -639,6 +695,86 @@ public TableDataInsertAllRequest.Rows apply(RowToInsert rowToInsert) { assertEquals("ErrorMessage", response.errorsFor(0L).get(0).message()); } + @Test + public void testInsertAllBytesResumable() throws IOException { + String uploadUri = "uploadURI"; + Capture capturedInputStream = Capture.newInstance(); + LoadConfiguration loadConfiguration = LoadConfiguration.of(TABLE_ID); + EasyMock.expect(bigqueryRpcMock.open(LoadConfiguration.of(TABLE_ID_WITH_PROJECT).toPb())) + .andReturn(uploadUri); + bigqueryRpcMock.write(eq(uploadUri), capture(capturedInputStream), eq(0L)); + EasyMock.expectLastCall().once(); + EasyMock.replay(bigqueryRpcMock); + byte[] readContent = new byte[4]; + bigquery = options.service(); + bigquery.insertAll(loadConfiguration, CONTENT); + int read = capturedInputStream.getValue().read(readContent); + assertEquals(4, read); + assertArrayEquals(CONTENT, readContent); + } + + @Test + public void testInsertAllBytesResumableWithException() throws IOException { + String uploadUri = "uploadURI"; + Capture capturedInputStream = Capture.newInstance(); + LoadConfiguration loadConfiguration = LoadConfiguration.of(TABLE_ID); + EasyMock.expect(bigqueryRpcMock.open(LoadConfiguration.of(TABLE_ID_WITH_PROJECT).toPb())) + .andReturn(uploadUri); + bigqueryRpcMock.write(eq(uploadUri), capture(capturedInputStream), eq(0L)); + EasyMock.expectLastCall().andThrow(new BigQueryException(0, "Error", true)); + EasyMock.expect(bigqueryRpcMock.status(uploadUri)).andReturn(Tuple.of(false, 1L)); + bigqueryRpcMock.write(eq(uploadUri), capture(capturedInputStream), eq(2L)); + EasyMock.expectLastCall(); + EasyMock.replay(bigqueryRpcMock); + byte[] readContent = new byte[2]; + bigquery = options.toBuilder().retryParams(RetryParams.defaultInstance()).build().service(); + bigquery.insertAll(loadConfiguration, CONTENT); + int read = capturedInputStream.getValue().read(readContent); + assertEquals(2, read); + byte[] expectedContent = {0xA, 0xD}; + assertArrayEquals(expectedContent, readContent); + } + + @Test + public void testInsertAllChannelResumable() throws IOException { + String uploadUri = "uploadURI"; + Capture capturedInputStream = Capture.newInstance(); + LoadConfiguration loadConfiguration = LoadConfiguration.of(TABLE_ID); + EasyMock.expect(bigqueryRpcMock.open(LoadConfiguration.of(TABLE_ID_WITH_PROJECT).toPb())) + .andReturn(uploadUri); + bigqueryRpcMock.write(eq(uploadUri), capture(capturedInputStream), eq(0L)); + EasyMock.expectLastCall().once(); + EasyMock.replay(bigqueryRpcMock); + byte[] readContent = new byte[4]; + bigquery = options.service(); + bigquery.insertAll(loadConfiguration, SEEKABLE_BYTE_CHANNEL); + int read = capturedInputStream.getValue().read(readContent); + assertEquals(4, read); + assertArrayEquals(CONTENT, readContent); + } + + @Test + public void testInsertAllChannelResumableWithException() throws IOException { + String uploadUri = "uploadURI"; + Capture capturedInputStream = Capture.newInstance(); + LoadConfiguration loadConfiguration = LoadConfiguration.of(TABLE_ID); + EasyMock.expect(bigqueryRpcMock.open(LoadConfiguration.of(TABLE_ID_WITH_PROJECT).toPb())) + .andReturn(uploadUri); + bigqueryRpcMock.write(eq(uploadUri), capture(capturedInputStream), eq(0L)); + EasyMock.expectLastCall().andThrow(new BigQueryException(0, "Error", true)); + EasyMock.expect(bigqueryRpcMock.status(uploadUri)).andReturn(Tuple.of(false, 1L)); + bigqueryRpcMock.write(eq(uploadUri), capture(capturedInputStream), eq(2L)); + EasyMock.expectLastCall(); + EasyMock.replay(bigqueryRpcMock); + byte[] readContent = new byte[2]; + bigquery = options.toBuilder().retryParams(RetryParams.defaultInstance()).build().service(); + bigquery.insertAll(loadConfiguration, SEEKABLE_BYTE_CHANNEL); + int read = capturedInputStream.getValue().read(readContent); + assertEquals(2, read); + byte[] expectedContent = {0xA, 0xD}; + assertArrayEquals(expectedContent, readContent); + } + @Test public void testListTableData() { String cursor = "cursor"; diff --git a/gcloud-java-bigquery/src/test/java/com/google/gcloud/bigquery/ITBigQueryTest.java b/gcloud-java-bigquery/src/test/java/com/google/gcloud/bigquery/ITBigQueryTest.java index 981bc142a9d5..f63c69aba43c 100644 --- a/gcloud-java-bigquery/src/test/java/com/google/gcloud/bigquery/ITBigQueryTest.java +++ b/gcloud-java-bigquery/src/test/java/com/google/gcloud/bigquery/ITBigQueryTest.java @@ -147,10 +147,11 @@ public static void beforeClass() throws IOException, InterruptedException { JSON_CONTENT.getBytes(StandardCharsets.UTF_8)); DatasetInfo info = DatasetInfo.builder(DATASET).description(DESCRIPTION).build(); bigquery.create(info); - LoadJobInfo job = LoadJobInfo.builder(TABLE_ID, "gs://" + BUCKET + "/" + JSON_LOAD_FILE) + LoadConfiguration configuration = LoadConfiguration.builder(TABLE_ID, FormatOptions.json()) .createDisposition(JobInfo.CreateDisposition.CREATE_IF_NEEDED) .schema(TABLE_SCHEMA) - .formatOptions(FormatOptions.json()) + .build(); + LoadJobInfo job = LoadJobInfo.builder(configuration, "gs://" + BUCKET + "/" + JSON_LOAD_FILE) .build(); job = bigquery.create(job); while (job.status().state() != JobStatus.State.DONE) { @@ -811,10 +812,11 @@ public void testQueryJob() throws InterruptedException { public void testExtractJob() throws InterruptedException { String tableName = "test_export_job_table"; TableId destinationTable = TableId.of(DATASET, tableName); + LoadConfiguration loadConfiguration = LoadConfiguration.builder(destinationTable) + .schema(SIMPLE_SCHEMA) + .build(); LoadJobInfo remoteLoadJob = bigquery.create( - LoadJobInfo.builder(destinationTable, "gs://" + BUCKET + "/" + LOAD_FILE) - .schema(SIMPLE_SCHEMA) - .build()); + LoadJobInfo.builder(loadConfiguration, "gs://" + BUCKET + "/" + LOAD_FILE).build()); while (remoteLoadJob.status().state() != JobStatus.State.DONE) { Thread.sleep(1000); remoteLoadJob = bigquery.getJob(remoteLoadJob.jobId()); @@ -858,4 +860,48 @@ public void testCancelJob() throws InterruptedException { public void testCancelNonExistingJob() throws InterruptedException { assertFalse(bigquery.cancel("test_cancel_non_existing_job")); } + + @Test + public void testInsertAllResumable() throws InterruptedException, IOException { + String destinationTableName = "test_insert_all_resumable_table"; + TableId tableId = TableId.of(DATASET, destinationTableName); + LoadConfiguration configuration = + LoadConfiguration.builder(tableId) + .formatOptions(FormatOptions.json()) + .createDisposition(JobInfo.CreateDisposition.CREATE_IF_NEEDED) + .schema(TABLE_SCHEMA) + .build(); + bigquery.insertAll(configuration, JSON_CONTENT.getBytes(StandardCharsets.UTF_8)); + // wait until the new table is created. If the table is never created the test will time-out + while (bigquery.getTable(tableId) == null) { + Thread.sleep(1000L); + } + Page> rows = bigquery.listTableData(tableId); + int rowCount = 0; + for (List row : rows.values()) { + FieldValue timestampCell = row.get(0); + FieldValue stringCell = row.get(1); + FieldValue integerCell = row.get(2); + FieldValue booleanCell = row.get(3); + FieldValue recordCell = row.get(4); + assertEquals(FieldValue.Attribute.PRIMITIVE, timestampCell.attribute()); + assertEquals(FieldValue.Attribute.PRIMITIVE, stringCell.attribute()); + assertEquals(FieldValue.Attribute.REPEATED, integerCell.attribute()); + assertEquals(FieldValue.Attribute.PRIMITIVE, booleanCell.attribute()); + assertEquals(FieldValue.Attribute.RECORD, recordCell.attribute()); + assertEquals(1408452095220000L, timestampCell.timestampValue()); + assertEquals("stringValue", stringCell.stringValue()); + assertEquals(0, integerCell.repeatedValue().get(0).longValue()); + assertEquals(1, integerCell.repeatedValue().get(1).longValue()); + assertEquals(false, booleanCell.booleanValue()); + assertEquals(-14182916000000L, recordCell.recordValue().get(0).timestampValue()); + assertTrue(recordCell.recordValue().get(1).isNull()); + assertEquals(1, recordCell.recordValue().get(2).repeatedValue().get(0).longValue()); + assertEquals(0, recordCell.recordValue().get(2).repeatedValue().get(1).longValue()); + assertEquals(true, recordCell.recordValue().get(3).booleanValue()); + rowCount++; + } + assertEquals(2, rowCount); + assertTrue(bigquery.delete(DATASET, destinationTableName)); + } } diff --git a/gcloud-java-bigquery/src/test/java/com/google/gcloud/bigquery/LoadConfigurationTest.java b/gcloud-java-bigquery/src/test/java/com/google/gcloud/bigquery/LoadConfigurationTest.java new file mode 100644 index 000000000000..57616210501f --- /dev/null +++ b/gcloud-java-bigquery/src/test/java/com/google/gcloud/bigquery/LoadConfigurationTest.java @@ -0,0 +1,123 @@ +/* + * Copyright 2015 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.gcloud.bigquery; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +import com.google.common.collect.ImmutableList; +import com.google.gcloud.bigquery.JobInfo.CreateDisposition; +import com.google.gcloud.bigquery.JobInfo.WriteDisposition; + +import org.junit.Test; + +import java.nio.charset.StandardCharsets; +import java.util.List; + +public class LoadConfigurationTest { + + private static final CsvOptions CSV_OPTIONS = CsvOptions.builder() + .allowJaggedRows(true) + .allowQuotedNewLines(false) + .encoding(StandardCharsets.UTF_8) + .build(); + private static final TableId TABLE_ID = TableId.of("dataset", "table"); + private static final CreateDisposition CREATE_DISPOSITION = CreateDisposition.CREATE_IF_NEEDED; + private static final WriteDisposition WRITE_DISPOSITION = WriteDisposition.WRITE_APPEND; + private static final Integer MAX_BAD_RECORDS = 42; + private static final String FORMAT = "CSV"; + private static final Boolean IGNORE_UNKNOWN_VALUES = true; + private static final List PROJECTION_FIELDS = ImmutableList.of("field1", "field2"); + private static final Field FIELD_SCHEMA = Field.builder("IntegerField", Field.Type.integer()) + .mode(Field.Mode.REQUIRED) + .description("FieldDescription") + .build(); + private static final Schema TABLE_SCHEMA = Schema.of(FIELD_SCHEMA); + private static final LoadConfiguration LOAD_CONFIGURATION = LoadConfiguration.builder(TABLE_ID) + .createDisposition(CREATE_DISPOSITION) + .writeDisposition(WRITE_DISPOSITION) + .formatOptions(CSV_OPTIONS) + .ignoreUnknownValues(IGNORE_UNKNOWN_VALUES) + .maxBadRecords(MAX_BAD_RECORDS) + .projectionFields(PROJECTION_FIELDS) + .schema(TABLE_SCHEMA) + .build(); + + @Test + public void testToBuilder() { + compareLoadConfiguration(LOAD_CONFIGURATION, LOAD_CONFIGURATION.toBuilder().build()); + LoadConfiguration configuration = LOAD_CONFIGURATION.toBuilder() + .destinationTable(TableId.of("dataset", "newTable")) + .build(); + assertEquals("newTable", configuration.destinationTable().table()); + configuration = configuration.toBuilder().destinationTable(TABLE_ID).build(); + compareLoadConfiguration(LOAD_CONFIGURATION, configuration); + } + + @Test + public void testOf() { + LoadConfiguration configuration = LoadConfiguration.of(TABLE_ID); + assertEquals(TABLE_ID, configuration.destinationTable()); + configuration = LoadConfiguration.of(TABLE_ID, CSV_OPTIONS); + assertEquals(TABLE_ID, configuration.destinationTable()); + assertEquals(FORMAT, configuration.format()); + assertEquals(CSV_OPTIONS, configuration.csvOptions()); + } + + @Test + public void testToBuilderIncomplete() { + LoadConfiguration configuration = LoadConfiguration.of(TABLE_ID); + compareLoadConfiguration( configuration, configuration.toBuilder().build()); + } + + @Test + public void testBuilder() { + assertEquals(TABLE_ID, LOAD_CONFIGURATION.destinationTable()); + assertEquals(CREATE_DISPOSITION, LOAD_CONFIGURATION.createDisposition()); + assertEquals(WRITE_DISPOSITION, LOAD_CONFIGURATION.writeDisposition()); + assertEquals(CSV_OPTIONS, LOAD_CONFIGURATION.csvOptions()); + assertEquals(FORMAT, LOAD_CONFIGURATION.format()); + assertEquals(IGNORE_UNKNOWN_VALUES, LOAD_CONFIGURATION.ignoreUnknownValues()); + assertEquals(MAX_BAD_RECORDS, LOAD_CONFIGURATION.maxBadRecords()); + assertEquals(PROJECTION_FIELDS, LOAD_CONFIGURATION.projectionFields()); + assertEquals(TABLE_SCHEMA, LOAD_CONFIGURATION.schema()); + } + + @Test + public void testToPbAndFromPb() { + assertNull(LOAD_CONFIGURATION.toPb().getSourceUris()); + compareLoadConfiguration(LOAD_CONFIGURATION, + LoadConfiguration.fromPb(LOAD_CONFIGURATION.toPb())); + LoadConfiguration configuration = LoadConfiguration.of(TABLE_ID); + compareLoadConfiguration(configuration, LoadConfiguration.fromPb(configuration.toPb())); + } + + private void compareLoadConfiguration(LoadConfiguration expected, LoadConfiguration value) { + assertEquals(expected, value); + assertEquals(expected.hashCode(), value.hashCode()); + assertEquals(expected.toString(), value.toString()); + assertEquals(expected.destinationTable(), value.destinationTable()); + assertEquals(expected.createDisposition(), value.createDisposition()); + assertEquals(expected.writeDisposition(), value.writeDisposition()); + assertEquals(expected.csvOptions(), value.csvOptions()); + assertEquals(expected.format(), value.format()); + assertEquals(expected.ignoreUnknownValues(), value.ignoreUnknownValues()); + assertEquals(expected.maxBadRecords(), value.maxBadRecords()); + assertEquals(expected.projectionFields(), value.projectionFields()); + assertEquals(expected.schema(), value.schema()); + } +} diff --git a/gcloud-java-bigquery/src/test/java/com/google/gcloud/bigquery/LoadJobInfoTest.java b/gcloud-java-bigquery/src/test/java/com/google/gcloud/bigquery/LoadJobInfoTest.java index 06ce0b42ad4b..499d0d939698 100644 --- a/gcloud-java-bigquery/src/test/java/com/google/gcloud/bigquery/LoadJobInfoTest.java +++ b/gcloud-java-bigquery/src/test/java/com/google/gcloud/bigquery/LoadJobInfoTest.java @@ -47,7 +47,6 @@ public class LoadJobInfoTest { private static final CreateDisposition CREATE_DISPOSITION = CreateDisposition.CREATE_IF_NEEDED; private static final WriteDisposition WRITE_DISPOSITION = WriteDisposition.WRITE_APPEND; private static final Integer MAX_BAD_RECORDS = 42; - private static final String FORMAT = "CSV"; private static final Boolean IGNORE_UNKNOWN_VALUES = true; private static final List PROJECTION_FIELDS = ImmutableList.of("field1", "field2"); private static final JobId JOB_ID = JobId.of("job"); @@ -66,13 +65,7 @@ public class LoadJobInfoTest { .inputBytes(2048L) .outputRows(24L) .build(); - private static final LoadJobInfo LOAD_JOB = LoadJobInfo.builder(TABLE_ID, SOURCE_URIS) - .etag(ETAG) - .id(ID) - .selfLink(SELF_LINK) - .userEmail(EMAIL) - .jobId(JOB_ID) - .status(JOB_STATUS) + private static final LoadConfiguration LOAD_CONFIGURATION = LoadConfiguration.builder(TABLE_ID) .createDisposition(CREATE_DISPOSITION) .writeDisposition(WRITE_DISPOSITION) .formatOptions(CSV_OPTIONS) @@ -80,63 +73,47 @@ public class LoadJobInfoTest { .maxBadRecords(MAX_BAD_RECORDS) .projectionFields(PROJECTION_FIELDS) .schema(TABLE_SCHEMA) + .build(); + private static final LoadJobInfo LOAD_JOB = LoadJobInfo.builder(LOAD_CONFIGURATION, SOURCE_URIS) + .etag(ETAG) + .id(ID) + .selfLink(SELF_LINK) + .userEmail(EMAIL) + .jobId(JOB_ID) + .status(JOB_STATUS) .statistics(JOB_STATISTICS) .build(); @Test public void testToBuilder() { compareLoadJobInfo(LOAD_JOB, LOAD_JOB.toBuilder().build()); - LoadJobInfo job = LOAD_JOB.toBuilder() - .destinationTable(TableId.of("dataset", "newTable")) - .build(); - assertEquals("newTable", job.destinationTable().table()); - job = job.toBuilder().destinationTable(TABLE_ID).build(); + LoadJobInfo job = LOAD_JOB.toBuilder().etag("newEtag").build(); + assertEquals("newEtag", job.etag()); + job = job.toBuilder().etag(ETAG).build(); compareLoadJobInfo(LOAD_JOB, job); } @Test public void testOf() { - LoadJobInfo job = LoadJobInfo.of(TABLE_ID, SOURCE_URIS); - assertEquals(TABLE_ID, job.destinationTable()); - assertEquals(SOURCE_URIS, job.sourceUris()); - job = LoadJobInfo.of(TABLE_ID, SOURCE_URI); - assertEquals(TABLE_ID, job.destinationTable()); - assertEquals(ImmutableList.of(SOURCE_URI), job.sourceUris()); - job = LoadJobInfo.of(TABLE_ID, CSV_OPTIONS, SOURCE_URIS); - assertEquals(TABLE_ID, job.destinationTable()); + LoadJobInfo job = LoadJobInfo.of(LOAD_CONFIGURATION, SOURCE_URIS); + assertEquals(LOAD_CONFIGURATION, job.configuration()); assertEquals(SOURCE_URIS, job.sourceUris()); - assertEquals(FORMAT, job.format()); - assertEquals(CSV_OPTIONS, job.csvOptions()); - job = LoadJobInfo.of(TABLE_ID, CSV_OPTIONS, SOURCE_URI); - assertEquals(TABLE_ID, job.destinationTable()); - assertEquals(ImmutableList.of(SOURCE_URI), job.sourceUris()); - assertEquals(FORMAT, job.format()); - assertEquals(CSV_OPTIONS, job.csvOptions()); - job = LoadJobInfo.of(JOB_ID, TABLE_ID, SOURCE_URIS); - assertEquals(JOB_ID, job.jobId()); - assertEquals(TABLE_ID, job.destinationTable()); - assertEquals(SOURCE_URIS, job.sourceUris()); - job = LoadJobInfo.of(JOB_ID, TABLE_ID, SOURCE_URI); - assertEquals(JOB_ID, job.jobId()); - assertEquals(TABLE_ID, job.destinationTable()); + job = LoadJobInfo.of(LOAD_CONFIGURATION, SOURCE_URI); + assertEquals(LOAD_CONFIGURATION, job.configuration()); assertEquals(ImmutableList.of(SOURCE_URI), job.sourceUris()); - job = LoadJobInfo.of(JOB_ID, TABLE_ID, CSV_OPTIONS, SOURCE_URIS); + job = LoadJobInfo.of(JOB_ID, LOAD_CONFIGURATION, SOURCE_URIS); assertEquals(JOB_ID, job.jobId()); - assertEquals(TABLE_ID, job.destinationTable()); + assertEquals(LOAD_CONFIGURATION, job.configuration()); assertEquals(SOURCE_URIS, job.sourceUris()); - assertEquals(FORMAT, job.format()); - assertEquals(CSV_OPTIONS, job.csvOptions()); - job = LoadJobInfo.of(JOB_ID, TABLE_ID, CSV_OPTIONS, SOURCE_URI); + job = LoadJobInfo.of(JOB_ID, LOAD_CONFIGURATION, SOURCE_URI); assertEquals(JOB_ID, job.jobId()); - assertEquals(TABLE_ID, job.destinationTable()); + assertEquals(LOAD_CONFIGURATION, job.configuration()); assertEquals(ImmutableList.of(SOURCE_URI), job.sourceUris()); - assertEquals(FORMAT, job.format()); - assertEquals(CSV_OPTIONS, job.csvOptions()); } @Test public void testToBuilderIncomplete() { - LoadJobInfo job = LoadJobInfo.of(TABLE_ID, SOURCE_URIS); + LoadJobInfo job = LoadJobInfo.of(LOAD_CONFIGURATION, SOURCE_URIS); compareLoadJobInfo(job, job.toBuilder().build()); } @@ -148,16 +125,8 @@ public void testBuilder() { assertEquals(EMAIL, LOAD_JOB.userEmail()); assertEquals(JOB_ID, LOAD_JOB.jobId()); assertEquals(JOB_STATUS, LOAD_JOB.status()); - assertEquals(TABLE_ID, LOAD_JOB.destinationTable()); + assertEquals(LOAD_CONFIGURATION, LOAD_JOB.configuration()); assertEquals(SOURCE_URIS, LOAD_JOB.sourceUris()); - assertEquals(CREATE_DISPOSITION, LOAD_JOB.createDisposition()); - assertEquals(WRITE_DISPOSITION, LOAD_JOB.writeDisposition()); - assertEquals(CSV_OPTIONS, LOAD_JOB.csvOptions()); - assertEquals(FORMAT, LOAD_JOB.format()); - assertEquals(IGNORE_UNKNOWN_VALUES, LOAD_JOB.ignoreUnknownValues()); - assertEquals(MAX_BAD_RECORDS, LOAD_JOB.maxBadRecords()); - assertEquals(PROJECTION_FIELDS, LOAD_JOB.projectionFields()); - assertEquals(TABLE_SCHEMA, LOAD_JOB.schema()); assertEquals(JOB_STATISTICS, LOAD_JOB.statistics()); } @@ -170,7 +139,7 @@ public void testToPbAndFromPb() { assertEquals(JOB_STATISTICS, JobStatistics.fromPb(LOAD_JOB.toPb().getStatistics())); compareLoadJobInfo(LOAD_JOB, LoadJobInfo.fromPb(LOAD_JOB.toPb())); compareLoadJobInfo(LOAD_JOB, (LoadJobInfo) JobInfo.fromPb(LOAD_JOB.toPb())); - LoadJobInfo job = LoadJobInfo.of(TABLE_ID, SOURCE_URIS); + LoadJobInfo job = LoadJobInfo.of(LOAD_CONFIGURATION, SOURCE_URIS); compareLoadJobInfo(job, LoadJobInfo.fromPb(job.toPb())); compareLoadJobInfo(job, (LoadJobInfo) JobInfo.fromPb(job.toPb())); } @@ -186,15 +155,7 @@ private void compareLoadJobInfo(LoadJobInfo expected, LoadJobInfo value) { assertEquals(expected.status(), value.status()); assertEquals(expected.statistics(), value.statistics()); assertEquals(expected.userEmail(), value.userEmail()); - assertEquals(expected.destinationTable(), value.destinationTable()); + assertEquals(expected.configuration(), value.configuration()); assertEquals(expected.sourceUris(), value.sourceUris()); - assertEquals(expected.createDisposition(), value.createDisposition()); - assertEquals(expected.writeDisposition(), value.writeDisposition()); - assertEquals(expected.csvOptions(), value.csvOptions()); - assertEquals(expected.format(), value.format()); - assertEquals(expected.ignoreUnknownValues(), value.ignoreUnknownValues()); - assertEquals(expected.maxBadRecords(), value.maxBadRecords()); - assertEquals(expected.projectionFields(), value.projectionFields()); - assertEquals(expected.schema(), value.schema()); } } diff --git a/gcloud-java-bigquery/src/test/java/com/google/gcloud/bigquery/SerializationTest.java b/gcloud-java-bigquery/src/test/java/com/google/gcloud/bigquery/SerializationTest.java index 8c80bddbfefb..31670d5b664d 100644 --- a/gcloud-java-bigquery/src/test/java/com/google/gcloud/bigquery/SerializationTest.java +++ b/gcloud-java-bigquery/src/test/java/com/google/gcloud/bigquery/SerializationTest.java @@ -168,7 +168,15 @@ public class SerializationTest { private static final JobId JOB_ID = JobId.of("project", "job"); private static final CopyJobInfo COPY_JOB = CopyJobInfo.of(TABLE_ID, TABLE_ID); private static final ExtractJobInfo EXTRACT_JOB = ExtractJobInfo.of(TABLE_ID, SOURCE_URIS); - private static final LoadJobInfo LOAD_JOB = LoadJobInfo.of(TABLE_ID, SOURCE_URIS); + private static final LoadConfiguration LOAD_CONFIGURATION = LoadConfiguration.builder(TABLE_ID) + .createDisposition(JobInfo.CreateDisposition.CREATE_IF_NEEDED) + .writeDisposition(JobInfo.WriteDisposition.WRITE_APPEND) + .formatOptions(CSV_OPTIONS) + .ignoreUnknownValues(true) + .maxBadRecords(10) + .schema(TABLE_SCHEMA) + .build(); + private static final LoadJobInfo LOAD_JOB = LoadJobInfo.of(LOAD_CONFIGURATION, SOURCE_URIS); private static final QueryJobInfo QUERY_JOB = QueryJobInfo.of("query"); private static final Map CONTENT1 = ImmutableMap.of("key", "val1"); @@ -231,8 +239,8 @@ public void testModelAndRequests() throws Exception { DATASET_INFO, TABLE_ID, CSV_OPTIONS, STREAMING_BUFFER, EXTERNAL_DATA_CONFIGURATION, TABLE_SCHEMA, TABLE_INFO, VIEW_INFO, EXTERNAL_TABLE_INFO, INLINE_FUNCTION, URI_FUNCTION, JOB_STATISTICS, EXTRACT_STATISTICS, LOAD_STATISTICS, QUERY_STATISTICS, BIGQUERY_ERROR, - JOB_STATUS, JOB_ID, COPY_JOB, EXTRACT_JOB, LOAD_JOB, QUERY_JOB, INSERT_ALL_REQUEST, - INSERT_ALL_RESPONSE, FIELD_VALUE, QUERY_REQUEST, QUERY_RESPONSE, + JOB_STATUS, JOB_ID, COPY_JOB, EXTRACT_JOB, LOAD_CONFIGURATION, LOAD_JOB, QUERY_JOB, + INSERT_ALL_REQUEST, INSERT_ALL_RESPONSE, FIELD_VALUE, QUERY_REQUEST, QUERY_RESPONSE, BigQuery.DatasetOption.fields(), BigQuery.DatasetDeleteOption.deleteContents(), BigQuery.DatasetListOption.all(), BigQuery.TableOption.fields(), BigQuery.TableListOption.maxResults(42L), BigQuery.JobOption.fields(), diff --git a/gcloud-java-bigquery/src/test/java/com/google/gcloud/bigquery/TableTest.java b/gcloud-java-bigquery/src/test/java/com/google/gcloud/bigquery/TableTest.java index e5b152a91a5f..d60f182361f9 100644 --- a/gcloud-java-bigquery/src/test/java/com/google/gcloud/bigquery/TableTest.java +++ b/gcloud-java-bigquery/src/test/java/com/google/gcloud/bigquery/TableTest.java @@ -48,9 +48,9 @@ public class TableTest { private static final TableId TABLE_ID1 = TableId.of("dataset", "table1"); private static final TableId TABLE_ID2 = TableId.of("dataset", "table2"); private static final JobInfo COPY_JOB_INFO = CopyJobInfo.of(TABLE_ID2, TABLE_ID1); - private static final JobInfo LOAD_JOB_INFO = - LoadJobInfo.builder(TABLE_ID1, ImmutableList.of("URI")) - .formatOptions(FormatOptions.json()) + private static final JobInfo LOAD_JOB_INFO = LoadJobInfo.builder( + LoadConfiguration.builder(TABLE_ID1).formatOptions(FormatOptions.json()).build(), + ImmutableList.of("URI")) .build(); private static final JobInfo EXTRACT_JOB_INFO = ExtractJobInfo.builder(TABLE_ID1, ImmutableList.of("URI")) diff --git a/gcloud-java-examples/src/main/java/com/google/gcloud/examples/BigQueryExample.java b/gcloud-java-examples/src/main/java/com/google/gcloud/examples/BigQueryExample.java index 1754be4df7dc..d664485fc9aa 100644 --- a/gcloud-java-examples/src/main/java/com/google/gcloud/examples/BigQueryExample.java +++ b/gcloud-java-examples/src/main/java/com/google/gcloud/examples/BigQueryExample.java @@ -33,6 +33,7 @@ import com.google.gcloud.bigquery.JobId; import com.google.gcloud.bigquery.JobInfo; import com.google.gcloud.bigquery.JobStatus; +import com.google.gcloud.bigquery.LoadConfiguration; import com.google.gcloud.bigquery.LoadJobInfo; import com.google.gcloud.bigquery.QueryRequest; import com.google.gcloud.bigquery.QueryResponse; @@ -42,6 +43,8 @@ import com.google.gcloud.bigquery.ViewInfo; import com.google.gcloud.spi.BigQueryRpc.Tuple; +import java.nio.channels.FileChannel; +import java.nio.file.Paths; import java.util.Arrays; import java.util.HashMap; import java.util.Iterator; @@ -544,8 +547,8 @@ LoadJobInfo parse(String... args) throws Exception { String table = args[1]; String format = args[2]; TableId tableId = TableId.of(dataset, table); - return LoadJobInfo.builder(tableId, Arrays.asList(args).subList(3, args.length)) - .formatOptions(FormatOptions.of(format)) + LoadConfiguration configuration = LoadConfiguration.of(tableId, FormatOptions.of(format)); + return LoadJobInfo.builder(configuration, Arrays.asList(args).subList(3, args.length)) .build(); } throw new IllegalArgumentException("Missing required arguments."); @@ -659,6 +662,40 @@ protected String params() { } } + /** + * This class demonstrates how to load data into a BigQuery Table from a local file. + * + * @see Resumable + * Upload + */ + private static class LoadFileAction extends BigQueryAction> { + @Override + void run(BigQuery bigquery, Tuple configuration) throws Exception { + System.out.println("Running insert"); + try(FileChannel channel = FileChannel.open(Paths.get(configuration.y()))) { + bigquery.insertAll(configuration.x(), channel); + } + } + + @Override + Tuple parse(String... args) throws Exception { + if (args.length == 4) { + String dataset = args[0]; + String table = args[1]; + String format = args[2]; + TableId tableId = TableId.of(dataset, table); + LoadConfiguration configuration = LoadConfiguration.of(tableId, FormatOptions.of(format)); + return Tuple.of(configuration, args[3]); + } + throw new IllegalArgumentException("Missing required arguments."); + } + + @Override + protected String params() { + return " "; + } + } + static { CREATE_ACTIONS.put("dataset", new CreateDatasetAction()); CREATE_ACTIONS.put("table", new CreateSimpleTableAction()); @@ -682,6 +719,7 @@ protected String params() { ACTIONS.put("extract", new ExtractAction()); ACTIONS.put("copy", new CopyAction()); ACTIONS.put("query", new QueryAction()); + ACTIONS.put("load-file", new LoadFileAction()); } private static void printUsage() {