From 23f31a6e751d58714d218929fc9d77fcc24fe840 Mon Sep 17 00:00:00 2001 From: Neville Li Date: Mon, 18 Apr 2016 22:17:45 -0400 Subject: [PATCH 1/3] allow flattenResults override in BigQueryClient --- .../spotify/scio/bigquery/BigQueryClient.scala | 15 ++++++++------- .../main/scala/com/spotify/scio/ScioContext.scala | 5 +++-- .../src/main/scala/com/spotify/scio/io/Taps.scala | 11 +++++++---- 3 files changed, 18 insertions(+), 13 deletions(-) diff --git a/scio-bigquery/src/main/scala/com/spotify/scio/bigquery/BigQueryClient.scala b/scio-bigquery/src/main/scala/com/spotify/scio/bigquery/BigQueryClient.scala index 57107c0522..89b914e955 100644 --- a/scio-bigquery/src/main/scala/com/spotify/scio/bigquery/BigQueryClient.scala +++ b/scio-bigquery/src/main/scala/com/spotify/scio/bigquery/BigQueryClient.scala @@ -151,8 +151,8 @@ class BigQueryClient private (private val projectId: String, } /** Get rows from a query. */ - def getQueryRows(sqlQuery: String): Iterator[TableRow] = { - val queryJob = queryIntoTable(sqlQuery) + def getQueryRows(sqlQuery: String, flattenResults: Boolean = false): Iterator[TableRow] = { + val queryJob = queryIntoTable(sqlQuery, flattenResults) queryJob.waitForResult() getTableRows(queryJob.table) } @@ -198,7 +198,7 @@ class BigQueryClient private (private val projectId: String, } /** Execute a query and save results into a temporary table. */ - def queryIntoTable(sqlQuery: String): QueryJob = { + def queryIntoTable(sqlQuery: String, flattenResults: Boolean): QueryJob = { try { val sourceTimes = BigQueryUtil.extractTables(sqlQuery).map(t => BigInt(getTable(t).getLastModifiedTime)) @@ -218,7 +218,7 @@ class BigQueryClient private (private val projectId: String, logger.info(s"Cache invalid for query: $sqlQuery") logger.info(s"New destination table: ${BigQueryIO.toTableSpec(temp)}") setCacheDestinationTable(sqlQuery, temp) - makeQueryJob(sqlQuery, temp) + makeQueryJob(sqlQuery, temp, flattenResults) } } catch { case NonFatal(_) => @@ -226,7 +226,7 @@ class BigQueryClient private (private val projectId: String, logger.info(s"Cache miss for query: $sqlQuery") logger.info(s"New destination table: ${BigQueryIO.toTableSpec(temp)}") setCacheDestinationTable(sqlQuery, temp) - makeQueryJob(sqlQuery, temp) + makeQueryJob(sqlQuery, temp, flattenResults) } } @@ -314,7 +314,8 @@ class BigQueryClient private (private val projectId: String, } private def makeQueryJob(sqlQuery: String, - destinationTable: TableReference): QueryJob = new QueryJob { + destinationTable: TableReference, + flattenResults: Boolean): QueryJob = new QueryJob { override def waitForResult(): Unit = self.waitForJobs(this) override lazy val jobReference: Option[JobReference] = { prepareStagingDataset() @@ -322,7 +323,7 @@ class BigQueryClient private (private val projectId: String, val queryConfig: JobConfigurationQuery = new JobConfigurationQuery() .setQuery(sqlQuery) .setAllowLargeResults(true) - .setFlattenResults(false) + .setFlattenResults(flattenResults) .setPriority(PRIORITY) .setCreateDisposition("CREATE_IF_NEEDED") .setWriteDisposition("WRITE_EMPTY") diff --git a/scio-core/src/main/scala/com/spotify/scio/ScioContext.scala b/scio-core/src/main/scala/com/spotify/scio/ScioContext.scala index 2ee6489b88..cbc53cf224 100644 --- a/scio-core/src/main/scala/com/spotify/scio/ScioContext.scala +++ b/scio-core/src/main/scala/com/spotify/scio/ScioContext.scala @@ -354,11 +354,12 @@ class ScioContext private[scio] (val options: DataflowPipelineOptions, * Get an SCollection for a BigQuery SELECT query. * @group input */ - def bigQuerySelect(sqlQuery: String): SCollection[TableRow] = pipelineOp { + def bigQuerySelect(sqlQuery: String, + flattenResults: Boolean = false): SCollection[TableRow] = pipelineOp { if (this.isTest) { this.getTestInput(BigQueryIO(sqlQuery)) } else { - val queryJob = this.bigQueryClient.queryIntoTable(sqlQuery) + val queryJob = this.bigQueryClient.queryIntoTable(sqlQuery, flattenResults) _queryJobs.append(queryJob) wrap(this.applyInternal(GBigQueryIO.Read.from(queryJob.table).withoutValidation())) .setName(sqlQuery) diff --git a/scio-core/src/main/scala/com/spotify/scio/io/Taps.scala b/scio-core/src/main/scala/com/spotify/scio/io/Taps.scala index 673c272467..a6c23d9570 100644 --- a/scio-core/src/main/scala/com/spotify/scio/io/Taps.scala +++ b/scio-core/src/main/scala/com/spotify/scio/io/Taps.scala @@ -43,8 +43,11 @@ trait Taps { mkTap(s"Avro: $path", () => isPathDone(path), () => AvroTap[T](path, schema)) /** Get a `Future[Tap[T]]` for BigQuery SELECT query. */ - def bigQuerySelect(sqlQuery: String): Future[Tap[TableRow]] = - mkTap(s"BigQuery SELECT: $sqlQuery", () => isQueryDone(sqlQuery), () => bigQueryTap(sqlQuery)) + def bigQuerySelect(sqlQuery: String, flattenResults: Boolean = false): Future[Tap[TableRow]] = + mkTap( + s"BigQuery SELECT: $sqlQuery", + () => isQueryDone(sqlQuery), + () => bigQueryTap(sqlQuery, flattenResults)) /** Get a `Future[Tap[T]]` for BigQuery table. */ def bigQueryTable(table: TableReference): Future[Tap[TableRow]] = @@ -70,9 +73,9 @@ trait Taps { private def tableExists(table: TableReference): Boolean = Try(BigQueryClient.defaultInstance().getTableSchema(table)).isSuccess - private def bigQueryTap(sqlQuery: String): BigQueryTap = { + private def bigQueryTap(sqlQuery: String, flattenResults: Boolean): BigQueryTap = { val bq = BigQueryClient.defaultInstance() - val queryJob = bq.queryIntoTable(sqlQuery) + val queryJob = bq.queryIntoTable(sqlQuery, flattenResults) queryJob.waitForResult() BigQueryTap(queryJob.table) } From ccf8f83a2ba2882c58c4798768a8057cc0bbf318 Mon Sep 17 00:00:00 2001 From: Neville Li Date: Mon, 18 Apr 2016 22:50:47 -0400 Subject: [PATCH 2/3] refactor queryIntoTable --- .../scio/bigquery/BigQueryClient.scala | 27 ++++++++++++++++--- .../scala/com/spotify/scio/ScioContext.scala | 2 +- .../main/scala/com/spotify/scio/io/Taps.scala | 2 +- 3 files changed, 26 insertions(+), 5 deletions(-) diff --git a/scio-bigquery/src/main/scala/com/spotify/scio/bigquery/BigQueryClient.scala b/scio-bigquery/src/main/scala/com/spotify/scio/bigquery/BigQueryClient.scala index 89b914e955..10aef0bd89 100644 --- a/scio-bigquery/src/main/scala/com/spotify/scio/bigquery/BigQueryClient.scala +++ b/scio-bigquery/src/main/scala/com/spotify/scio/bigquery/BigQueryClient.scala @@ -152,7 +152,7 @@ class BigQueryClient private (private val projectId: String, /** Get rows from a query. */ def getQueryRows(sqlQuery: String, flattenResults: Boolean = false): Iterator[TableRow] = { - val queryJob = queryIntoTable(sqlQuery, flattenResults) + val queryJob = newQueryJob(sqlQuery, flattenResults) queryJob.waitForResult() getTableRows(queryJob.table) } @@ -197,8 +197,29 @@ class BigQueryClient private (private val projectId: String, getTable(table).getSchema } - /** Execute a query and save results into a temporary table. */ - def queryIntoTable(sqlQuery: String, flattenResults: Boolean): QueryJob = { + /** Make a query and save results to a destination table. */ + def queryIntoTable(sqlQuery: String, + destinationTable: TableReference, + flattenResults: Boolean): Job = { + val queryJob = makeQueryJob(sqlQuery, destinationTable, flattenResults) + queryJob.waitForResult() + val jobRef = queryJob.jobReference.get + bigquery.jobs().get(projectId, jobRef.getJobId).execute() + } + + /** Make a query and save results to a destination table. */ + def queryIntoTable(sqlQuery: String, + destinationTable: String, + flattenResults: Boolean = false): Job = + queryIntoTable(sqlQuery, BigQueryIO.parseTableSpec(destinationTable), flattenResults) + + /** + * Create a new query job and save results into a temporary table. + * + * No query will be executed if a cached table exists. Query execution is delayed until + * `waitForResult` is called on the result `QueryJob`. + */ + def newQueryJob(sqlQuery: String, flattenResults: Boolean): QueryJob = { try { val sourceTimes = BigQueryUtil.extractTables(sqlQuery).map(t => BigInt(getTable(t).getLastModifiedTime)) diff --git a/scio-core/src/main/scala/com/spotify/scio/ScioContext.scala b/scio-core/src/main/scala/com/spotify/scio/ScioContext.scala index cbc53cf224..ca018cedc2 100644 --- a/scio-core/src/main/scala/com/spotify/scio/ScioContext.scala +++ b/scio-core/src/main/scala/com/spotify/scio/ScioContext.scala @@ -359,7 +359,7 @@ class ScioContext private[scio] (val options: DataflowPipelineOptions, if (this.isTest) { this.getTestInput(BigQueryIO(sqlQuery)) } else { - val queryJob = this.bigQueryClient.queryIntoTable(sqlQuery, flattenResults) + val queryJob = this.bigQueryClient.newQueryJob(sqlQuery, flattenResults) _queryJobs.append(queryJob) wrap(this.applyInternal(GBigQueryIO.Read.from(queryJob.table).withoutValidation())) .setName(sqlQuery) diff --git a/scio-core/src/main/scala/com/spotify/scio/io/Taps.scala b/scio-core/src/main/scala/com/spotify/scio/io/Taps.scala index a6c23d9570..ec42ecac74 100644 --- a/scio-core/src/main/scala/com/spotify/scio/io/Taps.scala +++ b/scio-core/src/main/scala/com/spotify/scio/io/Taps.scala @@ -75,7 +75,7 @@ trait Taps { private def bigQueryTap(sqlQuery: String, flattenResults: Boolean): BigQueryTap = { val bq = BigQueryClient.defaultInstance() - val queryJob = bq.queryIntoTable(sqlQuery, flattenResults) + val queryJob = bq.newQueryJob(sqlQuery, flattenResults) queryJob.waitForResult() BigQueryTap(queryJob.table) } From d3bdfb85cdfc96f6688552e6ba8e242d649b1f3c Mon Sep 17 00:00:00 2001 From: Neville Li Date: Tue, 19 Apr 2016 15:59:15 -0400 Subject: [PATCH 3/3] simplify queryIntoTable --- .../scio/bigquery/BigQueryClient.scala | 108 +++++++++--------- .../main/scala/com/spotify/scio/io/Taps.scala | 5 +- 2 files changed, 57 insertions(+), 56 deletions(-) diff --git a/scio-bigquery/src/main/scala/com/spotify/scio/bigquery/BigQueryClient.scala b/scio-bigquery/src/main/scala/com/spotify/scio/bigquery/BigQueryClient.scala index 10aef0bd89..76655285f5 100644 --- a/scio-bigquery/src/main/scala/com/spotify/scio/bigquery/BigQueryClient.scala +++ b/scio-bigquery/src/main/scala/com/spotify/scio/bigquery/BigQueryClient.scala @@ -77,7 +77,7 @@ object BigQueryUtil { } /** A query job that may delay execution. */ -trait QueryJob { +private[scio] trait QueryJob { def waitForResult(): Unit val jobReference: Option[JobReference] val query: String @@ -197,59 +197,25 @@ class BigQueryClient private (private val projectId: String, getTable(table).getSchema } - /** Make a query and save results to a destination table. */ - def queryIntoTable(sqlQuery: String, - destinationTable: TableReference, - flattenResults: Boolean): Job = { - val queryJob = makeQueryJob(sqlQuery, destinationTable, flattenResults) - queryJob.waitForResult() - val jobRef = queryJob.jobReference.get - bigquery.jobs().get(projectId, jobRef.getJobId).execute() - } - - /** Make a query and save results to a destination table. */ - def queryIntoTable(sqlQuery: String, - destinationTable: String, - flattenResults: Boolean = false): Job = - queryIntoTable(sqlQuery, BigQueryIO.parseTableSpec(destinationTable), flattenResults) - /** - * Create a new query job and save results into a temporary table. + * Make a query and save results to a destination table. * - * No query will be executed if a cached table exists. Query execution is delayed until - * `waitForResult` is called on the result `QueryJob`. + * A temporary table will be created if `destinationTable` is `null` and a cached table will be + * returned instead if one exists. */ - def newQueryJob(sqlQuery: String, flattenResults: Boolean): QueryJob = { - try { - val sourceTimes = - BigQueryUtil.extractTables(sqlQuery).map(t => BigInt(getTable(t).getLastModifiedTime)) - val temp = getCacheDestinationTable(sqlQuery).get - val time = BigInt(getTable(temp).getLastModifiedTime) - if (sourceTimes.forall(_ < time)) { - logger.info(s"Cache hit for query: $sqlQuery") - logger.info(s"Existing destination table: ${BigQueryIO.toTableSpec(temp)}") - new QueryJob { - override def waitForResult(): Unit = {} - override val jobReference: Option[JobReference] = None - override val query: String = sqlQuery - override val table: TableReference = temp - } - } else { - val temp = temporaryTable(TABLE_PREFIX) - logger.info(s"Cache invalid for query: $sqlQuery") - logger.info(s"New destination table: ${BigQueryIO.toTableSpec(temp)}") - setCacheDestinationTable(sqlQuery, temp) - makeQueryJob(sqlQuery, temp, flattenResults) - } - } catch { - case NonFatal(_) => - val temp = temporaryTable(TABLE_PREFIX) - logger.info(s"Cache miss for query: $sqlQuery") - logger.info(s"New destination table: ${BigQueryIO.toTableSpec(temp)}") - setCacheDestinationTable(sqlQuery, temp) - makeQueryJob(sqlQuery, temp, flattenResults) + def query(sqlQuery: String, + destinationTable: String = null, + flattenResults: Boolean = false): TableReference = + if (destinationTable != null) { + val tableRef = BigQueryIO.parseTableSpec(destinationTable) + val queryJob = delayedQueryJob(sqlQuery, tableRef, flattenResults) + queryJob.waitForResult() + tableRef + } else { + val queryJob = newQueryJob(sqlQuery, flattenResults) + queryJob.waitForResult() + queryJob.table } - } /** Write rows to a table. */ def writeTableRows(table: TableReference, rows: List[TableRow], schema: TableSchema, @@ -297,6 +263,42 @@ class BigQueryClient private (private val projectId: String, } } + // ======================================================================= + // Job execution + // ======================================================================= + + private[scio] def newQueryJob(sqlQuery: String, flattenResults: Boolean): QueryJob = { + try { + val sourceTimes = + BigQueryUtil.extractTables(sqlQuery).map(t => BigInt(getTable(t).getLastModifiedTime)) + val temp = getCacheDestinationTable(sqlQuery).get + val time = BigInt(getTable(temp).getLastModifiedTime) + if (sourceTimes.forall(_ < time)) { + logger.info(s"Cache hit for query: $sqlQuery") + logger.info(s"Existing destination table: ${BigQueryIO.toTableSpec(temp)}") + new QueryJob { + override def waitForResult(): Unit = {} + override val jobReference: Option[JobReference] = None + override val query: String = sqlQuery + override val table: TableReference = temp + } + } else { + val temp = temporaryTable(TABLE_PREFIX) + logger.info(s"Cache invalid for query: $sqlQuery") + logger.info(s"New destination table: ${BigQueryIO.toTableSpec(temp)}") + setCacheDestinationTable(sqlQuery, temp) + delayedQueryJob(sqlQuery, temp, flattenResults) + } + } catch { + case NonFatal(_) => + val temp = temporaryTable(TABLE_PREFIX) + logger.info(s"Cache miss for query: $sqlQuery") + logger.info(s"New destination table: ${BigQueryIO.toTableSpec(temp)}") + setCacheDestinationTable(sqlQuery, temp) + delayedQueryJob(sqlQuery, temp, flattenResults) + } + } + private def prepareStagingDataset(): Unit = { // Create staging dataset if it does not already exist val datasetId = BigQueryClient.stagingDataset @@ -334,9 +336,9 @@ class BigQueryClient private (private val projectId: String, new JobReference().setProjectId(projectId).setJobId(fullJobId) } - private def makeQueryJob(sqlQuery: String, - destinationTable: TableReference, - flattenResults: Boolean): QueryJob = new QueryJob { + private def delayedQueryJob(sqlQuery: String, + destinationTable: TableReference, + flattenResults: Boolean): QueryJob = new QueryJob { override def waitForResult(): Unit = self.waitForJobs(this) override lazy val jobReference: Option[JobReference] = { prepareStagingDataset() diff --git a/scio-core/src/main/scala/com/spotify/scio/io/Taps.scala b/scio-core/src/main/scala/com/spotify/scio/io/Taps.scala index ec42ecac74..0867a0fcc6 100644 --- a/scio-core/src/main/scala/com/spotify/scio/io/Taps.scala +++ b/scio-core/src/main/scala/com/spotify/scio/io/Taps.scala @@ -75,9 +75,8 @@ trait Taps { private def bigQueryTap(sqlQuery: String, flattenResults: Boolean): BigQueryTap = { val bq = BigQueryClient.defaultInstance() - val queryJob = bq.newQueryJob(sqlQuery, flattenResults) - queryJob.waitForResult() - BigQueryTap(queryJob.table) + val table = bq.query(sqlQuery, flattenResults = flattenResults) + BigQueryTap(table) } /**