Skip to content

Commit

Permalink
Neo4j parametrized from existing SCollection (#4719)
Browse files Browse the repository at this point in the history
* Neo4j parallel-query from existing SCollection

* Apply review comments

* Simplify tests

* revise scaladoc to be consistent with refactor

* Remove useless interpolation

---------

Co-authored-by: Michel Davit <micheld@spotify.com>
  • Loading branch information
sumitsu and RustedBones authored Mar 1, 2023
1 parent 269b86b commit e2b92c7
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 29 deletions.
75 changes: 50 additions & 25 deletions scio-neo4j/src/it/scala/com/spotify/scio/neo4j/Neo4jIOIT.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import com.dimafeng.testcontainers.{ForAllTestContainer, Neo4jContainer}
import com.spotify.scio.testing.PipelineSpec
import org.apache.beam.runners.direct.DirectRunner
import org.apache.beam.sdk.options.PipelineOptionsFactory
import org.neo4j.driver.{AuthTokens, Driver, GraphDatabase, Record}
import org.neo4j.driver.{AuthTokens, Driver, GraphDatabase}
import org.scalatest.concurrent.Eventually
import org.testcontainers.utility.DockerImageName

Expand All @@ -21,6 +21,9 @@ object Neo4jIOIT {
final case class Movie(title: String, year: Int)
final case class Role(person: Person, movie: Movie, role: String)
final case class Origin(movie: String, country: String)

final case class MovieParam(year: Int)

}

class Neo4jIOIT extends PipelineSpec with Eventually with ForAllTestContainer {
Expand All @@ -37,7 +40,8 @@ class Neo4jIOIT extends PipelineSpec with Eventually with ForAllTestContainer {
// creating data from
// https://neo4j.com/docs/getting-started/current/cypher-intro/load-csv/#_the_graph_model
// (m)-[:ORIGIN]->(c) relation is added in the write test
"Neo4jIO" should "read cypher query from the graph database" in {
override def afterStart(): Unit = {
super.afterStart()
val session = client.session()
try {
session.writeTransaction { tx =>
Expand All @@ -63,51 +67,72 @@ class Neo4jIOIT extends PipelineSpec with Eventually with ForAllTestContainer {
()
}
} finally session.close()
}

val martin = Person("Martin Sheen")
val morgan = Person("Morgan Freeman")
val michael = Person("Michael Douglas")

val options = PipelineOptionsFactory.create()
options.setRunner(classOf[DirectRunner])
val americanPresident = Movie("American President", 1995)

val options = PipelineOptionsFactory.create()
lazy val neo4jOptions = Neo4jOptions(
Neo4jConnectionOptions(container.boltUrl, container.username, container.password)
)

"Neo4jIO" should "read cypher query from the graph database" in {
val queryRoles =
s"""MATCH (p)-[r: ACTED_IN]->(m)
|WHERE p.name='${martin.name}'
|RETURN p as person, m as movie, r.role as role
|""".stripMargin

val martin = Person("Martin Sheen")
val expectedRoles = Seq(
Role(martin, Movie("Wall Street", 1987), "Carl Fox"),
Role(martin, Movie("American President", 1995), "A.J. MacInerney")
)

val neo4jOptions = Neo4jOptions(
Neo4jConnectionOptions(container.boltUrl, container.username, container.password)
runWithRealContext(options) { sc =>
val resultQueryRoles = sc.neo4jCypher[Role](neo4jOptions, queryRoles)
resultQueryRoles should containInAnyOrder(expectedRoles)
}
}

it should "read cypher query from the graph database with parameter" in {
val queryParams = Seq(
MovieParam(1994),
MovieParam(0),
MovieParam(1995)
)

val queryRoles = s"""MATCH (p)-[r: ACTED_IN]->(m)
|WHERE p.name='${martin.name}'
|RETURN p as person, m as movie, r.role as role
|""".stripMargin
val queryRoles =
"""MATCH (p)-[r: ACTED_IN]->(m)
|WHERE m.year = $year
|RETURN p as person, m as movie, r.role as role
|""".stripMargin

implicit val rowMapper = (record: Record) => {
val p = Person(record.get("p").get("name").asString())
val m = Movie(record.get("m").get("name").asString(), record.get("m").get("year").asInt())
Role(p, m, record.get("r").get("role").asString())
}
val expectedRoles = Seq(
Role(martin, americanPresident, "A.J. MacInerney"),
Role(michael, americanPresident, "President Andrew Shepherd"),
Role(morgan, Movie("The Shawshank Redemption", 1994), "Ellis Boyd 'Red' Redding")
)

runWithRealContext(options) { sc =>
val result = sc.neo4jCypher[Role](neo4jOptions, queryRoles)
result should containInAnyOrder(expectedRoles)
val resultQueryMovieYear = sc
.parallelize(queryParams)
.neo4jCypher[Role](neo4jOptions, queryRoles)

resultQueryMovieYear should containInAnyOrder(expectedRoles)
}
}

it should "write to the graph database" in {
val options = PipelineOptionsFactory.create()
options.setRunner(classOf[DirectRunner])

val movieOrigins = Seq(
Origin("Wall Street", "USA"),
Origin("American President", "USA"),
Origin("The Shawshank Redemption", "USA")
)

val neo4jOptions = Neo4jOptions(
Neo4jConnectionOptions(container.boltUrl, container.username, container.password)
)

val insertOrigins = """UNWIND $origin AS origin
|MATCH
| (m:Movie {title: origin.movie}),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ object Neo4jIO {
}
final case class WriteParam(batchSize: Long = WriteParam.BeamDefaultBatchSize)

implicit private def recordConverter(record: Record): Value =
implicit private[neo4j] def recordConverter(record: Record): Value =
Values.value(record.asMap(identity[Value]))

private[neo4j] val UnwindParameterRegex: Regex = """UNWIND \$(\w+)""".r.unanchored
Expand Down Expand Up @@ -97,7 +97,7 @@ final case class Neo4jIO[T](neo4jOptions: Neo4jOptions, cypher: String)(implicit
beam.Neo4jIO
.writeUnwind()
.withUnwindMapName(unwindMapName)
.withDriverConfiguration(Neo4jIO.dataSourceConfiguration(neo4jOptions.connectionOptions))
.withDriverConfiguration(dataSourceConfiguration(neo4jOptions.connectionOptions))
.withSessionConfig(neo4jOptions.sessionConfig)
.withTransactionConfig(neo4jOptions.transactionConfig)
.withBatchSize(params.batchSize)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,65 @@

package com.spotify.scio.neo4j.syntax

import com.spotify.scio.coders.Coder
import com.spotify.scio.coders.{Coder, CoderMaterializer}
import com.spotify.scio.io.ClosedTap
import com.spotify.scio.neo4j.Neo4jIO.WriteParam
import com.spotify.scio.neo4j.{Neo4jIO, Neo4jOptions}
import com.spotify.scio.values.SCollection
import magnolify.neo4j.ValueType
import org.apache.beam.sdk.io.{neo4j => beam}

/** Enhanced version of [[com.spotify.scio.values.SCollection SCollection]] with Neo4J methods. */
final class Neo4jSCollectionOps[T](private val self: SCollection[T]) extends AnyVal {

import Neo4jIO._

/**
* Execute parallel instances of the provided Cypher query to the specified Neo4j database; one
* instance of the query will be executed for each element in this [[SCollection]]. Results from
* each query invocation will be added to the resulting [[SCollection]] as if by a `flatMap`
* transformation (where the Neo4j-query-execution returns an `Iterable`).
*
* This operation parameterizes each query invocation by first transforming each [[SCollection]]
* element via [[magnolify.neo4j.ValueType.to]]. Parameter names in the provided Cypher query
* [[String]] must match the names of keys in the [[org.neo4j.driver.Value]] generated by the
* (implicit) [[ValueType]] for the input [[SCollection]] type. (If the input type is a case
* class, that means that parameter names in the query should correspond to the fields defined in
* the case class.)
*
* @see
* ''Reading from Neo4j'' in the
* [[https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/neo4j/Neo4jIO.html Beam `Neo4jIO` documentation]]
* @see
* syntax for Cypher query parameters defined in
* [[https://neo4j.com/docs/cypher-manual/current/syntax/parameters Cypher Manual (Syntax / Parameters)]]
* @param neo4jConf
* options for configuring a Neo4J driver
* @param cypher
* parameterized Cypher query
* @return
* [[SCollection]] containing the union of query results from a parameterized query invocation
* for each original [[SCollection]] element
*/
def neo4jCypher[U](
neo4jConf: Neo4jOptions,
cypher: String
)(implicit
neo4jInType: ValueType[T],
neo4jOutType: ValueType[U],
coder: Coder[U]
): SCollection[U] =
self.applyTransform(
beam.Neo4jIO
.readAll[T, U]()
.withDriverConfiguration(dataSourceConfiguration(neo4jConf.connectionOptions))
.withSessionConfig(neo4jConf.sessionConfig)
.withTransactionConfig(neo4jConf.transactionConfig)
.withCypher(cypher)
.withParametersFunction(neo4jInType.to(_).asMap())
.withRowMapper(neo4jOutType.from(_))
.withCoder(CoderMaterializer.beam(self.context, coder))
)

/**
* Save this SCollection as a Neo4J database.
*
Expand Down

0 comments on commit e2b92c7

Please sign in to comment.