Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use JacksonJsonpMapper as default for Elasticsearch #5306

Merged
merged 4 commits into from
Mar 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -878,6 +878,9 @@ lazy val `scio-elasticsearch-common` = project
libraryDependencies ++= Seq(
// compile
"commons-io" % "commons-io" % commonsIoVersion,
"com.fasterxml.jackson.datatype" % "jackson-datatype-jsr310" % jacksonVersion,
"com.fasterxml.jackson.core" % "jackson-databind" % jacksonVersion,
"com.fasterxml.jackson.module" %% "jackson-module-scala" % jacksonVersion,
"jakarta.json" % "jakarta.json-api" % jakartaJsonVersion,
"joda-time" % "joda-time" % jodaTimeVersion,
"org.apache.beam" % "beam-sdks-java-core" % beamVersion,
Expand Down Expand Up @@ -1207,7 +1210,6 @@ lazy val `scio-examples` = project
unusedCompileDependenciesFilter -= moduleFilter("mysql", "mysql-connector-java"),
libraryDependencies ++= Seq(
// compile
"com.fasterxml.jackson.core" % "jackson-databind" % jacksonVersion,
"com.fasterxml.jackson.datatype" % "jackson-datatype-jsr310" % jacksonVersion,
"com.fasterxml.jackson.module" %% "jackson-module-scala" % jacksonVersion,
"com.google.api-client" % "google-api-client" % googleApiClientVersion,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@ trait ElasticsearchIOBehavior extends Eventually with ForAllTestContainer { this
}

def elasticsearchIO() = {

// from https://www.elastic.co/blog/a-practical-introduction-to-elasticsearch
it should "apply operations to elasticsearch cluster" in {
val options = PipelineOptionsFactory.create()
Expand All @@ -103,8 +102,7 @@ trait ElasticsearchIOBehavior extends Eventually with ForAllTestContainer { this
val host = new HttpHost(container.host, container.mappedPort(9200))
val esOptions = ElasticsearchOptions(
nodes = Seq(host),
usernameAndPassword = Some((Username, Password)),
mapperFactory = createScalaMapper
usernameAndPassword = Some((Username, Password))
RustedBones marked this conversation as resolved.
Show resolved Hide resolved
)

val persons = Seq(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
package com.spotify.scio

import co.elastic.clients.elasticsearch.core.bulk.BulkOperation
import co.elastic.clients.json.{JsonpMapper, SimpleJsonpMapper}
import co.elastic.clients.json.jackson.JacksonJsonpMapper
import co.elastic.clients.json.JsonpMapper
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.spotify.scio.elasticsearch.ElasticsearchIO.{RetryConfig, WriteParam}
import com.spotify.scio.io.ClosedTap
import com.spotify.scio.values.SCollection
Expand All @@ -34,11 +37,17 @@ import org.joda.time.Duration
* }}}
*/
package object elasticsearch extends CoderInstances {
def defaultMapper(): JsonpMapper = {
// Use jackson for user json serialization, add scala and java.time support
val mapper = new JacksonJsonpMapper()
mapper.objectMapper().registerModule(DefaultScalaModule).registerModule(new JavaTimeModule())
mapper
}

final case class ElasticsearchOptions(
nodes: Seq[HttpHost],
usernameAndPassword: Option[(String, String)] = None,
mapperFactory: () => JsonpMapper = () => new SimpleJsonpMapper()
mapperFactory: () => JsonpMapper = defaultMapper
)

implicit class ElasticsearchSCollection[T](@transient private val self: SCollection[T])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,24 @@
package com.spotify.scio.elasticsearch

import co.elastic.clients.elasticsearch.core.bulk.{BulkOperation, IndexOperation}
import co.elastic.clients.json.jackson.{
JacksonJsonpGenerator,
JacksonJsonpMapper,
JacksonJsonpParser
}
import com.fasterxml.jackson.core.JsonFactory
import com.spotify.scio.testing._

class ElasticsearchIOTest extends ScioIOSpec {
import java.io.StringWriter
import java.time.LocalDate

object ElasticsearchIOTest {
type Document = Map[String, String]
case class Record(i: Int, jt: java.time.Instant, ldt: LocalDate)
}

class ElasticsearchIOTest extends ScioIOSpec {
import ElasticsearchIOTest._

"ElasticsearchIO" should "work with output" in {
val xs = 1 to 100
Expand All @@ -38,4 +51,22 @@ class ElasticsearchIOTest extends ScioIOSpec {
}
}

"ElasticsearchOptions" should "have a default mapper that supports scala and java.time" in {
val record = Record(10, java.time.Instant.ofEpochSecond(10), LocalDate.of(2021, 10, 22))

// the mapper internals require that the generator and parser types match,
// so we have to do some awkward java dancing here
val mapper = ElasticsearchOptions(Nil).mapperFactory()
val writer = new StringWriter()
val generator = new JacksonJsonpGenerator(new JsonFactory().createGenerator(writer))
mapper.serialize(record, generator)
generator.close()
val parser = new JacksonJsonpParser(
new JsonFactory().createParser(writer.toString),
mapper.asInstanceOf[JacksonJsonpMapper]
)
val roundTripped = mapper.deserialize(parser, classOf[Record])

roundTripped should equal(record)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,18 +60,7 @@ object ElasticsearchMinimalExample {
val primaryHost = new HttpHost(host, port)
val nodes = Seq(primaryHost)

val clusterOpts = ElasticsearchOptions(
nodes = nodes,
mapperFactory = () => {
// Use jackson for user json serialization
val mapper = new JacksonJsonpMapper()
// Add scala support
mapper.objectMapper().registerModule(DefaultScalaModule)
// Add java.time support
mapper.objectMapper().registerModule(new JavaTimeModule())
mapper
}
)
val clusterOpts = ElasticsearchOptions(nodes = nodes)

// Provide an elasticsearch indexer to transform collections to indexable ES documents
val indexRequestBuilder = indexer(index)
Expand Down