Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Neo4j parametrized from existing SCollection #4719

Merged
merged 5 commits into from
Mar 1, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 55 additions & 25 deletions scio-neo4j/src/it/scala/com/spotify/scio/neo4j/Neo4jIOIT.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import com.dimafeng.testcontainers.{ForAllTestContainer, Neo4jContainer}
import com.spotify.scio.testing.PipelineSpec
import org.apache.beam.runners.direct.DirectRunner
import org.apache.beam.sdk.options.PipelineOptionsFactory
import org.neo4j.driver.{AuthTokens, Driver, GraphDatabase, Record}
import org.neo4j.driver.{AuthTokens, Driver, GraphDatabase}
import org.scalatest.concurrent.Eventually
import org.testcontainers.utility.DockerImageName

Expand All @@ -21,6 +21,9 @@ object Neo4jIOIT {
final case class Movie(title: String, year: Int)
final case class Role(person: Person, movie: Movie, role: String)
final case class Origin(movie: String, country: String)

final case class MovieParam(year: Int)

}

class Neo4jIOIT extends PipelineSpec with Eventually with ForAllTestContainer {
Expand All @@ -37,7 +40,8 @@ class Neo4jIOIT extends PipelineSpec with Eventually with ForAllTestContainer {
// creating data from
// https://neo4j.com/docs/getting-started/current/cypher-intro/load-csv/#_the_graph_model
// (m)-[:ORIGIN]->(c) relation is added in the write test
"Neo4jIO" should "read cypher query from the graph database" in {
override def afterStart(): Unit = {
super.afterStart()
val session = client.session()
try {
session.writeTransaction { tx =>
Expand All @@ -63,51 +67,77 @@ class Neo4jIOIT extends PipelineSpec with Eventually with ForAllTestContainer {
()
}
} finally session.close()
}

val options = PipelineOptionsFactory.create()
options.setRunner(classOf[DirectRunner])
val martin = Person("Martin Sheen")
val morgan = Person("Morgan Freeman")
val michael = Person("Michael Douglas")

val americanPresident = Movie("American President", 1995)

val options = PipelineOptionsFactory.create()
options.setRunner(classOf[DirectRunner])
RustedBones marked this conversation as resolved.
Show resolved Hide resolved

lazy val neo4jOptions = Neo4jOptions(
Neo4jConnectionOptions(container.boltUrl, container.username, container.password)
)

"Neo4jIO" should "read cypher query from the graph database" in {
val queryRoles =
s"""MATCH (p)-[r: ACTED_IN]->(m)
|WHERE p.name='${martin.name}'
|RETURN p as person, m as movie, r.role as role
|""".stripMargin

val martin = Person("Martin Sheen")
val expectedRoles = Seq(
Role(martin, Movie("Wall Street", 1987), "Carl Fox"),
Role(martin, Movie("American President", 1995), "A.J. MacInerney")
)

val neo4jOptions = Neo4jOptions(
Neo4jConnectionOptions(container.boltUrl, container.username, container.password)
runWithRealContext(options) { sc =>
val resultQueryRoles = sc.neo4jCypher[Role](neo4jOptions, queryRoles)
resultQueryRoles should containInAnyOrder(expectedRoles)
}
}

it should "read cypher query from the graph database with parameter" in {
val options = PipelineOptionsFactory.create()
options.setRunner(classOf[DirectRunner])

val queryParams = Seq(
MovieParam(1994),
MovieParam(0),
MovieParam(1995)
)

val queryRoles = s"""MATCH (p)-[r: ACTED_IN]->(m)
|WHERE p.name='${martin.name}'
|RETURN p as person, m as movie, r.role as role
|""".stripMargin
val queryRoles =
s"""MATCH (p)-[r: ACTED_IN]->(m)
|WHERE m.year = $$year
|RETURN p as person, m as movie, r.role as role
|""".stripMargin

implicit val rowMapper = (record: Record) => {
val p = Person(record.get("p").get("name").asString())
val m = Movie(record.get("m").get("name").asString(), record.get("m").get("year").asInt())
Role(p, m, record.get("r").get("role").asString())
}
val expectedRoles = Seq(
Role(martin, americanPresident, "A.J. MacInerney"),
Role(michael, americanPresident, "President Andrew Shepherd"),
Role(morgan, Movie("The Shawshank Redemption", 1994), "Ellis Boyd 'Red' Redding")
)

runWithRealContext(options) { sc =>
val result = sc.neo4jCypher[Role](neo4jOptions, queryRoles)
result should containInAnyOrder(expectedRoles)
val resultQueryMovieYear = sc
.parallelize(queryParams)
.neo4jCypher[Role](neo4jOptions, queryRoles)

resultQueryMovieYear should containInAnyOrder(expectedRoles)
}
}

it should "write to the graph database" in {
val options = PipelineOptionsFactory.create()
options.setRunner(classOf[DirectRunner])

val movieOrigins = Seq(
Origin("Wall Street", "USA"),
Origin("American President", "USA"),
Origin("The Shawshank Redemption", "USA")
)

val neo4jOptions = Neo4jOptions(
Neo4jConnectionOptions(container.boltUrl, container.username, container.password)
)

val insertOrigins = """UNWIND $origin AS origin
|MATCH
| (m:Movie {title: origin.movie}),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ object Neo4jIO {
}
final case class WriteParam(batchSize: Long = WriteParam.BeamDefaultBatchSize)

implicit private def recordConverter(record: Record): Value =
implicit private[neo4j] def recordConverter(record: Record): Value =
Values.value(record.asMap(identity[Value]))

private[neo4j] val UnwindParameterRegex: Regex = """UNWIND \$(\w+)""".r.unanchored
Expand Down Expand Up @@ -97,7 +97,7 @@ final case class Neo4jIO[T](neo4jOptions: Neo4jOptions, cypher: String)(implicit
beam.Neo4jIO
.writeUnwind()
.withUnwindMapName(unwindMapName)
.withDriverConfiguration(Neo4jIO.dataSourceConfiguration(neo4jOptions.connectionOptions))
.withDriverConfiguration(dataSourceConfiguration(neo4jOptions.connectionOptions))
.withSessionConfig(neo4jOptions.sessionConfig)
.withTransactionConfig(neo4jOptions.transactionConfig)
.withBatchSize(params.batchSize)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,63 @@

package com.spotify.scio.neo4j.syntax

import com.spotify.scio.coders.Coder
import com.spotify.scio.coders.{Coder, CoderMaterializer}
import com.spotify.scio.io.ClosedTap
import com.spotify.scio.neo4j.Neo4jIO.WriteParam
import com.spotify.scio.neo4j.{Neo4jIO, Neo4jOptions}
import com.spotify.scio.values.SCollection
import magnolify.neo4j.ValueType
import org.apache.beam.sdk.io.{neo4j => beam}

/** Enhanced version of [[com.spotify.scio.values.SCollection SCollection]] with Neo4J methods. */
final class Neo4jSCollectionOps[T](private val self: SCollection[T]) extends AnyVal {

import Neo4jIO._

/**
* Execute parallel instances of the provided Cypher query to the specified Neo4j database; one
* instance of the query will be executed for each element in this [[SCollection]]. Results from
* each query invocation will be added to the resulting [[SCollection]] as if by a `flatMap`
* transformation (where the Neo4j-query-execution returns an `Iterable`).
*
* This operation parameterizes each query invocation by applying a supplied function to each
* [[SCollection]] element before executing the Cypher query. The function must produce a [[Map]]
* of [[String]] to [[AnyRef]], where the keys correspond to named parameters in the provided
* Cypher query and the corresponding values correspond to the intended value of that parameter
* for a given query invocation. Named parameters must consist of letters and numbers, prepended
* with a `$`, as described in the Neo4j
* [[https://neo4j.com/docs/cypher-manual/current/syntax/parameters Cypher Manual (Syntax / Parameters)]].
*
* @see
* ''Reading from Neo4j'' in the
* [[https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/neo4j/Neo4jIO.html Beam `Neo4jIO` documentation]]
* @param neo4jConf
* options for configuring a Neo4J driver
* @param cypher
* parameterized Cypher query
* @return
* [[SCollection]] containing the union of query results from a parameterized query invocation
* for each original [[SCollection]] element
*/
def neo4jCypher[U](
neo4jConf: Neo4jOptions,
cypher: String
)(implicit
neo4jInType: ValueType[T],
neo4jOutType: ValueType[U],
coder: Coder[U]
): SCollection[U] =
self.applyTransform(
beam.Neo4jIO
.readAll[T, U]()
.withDriverConfiguration(dataSourceConfiguration(neo4jConf.connectionOptions))
.withSessionConfig(neo4jConf.sessionConfig)
.withTransactionConfig(neo4jConf.transactionConfig)
.withCypher(cypher)
.withParametersFunction(neo4jInType.to(_).asMap())
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah nice! Yes, agree that this looks much better. 🙂

Just to confirm that my understanding is correct: it would be incumbent upon the user to write the Cypher query String such that the parameter names match the field names used by the (implicit) ValueType[T] implementation? (Of course, in most cases, it would probably be easiest to use a case class (with the existing CaseMapper-powered ValueType) and match the query to the case class field names.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's correct.
The write API works the same way: if the cipher refers to a field which is not in the model (after the case mapper transform), there will be a runtime exception.

.withRowMapper(neo4jOutType.from(_))
.withCoder(CoderMaterializer.beam(self.context, coder))
)

/**
* Save this SCollection as a Neo4J database.
*
Expand Down