Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added new JdbcIO read/write params to Scio #4820

Merged
merged 20 commits into from
Jun 6, 2023
Merged

Conversation

shnapz
Copy link
Contributor

@shnapz shnapz commented May 24, 2023

Implementing #4751

@shnapz shnapz requested a review from RustedBones May 24, 2023 18:04
@shnapz shnapz marked this pull request as draft May 24, 2023 18:05
@codecov
Copy link

codecov bot commented May 24, 2023

Codecov Report

Merging #4820 (be06a30) into main (924282d) will increase coverage by 0.10%.
The diff coverage is 75.78%.

❗ Current head be06a30 differs from pull request most recent head e583a85. Consider uploading reports for the commit e583a85 to get more accurate results

@@            Coverage Diff             @@
##             main    #4820      +/-   ##
==========================================
+ Coverage   62.42%   62.53%   +0.10%     
==========================================
  Files         280      281       +1     
  Lines       10406    10431      +25     
  Branches      773      781       +8     
==========================================
+ Hits         6496     6523      +27     
+ Misses       3910     3908       -2     
Impacted Files Coverage Δ
...fy/scio/coders/instances/kryo/AvroSerializer.scala 100.00% <ø> (ø)
...c/main/scala/com/spotify/scio/io/FileStorage.scala 97.72% <ø> (-0.06%) ⬇️
...main/scala/com/spotify/scio/bigquery/package.scala 100.00% <ø> (ø)
...scala/com/spotify/scio/datastore/DatastoreIO.scala 9.09% <0.00%> (-7.58%) ⬇️
...m/spotify/scio/jdbc/syntax/SCollectionSyntax.scala 33.33% <25.00%> (-66.67%) ⬇️
...m/spotify/scio/jdbc/syntax/ScioContextSyntax.scala 80.00% <66.66%> (-20.00%) ⬇️
.../src/main/scala/com/spotify/scio/jdbc/JdbcIO.scala 72.72% <75.00%> (+45.22%) ⬆️
...cala/com/spotify/scio/coders/KryoAtomicCoder.scala 69.60% <100.00%> (-0.72%) ⬇️
...otify/scio/coders/LowPriorityCoderDerivation.scala 97.43% <100.00%> (ø)
...com/spotify/scio/coders/instances/AvroCoders.scala 84.61% <100.00%> (+1.75%) ⬆️
... and 9 more

... and 2 files with indirect coverage changes

Comment on lines 36 to 39
case readOpts: JdbcReadOptions[_] =>
jdbcIoId(readOpts.connectionOptions, readOpts.query)
case writeOpts: JdbcWriteOptions[_] =>
jdbcIoId(writeOpts.connectionOptions, writeOpts.statement)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nicer like this!

outputParallelization: Boolean = JdbcIoOptions.DefaultOutputParallelization
outputParallelization: Boolean = JdbcIoOptions.DefaultOutputParallelization,
dataSourceProviderFn: () => DataSource = null,
configOverride: Read[T] => Read[T] = identity
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we agree to apply this convention on all IOs for 0.13 ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@RustedBones it would be good! We need to be consistent across all APIs

@shnapz shnapz marked this pull request as ready for review May 30, 2023 01:28
Comment on lines 53 to 57
def getWriteOptions(opts: CloudSqlOptions): JdbcWriteOptions[String] =
JdbcWriteOptions[String](
connectionOptions = getConnectionOptions(opts),
statement = "INSERT INTO <this> VALUES( ?, ? ..?)"
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks this s unused so far

var expectedTransform: BJdbcIO.Read[String] = null
sc.jdbcSelect[String](
getDefaultReadOptions(opts).copy(configOverride = r => {
expectedTransform = r.withQuery("overridden query")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we get back the query instead on memorizing the transform in a var ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, we can. This was the simplest code, otherwise we would need to match transform by type. Which is not difficult either :)

@shnapz shnapz added this to the 0.13.0 milestone May 30, 2023
@shnapz shnapz self-assigned this May 30, 2023
Comment on lines 60 to 61
namespace: String = null,
configOverride: beam.DatastoreV1.Read => beam.DatastoreV1.Read = null
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We usually defined the default parameters in the companion object

namespace: String = ReadParam.DefaultNamespace

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you think we need null as a constant? I think nulls are different from other optional parameters. It's just simpler without constants if they are null

def datastore(
projectId: String,
query: Query,
namespace: String = null,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and reuse it there

namespace: String = DatastoreIO.ReadParam.DefaultNamespace

@@ -66,7 +64,7 @@ object JdbcIO {
final case class JdbcSelect[T: Coder](readOptions: JdbcReadOptions[T]) extends JdbcIO[T] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO we've not defined the constructor for JdbcIOs properly. Here we should only have data that allows to identify the target destination (required to distinguish the mocked IO basically), so the connection option and the query.

All the other param should be passed as ReadP or WriteP

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I also noted that it stands out from other IOs. Will change it

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's do in another PR. I think we have to check some other IOs too (I recall CsvIO has the same issue)

outputParallelization: Boolean = JdbcIoOptions.DefaultOutputParallelization
outputParallelization: Boolean = JdbcIoOptions.DefaultOutputParallelization,
dataSourceProviderFn: () => DataSource = null,
configOverride: Read[T] => Read[T] = null
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't we pass identity instead ?

@shnapz shnapz requested a review from RustedBones June 1, 2023 15:50
@@ -104,38 +144,42 @@ final case class JdbcSelect[T: Coder](readOptions: JdbcReadOptions[T]) extends J
EmptyTap
}

final case class JdbcWrite[T](writeOptions: JdbcWriteOptions[T]) extends JdbcIO[T] {
final case class JdbcWrite[T](opts: JdbcConnectionOptions, statement: String) extends JdbcIO[T] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can think of merging the read/write into a single IO that now they have the same signature. Probably for a next step

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would be a good thing


/** Enhanced version of [[com.spotify.scio.values.SCollection SCollection]] with JDBC methods. */
final class JdbcSCollectionOps[T](private val self: SCollection[T]) extends AnyVal {

/** Save this SCollection as a JDBC database. */
@deprecated("Use another overload with multiple parameters")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

put the since="0.13.0" param

def saveAsJdbc(
connectionOptions: JdbcConnectionOptions,
statement: String,
preparedStatementSetter: (T, PreparedStatement) => Unit,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd curry this one as this is the element operation (like foreach)

def jdbcSelect[T: ClassTag: Coder](
connectionOptions: JdbcConnectionOptions,
query: String,
rowMapper: ResultSet => T,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd curry this one as this is a map element operation

Copy link
Contributor

@RustedBones RustedBones left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit for the example code

shnapz and others added 3 commits June 5, 2023 12:07
…loudSqlExample.scala

Co-authored-by: Michel Davit <micheld@spotify.com>
…loudSqlExample.scala

Co-authored-by: Michel Davit <micheld@spotify.com>
@RustedBones RustedBones merged commit eee089e into main Jun 6, 2023
@RustedBones RustedBones deleted the akabas/jdbcio-extension branch June 6, 2023 14:54
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants