Skip to content

Commit

Permalink
remove testId from ScioContext #114
Browse files Browse the repository at this point in the history
  • Loading branch information
nevillelyh committed Jun 21, 2016
1 parent 49770cd commit a5b9c67
Show file tree
Hide file tree
Showing 6 changed files with 28 additions and 18 deletions.
27 changes: 19 additions & 8 deletions scio-core/src/main/scala/com/spotify/scio/ScioContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ object ContextAndArgs {
/** Create [[ScioContext]] and [[Args]] for command line arguments. */
def apply(args: Array[String]): (ScioContext, Args) = {
val (_opts, _args) = ScioContext.parseArguments[DataflowPipelineOptions](args)
(new ScioContext(_opts, Nil, _args.optional("testId")), _args)
(new ScioContext(_opts, Nil), _args)
}
}

Expand All @@ -71,18 +71,22 @@ object ScioContext {
def apply(): ScioContext = ScioContext(defaultOptions)

/** Create a new [[ScioContext]] instance. */
def apply(options: PipelineOptions): ScioContext = new ScioContext(options, Nil, None)
def apply(options: PipelineOptions): ScioContext = new ScioContext(options, Nil)

/** Create a new [[ScioContext]] instance. */
def apply(artifacts: List[String]): ScioContext = new ScioContext(defaultOptions, artifacts, None)
def apply(artifacts: List[String]): ScioContext = new ScioContext(defaultOptions, artifacts)

/** Create a new [[ScioContext]] instance. */
def apply(options: PipelineOptions, artifacts: List[String]): ScioContext =
new ScioContext(options, artifacts, None)
new ScioContext(options, artifacts)

/** Create a new [[ScioContext]] instance for testing. */
def forTest(testId: String): ScioContext =
new ScioContext(defaultOptions, List[String](), Some(testId))
def forTest(): ScioContext = {
val opts = PipelineOptionsFactory
.fromArgs(Array("--appName=JobTest-" + System.currentTimeMillis()))
.as(classOf[DataflowPipelineOptions])
new ScioContext(opts, List[String]())
}

/** Parse PipelineOptions and application arguments from command line arguments. */
def parseArguments[T <: PipelineOptions : ClassTag](cmdlineArgs: Array[String]): (T, Args) = {
Expand Down Expand Up @@ -119,8 +123,7 @@ object ScioContext {
*/
// scalastyle:off number.of.methods
class ScioContext private[scio] (val options: PipelineOptions,
private var artifacts: List[String],
private[scio] val testId: Option[String]) {
private var artifacts: List[String]) {

private implicit val context: ScioContext = this

Expand All @@ -143,6 +146,14 @@ class ScioContext private[scio] (val options: PipelineOptions,
}
}

private[scio] val testId: Option[String] = dfOptions.toOption.flatMap { o =>
if ("JobTest-[0-9]+".r.pattern.matcher(o.getAppName).matches()) {
Some(o.getAppName)
} else {
None
}
}

/** Dataflow pipeline. */
def pipeline: Pipeline = {
if (_pipeline == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ object JobTest {
this.copy(distCaches = this.distCaches + (key -> value))

def run(): Unit = {
val testId = className + "-" + System.currentTimeMillis()
val testId = "JobTest-" + System.currentTimeMillis()
TestDataManager.setInput(testId, new TestInput(inputs))
TestDataManager.setOutput(testId, new TestOutput(outputs))
TestDataManager.setDistCache(testId, new TestDistCache(distCaches))
Expand All @@ -92,7 +92,7 @@ object JobTest {
Class
.forName(className)
.getMethod("main", classOf[Array[String]])
.invoke(null, cmdlineArgs :+ s"--testId=$testId")
.invoke(null, cmdlineArgs :+ s"--appName=$testId")
} catch {
// InvocationTargetException stacktrace is noisy and useless
case e: InvocationTargetException => throw e.getCause
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,8 @@ import com.google.cloud.dataflow.sdk.options.PipelineOptions
import com.spotify.scio.{ScioContext, ScioResult}

class ReplScioContext(options: PipelineOptions,
artifacts: List[String],
testId: Option[String])
extends ScioContext(options, artifacts, testId) {
artifacts: List[String])
extends ScioContext(options, artifacts) {

private lazy val nullout = new PrintStream(new OutputStream() {
override def write(b: Int) = {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ trait PipelineSpec extends FlatSpec with Matchers with SCollectionMatcher {
* }}}
*/
def runWithContext[T](fn: ScioContext => T): ScioResult = {
val sc = ScioContext.forTest("PipelineTest-" + System.currentTimeMillis())
val sc = ScioContext.forTest()
fn(sc)
sc.close()
}
Expand Down
4 changes: 2 additions & 2 deletions scio-test/src/test/scala/com/spotify/scio/io/TapTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class TapTest extends PipelineSpec {
}

it should "update isCompleted with testId" in {
val sc = ScioContext.forTest("FutureTest-" + System.currentTimeMillis())
val sc = ScioContext.forTest()
val f = sc.parallelize(Seq(1, 2, 3))
.map(newSpecificRecord)
.saveAsInMemoryTap
Expand Down Expand Up @@ -144,7 +144,7 @@ class TapTest extends PipelineSpec {
}

def runWithInMemoryFuture[T](fn: ScioContext => Future[Tap[T]]): Tap[T] =
runWithFuture(ScioContext.forTest("FutureTest-" + System.currentTimeMillis()))(fn)
runWithFuture(ScioContext.forTest())(fn)

def runWithFileFuture[T](fn: ScioContext => Future[Tap[T]]): Tap[T] =
runWithFuture(ScioContext())(fn)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import com.spotify.scio.testing.PipelineSpec
class AccumulatorTest extends PipelineSpec {

"Accumulator" should "support accumulatorTotalValue" in {
val sc = ScioContext.forTest("PipelineTest-" + System.currentTimeMillis())
val sc = ScioContext.forTest()

val maxI = sc.maxAccumulator[Int]("maxI")
val minI = sc.minAccumulator[Int]("minI")
Expand Down Expand Up @@ -57,7 +57,7 @@ class AccumulatorTest extends PipelineSpec {
}

it should "support accumulatorValuesAtSteps" in {
val sc = ScioContext.forTest("PipelineTest-" + System.currentTimeMillis())
val sc = ScioContext.forTest()

val count = sc.sumAccumulator[Int]("count")
sc.parallelize(1 to 100)
Expand Down

0 comments on commit a5b9c67

Please sign in to comment.