Skip to content

Commit

Permalink
Add tests for temp directory in local runner
Browse files Browse the repository at this point in the history
  • Loading branch information
Rafal Wojdyla committed Jul 1, 2016
1 parent 53e58bc commit 9734f50
Showing 1 changed file with 42 additions and 0 deletions.
42 changes: 42 additions & 0 deletions scio-test/src/test/scala/com/spotify/scio/ScioContextTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,14 @@

package com.spotify.scio

import com.google.cloud.dataflow.sdk.options.{DataflowPipelineOptions, PipelineOptionsFactory}
import com.google.cloud.dataflow.sdk.runners.{DataflowPipelineRunner, DirectPipelineRunner}
import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner
import com.google.cloud.dataflow.sdk.testing.DataflowAssert
import com.google.cloud.dataflow.sdk.transforms.Create
import com.google.common.collect.Lists
import com.spotify.scio.testing.PipelineSpec
import org.apache.commons.lang.exception.ExceptionUtils

import scala.collection.JavaConverters._

Expand All @@ -39,4 +43,42 @@ class ScioContextTest extends PipelineSpec {
pipeline.run()
}

it should "have temp directory for default runner" in {
val pipeline = ScioContext().pipeline
pipeline.getOptions.getTempLocation should not be null
}

it should "have temp directory for default InProcessPipelineRunner" in {
val opts = PipelineOptionsFactory.create()
opts.setRunner(classOf[InProcessPipelineRunner])
val pipeline = ScioContext(opts).pipeline
pipeline.getOptions.getTempLocation should not be null
}

it should "have temp directory for default DirectPipelineRunner" in {
val opts = PipelineOptionsFactory.create()
opts.setRunner(classOf[DirectPipelineRunner])
val pipeline = ScioContext(opts).pipeline
pipeline.getOptions.getTempLocation should not be null
}

it should "use user specified temp directory" in {
val expected = "/expected"
val opts = PipelineOptionsFactory.create()
opts.setTempLocation(expected)
val pipeline = ScioContext(opts).pipeline
pipeline.getOptions.getTempLocation shouldEqual expected
}

it should "fail without temp/staging dir for DataflowPipelineRunner " in {
val opts = PipelineOptionsFactory.create()
opts.setRunner(classOf[DataflowPipelineRunner])
val dfOpts = opts.as(classOf[DataflowPipelineOptions])
dfOpts.setProject("foobar")
val sc = ScioContext(dfOpts)
val e = intercept[RuntimeException] { sc.pipeline }
ExceptionUtils.getFullStackTrace(e) should
include ("at least one of tempLocation or stagingLocation must be set.")
}

}

0 comments on commit 9734f50

Please sign in to comment.