From c8542e379595aa8952b06f92b2664695a416ab1a Mon Sep 17 00:00:00 2001 From: Branden Smith Date: Thu, 23 Feb 2023 20:29:50 +0000 Subject: [PATCH 1/5] Neo4j parallel-query from existing SCollection --- .../com/spotify/scio/neo4j/Neo4jIOIT.scala | 43 ++++- .../com/spotify/scio/neo4j/Neo4jIO.scala | 17 +- .../spotify/scio/neo4j/ops/Neo4jCommon.scala | 36 +++++ .../scio/neo4j/ops/Neo4jCommonImplicits.scala | 28 ++++ .../neo4j/ops/Neo4jSCollectionReadOps.scala | 149 ++++++++++++++++++ .../neo4j/ops/Neo4jSCollectionWriteOps.scala | 51 ++++++ .../scio/neo4j/syntax/SCollectionSyntax.scala | 36 +---- 7 files changed, 310 insertions(+), 50 deletions(-) create mode 100644 scio-neo4j/src/main/scala/com/spotify/scio/neo4j/ops/Neo4jCommon.scala create mode 100644 scio-neo4j/src/main/scala/com/spotify/scio/neo4j/ops/Neo4jCommonImplicits.scala create mode 100644 scio-neo4j/src/main/scala/com/spotify/scio/neo4j/ops/Neo4jSCollectionReadOps.scala create mode 100644 scio-neo4j/src/main/scala/com/spotify/scio/neo4j/ops/Neo4jSCollectionWriteOps.scala diff --git a/scio-neo4j/src/it/scala/com/spotify/scio/neo4j/Neo4jIOIT.scala b/scio-neo4j/src/it/scala/com/spotify/scio/neo4j/Neo4jIOIT.scala index 6b2aab04af..10b3c2bb13 100644 --- a/scio-neo4j/src/it/scala/com/spotify/scio/neo4j/Neo4jIOIT.scala +++ b/scio-neo4j/src/it/scala/com/spotify/scio/neo4j/Neo4jIOIT.scala @@ -68,20 +68,38 @@ class Neo4jIOIT extends PipelineSpec with Eventually with ForAllTestContainer { options.setRunner(classOf[DirectRunner]) val martin = Person("Martin Sheen") - val expectedRoles = Seq( + val morgan = Person("Morgan Freeman") + val michael = Person("Michael Douglas") + + val americanPresident = Movie("American President", 1995) + + val queryMovieYears = Seq(1994, 0, 1995) + val expectedRolesMartin = Seq( Role(martin, Movie("Wall Street", 1987), "Carl Fox"), Role(martin, Movie("American President", 1995), "A.J. MacInerney") ) + val expectedRolesMovieYears = Seq( + Role(martin, americanPresident, "A.J. MacInerney"), + Role(michael, americanPresident, "President Andrew Shepherd"), + Role(morgan, Movie("The Shawshank Redemption", 1994), "Ellis Boyd 'Red' Redding") + ) + + val queryRoles = + s"""MATCH (p)-[r: ACTED_IN]->(m) + |WHERE p.name='${martin.name}' + |RETURN p as person, m as movie, r.role as role + |""".stripMargin + + val queryMovieYear = + s"""MATCH (p)-[r: ACTED_IN]->(m) + |WHERE m.year = $$movieYear + |RETURN p as person, m as movie, r.role as role + |""".stripMargin val neo4jOptions = Neo4jOptions( Neo4jConnectionOptions(container.boltUrl, container.username, container.password) ) - val queryRoles = s"""MATCH (p)-[r: ACTED_IN]->(m) - |WHERE p.name='${martin.name}' - |RETURN p as person, m as movie, r.role as role - |""".stripMargin - implicit val rowMapper = (record: Record) => { val p = Person(record.get("p").get("name").asString()) val m = Movie(record.get("m").get("name").asString(), record.get("m").get("year").asInt()) @@ -89,8 +107,17 @@ class Neo4jIOIT extends PipelineSpec with Eventually with ForAllTestContainer { } runWithRealContext(options) { sc => - val result = sc.neo4jCypher[Role](neo4jOptions, queryRoles) - result should containInAnyOrder(expectedRoles) + val resultQueryRoles = sc.neo4jCypher[Role](neo4jOptions, queryRoles) + resultQueryRoles should containInAnyOrder(expectedRolesMartin) + + val resultQueryMovieYear = sc + .parallelize(queryMovieYears) + .neo4jCypherWithParams[Role]( + neo4jOptions, + queryMovieYear, + (my: Int) => Map("movieYear" -> java.lang.Integer.valueOf(my)) + ) + resultQueryMovieYear should containInAnyOrder(expectedRolesMovieYears) } } diff --git a/scio-neo4j/src/main/scala/com/spotify/scio/neo4j/Neo4jIO.scala b/scio-neo4j/src/main/scala/com/spotify/scio/neo4j/Neo4jIO.scala index 7417c3c313..bd3aad666f 100644 --- a/scio-neo4j/src/main/scala/com/spotify/scio/neo4j/Neo4jIO.scala +++ b/scio-neo4j/src/main/scala/com/spotify/scio/neo4j/Neo4jIO.scala @@ -20,10 +20,11 @@ package com.spotify.scio.neo4j import com.spotify.scio.ScioContext import com.spotify.scio.coders.{Coder, CoderMaterializer} import com.spotify.scio.io._ +import com.spotify.scio.neo4j.ops.Neo4jCommon.dataSourceConfiguration +import com.spotify.scio.neo4j.ops.Neo4jCommonImplicits import com.spotify.scio.values.SCollection import magnolify.neo4j.ValueType import org.apache.beam.sdk.io.{neo4j => beam} -import org.neo4j.driver.{Record, Value, Values} import scala.util.matching.Regex @@ -34,9 +35,6 @@ object Neo4jIO { } final case class WriteParam(batchSize: Long = WriteParam.BeamDefaultBatchSize) - implicit private def recordConverter(record: Record): Value = - Values.value(record.asMap(identity[Value])) - private[neo4j] val UnwindParameterRegex: Regex = """UNWIND \$(\w+)""".r.unanchored private[neo4j] def neo4jIoId(opts: Neo4jOptions, cypher: String): String = @@ -45,14 +43,6 @@ object Neo4jIO { private[neo4j] def neo4jIoId(opts: Neo4jConnectionOptions, cypher: String): String = s"${opts.username}:${opts.password}@${opts.url}:$cypher" - private[neo4j] def dataSourceConfiguration( - connectionOptions: Neo4jConnectionOptions - ): beam.Neo4jIO.DriverConfiguration = - beam.Neo4jIO.DriverConfiguration.create( - connectionOptions.url, - connectionOptions.username, - connectionOptions.password - ) } final case class Neo4jIO[T](neo4jOptions: Neo4jOptions, cypher: String)(implicit @@ -60,6 +50,7 @@ final case class Neo4jIO[T](neo4jOptions: Neo4jOptions, cypher: String)(implicit coder: Coder[T] ) extends ScioIO[T] { + import Neo4jCommonImplicits._ import Neo4jIO._ override type ReadP = Unit @@ -97,7 +88,7 @@ final case class Neo4jIO[T](neo4jOptions: Neo4jOptions, cypher: String)(implicit beam.Neo4jIO .writeUnwind() .withUnwindMapName(unwindMapName) - .withDriverConfiguration(Neo4jIO.dataSourceConfiguration(neo4jOptions.connectionOptions)) + .withDriverConfiguration(dataSourceConfiguration(neo4jOptions.connectionOptions)) .withSessionConfig(neo4jOptions.sessionConfig) .withTransactionConfig(neo4jOptions.transactionConfig) .withBatchSize(params.batchSize) diff --git a/scio-neo4j/src/main/scala/com/spotify/scio/neo4j/ops/Neo4jCommon.scala b/scio-neo4j/src/main/scala/com/spotify/scio/neo4j/ops/Neo4jCommon.scala new file mode 100644 index 0000000000..51220536cb --- /dev/null +++ b/scio-neo4j/src/main/scala/com/spotify/scio/neo4j/ops/Neo4jCommon.scala @@ -0,0 +1,36 @@ +/* + * Copyright 2023 Spotify AB. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.spotify.scio.neo4j.ops + +import com.spotify.scio.neo4j.Neo4jConnectionOptions +import org.apache.beam.sdk.io.{neo4j => beam} + +/** Shared tooling for Scio-Neo4j components */ +object Neo4jCommon { + + def dataSourceConfiguration( + connectionOptions: Neo4jConnectionOptions + ): beam.Neo4jIO.DriverConfiguration = { + beam.Neo4jIO.DriverConfiguration.create( + connectionOptions.url, + connectionOptions.username, + connectionOptions.password + ) + } + +} diff --git a/scio-neo4j/src/main/scala/com/spotify/scio/neo4j/ops/Neo4jCommonImplicits.scala b/scio-neo4j/src/main/scala/com/spotify/scio/neo4j/ops/Neo4jCommonImplicits.scala new file mode 100644 index 0000000000..e02b8eb307 --- /dev/null +++ b/scio-neo4j/src/main/scala/com/spotify/scio/neo4j/ops/Neo4jCommonImplicits.scala @@ -0,0 +1,28 @@ +/* + * Copyright 2023 Spotify AB. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.spotify.scio.neo4j.ops + +import org.neo4j.driver.{Record, Value, Values} + +/** Implicit conversions for Scio-Neo4j components */ +object Neo4jCommonImplicits { + + implicit def recordConverter(record: Record): Value = + Values.value(record.asMap(identity[Value])) + +} diff --git a/scio-neo4j/src/main/scala/com/spotify/scio/neo4j/ops/Neo4jSCollectionReadOps.scala b/scio-neo4j/src/main/scala/com/spotify/scio/neo4j/ops/Neo4jSCollectionReadOps.scala new file mode 100644 index 0000000000..9aaa58ce31 --- /dev/null +++ b/scio-neo4j/src/main/scala/com/spotify/scio/neo4j/ops/Neo4jSCollectionReadOps.scala @@ -0,0 +1,149 @@ +/* + * Copyright 2023 Spotify AB. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.spotify.scio.neo4j.ops + +import com.spotify.scio.coders.{Coder, CoderMaterializer} +import com.spotify.scio.neo4j.Neo4jOptions +import com.spotify.scio.neo4j.ops.Neo4jCommon.dataSourceConfiguration +import com.spotify.scio.values.SCollection +import magnolify.neo4j.ValueType +import org.apache.beam.sdk.io.{neo4j => beam} +import org.apache.beam.sdk.transforms.SerializableFunction + +import java.util.{Map => JMap} +import scala.jdk.CollectionConverters._ + +/** + * Operations for transforming an existing [[SCollection]] via a Neo4j query (read) + * @param self + * existing [[SCollection]] (prior to Neo4j read) + * @tparam X + * type parameter for existing [[SCollection]] + */ +class Neo4jSCollectionReadOps[X](self: SCollection[X]) { + + /** + * Execute parallel instances of the provided Cypher query to the specified Neo4j database; one + * instance of the query will be executed for each element in this [[SCollection]]. Results from + * each query invocation will be added to the resulting [[SCollection]] as if by a `flatMap` + * transformation (where the Neo4j-query-execution returns an `Iterable`). + * + * This operation parameterizes each query invocation by applying a supplied function to each + * [[SCollection]] element before executing the Cypher query. The function must produce a [[Map]] + * of [[String]] to [[AnyRef]], where the keys correspond to named parameters in the provided + * Cypher query and the corresponding values correspond to the intended value of that parameter + * for a given query invocation. Named parameters must consist of letters and numbers, prepended + * with a `$`, as described in the Neo4j + * [[https://neo4j.com/docs/cypher-manual/current/syntax/parameters Cypher Manual (Syntax / Parameters)]]. + * + * @see + * ''Reading from Neo4j'' in the + * [[https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/neo4j/Neo4jIO.html Beam `Neo4jIO` documentation]] + * @param neo4jConf + * [[Neo4jOptions]] indicating the Neo4j instance on which to run the query + * @param cypher + * [[String]] parameterized Cypher query + * @param elementToParamFunction + * function (`X => Map[String, AnyRef]`) which converts an element of the [[SCollection]] to a + * [[Map]] of parameter values for the given Cypher query + * @param neo4jType + * (implicit) [[ValueType]] converting Neo4j results to the expected [[SCollection]] output type + * @param coder + * (implicit) [[Coder]] for serialization of the result [[SCollection]] type + * @tparam Y + * result [[SCollection]] type + * @return + * [[SCollection]] containing the union of query results from a parameterized query invocation + * for each original [[SCollection]] element + */ + def neo4jCypherWithParams[Y]( + neo4jConf: Neo4jOptions, + cypher: String, + elementToParamFunction: X => Map[String, AnyRef] + )(implicit + neo4jType: ValueType[Y], + coder: Coder[Y] + ): SCollection[Y] = { + Neo4jSCollectionReadOps.neo4jCypherWithParamsImpl( + self, + neo4jConf, + cypher, + elementToParamFunction + ) + } + +} + +/** + * Implementations for operations for transforming an existing [[SCollection]] via a Neo4j query; + * typically used via implicitly-defined [[SCollection]] syntax. + * @see + * [[com.spotify.scio.neo4j.syntax.SCollectionSyntax]] + */ +object Neo4jSCollectionReadOps { + + import Neo4jCommonImplicits._ + + /** + * Convert a provided Scala function to a Beam [[SerializableFunction]] for use in + * [[https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/neo4j/Neo4jIO.ReadAll.html#withParametersFunction-org.apache.beam.sdk.transforms.SerializableFunction- `Neo4jIO.ReadAll`]] + * @param elementToParamFunction + * `X => Map[String, AnyRef]` Scala function defining how elements of an existing + * [[SCollection]] (`X`) should map to parameters ([[String]] keys in the output [[Map]]) in a + * Cypher query + * @tparam X + * type parameter for input [[SCollection]] + * @return + * [[SerializableFunction]] equivalent of input Scala function + */ + private def beamParamFunction[X]( + elementToParamFunction: X => Map[String, AnyRef] + ): SerializableFunction[X, JMap[String, AnyRef]] = { + new SerializableFunction[X, JMap[String, AnyRef]] { + override def apply(input: X): JMap[String, AnyRef] = { + val queryParamStringToArgValue: Map[String, AnyRef] = elementToParamFunction.apply(input) + queryParamStringToArgValue.asJava + } + } + } + + /** @see [[com.spotify.scio.neo4j.ops.Neo4jSCollectionReadOps.neo4jCypherWithParams]] */ + def neo4jCypherWithParamsImpl[X, Y]( + sColl: SCollection[X], + neo4jConf: Neo4jOptions, + cypher: String, + elementToParamFunction: X => Map[String, AnyRef] + )(implicit + neo4jType: ValueType[Y], + coder: Coder[Y] + ): SCollection[Y] = { + sColl + .applyTransform( + beam.Neo4jIO + .readAll[X, Y]() + .withDriverConfiguration(dataSourceConfiguration(neo4jConf.connectionOptions)) + .withSessionConfig(neo4jConf.sessionConfig) + .withTransactionConfig(neo4jConf.transactionConfig) + .withCypher(cypher) + .withParametersFunction(beamParamFunction(elementToParamFunction)) + .withRowMapper(neo4jType.from(_)) + .withCoder(CoderMaterializer.beam(sColl.context, coder)) + ) + } + +} diff --git a/scio-neo4j/src/main/scala/com/spotify/scio/neo4j/ops/Neo4jSCollectionWriteOps.scala b/scio-neo4j/src/main/scala/com/spotify/scio/neo4j/ops/Neo4jSCollectionWriteOps.scala new file mode 100644 index 0000000000..d33ea7778b --- /dev/null +++ b/scio-neo4j/src/main/scala/com/spotify/scio/neo4j/ops/Neo4jSCollectionWriteOps.scala @@ -0,0 +1,51 @@ +/* + * Copyright 2023 Spotify AB. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.spotify.scio.neo4j.ops + +import com.spotify.scio.coders.Coder +import com.spotify.scio.io.ClosedTap +import com.spotify.scio.neo4j.Neo4jIO.WriteParam +import com.spotify.scio.neo4j.{Neo4jIO, Neo4jOptions} +import com.spotify.scio.values.SCollection +import magnolify.neo4j.ValueType + +/** + * Enhanced version of [[com.spotify.scio.values.SCollection SCollection]] with Neo4J write methods. + */ +class Neo4jSCollectionWriteOps[T](private val self: SCollection[T]) extends AnyVal { + + /** + * Save this SCollection as a Neo4J database. + * + * @param neo4jOptions + * options for configuring a Neo4J driver + * @param unwindCypher + * Neo4J cypher query representing an + * [[https://neo4j.com/docs/cypher-manual/current/clauses/unwind/#unwind-creating-nodes-from-a-list-parameter UNWIND parameter]] + * cypher statement + * @param batchSize + * batch size when executing the unwind cypher query. Default batch size of 5000 + */ + def saveAsNeo4j( + neo4jOptions: Neo4jOptions, + unwindCypher: String, + batchSize: Long = WriteParam.BeamDefaultBatchSize + )(implicit neo4jType: ValueType[T], coder: Coder[T]): ClosedTap[Nothing] = + self.write(Neo4jIO[T](neo4jOptions, unwindCypher))(WriteParam(batchSize)) + +} diff --git a/scio-neo4j/src/main/scala/com/spotify/scio/neo4j/syntax/SCollectionSyntax.scala b/scio-neo4j/src/main/scala/com/spotify/scio/neo4j/syntax/SCollectionSyntax.scala index 430c6b44f2..aac64fa39b 100644 --- a/scio-neo4j/src/main/scala/com/spotify/scio/neo4j/syntax/SCollectionSyntax.scala +++ b/scio-neo4j/src/main/scala/com/spotify/scio/neo4j/syntax/SCollectionSyntax.scala @@ -17,37 +17,15 @@ package com.spotify.scio.neo4j.syntax -import com.spotify.scio.coders.Coder -import com.spotify.scio.io.ClosedTap -import com.spotify.scio.neo4j.Neo4jIO.WriteParam -import com.spotify.scio.neo4j.{Neo4jIO, Neo4jOptions} +import com.spotify.scio.neo4j.ops.{Neo4jSCollectionReadOps, Neo4jSCollectionWriteOps} import com.spotify.scio.values.SCollection -import magnolify.neo4j.ValueType -/** Enhanced version of [[com.spotify.scio.values.SCollection SCollection]] with Neo4J methods. */ -final class Neo4jSCollectionOps[T](private val self: SCollection[T]) extends AnyVal { +trait SCollectionSyntax { - /** - * Save this SCollection as a Neo4J database. - * - * @param neo4jOptions - * options for configuring a Neo4J driver - * @param unwindCypher - * Neo4J cypher query representing an - * [[https://neo4j.com/docs/cypher-manual/current/clauses/unwind/#unwind-creating-nodes-from-a-list-parameter UNWIND parameter]] - * cypher statement - * @param batchSize - * batch size when executing the unwind cypher query. Default batch size of 5000 - */ - def saveAsNeo4j( - neo4jOptions: Neo4jOptions, - unwindCypher: String, - batchSize: Long = WriteParam.BeamDefaultBatchSize - )(implicit neo4jType: ValueType[T], coder: Coder[T]): ClosedTap[Nothing] = - self.write(Neo4jIO[T](neo4jOptions, unwindCypher))(WriteParam(batchSize)) -} + implicit def neo4jSCollectionWriteOps[T](sc: SCollection[T]): Neo4jSCollectionWriteOps[T] = + new Neo4jSCollectionWriteOps[T](sc) + + implicit def neo4jSCollectionReadOps[T](sColl: SCollection[T]): Neo4jSCollectionReadOps[T] = + new Neo4jSCollectionReadOps[T](sColl) -trait SCollectionSyntax { - implicit def neo4jSCollectionOps[T](sc: SCollection[T]): Neo4jSCollectionOps[T] = - new Neo4jSCollectionOps(sc) } From 9fdcac3c7feb0bfff3aa0be1a2cb963635d2cf34 Mon Sep 17 00:00:00 2001 From: Michel Davit Date: Fri, 24 Feb 2023 14:30:50 +0100 Subject: [PATCH 2/5] Apply review comments --- .../com/spotify/scio/neo4j/Neo4jIOIT.scala | 93 +++++------ .../com/spotify/scio/neo4j/Neo4jIO.scala | 15 +- .../spotify/scio/neo4j/ops/Neo4jCommon.scala | 36 ----- .../scio/neo4j/ops/Neo4jCommonImplicits.scala | 28 ---- .../neo4j/ops/Neo4jSCollectionReadOps.scala | 149 ------------------ .../neo4j/ops/Neo4jSCollectionWriteOps.scala | 51 ------ .../scio/neo4j/syntax/SCollectionSyntax.scala | 81 +++++++++- 7 files changed, 135 insertions(+), 318 deletions(-) delete mode 100644 scio-neo4j/src/main/scala/com/spotify/scio/neo4j/ops/Neo4jCommon.scala delete mode 100644 scio-neo4j/src/main/scala/com/spotify/scio/neo4j/ops/Neo4jCommonImplicits.scala delete mode 100644 scio-neo4j/src/main/scala/com/spotify/scio/neo4j/ops/Neo4jSCollectionReadOps.scala delete mode 100644 scio-neo4j/src/main/scala/com/spotify/scio/neo4j/ops/Neo4jSCollectionWriteOps.scala diff --git a/scio-neo4j/src/it/scala/com/spotify/scio/neo4j/Neo4jIOIT.scala b/scio-neo4j/src/it/scala/com/spotify/scio/neo4j/Neo4jIOIT.scala index 10b3c2bb13..eb72afdbf9 100644 --- a/scio-neo4j/src/it/scala/com/spotify/scio/neo4j/Neo4jIOIT.scala +++ b/scio-neo4j/src/it/scala/com/spotify/scio/neo4j/Neo4jIOIT.scala @@ -4,7 +4,7 @@ import com.dimafeng.testcontainers.{ForAllTestContainer, Neo4jContainer} import com.spotify.scio.testing.PipelineSpec import org.apache.beam.runners.direct.DirectRunner import org.apache.beam.sdk.options.PipelineOptionsFactory -import org.neo4j.driver.{AuthTokens, Driver, GraphDatabase, Record} +import org.neo4j.driver.{AuthTokens, Driver, GraphDatabase} import org.scalatest.concurrent.Eventually import org.testcontainers.utility.DockerImageName @@ -21,6 +21,9 @@ object Neo4jIOIT { final case class Movie(title: String, year: Int) final case class Role(person: Person, movie: Movie, role: String) final case class Origin(movie: String, country: String) + + final case class MovieParam(year: Int) + } class Neo4jIOIT extends PipelineSpec with Eventually with ForAllTestContainer { @@ -37,7 +40,8 @@ class Neo4jIOIT extends PipelineSpec with Eventually with ForAllTestContainer { // creating data from // https://neo4j.com/docs/getting-started/current/cypher-intro/load-csv/#_the_graph_model // (m)-[:ORIGIN]->(c) relation is added in the write test - "Neo4jIO" should "read cypher query from the graph database" in { + override def afterStart(): Unit = { + super.afterStart() val session = client.session() try { session.writeTransaction { tx => @@ -63,78 +67,77 @@ class Neo4jIOIT extends PipelineSpec with Eventually with ForAllTestContainer { () } } finally session.close() + } - val options = PipelineOptionsFactory.create() - options.setRunner(classOf[DirectRunner]) + val martin = Person("Martin Sheen") + val morgan = Person("Morgan Freeman") + val michael = Person("Michael Douglas") - val martin = Person("Martin Sheen") - val morgan = Person("Morgan Freeman") - val michael = Person("Michael Douglas") + val americanPresident = Movie("American President", 1995) - val americanPresident = Movie("American President", 1995) + val options = PipelineOptionsFactory.create() + options.setRunner(classOf[DirectRunner]) - val queryMovieYears = Seq(1994, 0, 1995) - val expectedRolesMartin = Seq( - Role(martin, Movie("Wall Street", 1987), "Carl Fox"), - Role(martin, Movie("American President", 1995), "A.J. MacInerney") - ) - val expectedRolesMovieYears = Seq( - Role(martin, americanPresident, "A.J. MacInerney"), - Role(michael, americanPresident, "President Andrew Shepherd"), - Role(morgan, Movie("The Shawshank Redemption", 1994), "Ellis Boyd 'Red' Redding") - ) + lazy val neo4jOptions = Neo4jOptions( + Neo4jConnectionOptions(container.boltUrl, container.username, container.password) + ) + "Neo4jIO" should "read cypher query from the graph database" in { val queryRoles = s"""MATCH (p)-[r: ACTED_IN]->(m) |WHERE p.name='${martin.name}' |RETURN p as person, m as movie, r.role as role |""".stripMargin - val queryMovieYear = + val expectedRoles = Seq( + Role(martin, Movie("Wall Street", 1987), "Carl Fox"), + Role(martin, Movie("American President", 1995), "A.J. MacInerney") + ) + + runWithRealContext(options) { sc => + val resultQueryRoles = sc.neo4jCypher[Role](neo4jOptions, queryRoles) + resultQueryRoles should containInAnyOrder(expectedRoles) + } + } + + it should "read cypher query from the graph database with parameter" in { + val options = PipelineOptionsFactory.create() + options.setRunner(classOf[DirectRunner]) + + val queryParams = Seq( + MovieParam(1994), + MovieParam(0), + MovieParam(1995) + ) + + val queryRoles = s"""MATCH (p)-[r: ACTED_IN]->(m) - |WHERE m.year = $$movieYear + |WHERE m.year = $$year |RETURN p as person, m as movie, r.role as role |""".stripMargin - val neo4jOptions = Neo4jOptions( - Neo4jConnectionOptions(container.boltUrl, container.username, container.password) + val expectedRoles = Seq( + Role(martin, americanPresident, "A.J. MacInerney"), + Role(michael, americanPresident, "President Andrew Shepherd"), + Role(morgan, Movie("The Shawshank Redemption", 1994), "Ellis Boyd 'Red' Redding") ) - implicit val rowMapper = (record: Record) => { - val p = Person(record.get("p").get("name").asString()) - val m = Movie(record.get("m").get("name").asString(), record.get("m").get("year").asInt()) - Role(p, m, record.get("r").get("role").asString()) - } - runWithRealContext(options) { sc => - val resultQueryRoles = sc.neo4jCypher[Role](neo4jOptions, queryRoles) - resultQueryRoles should containInAnyOrder(expectedRolesMartin) - val resultQueryMovieYear = sc - .parallelize(queryMovieYears) - .neo4jCypherWithParams[Role]( - neo4jOptions, - queryMovieYear, - (my: Int) => Map("movieYear" -> java.lang.Integer.valueOf(my)) - ) - resultQueryMovieYear should containInAnyOrder(expectedRolesMovieYears) + .parallelize(queryParams) + .neo4jCypher[Role](neo4jOptions, queryRoles) + + resultQueryMovieYear should containInAnyOrder(expectedRoles) } } it should "write to the graph database" in { - val options = PipelineOptionsFactory.create() - options.setRunner(classOf[DirectRunner]) - val movieOrigins = Seq( Origin("Wall Street", "USA"), Origin("American President", "USA"), Origin("The Shawshank Redemption", "USA") ) - val neo4jOptions = Neo4jOptions( - Neo4jConnectionOptions(container.boltUrl, container.username, container.password) - ) - val insertOrigins = """UNWIND $origin AS origin |MATCH | (m:Movie {title: origin.movie}), diff --git a/scio-neo4j/src/main/scala/com/spotify/scio/neo4j/Neo4jIO.scala b/scio-neo4j/src/main/scala/com/spotify/scio/neo4j/Neo4jIO.scala index bd3aad666f..b232d5990b 100644 --- a/scio-neo4j/src/main/scala/com/spotify/scio/neo4j/Neo4jIO.scala +++ b/scio-neo4j/src/main/scala/com/spotify/scio/neo4j/Neo4jIO.scala @@ -20,11 +20,10 @@ package com.spotify.scio.neo4j import com.spotify.scio.ScioContext import com.spotify.scio.coders.{Coder, CoderMaterializer} import com.spotify.scio.io._ -import com.spotify.scio.neo4j.ops.Neo4jCommon.dataSourceConfiguration -import com.spotify.scio.neo4j.ops.Neo4jCommonImplicits import com.spotify.scio.values.SCollection import magnolify.neo4j.ValueType import org.apache.beam.sdk.io.{neo4j => beam} +import org.neo4j.driver.{Record, Value, Values} import scala.util.matching.Regex @@ -35,6 +34,9 @@ object Neo4jIO { } final case class WriteParam(batchSize: Long = WriteParam.BeamDefaultBatchSize) + implicit private[neo4j] def recordConverter(record: Record): Value = + Values.value(record.asMap(identity[Value])) + private[neo4j] val UnwindParameterRegex: Regex = """UNWIND \$(\w+)""".r.unanchored private[neo4j] def neo4jIoId(opts: Neo4jOptions, cypher: String): String = @@ -43,6 +45,14 @@ object Neo4jIO { private[neo4j] def neo4jIoId(opts: Neo4jConnectionOptions, cypher: String): String = s"${opts.username}:${opts.password}@${opts.url}:$cypher" + private[neo4j] def dataSourceConfiguration( + connectionOptions: Neo4jConnectionOptions + ): beam.Neo4jIO.DriverConfiguration = + beam.Neo4jIO.DriverConfiguration.create( + connectionOptions.url, + connectionOptions.username, + connectionOptions.password + ) } final case class Neo4jIO[T](neo4jOptions: Neo4jOptions, cypher: String)(implicit @@ -50,7 +60,6 @@ final case class Neo4jIO[T](neo4jOptions: Neo4jOptions, cypher: String)(implicit coder: Coder[T] ) extends ScioIO[T] { - import Neo4jCommonImplicits._ import Neo4jIO._ override type ReadP = Unit diff --git a/scio-neo4j/src/main/scala/com/spotify/scio/neo4j/ops/Neo4jCommon.scala b/scio-neo4j/src/main/scala/com/spotify/scio/neo4j/ops/Neo4jCommon.scala deleted file mode 100644 index 51220536cb..0000000000 --- a/scio-neo4j/src/main/scala/com/spotify/scio/neo4j/ops/Neo4jCommon.scala +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Copyright 2023 Spotify AB. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package com.spotify.scio.neo4j.ops - -import com.spotify.scio.neo4j.Neo4jConnectionOptions -import org.apache.beam.sdk.io.{neo4j => beam} - -/** Shared tooling for Scio-Neo4j components */ -object Neo4jCommon { - - def dataSourceConfiguration( - connectionOptions: Neo4jConnectionOptions - ): beam.Neo4jIO.DriverConfiguration = { - beam.Neo4jIO.DriverConfiguration.create( - connectionOptions.url, - connectionOptions.username, - connectionOptions.password - ) - } - -} diff --git a/scio-neo4j/src/main/scala/com/spotify/scio/neo4j/ops/Neo4jCommonImplicits.scala b/scio-neo4j/src/main/scala/com/spotify/scio/neo4j/ops/Neo4jCommonImplicits.scala deleted file mode 100644 index e02b8eb307..0000000000 --- a/scio-neo4j/src/main/scala/com/spotify/scio/neo4j/ops/Neo4jCommonImplicits.scala +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Copyright 2023 Spotify AB. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package com.spotify.scio.neo4j.ops - -import org.neo4j.driver.{Record, Value, Values} - -/** Implicit conversions for Scio-Neo4j components */ -object Neo4jCommonImplicits { - - implicit def recordConverter(record: Record): Value = - Values.value(record.asMap(identity[Value])) - -} diff --git a/scio-neo4j/src/main/scala/com/spotify/scio/neo4j/ops/Neo4jSCollectionReadOps.scala b/scio-neo4j/src/main/scala/com/spotify/scio/neo4j/ops/Neo4jSCollectionReadOps.scala deleted file mode 100644 index 9aaa58ce31..0000000000 --- a/scio-neo4j/src/main/scala/com/spotify/scio/neo4j/ops/Neo4jSCollectionReadOps.scala +++ /dev/null @@ -1,149 +0,0 @@ -/* - * Copyright 2023 Spotify AB. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package com.spotify.scio.neo4j.ops - -import com.spotify.scio.coders.{Coder, CoderMaterializer} -import com.spotify.scio.neo4j.Neo4jOptions -import com.spotify.scio.neo4j.ops.Neo4jCommon.dataSourceConfiguration -import com.spotify.scio.values.SCollection -import magnolify.neo4j.ValueType -import org.apache.beam.sdk.io.{neo4j => beam} -import org.apache.beam.sdk.transforms.SerializableFunction - -import java.util.{Map => JMap} -import scala.jdk.CollectionConverters._ - -/** - * Operations for transforming an existing [[SCollection]] via a Neo4j query (read) - * @param self - * existing [[SCollection]] (prior to Neo4j read) - * @tparam X - * type parameter for existing [[SCollection]] - */ -class Neo4jSCollectionReadOps[X](self: SCollection[X]) { - - /** - * Execute parallel instances of the provided Cypher query to the specified Neo4j database; one - * instance of the query will be executed for each element in this [[SCollection]]. Results from - * each query invocation will be added to the resulting [[SCollection]] as if by a `flatMap` - * transformation (where the Neo4j-query-execution returns an `Iterable`). - * - * This operation parameterizes each query invocation by applying a supplied function to each - * [[SCollection]] element before executing the Cypher query. The function must produce a [[Map]] - * of [[String]] to [[AnyRef]], where the keys correspond to named parameters in the provided - * Cypher query and the corresponding values correspond to the intended value of that parameter - * for a given query invocation. Named parameters must consist of letters and numbers, prepended - * with a `$`, as described in the Neo4j - * [[https://neo4j.com/docs/cypher-manual/current/syntax/parameters Cypher Manual (Syntax / Parameters)]]. - * - * @see - * ''Reading from Neo4j'' in the - * [[https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/neo4j/Neo4jIO.html Beam `Neo4jIO` documentation]] - * @param neo4jConf - * [[Neo4jOptions]] indicating the Neo4j instance on which to run the query - * @param cypher - * [[String]] parameterized Cypher query - * @param elementToParamFunction - * function (`X => Map[String, AnyRef]`) which converts an element of the [[SCollection]] to a - * [[Map]] of parameter values for the given Cypher query - * @param neo4jType - * (implicit) [[ValueType]] converting Neo4j results to the expected [[SCollection]] output type - * @param coder - * (implicit) [[Coder]] for serialization of the result [[SCollection]] type - * @tparam Y - * result [[SCollection]] type - * @return - * [[SCollection]] containing the union of query results from a parameterized query invocation - * for each original [[SCollection]] element - */ - def neo4jCypherWithParams[Y]( - neo4jConf: Neo4jOptions, - cypher: String, - elementToParamFunction: X => Map[String, AnyRef] - )(implicit - neo4jType: ValueType[Y], - coder: Coder[Y] - ): SCollection[Y] = { - Neo4jSCollectionReadOps.neo4jCypherWithParamsImpl( - self, - neo4jConf, - cypher, - elementToParamFunction - ) - } - -} - -/** - * Implementations for operations for transforming an existing [[SCollection]] via a Neo4j query; - * typically used via implicitly-defined [[SCollection]] syntax. - * @see - * [[com.spotify.scio.neo4j.syntax.SCollectionSyntax]] - */ -object Neo4jSCollectionReadOps { - - import Neo4jCommonImplicits._ - - /** - * Convert a provided Scala function to a Beam [[SerializableFunction]] for use in - * [[https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/neo4j/Neo4jIO.ReadAll.html#withParametersFunction-org.apache.beam.sdk.transforms.SerializableFunction- `Neo4jIO.ReadAll`]] - * @param elementToParamFunction - * `X => Map[String, AnyRef]` Scala function defining how elements of an existing - * [[SCollection]] (`X`) should map to parameters ([[String]] keys in the output [[Map]]) in a - * Cypher query - * @tparam X - * type parameter for input [[SCollection]] - * @return - * [[SerializableFunction]] equivalent of input Scala function - */ - private def beamParamFunction[X]( - elementToParamFunction: X => Map[String, AnyRef] - ): SerializableFunction[X, JMap[String, AnyRef]] = { - new SerializableFunction[X, JMap[String, AnyRef]] { - override def apply(input: X): JMap[String, AnyRef] = { - val queryParamStringToArgValue: Map[String, AnyRef] = elementToParamFunction.apply(input) - queryParamStringToArgValue.asJava - } - } - } - - /** @see [[com.spotify.scio.neo4j.ops.Neo4jSCollectionReadOps.neo4jCypherWithParams]] */ - def neo4jCypherWithParamsImpl[X, Y]( - sColl: SCollection[X], - neo4jConf: Neo4jOptions, - cypher: String, - elementToParamFunction: X => Map[String, AnyRef] - )(implicit - neo4jType: ValueType[Y], - coder: Coder[Y] - ): SCollection[Y] = { - sColl - .applyTransform( - beam.Neo4jIO - .readAll[X, Y]() - .withDriverConfiguration(dataSourceConfiguration(neo4jConf.connectionOptions)) - .withSessionConfig(neo4jConf.sessionConfig) - .withTransactionConfig(neo4jConf.transactionConfig) - .withCypher(cypher) - .withParametersFunction(beamParamFunction(elementToParamFunction)) - .withRowMapper(neo4jType.from(_)) - .withCoder(CoderMaterializer.beam(sColl.context, coder)) - ) - } - -} diff --git a/scio-neo4j/src/main/scala/com/spotify/scio/neo4j/ops/Neo4jSCollectionWriteOps.scala b/scio-neo4j/src/main/scala/com/spotify/scio/neo4j/ops/Neo4jSCollectionWriteOps.scala deleted file mode 100644 index d33ea7778b..0000000000 --- a/scio-neo4j/src/main/scala/com/spotify/scio/neo4j/ops/Neo4jSCollectionWriteOps.scala +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Copyright 2023 Spotify AB. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package com.spotify.scio.neo4j.ops - -import com.spotify.scio.coders.Coder -import com.spotify.scio.io.ClosedTap -import com.spotify.scio.neo4j.Neo4jIO.WriteParam -import com.spotify.scio.neo4j.{Neo4jIO, Neo4jOptions} -import com.spotify.scio.values.SCollection -import magnolify.neo4j.ValueType - -/** - * Enhanced version of [[com.spotify.scio.values.SCollection SCollection]] with Neo4J write methods. - */ -class Neo4jSCollectionWriteOps[T](private val self: SCollection[T]) extends AnyVal { - - /** - * Save this SCollection as a Neo4J database. - * - * @param neo4jOptions - * options for configuring a Neo4J driver - * @param unwindCypher - * Neo4J cypher query representing an - * [[https://neo4j.com/docs/cypher-manual/current/clauses/unwind/#unwind-creating-nodes-from-a-list-parameter UNWIND parameter]] - * cypher statement - * @param batchSize - * batch size when executing the unwind cypher query. Default batch size of 5000 - */ - def saveAsNeo4j( - neo4jOptions: Neo4jOptions, - unwindCypher: String, - batchSize: Long = WriteParam.BeamDefaultBatchSize - )(implicit neo4jType: ValueType[T], coder: Coder[T]): ClosedTap[Nothing] = - self.write(Neo4jIO[T](neo4jOptions, unwindCypher))(WriteParam(batchSize)) - -} diff --git a/scio-neo4j/src/main/scala/com/spotify/scio/neo4j/syntax/SCollectionSyntax.scala b/scio-neo4j/src/main/scala/com/spotify/scio/neo4j/syntax/SCollectionSyntax.scala index aac64fa39b..af61237b6f 100644 --- a/scio-neo4j/src/main/scala/com/spotify/scio/neo4j/syntax/SCollectionSyntax.scala +++ b/scio-neo4j/src/main/scala/com/spotify/scio/neo4j/syntax/SCollectionSyntax.scala @@ -17,15 +17,84 @@ package com.spotify.scio.neo4j.syntax -import com.spotify.scio.neo4j.ops.{Neo4jSCollectionReadOps, Neo4jSCollectionWriteOps} +import com.spotify.scio.coders.{Coder, CoderMaterializer} +import com.spotify.scio.io.ClosedTap +import com.spotify.scio.neo4j.{Neo4jIO, Neo4jOptions} import com.spotify.scio.values.SCollection +import magnolify.neo4j.ValueType +import org.apache.beam.sdk.io.{neo4j => beam} -trait SCollectionSyntax { +/** Enhanced version of [[com.spotify.scio.values.SCollection SCollection]] with Neo4J methods. */ +final class Neo4jSCollectionOps[T](private val self: SCollection[T]) extends AnyVal { + + import Neo4jIO._ - implicit def neo4jSCollectionWriteOps[T](sc: SCollection[T]): Neo4jSCollectionWriteOps[T] = - new Neo4jSCollectionWriteOps[T](sc) + /** + * Execute parallel instances of the provided Cypher query to the specified Neo4j database; one + * instance of the query will be executed for each element in this [[SCollection]]. Results from + * each query invocation will be added to the resulting [[SCollection]] as if by a `flatMap` + * transformation (where the Neo4j-query-execution returns an `Iterable`). + * + * This operation parameterizes each query invocation by applying a supplied function to each + * [[SCollection]] element before executing the Cypher query. The function must produce a [[Map]] + * of [[String]] to [[AnyRef]], where the keys correspond to named parameters in the provided + * Cypher query and the corresponding values correspond to the intended value of that parameter + * for a given query invocation. Named parameters must consist of letters and numbers, prepended + * with a `$`, as described in the Neo4j + * [[https://neo4j.com/docs/cypher-manual/current/syntax/parameters Cypher Manual (Syntax / Parameters)]]. + * + * @see + * ''Reading from Neo4j'' in the + * [[https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/neo4j/Neo4jIO.html Beam `Neo4jIO` documentation]] + * @param neo4jConf + * options for configuring a Neo4J driver + * @param cypher + * parameterized Cypher query + * @return + * [[SCollection]] containing the union of query results from a parameterized query invocation + * for each original [[SCollection]] element + */ + def neo4jCypher[U]( + neo4jConf: Neo4jOptions, + cypher: String + )(implicit + neo4jInType: ValueType[T], + neo4jOutType: ValueType[U], + coder: Coder[U] + ): SCollection[U] = + self.applyTransform( + beam.Neo4jIO + .readAll[T, U]() + .withDriverConfiguration(dataSourceConfiguration(neo4jConf.connectionOptions)) + .withSessionConfig(neo4jConf.sessionConfig) + .withTransactionConfig(neo4jConf.transactionConfig) + .withCypher(cypher) + .withParametersFunction(neo4jInType.to(_).asMap()) + .withRowMapper(neo4jOutType.from(_)) + .withCoder(CoderMaterializer.beam(self.context, coder)) + ) - implicit def neo4jSCollectionReadOps[T](sColl: SCollection[T]): Neo4jSCollectionReadOps[T] = - new Neo4jSCollectionReadOps[T](sColl) + /** + * Save this SCollection as a Neo4J database. + * + * @param neo4jOptions + * options for configuring a Neo4J driver + * @param unwindCypher + * Neo4J cypher query representing an + * [[https://neo4j.com/docs/cypher-manual/current/clauses/unwind/#unwind-creating-nodes-from-a-list-parameter UNWIND parameter]] + * cypher statement + * @param batchSize + * batch size when executing the unwind cypher query. Default batch size of 5000 + */ + def saveAsNeo4j( + neo4jOptions: Neo4jOptions, + unwindCypher: String, + batchSize: Long = WriteParam.BeamDefaultBatchSize + )(implicit neo4jType: ValueType[T], coder: Coder[T]): ClosedTap[Nothing] = + self.write(Neo4jIO[T](neo4jOptions, unwindCypher))(WriteParam(batchSize)) +} +trait SCollectionSyntax { + implicit def neo4jSCollectionOps[T](sc: SCollection[T]): Neo4jSCollectionOps[T] = + new Neo4jSCollectionOps(sc) } From 628472369463d37f8caf7cc7fd57d1d4468756eb Mon Sep 17 00:00:00 2001 From: Michel Davit Date: Fri, 24 Feb 2023 15:15:55 +0100 Subject: [PATCH 3/5] Simplify tests --- .../src/it/scala/com/spotify/scio/neo4j/Neo4jIOIT.scala | 5 ----- 1 file changed, 5 deletions(-) diff --git a/scio-neo4j/src/it/scala/com/spotify/scio/neo4j/Neo4jIOIT.scala b/scio-neo4j/src/it/scala/com/spotify/scio/neo4j/Neo4jIOIT.scala index eb72afdbf9..efbfe8c773 100644 --- a/scio-neo4j/src/it/scala/com/spotify/scio/neo4j/Neo4jIOIT.scala +++ b/scio-neo4j/src/it/scala/com/spotify/scio/neo4j/Neo4jIOIT.scala @@ -76,8 +76,6 @@ class Neo4jIOIT extends PipelineSpec with Eventually with ForAllTestContainer { val americanPresident = Movie("American President", 1995) val options = PipelineOptionsFactory.create() - options.setRunner(classOf[DirectRunner]) - lazy val neo4jOptions = Neo4jOptions( Neo4jConnectionOptions(container.boltUrl, container.username, container.password) ) @@ -101,9 +99,6 @@ class Neo4jIOIT extends PipelineSpec with Eventually with ForAllTestContainer { } it should "read cypher query from the graph database with parameter" in { - val options = PipelineOptionsFactory.create() - options.setRunner(classOf[DirectRunner]) - val queryParams = Seq( MovieParam(1994), MovieParam(0), From 8900688394530d173864416136dca238cf56a2ac Mon Sep 17 00:00:00 2001 From: Branden Smith Date: Fri, 24 Feb 2023 14:39:31 +0000 Subject: [PATCH 4/5] revise scaladoc to be consistent with refactor --- .../scio/neo4j/syntax/SCollectionSyntax.scala | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/scio-neo4j/src/main/scala/com/spotify/scio/neo4j/syntax/SCollectionSyntax.scala b/scio-neo4j/src/main/scala/com/spotify/scio/neo4j/syntax/SCollectionSyntax.scala index af61237b6f..6153f80833 100644 --- a/scio-neo4j/src/main/scala/com/spotify/scio/neo4j/syntax/SCollectionSyntax.scala +++ b/scio-neo4j/src/main/scala/com/spotify/scio/neo4j/syntax/SCollectionSyntax.scala @@ -35,17 +35,19 @@ final class Neo4jSCollectionOps[T](private val self: SCollection[T]) extends Any * each query invocation will be added to the resulting [[SCollection]] as if by a `flatMap` * transformation (where the Neo4j-query-execution returns an `Iterable`). * - * This operation parameterizes each query invocation by applying a supplied function to each - * [[SCollection]] element before executing the Cypher query. The function must produce a [[Map]] - * of [[String]] to [[AnyRef]], where the keys correspond to named parameters in the provided - * Cypher query and the corresponding values correspond to the intended value of that parameter - * for a given query invocation. Named parameters must consist of letters and numbers, prepended - * with a `$`, as described in the Neo4j - * [[https://neo4j.com/docs/cypher-manual/current/syntax/parameters Cypher Manual (Syntax / Parameters)]]. + * This operation parameterizes each query invocation by first transforming each [[SCollection]] + * element via [[magnolify.neo4j.ValueType.to]]. Parameter names in the provided Cypher query + * [[String]] must match the names of keys in the [[org.neo4j.driver.Value]] generated by the + * (implicit) [[ValueType]] for the input [[SCollection]] type. (If the input type is a case + * class, that means that parameter names in the query should correspond to the fields defined in + * the case class.) * * @see * ''Reading from Neo4j'' in the * [[https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/neo4j/Neo4jIO.html Beam `Neo4jIO` documentation]] + * @see + * syntax for Cypher query parameters defined in + * [[https://neo4j.com/docs/cypher-manual/current/syntax/parameters Cypher Manual (Syntax / Parameters)]] * @param neo4jConf * options for configuring a Neo4J driver * @param cypher From 0f64847de966334bb2f6542a6db22caa5ee61448 Mon Sep 17 00:00:00 2001 From: Michel Davit Date: Fri, 24 Feb 2023 17:44:17 +0100 Subject: [PATCH 5/5] Remove useless interpolation --- .../src/it/scala/com/spotify/scio/neo4j/Neo4jIOIT.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/scio-neo4j/src/it/scala/com/spotify/scio/neo4j/Neo4jIOIT.scala b/scio-neo4j/src/it/scala/com/spotify/scio/neo4j/Neo4jIOIT.scala index efbfe8c773..196e1dfbe5 100644 --- a/scio-neo4j/src/it/scala/com/spotify/scio/neo4j/Neo4jIOIT.scala +++ b/scio-neo4j/src/it/scala/com/spotify/scio/neo4j/Neo4jIOIT.scala @@ -106,10 +106,10 @@ class Neo4jIOIT extends PipelineSpec with Eventually with ForAllTestContainer { ) val queryRoles = - s"""MATCH (p)-[r: ACTED_IN]->(m) - |WHERE m.year = $$year - |RETURN p as person, m as movie, r.role as role - |""".stripMargin + """MATCH (p)-[r: ACTED_IN]->(m) + |WHERE m.year = $year + |RETURN p as person, m as movie, r.role as role + |""".stripMargin val expectedRoles = Seq( Role(martin, americanPresident, "A.J. MacInerney"),