Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix redis examples #5482

Merged
merged 2 commits into from
Sep 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,12 @@ import com.spotify.scio.redis.types._
import com.spotify.scio.redis.coders._
import org.apache.beam.examples.common.ExampleUtils
import org.apache.beam.sdk.options.{PipelineOptions, StreamingOptions}
import org.apache.beam.sdk.transforms.ParDo

import scala.concurrent.{ExecutionContext, Future}

// Example: Redis Examples

// ## Redis Read Strings example
// Read strings from Redis by a key pattern

Expand Down Expand Up @@ -135,7 +139,7 @@ object RedisWriteStreamingExample {
// `sbt "runMain com.spotify.scio.examples.extra.RedisLookUpStringsExample
// --project=[PROJECT] --runner=DataflowRunner --region=[REGION NAME]
// --redisHost=[REDIS_HOST]
// --redisPort=[REDIS_PORT]
// --redisPort=[REDIS_PORT]`
object RedisLookUpStringsExample {

def main(cmdlineArgs: Array[String]): Unit = {
Expand All @@ -146,18 +150,18 @@ object RedisLookUpStringsExample {
val connectionOptions = RedisConnectionOptions(redisHost, redisPort)

sc.parallelize(Seq("key1", "key2", "unknownKey"))
// #RedisLookup_example
.parDo(
new RedisDoFn[String, (String, Option[String])](connectionOptions, 1000) {
override def request(value: String, client: Client)(implicit
ec: ExecutionContext
): Future[(String, Option[String])] =
client
.request(p => p.get(value) :: Nil)
.map { case r: List[String @unchecked] => (value, r.headOption) }
}
.applyTransform(
ParDo.of(
new RedisDoFn[String, (String, Option[String])](connectionOptions, 1000) {
override def request(value: String, client: Client)(implicit
ec: ExecutionContext
): Future[(String, Option[String])] =
client
.request(p => p.get(value) :: Nil)
.map { case r: List[String @unchecked] => (value, r.headOption) }
}
)
)
// #RedisLookup_example
.debug()

sc.run()
Expand Down
29 changes: 28 additions & 1 deletion site/src/main/paradox/io/Redis.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,34 @@ val elements: SCollection[(String, String)] = sc.redis(connectionOptions, keyPat

Looking up specific keys from redis can be done with @scaladoc[RedisDoFn](com.spotify.scio.redis.RedisDoFn):

@@snip [RedisExamples.scala](/scio-examples/src/main/scala/com/spotify/scio/examples/extra/RedisExamples.scala) { #RedisLookup_example }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we go away from integrated snippets in the future?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, but they can't be in the Examples files. I think this was the only one and I added it only last year when doing doc cleanup.

```scala mdoc:compile-only
import com.spotify.scio._
import com.spotify.scio.redis._
import com.spotify.scio.values.SCollection
import org.apache.beam.sdk.transforms.ParDo
import scala.concurrent.{ExecutionContext, Future}

val redisHost: String = ???
val redisPort: Int = ???
val batchSize: Int = ???
val connectionOptions = RedisConnectionOptions(redisHost, redisPort)

val keys: SCollection[String] = ???

keys
.applyTransform(
ParDo.of(
new RedisDoFn[String, (String, Option[String])](connectionOptions, batchSize) {
override def request(value: String, client: Client)(
implicit ec: ExecutionContext
): Future[(String, Option[String])] =
client
.request(p => p.get(value) :: Nil)
.map { case r: List[String @unchecked] => (value, r.headOption) }
}
)
)
```

# Write

Expand Down