Skip to content

Commit

Permalink
Migrate to Java 17
Browse files Browse the repository at this point in the history
  • Loading branch information
pbernet committed May 22, 2024
1 parent 7d1a6de commit ea7e678
Show file tree
Hide file tree
Showing 9 changed files with 50 additions and 164 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ jobs:
steps:
- name: Checkout
uses: actions/checkout@v4
# https://github.com/coursier/setup-action
- name: Set up JDK 11
# https://github.com/coursier/setup-action
- name: Set up JDK 17
uses: coursier/setup-action@v1
with:
jvm: adopt:11
jvm: adopt:17
apps: sbtn
- name: Build and Test
run: sbt -v +test
4 changes: 1 addition & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ val artemisVersion = "2.33.0"
val testContainersVersion = "1.19.8"
val keycloakVersion = "24.0.4"
val sttpVersion = "3.9.0"
val influxdbVersion = "6.10.0"
val influxdbVersion = "7.1.0"
val awsClientVersion = "2.25.32"

libraryDependencies ++= Seq(
Expand Down Expand Up @@ -86,8 +86,6 @@ libraryDependencies ++= Seq(
"software.amazon.awssdk" % "sqs" % awsClientVersion,


// Migrated to pekko, but new client 7.0.0 only supports Java 17 (not Java 11)
// https://github.com/influxdata/influxdb-client-java/blob/master/CHANGELOG.md
"com.influxdb" %% "influxdb-client-scala" % influxdbVersion,
"com.influxdb" % "flux-dsl" % influxdbVersion,
"org.influxdb" % "influxdb-java" % "2.23",
Expand Down
7 changes: 4 additions & 3 deletions src/main/java/actor/DemoMessagesActor.java
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package actor;

import akka.Done;
import akka.actor.*;
import akka.pattern.Patterns;

import org.apache.pekko.Done;
import org.apache.pekko.actor.*;
import org.apache.pekko.pattern.Patterns;

import java.time.Duration;
import java.util.concurrent.CompletionStage;
Expand Down
4 changes: 2 additions & 2 deletions src/main/scala/akkahttp/ReverseProxy.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import org.apache.pekko.http.scaladsl.server.Directives.*
import org.apache.pekko.http.scaladsl.server.Route
import org.apache.pekko.http.scaladsl.settings.ServerSettings
import org.apache.pekko.http.scaladsl.{Http, HttpExt}
import org.apache.pekko.pattern.CircuitBreaker
import org.apache.pekko.pattern.{CircuitBreaker, CircuitBreakerOpenException}
import org.apache.pekko.stream.ThrottleMode
import org.apache.pekko.stream.scaladsl.{Sink, Source}
import org.slf4j.{Logger, LoggerFactory}
Expand Down Expand Up @@ -168,7 +168,7 @@ object ReverseProxy extends App {
val proxyReq = request.withUri(uri(target)).withHeaders(headers(target))
circuitBreaker.withCircuitBreaker(http.singleRequest(proxyReq))
}.recover {
case _: akka.pattern.CircuitBreakerOpenException => BadGateway(id, "Circuit breaker opened")
case _: CircuitBreakerOpenException => BadGateway(id, "Circuit breaker opened")
case _: TimeoutException => GatewayTimeout(id)
case e => BadGateway(id, e.getMessage)
}
Expand Down
37 changes: 18 additions & 19 deletions src/main/scala/alpakka/influxdb/InfluxdbReader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,14 @@ import com.influxdb.query.FluxTable
import com.influxdb.query.dsl.Flux
import com.influxdb.query.dsl.functions.restriction.Restrictions
import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.stream.Supervision
import org.apache.pekko.stream.scaladsl.Sink
import org.apache.pekko.stream.{ActorAttributes, Supervision}
import org.slf4j.{Logger, LoggerFactory}

import java.time.temporal.ChronoUnit
import java.util
import scala.concurrent.ExecutionContextExecutor
import scala.concurrent.duration.DurationInt
import scala.concurrent.{Await, ExecutionContextExecutor}
import scala.util.control.NonFatal

/**
Expand Down Expand Up @@ -54,25 +55,23 @@ class InfluxdbReader(baseURL: String, token: String, org: String = "testorg", bu
|> range(start: -interval)
"""

// TODO Activate, when "com.influxdb" %% "influxdb-client-scala" is available for pekko
// def source() = influxdbClientScala
// .getQueryScalaApi()
// .query(query)
def source() = influxdbClientScala
.getQueryScalaApi()
.query(query)

// TODO Activate, when "com.influxdb" %% "influxdb-client-scala" is available for pekko
def getQuerySync(mem: String) = {
// logger.info(s"Query raw for measurements of type: $mem")
// val result = source()
// .filter(fluxRecord => fluxRecord.getMeasurement().equals(mem) )
// .wireTap(fluxRecord => {
// val measurement = fluxRecord.getMeasurement()
// val value = fluxRecord.getValue()
// logger.debug(s"About to process measurement: $measurement with value: $value")
// })
// .withAttributes(ActorAttributes.supervisionStrategy(deciderFlow))
// .runWith(Sink.seq)
//
// Await.result(result, 10.seconds)
logger.info(s"Query raw for measurements of type: $mem")
val result = source()
.filter(fluxRecord => fluxRecord.getMeasurement().equals(mem))
.wireTap(fluxRecord => {
val measurement = fluxRecord.getMeasurement()
val value = fluxRecord.getValue()
logger.debug(s"About to process measurement: $measurement with value: $value")
})
.withAttributes(ActorAttributes.supervisionStrategy(deciderFlow))
.runWith(Sink.seq)

Await.result(result, 10.seconds)
}

def fluxQueryCount(mem: String): Long = {
Expand Down
27 changes: 13 additions & 14 deletions src/main/scala/sample/stream/TcpEcho.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,42 +14,39 @@ import scala.sys.process.*
import scala.util.{Failure, Success}

/**
* Inspired by:
* https://doc.akka.io/docs/akka/current/stream/stream-io.html?language=scala
*
* Use without parameters to start server and 100 parallel clients.
* TCP echo client server round trip
* Use without parameters to start server and 100 parallel clients
*
* Use parameters `server 127.0.0.1 6000` to start server listening on port 6000
*
* Use parameters `client 127.0.0.1 6000` to start one client connecting to
* server on 127.0.0.1:6000
* Use parameters `client 127.0.0.1 6000` to start one client
*
* Run cmd line client:
* echo -n "Hello World" | nc 127.0.0.1 6000
*
* Doc:
* https://pekko.apache.org/docs/pekko/current/stream/stream-io.html?language=scala
*/
object TcpEcho extends App {
val logger: Logger = LoggerFactory.getLogger(this.getClass)
val systemServer = ActorSystem("TcpEchoServer")
val systemClient = ActorSystem("TcpEchoClient")

var serverBinding: Future[Tcp.ServerBinding] = _

if (args.isEmpty) {
val (host, port) = ("127.0.0.1", 6000)
serverBinding = server(systemServer, host, port)
server(systemServer, host, port)

// Issue: https://github.com/akka/akka/issues/29842
checkResources()
// Issue:
// https://github.com/akka/akka/issues/29842

val maxClients = 100
(1 to maxClients).par.foreach(each => client(each, systemClient, host, port))
} else {
val (host, port) =
if (args.length == 3) (args(1), args(2).toInt)
else ("127.0.0.1", 6000)
if (args(0) == "server") {
serverBinding = server(systemServer, host, port)
server(systemServer, host, port)
} else if (args(0) == "client") {
client(1, systemClient, host, port)
}
Expand Down Expand Up @@ -105,12 +102,14 @@ object TcpEcho extends App {

// We want "halfClose behavior" on the client side. Doc:
// https://github.com/akka/akka/issues/22163
val connection: Flow[ByteString, ByteString, Future[Tcp.OutgoingConnection]] = Tcp().outgoingConnection(remoteAddress = InetSocketAddress.createUnresolved(host, port), halfClose = true)
val connection: Flow[ByteString, ByteString, Future[Tcp.OutgoingConnection]] =
Tcp().outgoingConnection(remoteAddress = InetSocketAddress.createUnresolved(host, port), halfClose = true)
val testInput = ('a' to 'z').map(ByteString(_)) ++ Seq(ByteString("BYE"))
logger.info(s"Client: $id sending: ${testInput.length} bytes")

val restartSettings = RestartSettings(1.second, 10.seconds, 0.2).withMaxRestarts(10, 1.minute)
val restartSource = RestartSource.onFailuresWithBackoff(restartSettings) { () => Source(testInput).via(connection) }
val closed = restartSource.runForeach(each => logger.info(s"Client: $id received echo: ${each.utf8String}"))
val closed = restartSource.runForeach(each => logger.info(s"Client: $id received: ${each.utf8String}"))
closed.onComplete(each => logger.info(s"Client: $id closed: $each"))
}

Expand Down
104 changes: 0 additions & 104 deletions src/main/scala/sample/stream/TcpEchoJava.java

This file was deleted.

22 changes: 8 additions & 14 deletions src/test/scala/alpakka/clickhousedb/ClickhousedbIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -171,21 +171,15 @@ protected DataSource getDataSource(JdbcDatabaseContainer<?> container) {
}

protected void createTable() throws SQLException {
// Since we want to be Java 11 source compatible
// Doc: https://clickhouse.com/docs/en/engines/table-engines
String newLine = System.lineSeparator();
String createStatementTextBlock =
"CREATE TABLE test.my_table"
+ newLine
+ "("
+ newLine
+ "`myfloat_nullable` Nullable(Float32),"
+ newLine
+ "`mystr` String,"
+ newLine
+ "`myint_id` Int32"
+ newLine
+ ") ENGINE = Log";
String createStatementTextBlock = """
CREATE TABLE test.my_table
(
`myfloat_nullable` Nullable(Float32),
`mystr` String,
`myint_id` Int32
) ENGINE = Log
""";

LOGGER.info(createStatementTextBlock);

Expand Down
3 changes: 1 addition & 2 deletions src/test/scala/alpakka/influxdb/InfluxdbIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,7 @@ void testWriteAndRead() {
.collect(Collectors.toList());
assertThat(CompletableFuture.allOf(futList.toArray(new CompletableFuture[futList.size()]))).succeedsWithin(5 * maxClients, TimeUnit.SECONDS);

// TODO Activate, when "com.influxdb" %% "influxdb-client-scala" is available for pekko
//assertThat(influxDBReader.getQuerySync("testMem").length()).isEqualTo(nPoints * maxClients);
assertThat(influxDBReader.getQuerySync("testMem").length()).isEqualTo(nPoints * maxClients);
assertThat(influxDBReader.fluxQueryCount("testMem")).isEqualTo(nPoints * maxClients);
assertThat(new LogFileScanner("logs/application.log").run(1, 2, searchAfterPattern, "ERROR").length()).isZero();
}
Expand Down

0 comments on commit ea7e678

Please sign in to comment.