diff --git a/scio-examples/src/main/scala/com/spotify/scio/examples/extra/RedisExamples.scala b/scio-examples/src/main/scala/com/spotify/scio/examples/extra/RedisExamples.scala index 68fced35b6..bbc712b7ae 100644 --- a/scio-examples/src/main/scala/com/spotify/scio/examples/extra/RedisExamples.scala +++ b/scio-examples/src/main/scala/com/spotify/scio/examples/extra/RedisExamples.scala @@ -25,8 +25,12 @@ import com.spotify.scio.redis.types._ import com.spotify.scio.redis.coders._ import org.apache.beam.examples.common.ExampleUtils import org.apache.beam.sdk.options.{PipelineOptions, StreamingOptions} +import org.apache.beam.sdk.transforms.ParDo + import scala.concurrent.{ExecutionContext, Future} +// Example: Redis Examples + // ## Redis Read Strings example // Read strings from Redis by a key pattern @@ -135,7 +139,7 @@ object RedisWriteStreamingExample { // `sbt "runMain com.spotify.scio.examples.extra.RedisLookUpStringsExample // --project=[PROJECT] --runner=DataflowRunner --region=[REGION NAME] // --redisHost=[REDIS_HOST] -// --redisPort=[REDIS_PORT] +// --redisPort=[REDIS_PORT]` object RedisLookUpStringsExample { def main(cmdlineArgs: Array[String]): Unit = { @@ -146,18 +150,18 @@ object RedisLookUpStringsExample { val connectionOptions = RedisConnectionOptions(redisHost, redisPort) sc.parallelize(Seq("key1", "key2", "unknownKey")) - // #RedisLookup_example - .parDo( - new RedisDoFn[String, (String, Option[String])](connectionOptions, 1000) { - override def request(value: String, client: Client)(implicit - ec: ExecutionContext - ): Future[(String, Option[String])] = - client - .request(p => p.get(value) :: Nil) - .map { case r: List[String @unchecked] => (value, r.headOption) } - } + .applyTransform( + ParDo.of( + new RedisDoFn[String, (String, Option[String])](connectionOptions, 1000) { + override def request(value: String, client: Client)(implicit + ec: ExecutionContext + ): Future[(String, Option[String])] = + client + .request(p => p.get(value) :: Nil) + .map { case r: List[String @unchecked] => (value, r.headOption) } + } + ) ) - // #RedisLookup_example .debug() sc.run() diff --git a/site/src/main/paradox/io/Redis.md b/site/src/main/paradox/io/Redis.md index 75d8430453..037ebe30c4 100644 --- a/site/src/main/paradox/io/Redis.md +++ b/site/src/main/paradox/io/Redis.md @@ -22,7 +22,34 @@ val elements: SCollection[(String, String)] = sc.redis(connectionOptions, keyPat Looking up specific keys from redis can be done with @scaladoc[RedisDoFn](com.spotify.scio.redis.RedisDoFn): -@@snip [RedisExamples.scala](/scio-examples/src/main/scala/com/spotify/scio/examples/extra/RedisExamples.scala) { #RedisLookup_example } +```scala mdoc:compile-only +import com.spotify.scio._ +import com.spotify.scio.redis._ +import com.spotify.scio.values.SCollection +import org.apache.beam.sdk.transforms.ParDo +import scala.concurrent.{ExecutionContext, Future} + +val redisHost: String = ??? +val redisPort: Int = ??? +val batchSize: Int = ??? +val connectionOptions = RedisConnectionOptions(redisHost, redisPort) + +val keys: SCollection[String] = ??? + +keys + .applyTransform( + ParDo.of( + new RedisDoFn[String, (String, Option[String])](connectionOptions, batchSize) { + override def request(value: String, client: Client)( + implicit ec: ExecutionContext + ): Future[(String, Option[String])] = + client + .request(p => p.get(value) :: Nil) + .map { case r: List[String @unchecked] => (value, r.headOption) } + } + ) + ) +``` # Write