Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

allow flatten results and dest table in BQ #84 #88

Merged
merged 3 commits into from
Apr 19, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ object BigQueryUtil {
}

/** A query job that may delay execution. */
trait QueryJob {
private[scio] trait QueryJob {
def waitForResult(): Unit
val jobReference: Option[JobReference]
val query: String
Expand Down Expand Up @@ -151,8 +151,8 @@ class BigQueryClient private (private val projectId: String,
}

/** Get rows from a query. */
def getQueryRows(sqlQuery: String): Iterator[TableRow] = {
val queryJob = queryIntoTable(sqlQuery)
def getQueryRows(sqlQuery: String, flattenResults: Boolean = false): Iterator[TableRow] = {
val queryJob = newQueryJob(sqlQuery, flattenResults)
queryJob.waitForResult()
getTableRows(queryJob.table)
}
Expand Down Expand Up @@ -197,38 +197,25 @@ class BigQueryClient private (private val projectId: String,
getTable(table).getSchema
}

/** Execute a query and save results into a temporary table. */
def queryIntoTable(sqlQuery: String): QueryJob = {
try {
val sourceTimes =
BigQueryUtil.extractTables(sqlQuery).map(t => BigInt(getTable(t).getLastModifiedTime))
val temp = getCacheDestinationTable(sqlQuery).get
val time = BigInt(getTable(temp).getLastModifiedTime)
if (sourceTimes.forall(_ < time)) {
logger.info(s"Cache hit for query: $sqlQuery")
logger.info(s"Existing destination table: ${BigQueryIO.toTableSpec(temp)}")
new QueryJob {
override def waitForResult(): Unit = {}
override val jobReference: Option[JobReference] = None
override val query: String = sqlQuery
override val table: TableReference = temp
}
} else {
val temp = temporaryTable(TABLE_PREFIX)
logger.info(s"Cache invalid for query: $sqlQuery")
logger.info(s"New destination table: ${BigQueryIO.toTableSpec(temp)}")
setCacheDestinationTable(sqlQuery, temp)
makeQueryJob(sqlQuery, temp)
}
} catch {
case NonFatal(_) =>
val temp = temporaryTable(TABLE_PREFIX)
logger.info(s"Cache miss for query: $sqlQuery")
logger.info(s"New destination table: ${BigQueryIO.toTableSpec(temp)}")
setCacheDestinationTable(sqlQuery, temp)
makeQueryJob(sqlQuery, temp)
/**
* Make a query and save results to a destination table.
*
* A temporary table will be created if `destinationTable` is `null` and a cached table will be
* returned instead if one exists.
*/
def query(sqlQuery: String,
destinationTable: String = null,
flattenResults: Boolean = false): TableReference =
if (destinationTable != null) {
val tableRef = BigQueryIO.parseTableSpec(destinationTable)
val queryJob = delayedQueryJob(sqlQuery, tableRef, flattenResults)
queryJob.waitForResult()
tableRef
} else {
val queryJob = newQueryJob(sqlQuery, flattenResults)
queryJob.waitForResult()
queryJob.table
}
}

/** Write rows to a table. */
def writeTableRows(table: TableReference, rows: List[TableRow], schema: TableSchema,
Expand Down Expand Up @@ -276,6 +263,42 @@ class BigQueryClient private (private val projectId: String,
}
}

// =======================================================================
// Job execution
// =======================================================================

private[scio] def newQueryJob(sqlQuery: String, flattenResults: Boolean): QueryJob = {
try {
val sourceTimes =
BigQueryUtil.extractTables(sqlQuery).map(t => BigInt(getTable(t).getLastModifiedTime))
val temp = getCacheDestinationTable(sqlQuery).get
val time = BigInt(getTable(temp).getLastModifiedTime)
if (sourceTimes.forall(_ < time)) {
logger.info(s"Cache hit for query: $sqlQuery")
logger.info(s"Existing destination table: ${BigQueryIO.toTableSpec(temp)}")
new QueryJob {
override def waitForResult(): Unit = {}
override val jobReference: Option[JobReference] = None
override val query: String = sqlQuery
override val table: TableReference = temp
}
} else {
val temp = temporaryTable(TABLE_PREFIX)
logger.info(s"Cache invalid for query: $sqlQuery")
logger.info(s"New destination table: ${BigQueryIO.toTableSpec(temp)}")
setCacheDestinationTable(sqlQuery, temp)
delayedQueryJob(sqlQuery, temp, flattenResults)
}
} catch {
case NonFatal(_) =>
val temp = temporaryTable(TABLE_PREFIX)
logger.info(s"Cache miss for query: $sqlQuery")
logger.info(s"New destination table: ${BigQueryIO.toTableSpec(temp)}")
setCacheDestinationTable(sqlQuery, temp)
delayedQueryJob(sqlQuery, temp, flattenResults)
}
}

private def prepareStagingDataset(): Unit = {
// Create staging dataset if it does not already exist
val datasetId = BigQueryClient.stagingDataset
Expand Down Expand Up @@ -313,16 +336,17 @@ class BigQueryClient private (private val projectId: String,
new JobReference().setProjectId(projectId).setJobId(fullJobId)
}

private def makeQueryJob(sqlQuery: String,
destinationTable: TableReference): QueryJob = new QueryJob {
private def delayedQueryJob(sqlQuery: String,
destinationTable: TableReference,
flattenResults: Boolean): QueryJob = new QueryJob {
override def waitForResult(): Unit = self.waitForJobs(this)
override lazy val jobReference: Option[JobReference] = {
prepareStagingDataset()
logger.info(s"Executing query: $sqlQuery")
val queryConfig: JobConfigurationQuery = new JobConfigurationQuery()
.setQuery(sqlQuery)
.setAllowLargeResults(true)
.setFlattenResults(false)
.setFlattenResults(flattenResults)
.setPriority(PRIORITY)
.setCreateDisposition("CREATE_IF_NEEDED")
.setWriteDisposition("WRITE_EMPTY")
Expand Down
5 changes: 3 additions & 2 deletions scio-core/src/main/scala/com/spotify/scio/ScioContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -354,11 +354,12 @@ class ScioContext private[scio] (val options: DataflowPipelineOptions,
* Get an SCollection for a BigQuery SELECT query.
* @group input
*/
def bigQuerySelect(sqlQuery: String): SCollection[TableRow] = pipelineOp {
def bigQuerySelect(sqlQuery: String,
flattenResults: Boolean = false): SCollection[TableRow] = pipelineOp {
if (this.isTest) {
this.getTestInput(BigQueryIO(sqlQuery))
} else {
val queryJob = this.bigQueryClient.queryIntoTable(sqlQuery)
val queryJob = this.bigQueryClient.newQueryJob(sqlQuery, flattenResults)
_queryJobs.append(queryJob)
wrap(this.applyInternal(GBigQueryIO.Read.from(queryJob.table).withoutValidation()))
.setName(sqlQuery)
Expand Down
14 changes: 8 additions & 6 deletions scio-core/src/main/scala/com/spotify/scio/io/Taps.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,11 @@ trait Taps {
mkTap(s"Avro: $path", () => isPathDone(path), () => AvroTap[T](path, schema))

/** Get a `Future[Tap[T]]` for BigQuery SELECT query. */
def bigQuerySelect(sqlQuery: String): Future[Tap[TableRow]] =
mkTap(s"BigQuery SELECT: $sqlQuery", () => isQueryDone(sqlQuery), () => bigQueryTap(sqlQuery))
def bigQuerySelect(sqlQuery: String, flattenResults: Boolean = false): Future[Tap[TableRow]] =
mkTap(
s"BigQuery SELECT: $sqlQuery",
() => isQueryDone(sqlQuery),
() => bigQueryTap(sqlQuery, flattenResults))

/** Get a `Future[Tap[T]]` for BigQuery table. */
def bigQueryTable(table: TableReference): Future[Tap[TableRow]] =
Expand All @@ -70,11 +73,10 @@ trait Taps {
private def tableExists(table: TableReference): Boolean =
Try(BigQueryClient.defaultInstance().getTableSchema(table)).isSuccess

private def bigQueryTap(sqlQuery: String): BigQueryTap = {
private def bigQueryTap(sqlQuery: String, flattenResults: Boolean): BigQueryTap = {
val bq = BigQueryClient.defaultInstance()
val queryJob = bq.queryIntoTable(sqlQuery)
queryJob.waitForResult()
BigQueryTap(queryJob.table)
val table = bq.query(sqlQuery, flattenResults = flattenResults)
BigQueryTap(table)
}

/**
Expand Down