Skip to content

Commit

Permalink
Merge pull request #88 from spotify/neville/bq-flatten
Browse files Browse the repository at this point in the history
allow flatten results and dest table in BQ #84
  • Loading branch information
nevillelyh committed Apr 19, 2016
2 parents 1d9b44d + d3bdfb8 commit b60d5d1
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ object BigQueryUtil {
}

/** A query job that may delay execution. */
trait QueryJob {
private[scio] trait QueryJob {
def waitForResult(): Unit
val jobReference: Option[JobReference]
val query: String
Expand Down Expand Up @@ -151,8 +151,8 @@ class BigQueryClient private (private val projectId: String,
}

/** Get rows from a query. */
def getQueryRows(sqlQuery: String): Iterator[TableRow] = {
val queryJob = queryIntoTable(sqlQuery)
def getQueryRows(sqlQuery: String, flattenResults: Boolean = false): Iterator[TableRow] = {
val queryJob = newQueryJob(sqlQuery, flattenResults)
queryJob.waitForResult()
getTableRows(queryJob.table)
}
Expand Down Expand Up @@ -197,38 +197,25 @@ class BigQueryClient private (private val projectId: String,
getTable(table).getSchema
}

/** Execute a query and save results into a temporary table. */
def queryIntoTable(sqlQuery: String): QueryJob = {
try {
val sourceTimes =
BigQueryUtil.extractTables(sqlQuery).map(t => BigInt(getTable(t).getLastModifiedTime))
val temp = getCacheDestinationTable(sqlQuery).get
val time = BigInt(getTable(temp).getLastModifiedTime)
if (sourceTimes.forall(_ < time)) {
logger.info(s"Cache hit for query: $sqlQuery")
logger.info(s"Existing destination table: ${BigQueryIO.toTableSpec(temp)}")
new QueryJob {
override def waitForResult(): Unit = {}
override val jobReference: Option[JobReference] = None
override val query: String = sqlQuery
override val table: TableReference = temp
}
} else {
val temp = temporaryTable(TABLE_PREFIX)
logger.info(s"Cache invalid for query: $sqlQuery")
logger.info(s"New destination table: ${BigQueryIO.toTableSpec(temp)}")
setCacheDestinationTable(sqlQuery, temp)
makeQueryJob(sqlQuery, temp)
}
} catch {
case NonFatal(_) =>
val temp = temporaryTable(TABLE_PREFIX)
logger.info(s"Cache miss for query: $sqlQuery")
logger.info(s"New destination table: ${BigQueryIO.toTableSpec(temp)}")
setCacheDestinationTable(sqlQuery, temp)
makeQueryJob(sqlQuery, temp)
/**
* Make a query and save results to a destination table.
*
* A temporary table will be created if `destinationTable` is `null` and a cached table will be
* returned instead if one exists.
*/
def query(sqlQuery: String,
destinationTable: String = null,
flattenResults: Boolean = false): TableReference =
if (destinationTable != null) {
val tableRef = BigQueryIO.parseTableSpec(destinationTable)
val queryJob = delayedQueryJob(sqlQuery, tableRef, flattenResults)
queryJob.waitForResult()
tableRef
} else {
val queryJob = newQueryJob(sqlQuery, flattenResults)
queryJob.waitForResult()
queryJob.table
}
}

/** Write rows to a table. */
def writeTableRows(table: TableReference, rows: List[TableRow], schema: TableSchema,
Expand Down Expand Up @@ -276,6 +263,42 @@ class BigQueryClient private (private val projectId: String,
}
}

// =======================================================================
// Job execution
// =======================================================================

private[scio] def newQueryJob(sqlQuery: String, flattenResults: Boolean): QueryJob = {
try {
val sourceTimes =
BigQueryUtil.extractTables(sqlQuery).map(t => BigInt(getTable(t).getLastModifiedTime))
val temp = getCacheDestinationTable(sqlQuery).get
val time = BigInt(getTable(temp).getLastModifiedTime)
if (sourceTimes.forall(_ < time)) {
logger.info(s"Cache hit for query: $sqlQuery")
logger.info(s"Existing destination table: ${BigQueryIO.toTableSpec(temp)}")
new QueryJob {
override def waitForResult(): Unit = {}
override val jobReference: Option[JobReference] = None
override val query: String = sqlQuery
override val table: TableReference = temp
}
} else {
val temp = temporaryTable(TABLE_PREFIX)
logger.info(s"Cache invalid for query: $sqlQuery")
logger.info(s"New destination table: ${BigQueryIO.toTableSpec(temp)}")
setCacheDestinationTable(sqlQuery, temp)
delayedQueryJob(sqlQuery, temp, flattenResults)
}
} catch {
case NonFatal(_) =>
val temp = temporaryTable(TABLE_PREFIX)
logger.info(s"Cache miss for query: $sqlQuery")
logger.info(s"New destination table: ${BigQueryIO.toTableSpec(temp)}")
setCacheDestinationTable(sqlQuery, temp)
delayedQueryJob(sqlQuery, temp, flattenResults)
}
}

private def prepareStagingDataset(): Unit = {
// Create staging dataset if it does not already exist
val datasetId = BigQueryClient.stagingDataset
Expand Down Expand Up @@ -313,16 +336,17 @@ class BigQueryClient private (private val projectId: String,
new JobReference().setProjectId(projectId).setJobId(fullJobId)
}

private def makeQueryJob(sqlQuery: String,
destinationTable: TableReference): QueryJob = new QueryJob {
private def delayedQueryJob(sqlQuery: String,
destinationTable: TableReference,
flattenResults: Boolean): QueryJob = new QueryJob {
override def waitForResult(): Unit = self.waitForJobs(this)
override lazy val jobReference: Option[JobReference] = {
prepareStagingDataset()
logger.info(s"Executing query: $sqlQuery")
val queryConfig: JobConfigurationQuery = new JobConfigurationQuery()
.setQuery(sqlQuery)
.setAllowLargeResults(true)
.setFlattenResults(false)
.setFlattenResults(flattenResults)
.setPriority(PRIORITY)
.setCreateDisposition("CREATE_IF_NEEDED")
.setWriteDisposition("WRITE_EMPTY")
Expand Down
5 changes: 3 additions & 2 deletions scio-core/src/main/scala/com/spotify/scio/ScioContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -354,11 +354,12 @@ class ScioContext private[scio] (val options: DataflowPipelineOptions,
* Get an SCollection for a BigQuery SELECT query.
* @group input
*/
def bigQuerySelect(sqlQuery: String): SCollection[TableRow] = pipelineOp {
def bigQuerySelect(sqlQuery: String,
flattenResults: Boolean = false): SCollection[TableRow] = pipelineOp {
if (this.isTest) {
this.getTestInput(BigQueryIO(sqlQuery))
} else {
val queryJob = this.bigQueryClient.queryIntoTable(sqlQuery)
val queryJob = this.bigQueryClient.newQueryJob(sqlQuery, flattenResults)
_queryJobs.append(queryJob)
wrap(this.applyInternal(GBigQueryIO.Read.from(queryJob.table).withoutValidation()))
.setName(sqlQuery)
Expand Down
14 changes: 8 additions & 6 deletions scio-core/src/main/scala/com/spotify/scio/io/Taps.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,11 @@ trait Taps {
mkTap(s"Avro: $path", () => isPathDone(path), () => AvroTap[T](path, schema))

/** Get a `Future[Tap[T]]` for BigQuery SELECT query. */
def bigQuerySelect(sqlQuery: String): Future[Tap[TableRow]] =
mkTap(s"BigQuery SELECT: $sqlQuery", () => isQueryDone(sqlQuery), () => bigQueryTap(sqlQuery))
def bigQuerySelect(sqlQuery: String, flattenResults: Boolean = false): Future[Tap[TableRow]] =
mkTap(
s"BigQuery SELECT: $sqlQuery",
() => isQueryDone(sqlQuery),
() => bigQueryTap(sqlQuery, flattenResults))

/** Get a `Future[Tap[T]]` for BigQuery table. */
def bigQueryTable(table: TableReference): Future[Tap[TableRow]] =
Expand All @@ -70,11 +73,10 @@ trait Taps {
private def tableExists(table: TableReference): Boolean =
Try(BigQueryClient.defaultInstance().getTableSchema(table)).isSuccess

private def bigQueryTap(sqlQuery: String): BigQueryTap = {
private def bigQueryTap(sqlQuery: String, flattenResults: Boolean): BigQueryTap = {
val bq = BigQueryClient.defaultInstance()
val queryJob = bq.queryIntoTable(sqlQuery)
queryJob.waitForResult()
BigQueryTap(queryJob.table)
val table = bq.query(sqlQuery, flattenResults = flattenResults)
BigQueryTap(table)
}

/**
Expand Down

0 comments on commit b60d5d1

Please sign in to comment.