Skip to content

Commit

Permalink
decouple Dataflow runner in SCollection #139
Browse files Browse the repository at this point in the history
  • Loading branch information
nevillelyh committed May 19, 2016
1 parent ce88094 commit 8dd39b4
Showing 1 changed file with 11 additions and 6 deletions.
17 changes: 11 additions & 6 deletions scio-core/src/main/scala/com/spotify/scio/values/SCollection.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import com.google.cloud.dataflow.sdk.io.BigQueryIO.Write.{CreateDisposition, Wri
import com.google.cloud.dataflow.sdk.io.bigtable.BigtableIO
import com.google.cloud.dataflow.sdk.{io => gio}
import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions
import com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner
import com.google.cloud.dataflow.sdk.runners
import com.google.cloud.dataflow.sdk.transforms._
import com.google.cloud.dataflow.sdk.transforms.windowing._
import com.google.cloud.dataflow.sdk.util.CoderUtils
Expand Down Expand Up @@ -793,10 +793,15 @@ sealed trait SCollection[T] extends PCollectionWrapper[T] {
* @group output
*/
def materialize: Future[Tap[T]] = {
val tmpDir = if (context.pipeline.getRunner.isInstanceOf[DirectPipelineRunner]) {
sys.props("java.io.tmpdir")
} else {
context.pipeline.getOptions.asInstanceOf[DataflowPipelineOptions].getTempLocation
val runner = context.pipeline.getRunner
// TODO: BEAM how to handle runner dependencies here?
val tmpDir = runner match {
case _: runners.DirectPipelineRunner =>
sys.props("java.io.tmpdir")
case _: runners.DataflowPipelineRunner | _: runners.BlockingDataflowPipelineRunner =>
context.pipeline.getOptions.asInstanceOf[DataflowPipelineOptions].getTempLocation
case _ =>
throw new RuntimeException(s"Unsupported runner $runner")
}
val filename = "scio-materialize-" + UUID.randomUUID().toString
val path = tmpDir + (if (tmpDir.endsWith("/")) "" else "/") + filename
Expand Down Expand Up @@ -824,7 +829,7 @@ sealed trait SCollection[T] extends PCollectionWrapper[T] {
}

private def pathWithShards(path: String) = {
if (this.context.pipeline.getRunner.isInstanceOf[DirectPipelineRunner]) {
if (this.context.pipeline.getRunner.isInstanceOf[runners.DirectPipelineRunner]) {
val f = new File(path)
if (f.exists()) {
throw new RuntimeException(s"Output directory $path already exists")
Expand Down

0 comments on commit 8dd39b4

Please sign in to comment.