diff --git a/.github/dependabot.yml b/.github/dependabot.yml index 66ae07e78b..660970b64a 100644 --- a/.github/dependabot.yml +++ b/.github/dependabot.yml @@ -8,8 +8,6 @@ updates: open-pull-requests-limit: 10 target-branch: dev reviewers: - - ckittl - - johanneshiry - t-ober - sensarmad - sebastian-peter diff --git a/CHANGELOG.md b/CHANGELOG.md index 2025fc6e65..1410ac9535 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,22 +8,51 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added - Implement SQL source for primary data [#34](https://github.com/ie3-institute/simona/issues/34), [#101](https://github.com/ie3-institute/simona/issues/101) +- Relevant scientific papers have been added to the documentation [#139](https://github.com/ie3-institute/simona/issues/139) +- Add troubleshooting section to Users guide [#160](https://github.com/ie3-institute/simona/issues/160) +- Added Kafka sink for results [#24](https://github.com/ie3-institute/simona/issues/24) ### Changed - Re-organizing test resources into their respective packages [#105](https://github.com/ie3-institute/simona/issues/105) -- BREAKING: Using snapshot version of PSDM +- BREAKING: Using snapshot version of PSDM and PSU - Simplified PrimaryServiceProxy due to changes in PSDM [#120](https://github.com/ie3-institute/simona/issues/120) - Improved handling of weights and their sum in determination of weather data [#173](https://github.com/ie3-institute/simona/issues/173) - Improving code readability in EvcsAgent by moving FreeLotsRequest to separate methods [#19](https://github.com/ie3-institute/simona/issues/19) +- Ignore dependabot snapshot dependencies [#27](https://github.com/ie3-institute/simona/issues/27) - Sending termination message to external simulation on expected and unexpected shutdowns of SIMONA [#35](https://github.com/ie3-institute/simona/issues/35) +- Change transformer calculation since changes in PSDM [#99](https://github.com/ie3-institute/simona/issues/99) +- Adapt to changed PvInputModel of PSDM (elevationAngle) [#100](https://github.com/ie3-institute/simona/issues/100) +- Consolidate csv parameterization in config [#149](https://github.com/ie3-institute/simona/issues/149) +- Change weather scheme to COSMO [PR#154](https://github.com/ie3-institute/simona/pull/154) +- Adapt documentation to changed simonaAPI [#191](https://github.com/ie3-institute/simona/issues/191) +- Implementing a new plugin framework for external simulations [#195](https://github.com/ie3-institute/simona/issues/195) - Improved implementation of `RefSystemParser` [#212](https://github.com/ie3-institute/simona/issues/212) -- Removed Gradle task puml2png (Converting Plantuml to png / svg files) since it is no longer needed [#230](https://github.com/ie3-institute/simona/issues/230) -- Harmonized configuration of csv parameters [#149](https://github.com/ie3-institute/simona/issues/149) +- Include missing images into Documentation [#151](https://github.com/ie3-institute/simona/issues/151) +- Changing the export methode for diagrams [#156](https://github.com/ie3-institute/simona/issues/156) +- Change references implementation in Documentation to bibtex [#174](https://github.com/ie3-institute/simona/issues/174) +- Update Model descriptions (Documentation) [#122](https://github.com/ie3-institute/simona/issues/122) +- Changes of Getting Started Section (Documentation) [#124](https://github.com/ie3-institute/simona/issues/124) +- Update gradle [#176](https://github.com/ie3-institute/simona/issues/176) +- Setting java version to 17 [#58](https://github.com/ie3-institute/simona/issues/58) +- Made SimonaConfig.BaseRuntimeConfig serializable [#36](https://github.com/ie3-institute/simona/issues/36) +- Adapt to new simonaAPI snapshot [#95](https://github.com/ie3-institute/simona/issues/95) +- Update Sphinx to 4.5.0 as well as extensions [#214](https://github.com/ie3-institute/simona/issues/214) ### Fixed -- Location of `vn_simona` test grid (was partially in Berlin and Dortmund) -- Let `ParticipantAgent` die after failed registration with secondary services (prevents stuck simulation) +- Location of `vn_simona` test grid (was partially in Berlin and Dortmund) [#72](https://github.com/ie3-institute/simona/issues/72) +- Let `ParticipantAgent` die after failed registration with secondary services (prevents stuck simulation) [#76](https://github.com/ie3-institute/simona/issues/76) - Fix default resolution of weather source wrapper [#78](https://github.com/ie3-institute/simona/issues/78) -- Respect for voltage angle in DBFS slack voltage exchange protocol [#61](https://github.com/ie3-institute/simona/issues/61) +- Fix invalid thread allocation in GridAgent [#111](https://github.com/ie3-institute/simona/issues/111) +- Fixed config auto-generation [#130](https://github.com/ie3-institute/simona/issues/130) +- Fixed genConfigSample gradle task[#148](https://github.com/ie3-institute/simona/issues/148) +- Fixed some unreachable code [#167](https://github.com/ie3-institute/simona/issues/167) +- Fix treatment of non-InitializeTrigger triggers in initialization within SimScheduler [#237](https://github.com/ie3-institute/simona/issues/237) +- Fix breaking SIMONA caused by introducing temperature dependant load profiles in PSDM [#255](https://github.com/ie3-institute/simona/issues/255) +- Respect for voltage angle in DBFS slack voltage exchange protocol [#69](https://github.com/ie3-institute/simona/issues/69) + +### Removed +- Remove workaround for tscfg tmp directory [#178](https://github.com/ie3-institute/simona/issues/178) +- Removed Gradle task puml2png (Converting Plantuml to png / svg files) since it is no longer needed [#228](https://github.com/ie3-institute/simona/issues/228) +- Remove RocketChat notification from Jenkinsfile [#234](https://github.com/ie3-institute/simona/issues/234) [Unreleased]: https://github.com/ie3-institute/simona/compare/a14a093239f58fca9b2b974712686b33e5e5f939...HEAD diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000000..0de30978b8 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,25 @@ +FROM openjdk:17-slim + +# USAGE: +# build with ARG version and if applicable with ARG snapshot suffix + # e.g.: docker build --build-arg version=2.1.0 --build-arg snapshotSuffix=-SNAPSHOT -t simona . +# run by mounting directory + # e.g. docker run -v `realpath input`:/input --rm simona + # note: this does not work for windows so you have to enter the absolute path manually and escape the \'s + +ARG version +# snapshot suffix for jar files is "-SNAPSHOT" +ARG snapshotSuffix="" + +ENV jarFile="simona-${version}${snapshotSuffix}-all.jar" +ENV config="" + +RUN mkdir exec \ + && mkdir input + +# copy simona fat jar into container +COPY build/libs/$jarFile /exec/ +# inputData is mounted upon running +VOLUME /input + +ENTRYPOINT ["sh", "-c", "java -jar exec/${jarFile} --config=${config}"] \ No newline at end of file diff --git a/README.md b/README.md index cb2233bbdb..a61d0d3e29 100644 --- a/README.md +++ b/README.md @@ -34,6 +34,7 @@ CONTRIBUTING.md file in the root directory of this repository. For all SIMONA related questions please feel free to contact people involved in the development and maintenance of SIMONA. For the moment, these people are: +- Feismann, Daniel - [daniel.feismann@tu-dortmund.de](mailto:daniel.feismann@tu-dortmund.de) - Peter, Sebastian - [sebastian.peter@tu-dortmund.de](mailto:sebastian.peter@tu-dortmund.de) - Oberließen, Thomas - [thomas.oberliessen@tu-dortmund.de](mailto:thomas.oberliessen@tu-dortmund.de) - Sen Sarma, Debopama - [debopama-sen.sarma@tu-dortmund.de](mailto:debopama-sen.sarma@tu-dortmund.de) diff --git a/build.gradle b/build.gradle index 35d0895633..f4795529e8 100644 --- a/build.gradle +++ b/build.gradle @@ -7,13 +7,13 @@ plugins { id 'signing' id 'maven-publish' // publish to a maven repo (local or mvn central, has to be defined) id 'pmd' // code check, working on source code - id 'com.diffplug.spotless' version '6.6.1'// code format + id 'com.diffplug.spotless' version '6.7.2'// code format id 'com.github.onslip.gradle-one-jar' version '1.0.6' // pack a self contained jar id "com.github.ben-manes.versions" version '0.42.0' id "de.undercouch.download" version "5.1.0" // downloads plugin id "kr.motd.sphinx" version "2.10.1" // documentation generation id "com.github.johnrengelman.shadow" version "7.1.2" // fat jar - id "org.sonarqube" version "3.3" // sonarqube + id "org.sonarqube" version "3.4.0.2513" // sonarqube id "org.scoverage" version "7.0.0" // scala code coverage scoverage id "com.github.maiflai.scalatest" version "0.32" // run scalatest without specific spec task id 'org.hidetake.ssh' version '2.10.1' @@ -30,7 +30,7 @@ ext { tscfgVersion = '0.9.998' scapegoatVersion = '1.4.14' - testContainerVersion = '0.40.7' + testContainerVersion = '0.40.8' scriptsLocation = 'gradle' + File.separator + 'scripts' + File.separator // location of script plugins } @@ -60,6 +60,7 @@ repositories { mavenCentral() // searches in Sonatype's central repository maven { url 'https://www.jitpack.io' } // allows github repos as dependencies maven { url 'https://oss.sonatype.org/content/repositories/snapshots' } // sonatype snapshot repo + maven { url "https://packages.confluent.io/maven" } // confluent repo (kafka) } dependencies { @@ -70,7 +71,7 @@ dependencies { } // ie³ internal repository - implementation('com.github.ie3-institute:PowerSystemUtils:1.6') { + implementation('com.github.ie3-institute:PowerSystemUtils:2.0-SNAPSHOT') { exclude group: 'org.apache.logging.log4j' exclude group: 'org.slf4j' /* Exclude our own nested dependencies */ @@ -97,13 +98,13 @@ dependencies { } /* logging */ - implementation "com.typesafe.scala-logging:scala-logging_${scalaVersion}:3.9.4" // akka scala logging + implementation "com.typesafe.scala-logging:scala-logging_${scalaVersion}:3.9.5" // akka scala logging implementation "ch.qos.logback:logback-classic:1.2.11" /* testing */ testImplementation 'org.spockframework:spock-core:2.1-groovy-3.0' testImplementation 'org.scalatestplus:mockito-3-4_2.13:3.2.10.0' - implementation 'org.mockito:mockito-core:4.5.1' // mocking framework + implementation 'org.mockito:mockito-core:4.6.1' // mocking framework testImplementation "org.scalatest:scalatest_${scalaVersion}:3.2.12" testRuntimeOnly 'com.vladsch.flexmark:flexmark-all:0.64.0' //scalatest html output testImplementation group: 'org.pegdown', name: 'pegdown', version: '1.6.0' @@ -112,6 +113,7 @@ dependencies { // testcontainers testImplementation "com.dimafeng:testcontainers-scala-scalatest_${scalaVersion}:${testContainerVersion}" testImplementation "com.dimafeng:testcontainers-scala-postgresql_${scalaVersion}:${testContainerVersion}" + testImplementation "com.dimafeng:testcontainers-scala-kafka_${scalaVersion}:${testContainerVersion}" /* --- Scala libs --- */ /* CORE Scala */ @@ -141,6 +143,11 @@ dependencies { implementation "com.sksamuel.scapegoat:scalac-scapegoat-plugin_${scalaBinaryVersion}:${scapegoatVersion}" scalaCompilerPlugin "com.sksamuel.scapegoat:scalac-scapegoat-plugin_${scalaBinaryVersion}:${scapegoatVersion}" + /* Kafka */ + implementation group: 'org.apache.kafka', name: 'kafka-clients', version: '3.2.0' + implementation 'io.confluent:kafka-streams-avro-serde:7.1.1' + implementation "com.sksamuel.avro4s:avro4s-core_${scalaVersion}:4.1.0" + implementation 'org.apache.commons:commons-math3:3.6.1' // apache commons math3 implementation 'org.apache.poi:poi-ooxml:5.2.2' // used for FilenameUtils implementation 'javax.measure:unit-api:2.1.3' @@ -156,6 +163,14 @@ tasks.withType(JavaCompile) { options.encoding = 'UTF-8' } +jar { + manifest { + attributes( + 'Main-Class': 'edu.ie3.simona.main.RunSimonaStandalone' + ) + } +} + ////////////////////////////////////////////////////////////////////// // Build akka'able fat jar using the gradle shadow plugin // see http://www.sureshpw.com/2015/10/building-akka-bundle-with-all.html diff --git a/input/samples/vn_simona/fullGrid/load_input.csv b/input/samples/vn_simona/fullGrid/load_input.csv index 8e0e670c24..1b513a9556 100644 --- a/input/samples/vn_simona/fullGrid/load_input.csv +++ b/input/samples/vn_simona/fullGrid/load_input.csv @@ -1,4 +1,4 @@ -"uuid","cos_phi_rated","dsm","e_cons_annual","id","node","operates_from","operates_until","operator","q_characteristics","s_rated","standard_load_profile" +"uuid","cos_phi_rated","dsm","e_cons_annual","id","node","operates_from","operates_until","operator","q_characteristics","s_rated","load_profile" c2402412-97fa-4ca4-aa66-e6e04d010001,0.9700000286102295,false,4000.0,NS_NET126_L_F1_(36),ca3391eb-ca94-4945-ac72-e116f396f82c,,,,cosPhiFixed:{(0.00,1.00)},2.0618600845336914,h0 fa8ef266-5b15-4fdd-a145-71ba95e3463d,0.949999988079071,false,4000.0,NS_NET146_L_F3_(17),0f3ba59d-a9ce-4669-aa12-bebec42238b7,,,,cosPhiFixed:{(0.00,1.00)},2.3157899379730225,h0 4dd0785a-482c-47e3-bb82-e315083684d1,0.9700000286102295,false,4000.0,NS_NET116_L_S3_2(6),550ebca7-1455-44eb-9431-ffbf08e58bd4,,,,cosPhiFixed:{(0.00,1.00)},4.1237101554870605,h0 diff --git a/src/main/resources/config/config-template.conf b/src/main/resources/config/config-template.conf index 1b8b4c2284..5ec7c44844 100644 --- a/src/main/resources/config/config-template.conf +++ b/src/main/resources/config/config-template.conf @@ -70,6 +70,20 @@ PrimaryDataCsvParams { timePattern: string | "yyyy-MM-dd'T'HH:mm:ss[.S[S][S]]'Z'" # default pattern from PSDM:TimeBasedSimpleValueFactory } +#@define abstract +KafkaParams { + runId: string + bootstrapServers: string + schemaRegistryUrl: string + linger: int // in ms +} + +#@define extends KafkaParams +ResultKafkaParams { + base: KafkaParams + topicNodeRes = string +} + #@define BaseOutputConfig { notifier: string # Result event notifier @@ -201,6 +215,9 @@ simona.output.sink.influxDb1x { database = string } +#@optional +simona.output.sink.kafka = ResultKafkaParams + simona.output.grid = GridOutputConfig simona.output.participant = { defaultConfig = BaseOutputConfig diff --git a/src/main/scala/edu/ie3/simona/config/ConfigFailFast.scala b/src/main/scala/edu/ie3/simona/config/ConfigFailFast.scala index 5e132701f5..4eb817b5f5 100644 --- a/src/main/scala/edu/ie3/simona/config/ConfigFailFast.scala +++ b/src/main/scala/edu/ie3/simona/config/ConfigFailFast.scala @@ -9,14 +9,21 @@ package edu.ie3.simona.config import com.typesafe.config.{Config, ConfigException} import com.typesafe.scalalogging.LazyLogging import edu.ie3.simona.config.SimonaConfig.Simona.Output.Sink.InfluxDb1x -import edu.ie3.simona.config.SimonaConfig.{BaseOutputConfig, RefSystemConfig} +import edu.ie3.simona.config.SimonaConfig.{ + BaseOutputConfig, + RefSystemConfig, + ResultKafkaParams +} import edu.ie3.simona.exceptions.InvalidConfigParameterException import edu.ie3.simona.io.result.ResultSinkType import edu.ie3.simona.model.participant.load.{LoadModelBehaviour, LoadReference} import edu.ie3.simona.service.primary.PrimaryServiceProxy import edu.ie3.simona.service.weather.WeatherSource import edu.ie3.simona.util.CollectionUtils -import edu.ie3.simona.util.ConfigUtil.DatabaseConfigUtil.checkInfluxDb1xParams +import edu.ie3.simona.util.ConfigUtil.DatabaseConfigUtil.{ + checkInfluxDb1xParams, + checkKafkaParams +} import edu.ie3.simona.util.ConfigUtil.{CsvConfigUtil, NotifierIdentifier} import edu.ie3.util.scala.ReflectionTools import edu.ie3.util.{StringUtils, TimeUtil} @@ -138,7 +145,7 @@ case object ConfigFailFast extends LazyLogging { */ private def checkDataSink(sink: SimonaConfig.Simona.Output.Sink): Unit = { // ensures failure if new output sinks are added to enforce adaptions of the check sink method as well - val supportedSinks = Set("influxdb1x", "csv") + val supportedSinks = Set("influxdb1x", "csv", "kafka") if ( !sink.productElementNames .map(_.trim.toLowerCase) @@ -147,7 +154,7 @@ case object ConfigFailFast extends LazyLogging { ) throw new InvalidConfigParameterException( s"Newly added sink(s) " + - s"'${sink.productElementNames.toSet.removedAll(supportedSinks)}' detected! " + + s"'${sink.productElementNames.map(_.toLowerCase).toSet.removedAll(supportedSinks)}' detected! " + s"Please adapt 'ConfigFailFast' accordingly! Currently supported sinks: ${supportedSinks.mkString(", ")}." ) @@ -172,7 +179,6 @@ case object ConfigFailFast extends LazyLogging { "one sink is configured!" ) - // if this is db sink, check the connection sinkConfigs.find(_.isDefined) match { case Some(Some(influxDb1x: InfluxDb1x)) => checkInfluxDb1xParams( @@ -180,7 +186,9 @@ case object ConfigFailFast extends LazyLogging { ResultSinkType.buildInfluxDb1xUrl(influxDb1x), influxDb1x.database ) - case _ => // no db connection, do nothing + case Some(Some(kafka: ResultKafkaParams)) => + checkKafkaParams(kafka, Seq(kafka.topicNodeRes)) + case _ => // do nothing } } diff --git a/src/main/scala/edu/ie3/simona/config/SimonaConfig.scala b/src/main/scala/edu/ie3/simona/config/SimonaConfig.scala index bf0f5a11db..c4ba623a6a 100644 --- a/src/main/scala/edu/ie3/simona/config/SimonaConfig.scala +++ b/src/main/scala/edu/ie3/simona/config/SimonaConfig.scala @@ -290,6 +290,13 @@ object SimonaConfig { } + sealed abstract class KafkaParams( + val bootstrapServers: java.lang.String, + val linger: scala.Int, + val runId: java.lang.String, + val schemaRegistryUrl: java.lang.String + ) + final case class LoadRuntimeConfig( override val calculateMissingReactivePowerWithModel: scala.Boolean, override val scaling: scala.Double, @@ -554,6 +561,63 @@ object SimonaConfig { } + final case class ResultKafkaParams( + override val bootstrapServers: java.lang.String, + override val linger: scala.Int, + override val runId: java.lang.String, + override val schemaRegistryUrl: java.lang.String, + topicNodeRes: java.lang.String + ) extends KafkaParams(bootstrapServers, linger, runId, schemaRegistryUrl) + object ResultKafkaParams { + def apply( + c: com.typesafe.config.Config, + parentPath: java.lang.String, + $tsCfgValidator: $TsCfgValidator + ): SimonaConfig.ResultKafkaParams = { + SimonaConfig.ResultKafkaParams( + topicNodeRes = $_reqStr(parentPath, c, "topicNodeRes", $tsCfgValidator), + bootstrapServers = + $_reqStr(parentPath, c, "bootstrapServers", $tsCfgValidator), + linger = $_reqInt(parentPath, c, "linger", $tsCfgValidator), + runId = $_reqStr(parentPath, c, "runId", $tsCfgValidator), + schemaRegistryUrl = + $_reqStr(parentPath, c, "schemaRegistryUrl", $tsCfgValidator) + ) + } + private def $_reqInt( + parentPath: java.lang.String, + c: com.typesafe.config.Config, + path: java.lang.String, + $tsCfgValidator: $TsCfgValidator + ): scala.Int = { + if (c == null) 0 + else + try c.getInt(path) + catch { + case e: com.typesafe.config.ConfigException => + $tsCfgValidator.addBadPath(parentPath + path, e) + 0 + } + } + + private def $_reqStr( + parentPath: java.lang.String, + c: com.typesafe.config.Config, + path: java.lang.String, + $tsCfgValidator: $TsCfgValidator + ): java.lang.String = { + if (c == null) null + else + try c.getString(path) + catch { + case e: com.typesafe.config.ConfigException => + $tsCfgValidator.addBadPath(parentPath + path, e) + null + } + } + + } + final case class VoltLvlConfig( id: java.lang.String, vNom: java.lang.String @@ -1511,7 +1575,8 @@ object SimonaConfig { final case class Sink( csv: scala.Option[SimonaConfig.Simona.Output.Sink.Csv], - influxDb1x: scala.Option[SimonaConfig.Simona.Output.Sink.InfluxDb1x] + influxDb1x: scala.Option[SimonaConfig.Simona.Output.Sink.InfluxDb1x], + kafka: scala.Option[SimonaConfig.ResultKafkaParams] ) object Sink { final case class Csv( @@ -1618,6 +1683,16 @@ object SimonaConfig { $tsCfgValidator ) ) + else None, + kafka = + if (c.hasPathOrNull("kafka")) + scala.Some( + SimonaConfig.ResultKafkaParams( + c.getConfig("kafka"), + parentPath + "kafka.", + $tsCfgValidator + ) + ) else None ) } diff --git a/src/main/scala/edu/ie3/simona/event/listener/ResultEventListener.scala b/src/main/scala/edu/ie3/simona/event/listener/ResultEventListener.scala index 84feebd2e2..5f908d31c3 100644 --- a/src/main/scala/edu/ie3/simona/event/listener/ResultEventListener.scala +++ b/src/main/scala/edu/ie3/simona/event/listener/ResultEventListener.scala @@ -10,7 +10,7 @@ import akka.actor.{ActorRef, FSM, Props, Stash, Status} import akka.pattern.pipe import akka.stream.Materializer import edu.ie3.datamodel.io.processor.result.ResultEntityProcessor -import edu.ie3.datamodel.models.result.ResultEntity +import edu.ie3.datamodel.models.result.{NodeResult, ResultEntity} import edu.ie3.simona.agent.grid.GridResultsSupport.PartialTransformer3wResult import edu.ie3.simona.agent.state.AgentState import edu.ie3.simona.agent.state.AgentState.{Idle, Uninitialized} @@ -33,12 +33,7 @@ import edu.ie3.simona.exceptions.{ InitializationException, ProcessResultEventException } -import edu.ie3.simona.io.result.{ - ResultEntityCsvSink, - ResultEntityInfluxDbSink, - ResultEntitySink, - ResultSinkType -} +import edu.ie3.simona.io.result._ import edu.ie3.simona.logging.SimonaFSMActorLogging import edu.ie3.simona.sim.SimonaSim.ServiceInitComplete import edu.ie3.simona.util.ResultFileHierarchy @@ -81,13 +76,11 @@ object ResultEventListener extends Transformer3wResultSupport { ) extends ResultEventListenerData def props( - eventClassesToConsider: Set[Class[_ <: ResultEntity]], resultFileHierarchy: ResultFileHierarchy, supervisor: ActorRef ): Props = Props( new ResultEventListener( - eventClassesToConsider, resultFileHierarchy, supervisor ) @@ -97,20 +90,19 @@ object ResultEventListener extends Transformer3wResultSupport { * with the model names as strings. It generates one sink for each model * class. * - * @param eventClassesToConsider - * Incoming event classes that should be considered + * @param resultFileHierarchy + * The result file hierarchy * @return * mapping of the model class to the sink for this model class */ private def initializeSinks( - eventClassesToConsider: Set[Class[_ <: ResultEntity]], resultFileHierarchy: ResultFileHierarchy )(implicit materializer: Materializer ): Iterable[Future[(Class[_], ResultEntitySink)]] = { resultFileHierarchy.resultSinkType match { case _: ResultSinkType.Csv => - eventClassesToConsider + resultFileHierarchy.resultEntitiesToConsider .map(resultClass => { resultFileHierarchy.rawOutputDataFilePaths .get(resultClass) @@ -141,18 +133,42 @@ object ResultEventListener extends Transformer3wResultSupport { }) case ResultSinkType.InfluxDb1x(url, database, scenario) => // creates one connection per result entity that should be processed - eventClassesToConsider + resultFileHierarchy.resultEntitiesToConsider .map(resultClass => ResultEntityInfluxDbSink(url, database, scenario).map( (resultClass, _) ) ) + + case ResultSinkType.Kafka( + topicNodeRes, + runId, + bootstrapServers, + schemaRegistryUrl, + linger + ) => + val clzs: Iterable[Class[_ <: ResultEntity]] = Set( + classOf[NodeResult] // currently, only NodeResults are sent out + ) + clzs.map(clz => + Future.successful( + ( + clz, + ResultEntityKafkaSink[NodeResult]( + topicNodeRes, + runId, + bootstrapServers, + schemaRegistryUrl, + linger + ) + ) + ) + ) } } } class ResultEventListener( - eventClassesToConsider: Set[Class[_ <: ResultEntity]], resultFileHierarchy: ResultFileHierarchy, supervisor: ActorRef ) extends SimonaListener @@ -164,10 +180,17 @@ class ResultEventListener( override def preStart(): Unit = { log.debug("Starting initialization!") - log.debug( - s"Events that will be processed: {}", - eventClassesToConsider.map(_.getSimpleName).mkString(",") - ) + resultFileHierarchy.resultSinkType match { + case _: ResultSinkType.Kafka => + log.debug("NodeResults will be processed by a Kafka sink.") + case _ => + log.debug( + s"Events that will be processed: {}", + resultFileHierarchy.resultEntitiesToConsider + .map(_.getSimpleName) + .mkString(",") + ) + } self ! Init } @@ -301,7 +324,6 @@ class ResultEventListener( Future .sequence( ResultEventListener.initializeSinks( - eventClassesToConsider, resultFileHierarchy ) ) diff --git a/src/main/scala/edu/ie3/simona/io/result/ResultEntityCsvSink.scala b/src/main/scala/edu/ie3/simona/io/result/ResultEntityCsvSink.scala index 187c33c650..1d92f67bd0 100644 --- a/src/main/scala/edu/ie3/simona/io/result/ResultEntityCsvSink.scala +++ b/src/main/scala/edu/ie3/simona/io/result/ResultEntityCsvSink.scala @@ -187,7 +187,7 @@ final case class ResultEntityCsvSink private ( object ResultEntityCsvSink { - /** Default constructor to get an instance of [[ResultEntityCsvSource]] incl. + /** Default constructor to get an instance of [[ResultEntityCsvSink]] incl. * creation of the output file with the headers written * * @param outfileName @@ -202,7 +202,7 @@ object ResultEntityCsvSink { * @param materializer * the materializer to be used by the stream that writes the output file * @return - * instance of [[ResultEntityCsvSource]] to be used to write results + * instance of [[ResultEntityCsvSink]] to be used to write results */ def apply( outfileName: String, diff --git a/src/main/scala/edu/ie3/simona/io/result/ResultEntityKafkaSink.scala b/src/main/scala/edu/ie3/simona/io/result/ResultEntityKafkaSink.scala new file mode 100644 index 0000000000..b9d8f1f272 --- /dev/null +++ b/src/main/scala/edu/ie3/simona/io/result/ResultEntityKafkaSink.scala @@ -0,0 +1,102 @@ +/* + * © 2021. TU Dortmund University, + * Institute of Energy Systems, Energy Efficiency and Energy Economics, + * Research group Distribution grid planning and operation + */ + +package edu.ie3.simona.io.result + +import com.sksamuel.avro4s.RecordFormat +import edu.ie3.datamodel.models.result.{NodeResult, ResultEntity} +import edu.ie3.simona.io.result.plain.PlainResult.PlainNodeResult +import edu.ie3.simona.io.result.plain.PlainWriter.NodeResultWriter +import edu.ie3.simona.io.result.plain.{PlainResult, PlainWriter} +import edu.ie3.util.scala.io.ScalaReflectionSerde.reflectionSerializer4S +import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG +import org.apache.kafka.clients.producer.{ + KafkaProducer, + ProducerConfig, + ProducerRecord +} +import org.apache.kafka.common.serialization.{Serdes, Serializer} + +import java.util.{Properties, UUID} +import scala.jdk.CollectionConverters._ +import scala.reflect.ClassTag + +final case class ResultEntityKafkaSink[ + V <: ResultEntity, + P <: PlainResult +] private ( + producer: KafkaProducer[String, P], + plainWriter: PlainWriter[V, P], + topic: String +) extends ResultEntitySink { + + override def handleResultEntity(resultEntity: ResultEntity): Unit = { + val plainEntity = plainWriter.writePlain(resultEntity.asInstanceOf[V]) + producer.send( + new ProducerRecord[String, P](topic, plainEntity) + ) + } + + override def close(): Unit = { + producer.flush() + producer.close() + } +} + +object ResultEntityKafkaSink { + + def apply[R]( + topic: String, + simRunId: UUID, + bootstrapServers: String, + schemaRegistryUrl: String, + linger: Int + )(implicit + tag: ClassTag[R] + ): ResultEntityKafkaSink[_ <: ResultEntity, _ <: PlainResult] = { + val props = new Properties() + props.put(ProducerConfig.LINGER_MS_CONFIG, linger) + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers) + props.put( + ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, + true + ) // exactly once delivery + + val NodeResClass = classOf[NodeResult] + + tag.runtimeClass match { + case NodeResClass => + implicit val recordFormat: RecordFormat[PlainNodeResult] = + RecordFormat[PlainNodeResult] + createSink(schemaRegistryUrl, props, topic, NodeResultWriter(simRunId)) + } + } + + private def createSink[F <: ResultEntity, P <: PlainResult: RecordFormat]( + schemaRegistryUrl: String, + props: Properties, + topic: String, + writer: PlainWriter[F, P] + ): ResultEntityKafkaSink[F, P] = { + val keySerializer = Serdes.String().serializer() + val valueSerializer: Serializer[P] = reflectionSerializer4S[P] + + valueSerializer.configure( + Map(SCHEMA_REGISTRY_URL_CONFIG -> schemaRegistryUrl).asJava, + false + ) + + ResultEntityKafkaSink( + new KafkaProducer[String, P]( + props, + keySerializer, + valueSerializer + ), + writer, + topic + ) + } +} diff --git a/src/main/scala/edu/ie3/simona/io/result/ResultSinkType.scala b/src/main/scala/edu/ie3/simona/io/result/ResultSinkType.scala index 47c6e953ba..d4c444fdd7 100644 --- a/src/main/scala/edu/ie3/simona/io/result/ResultSinkType.scala +++ b/src/main/scala/edu/ie3/simona/io/result/ResultSinkType.scala @@ -8,6 +8,8 @@ package edu.ie3.simona.io.result import edu.ie3.simona.config.SimonaConfig +import java.util.UUID + /** Enumeration to describe all eligible types of * [[edu.ie3.datamodel.models.result.ResultEntity]] sink */ @@ -24,11 +26,20 @@ object ResultSinkType { final case class InfluxDb1x(url: String, database: String, scenario: String) extends ResultSinkType + final case class Kafka( + topicNodeRes: String, + runId: UUID, + bootstrapServers: String, + schemaRegistryUrl: String, + linger: Int + ) extends ResultSinkType + def apply( sinkConfig: SimonaConfig.Simona.Output.Sink, runName: String ): ResultSinkType = { - val sink: Seq[Any] = Seq(sinkConfig.csv, sinkConfig.influxDb1x).flatten + val sink: Seq[Any] = + Seq(sinkConfig.csv, sinkConfig.influxDb1x, sinkConfig.kafka).flatten if (sink.size > 1) throw new IllegalArgumentException( @@ -40,6 +51,14 @@ object ResultSinkType { Csv(params.fileFormat, params.filePrefix, params.fileSuffix) case Some(params: SimonaConfig.Simona.Output.Sink.InfluxDb1x) => InfluxDb1x(buildInfluxDb1xUrl(params), params.database, runName) + case Some(params: SimonaConfig.ResultKafkaParams) => + Kafka( + params.topicNodeRes, + UUID.fromString(params.runId), + params.bootstrapServers, + params.schemaRegistryUrl, + params.linger + ) case None => throw new IllegalArgumentException( s"No sinks defined! Cannot determine the sink type!" diff --git a/src/main/scala/edu/ie3/simona/io/result/plain/PlainResult.scala b/src/main/scala/edu/ie3/simona/io/result/plain/PlainResult.scala new file mode 100644 index 0000000000..a590441007 --- /dev/null +++ b/src/main/scala/edu/ie3/simona/io/result/plain/PlainResult.scala @@ -0,0 +1,45 @@ +/* + * © 2021. TU Dortmund University, + * Institute of Energy Systems, Energy Efficiency and Energy Economics, + * Research group Distribution grid planning and operation + */ + +package edu.ie3.simona.io.result.plain + +import java.util.UUID + +/** Results that are sent out with Kafka and avro should use this trait and + * corresponding implementing classes, since these give more control over + * attribute types and naming and they include sim run id. Plain result objects + * can be created by [[PlainWriter]]. + */ +sealed trait PlainResult + +object PlainResult { + + /** Plain result class for [[edu.ie3.datamodel.models.result.NodeResult]]. + * + * @param simRunId + * the simulation run id + * @param time + * the current time, formatted by [[PlainWriter.createSimpleTimeStamp]] + * @param uuid + * the uuid identifying this result event + * @param inputModel + * the uuid of the model that created this event + * @param vMag + * the voltage magnitude as a [[Double]] in + * [[edu.ie3.util.quantities.PowerSystemUnits#PU]] + * @param vAng + * the voltage angle as a [[Double]] in + * [[edu.ie3.util.quantities.PowerSystemUnits#DEGREE_GEOM]] + */ + final case class PlainNodeResult( + simRunId: UUID, + time: String, + uuid: UUID, + inputModel: UUID, + vMag: Double, + vAng: Double + ) extends PlainResult +} diff --git a/src/main/scala/edu/ie3/simona/io/result/plain/PlainWriter.scala b/src/main/scala/edu/ie3/simona/io/result/plain/PlainWriter.scala new file mode 100644 index 0000000000..993f43d0c9 --- /dev/null +++ b/src/main/scala/edu/ie3/simona/io/result/plain/PlainWriter.scala @@ -0,0 +1,82 @@ +/* + * © 2021. TU Dortmund University, + * Institute of Energy Systems, Energy Efficiency and Energy Economics, + * Research group Distribution grid planning and operation + */ + +package edu.ie3.simona.io.result.plain + +import edu.ie3.datamodel.models.result.{NodeResult, ResultEntity} +import edu.ie3.simona.io.result.plain.PlainResult.PlainNodeResult +import edu.ie3.util.quantities.PowerSystemUnits +import tech.units.indriya.quantity.Quantities + +import java.time.format.DateTimeFormatter +import java.time.{ZoneId, ZonedDateTime} +import java.util.UUID + +/** Converts a [[ResultEntity]] into a [[PlainResult]] and vice versa. + * @tparam F + * the type of [[ResultEntity]] + * @tparam P + * the type of [[PlainResult]] + */ +sealed trait PlainWriter[F <: ResultEntity, P <: PlainResult] { + + /** Converts a regular [[ResultEntity]] of type [[F]] into a [[PlainResult]] + * of type [[P]] + * @param full + * the [[ResultEntity]] to convert + * @return + * the resulting [[PlainResult]] + */ + def writePlain(full: F): P + + /** Converts a [[PlainResult]] of type [[P]] into a regular [[ResultEntity]] + * of type [[F]] + * @param plain + * the [[PlainResult]] to convert + * @return + * the resulting [[ResultEntity]] + */ + def createFull(plain: P): F +} + +object PlainWriter { + private lazy val timeFormatter = + DateTimeFormatter + .ofPattern("yyyy-MM-dd HH:mm:ss") + .withZone(ZoneId.of("UTC")) + + /** Converts [[NodeResult]]s into [[PlainNodeResult]]s and vice versa + * @param simRunId + * the simulation run id to use for plain results + */ + final case class NodeResultWriter(simRunId: UUID) + extends PlainWriter[NodeResult, PlainNodeResult] { + + override def writePlain(full: NodeResult): PlainNodeResult = { + PlainNodeResult( + simRunId, + createSimpleTimeStamp(full.getTime), + full.getUuid, + full.getInputModel, + full.getvMag.getValue.doubleValue(), + full.getvAng.getValue.doubleValue() + ) + } + + override def createFull(plain: PlainNodeResult): NodeResult = { + new NodeResult( + plain.uuid, + ZonedDateTime.parse(plain.time, timeFormatter), + plain.inputModel, + Quantities.getQuantity(plain.vMag, PowerSystemUnits.PU), + Quantities.getQuantity(plain.vAng, PowerSystemUnits.DEGREE_GEOM) + ) + } + } + + def createSimpleTimeStamp(dateTime: ZonedDateTime): String = + dateTime.format(timeFormatter) +} diff --git a/src/main/scala/edu/ie3/simona/model/participant/load/profile/LoadProfileKey.scala b/src/main/scala/edu/ie3/simona/model/participant/load/profile/LoadProfileKey.scala index 8f59e10e9b..cb3cb60e90 100644 --- a/src/main/scala/edu/ie3/simona/model/participant/load/profile/LoadProfileKey.scala +++ b/src/main/scala/edu/ie3/simona/model/participant/load/profile/LoadProfileKey.scala @@ -7,9 +7,8 @@ package edu.ie3.simona.model.participant.load.profile import java.time.ZonedDateTime - import edu.ie3.datamodel.exceptions.ParsingException -import edu.ie3.datamodel.models.StandardLoadProfile +import edu.ie3.datamodel.models.profile.StandardLoadProfile import edu.ie3.simona.model.participant.load import edu.ie3.simona.model.participant.load.{DayType, profile} diff --git a/src/main/scala/edu/ie3/simona/model/participant/load/profile/LoadProfileStore.scala b/src/main/scala/edu/ie3/simona/model/participant/load/profile/LoadProfileStore.scala index 7757a0a302..8a2ffd6029 100644 --- a/src/main/scala/edu/ie3/simona/model/participant/load/profile/LoadProfileStore.scala +++ b/src/main/scala/edu/ie3/simona/model/participant/load/profile/LoadProfileStore.scala @@ -12,7 +12,10 @@ import java.util import breeze.numerics.round import com.typesafe.scalalogging.LazyLogging -import edu.ie3.datamodel.models.{BdewLoadProfile, StandardLoadProfile} +import edu.ie3.datamodel.models.profile.{ + BdewStandardLoadProfile, + StandardLoadProfile +} import edu.ie3.simona.model.participant.load.profile.LoadProfileStore.{ initializeMaxConsumptionPerProfile, initializeTypeDayValues @@ -65,7 +68,7 @@ class LoadProfileStore private (val reader: Reader) { case Some(typeDayValues) => val quarterHourEnergy = typeDayValues.getQuarterHourEnergy(time) val load = loadProfile match { - case BdewLoadProfile.H0 => + case BdewStandardLoadProfile.H0 => /* For the residential average profile, a dynamization has to be taken into account */ val t = time.getDayOfYear // leap years are ignored LoadProfileStore.dynamization(quarterHourEnergy, t) @@ -205,7 +208,7 @@ object LoadProfileStore extends LazyLogging { knownLoadProfiles .flatMap(loadProfile => { (loadProfile match { - case BdewLoadProfile.H0 => + case BdewStandardLoadProfile.H0 => // max load for h0 is expected to be exclusively found in winter, // thus we only search there. DayType.values diff --git a/src/main/scala/edu/ie3/simona/model/participant/load/profile/ProfileLoadModel.scala b/src/main/scala/edu/ie3/simona/model/participant/load/profile/ProfileLoadModel.scala index 6cb73ff089..b9b207fef4 100644 --- a/src/main/scala/edu/ie3/simona/model/participant/load/profile/ProfileLoadModel.scala +++ b/src/main/scala/edu/ie3/simona/model/participant/load/profile/ProfileLoadModel.scala @@ -6,7 +6,7 @@ package edu.ie3.simona.model.participant.load.profile -import edu.ie3.datamodel.models.StandardLoadProfile +import edu.ie3.datamodel.models.profile.StandardLoadProfile import edu.ie3.datamodel.models.input.system.LoadInput import edu.ie3.simona.model.participant.CalcRelevantData.LoadRelevantData import edu.ie3.simona.model.participant.control.QControl @@ -136,13 +136,15 @@ case object ProfileLoadModel { QControl.apply(input.getqCharacteristics()), sRatedPowerScaled, input.getCosPhiRated, - input.getStandardLoadProfile, + input.getLoadProfile.asInstanceOf[StandardLoadProfile], reference ) case LoadReference.EnergyConsumption(energyConsumption) => val loadProfileMax = - LoadProfileStore().maxPower(input.getStandardLoadProfile) + LoadProfileStore().maxPower( + input.getLoadProfile.asInstanceOf[StandardLoadProfile] + ) val sRatedEnergy = LoadModel.scaleSRatedEnergy( input, energyConsumption, @@ -157,7 +159,7 @@ case object ProfileLoadModel { QControl.apply(input.getqCharacteristics()), sRatedEnergy, input.getCosPhiRated, - input.getStandardLoadProfile, + input.getLoadProfile.asInstanceOf[StandardLoadProfile], reference ) } diff --git a/src/main/scala/edu/ie3/simona/sim/setup/SimonaStandaloneSetup.scala b/src/main/scala/edu/ie3/simona/sim/setup/SimonaStandaloneSetup.scala index 688513d307..a7222211fd 100644 --- a/src/main/scala/edu/ie3/simona/sim/setup/SimonaStandaloneSetup.scala +++ b/src/main/scala/edu/ie3/simona/sim/setup/SimonaStandaloneSetup.scala @@ -248,7 +248,6 @@ class SimonaStandaloneSetup( .toVector :+ context.simonaActorOf( ResultEventListener.props( - SetupHelper.allResultEntitiesToWrite(simonaConfig.simona.output), resultFileHierarchy, simonaSim ) diff --git a/src/main/scala/edu/ie3/simona/util/ConfigUtil.scala b/src/main/scala/edu/ie3/simona/util/ConfigUtil.scala index e2a98e0e70..ceef9abbdd 100644 --- a/src/main/scala/edu/ie3/simona/util/ConfigUtil.scala +++ b/src/main/scala/edu/ie3/simona/util/ConfigUtil.scala @@ -13,7 +13,7 @@ import edu.ie3.datamodel.io.connectors.{ SqlConnector } -import java.util.UUID +import java.util.{Properties, UUID} import edu.ie3.datamodel.models.result.connector.{ LineResult, SwitchResult, @@ -25,10 +25,14 @@ import edu.ie3.simona.config.SimonaConfig import edu.ie3.simona.config.SimonaConfig._ import edu.ie3.simona.event.notifier.{Notifier, ParticipantNotifierConfig} import edu.ie3.simona.exceptions.InvalidConfigParameterException +import org.apache.kafka.clients.admin.AdminClient +import org.apache.kafka.common.KafkaException import java.io.File +import java.util.concurrent.ExecutionException import scala.collection.mutable -import scala.util.{Failure, Success, Try} +import scala.jdk.CollectionConverters._ +import scala.util.{Failure, Success, Try, Using} object ConfigUtil { @@ -457,6 +461,54 @@ object ConfigUtil { ) } } + + def checkKafkaParams( + kafkaParams: KafkaParams, + topics: Seq[String] + ): Unit = { + try { + UUID.fromString(kafkaParams.runId) + } catch { + case e: IllegalArgumentException => + throw new InvalidConfigParameterException( + s"The UUID '${kafkaParams.runId}' cannot be parsed as it is invalid.", + e + ) + } + + val properties = new Properties() + properties.put("bootstrap.servers", kafkaParams.bootstrapServers) + properties.put("default.api.timeout.ms", 2000) + properties.put("request.timeout.ms", 1000) + try { + Using(AdminClient.create(properties)) { client => + val existingTopics = client.listTopics.names().get().asScala + topics.filterNot(existingTopics.contains) + } match { + case Failure(ke: KafkaException) => + throw new InvalidConfigParameterException( + s"Exception creating kafka client for broker ${kafkaParams.bootstrapServers}.", + ke + ) + case Failure(ee: ExecutionException) => + throw new InvalidConfigParameterException( + s"Connection with kafka broker ${kafkaParams.bootstrapServers} failed.", + ee + ) + case Failure(other) => + throw new InvalidConfigParameterException( + s"Checking kafka config failed with unexpected exception.", + other + ) + case Success(missingTopics) if missingTopics.nonEmpty => + throw new InvalidConfigParameterException( + s"Required kafka topics {${missingTopics.mkString}} do not exist." + ) + case Success(_) => + // testing connection succeeded, do nothing + } + } + } } } diff --git a/src/main/scala/edu/ie3/simona/util/ResultFileHierarchy.scala b/src/main/scala/edu/ie3/simona/util/ResultFileHierarchy.scala index 8e717c229a..61276196db 100644 --- a/src/main/scala/edu/ie3/simona/util/ResultFileHierarchy.scala +++ b/src/main/scala/edu/ie3/simona/util/ResultFileHierarchy.scala @@ -62,6 +62,9 @@ final case class ResultFileHierarchy( val resultSinkType: ResultSinkType = resultEntityPathConfig.resultSinkType + val resultEntitiesToConsider: Set[Class[_ <: ResultEntity]] = + resultEntityPathConfig.resultEntitiesToConsider + val rawOutputDataFilePaths: Map[Class[_ <: ResultEntity], String] = { resultSinkType match { case csv: Csv => diff --git a/src/main/scala/edu/ie3/util/scala/io/ScalaReflectionSerde.scala b/src/main/scala/edu/ie3/util/scala/io/ScalaReflectionSerde.scala new file mode 100644 index 0000000000..e129c8dff6 --- /dev/null +++ b/src/main/scala/edu/ie3/util/scala/io/ScalaReflectionSerde.scala @@ -0,0 +1,59 @@ +/* + * © 2021. TU Dortmund University, + * Institute of Energy Systems, Energy Efficiency and Energy Economics, + * Research group Distribution grid planning and operation + */ + +package edu.ie3.util.scala.io + +import com.sksamuel.avro4s.RecordFormat +import io.confluent.kafka.streams.serdes.avro.{ + GenericAvroDeserializer, + GenericAvroSerializer +} +import org.apache.kafka.common.serialization.{Deserializer, Serializer} + +/** As seen at + * https://kafka-tutorials.confluent.io/produce-consume-lang/scala.html + */ +object ScalaReflectionSerde { + + def reflectionSerializer4S[T: RecordFormat]: Serializer[T] = + new Serializer[T] { + val inner = new GenericAvroSerializer() + + override def configure( + configs: java.util.Map[String, _], + isKey: Boolean + ): Unit = inner.configure(configs, isKey) + + override def serialize(topic: String, maybeData: T): Array[Byte] = + Option(maybeData) + .map(data => + inner.serialize(topic, implicitly[RecordFormat[T]].to(data)) + ) + .getOrElse(Array.emptyByteArray) + + override def close(): Unit = inner.close() + } + + def reflectionDeserializer4S[T: RecordFormat]: Deserializer[T] = + new Deserializer[T] { + val inner = new GenericAvroDeserializer() + + override def configure( + configs: java.util.Map[String, _], + isKey: Boolean + ): Unit = inner.configure(configs, isKey) + + override def deserialize(topic: String, maybeData: Array[Byte]): T = + Option(maybeData) + .filter(_.nonEmpty) + .map(data => + implicitly[RecordFormat[T]].from(inner.deserialize(topic, data)) + ) + .getOrElse(null.asInstanceOf[T]) + + override def close(): Unit = inner.close() + } +} diff --git a/src/test/groovy/edu/ie3/simona/model/participant/load/FixedLoadModelTest.groovy b/src/test/groovy/edu/ie3/simona/model/participant/load/FixedLoadModelTest.groovy index 925470e52a..26492f333f 100644 --- a/src/test/groovy/edu/ie3/simona/model/participant/load/FixedLoadModelTest.groovy +++ b/src/test/groovy/edu/ie3/simona/model/participant/load/FixedLoadModelTest.groovy @@ -6,7 +6,7 @@ package edu.ie3.simona.model.participant.load -import edu.ie3.datamodel.models.BdewLoadProfile +import edu.ie3.datamodel.models.profile.BdewStandardLoadProfile import edu.ie3.datamodel.models.OperationTime import edu.ie3.datamodel.models.input.NodeInput import edu.ie3.datamodel.models.input.OperatorInput @@ -14,7 +14,6 @@ import edu.ie3.datamodel.models.input.system.LoadInput import edu.ie3.datamodel.models.input.system.characteristic.CosPhiFixed import edu.ie3.datamodel.models.voltagelevels.GermanVoltageLevelUtils import edu.ie3.simona.model.SystemComponent -import edu.ie3.simona.model.participant.CalcRelevantData import edu.ie3.simona.model.participant.control.QControl import edu.ie3.util.TimeUtil import spock.lang.Specification @@ -45,7 +44,7 @@ class FixedLoadModelTest extends Specification { -1 ), new CosPhiFixed("cosPhiFixed:{(0.0,0.95)}"), - BdewLoadProfile.H0, + BdewStandardLoadProfile.H0, false, Quantities.getQuantity(3000d, KILOWATTHOUR), Quantities.getQuantity(282.74d, VOLTAMPERE), diff --git a/src/test/groovy/edu/ie3/simona/model/participant/load/ProfileLoadModelTest.groovy b/src/test/groovy/edu/ie3/simona/model/participant/load/ProfileLoadModelTest.groovy index a645ef9f6d..0f4de025b3 100644 --- a/src/test/groovy/edu/ie3/simona/model/participant/load/ProfileLoadModelTest.groovy +++ b/src/test/groovy/edu/ie3/simona/model/participant/load/ProfileLoadModelTest.groovy @@ -24,7 +24,7 @@ import javax.measure.quantity.Energy import java.time.temporal.ChronoUnit import java.util.stream.Collectors -import static edu.ie3.datamodel.models.BdewLoadProfile.* +import static edu.ie3.datamodel.models.profile.BdewStandardLoadProfile.* import static edu.ie3.simona.model.participant.load.LoadReference.ActivePower import static edu.ie3.simona.model.participant.load.LoadReference.EnergyConsumption import static edu.ie3.util.quantities.PowerSystemUnits.* @@ -71,7 +71,7 @@ class ProfileLoadModelTest extends Specification { def "A profile load model should be instantiated from valid input correctly"() { when: def actual = ProfileLoadModel.apply( - loadInput.copy().standardLoadProfile(profile).build(), + loadInput.copy().loadprofile(profile).build(), foreSeenOperationInterval, 1.0, reference) diff --git a/src/test/groovy/edu/ie3/simona/model/participant/load/RandomLoadModelTest.groovy b/src/test/groovy/edu/ie3/simona/model/participant/load/RandomLoadModelTest.groovy index 5089d7da95..171bd8ad22 100644 --- a/src/test/groovy/edu/ie3/simona/model/participant/load/RandomLoadModelTest.groovy +++ b/src/test/groovy/edu/ie3/simona/model/participant/load/RandomLoadModelTest.groovy @@ -24,7 +24,7 @@ import javax.measure.quantity.Energy import java.time.temporal.ChronoUnit import java.util.stream.Collectors -import static edu.ie3.datamodel.models.BdewLoadProfile.H0 +import static edu.ie3.datamodel.models.profile.BdewStandardLoadProfile.H0 import static edu.ie3.simona.model.participant.load.LoadReference.ActivePower import static edu.ie3.simona.model.participant.load.LoadReference.EnergyConsumption import static edu.ie3.util.quantities.PowerSystemUnits.* diff --git a/src/test/scala/edu/ie3/simona/config/ConfigFailFastSpec.scala b/src/test/scala/edu/ie3/simona/config/ConfigFailFastSpec.scala index d37b55e92b..27817b65e0 100644 --- a/src/test/scala/edu/ie3/simona/config/ConfigFailFastSpec.scala +++ b/src/test/scala/edu/ie3/simona/config/ConfigFailFastSpec.scala @@ -7,7 +7,7 @@ package edu.ie3.simona.config import com.typesafe.config.ConfigFactory -import edu.ie3.simona.config.SimonaConfig.BaseCsvParams +import edu.ie3.simona.config.SimonaConfig.{BaseCsvParams, ResultKafkaParams} import edu.ie3.simona.config.SimonaConfig.Simona.Input.Weather.Datasource.CoordinateSource import edu.ie3.simona.config.SimonaConfig.Simona.Output.Sink import edu.ie3.simona.config.SimonaConfig.Simona.Output.Sink.{Csv, InfluxDb1x} @@ -689,9 +689,9 @@ class ConfigFailFastSpec extends UnitSpec with ConfigTestData { "throw an exception if no sink is provided" in { intercept[InvalidConfigParameterException] { - ConfigFailFast invokePrivate checkDataSinks(Sink(None, None)) + ConfigFailFast invokePrivate checkDataSinks(Sink(None, None, None)) }.getLocalizedMessage shouldBe "No sink configuration found! Please ensure that at least " + - "one sink is configured! You can choose from: influxdb1x, csv." + "one sink is configured! You can choose from: influxdb1x, csv, kafka." } "throw an exception if more than one sink is provided" in { @@ -699,7 +699,8 @@ class ConfigFailFastSpec extends UnitSpec with ConfigTestData { ConfigFailFast invokePrivate checkDataSinks( Sink( Some(Csv("", "", "", isHierarchic = false)), - Some(InfluxDb1x("", 0, "")) + Some(InfluxDb1x("", 0, "")), + None ) ) }.getLocalizedMessage shouldBe "Multiple sink configurations are not supported! Please ensure that only " + @@ -709,12 +710,71 @@ class ConfigFailFastSpec extends UnitSpec with ConfigTestData { "throw an exception if an influxDb1x is configured, but not accessible" ignore { intercept[java.lang.IllegalArgumentException] { ConfigFailFast invokePrivate checkDataSinks( - Sink(None, Some(InfluxDb1x("", 0, ""))) + Sink(None, Some(InfluxDb1x("", 0, "")), None) ) }.getLocalizedMessage shouldBe "Unable to reach configured influxDb1x with url ':0' for 'Sink' configuration and database ''. " + "Exception: java.lang.IllegalArgumentException: Unable to parse url: :0" } + "throw an exception if kafka is configured with a malformed UUID" in { + intercept[InvalidConfigParameterException] { + ConfigFailFast invokePrivate checkDataSinks( + Sink( + None, + None, + Some( + ResultKafkaParams( + "server:1234", + 0, + "-not-a-uuid-", + "https://reg:123", + "topic" + ) + ) + ) + ) + }.getMessage shouldBe "The UUID '-not-a-uuid-' cannot be parsed as it is invalid." + } + + "throw an exception if kafka is configured, but creating kafka client fails" in { + intercept[InvalidConfigParameterException] { + ConfigFailFast invokePrivate checkDataSinks( + Sink( + None, + None, + Some( + ResultKafkaParams( + "not§a§server", + 0, + "00000000-0000-0000-0000-000000000000", + "https://reg:123", + "topic" + ) + ) + ) + ) + }.getMessage shouldBe "Exception creating kafka client for broker not§a§server." + } + + "throw an exception if kafka is configured, but connection to broker fails" in { + intercept[InvalidConfigParameterException] { + ConfigFailFast invokePrivate checkDataSinks( + Sink( + None, + None, + Some( + ResultKafkaParams( + "localhost:12345", + 0, + "00000000-0000-0000-0000-000000000000", + "https://reg:123", + "topic" + ) + ) + ) + ) + }.getMessage shouldBe "Connection with kafka broker localhost:12345 failed." + } } "Checking grid data sources" should { diff --git a/src/test/scala/edu/ie3/simona/event/listener/ResultEventListenerSpec.scala b/src/test/scala/edu/ie3/simona/event/listener/ResultEventListenerSpec.scala index 8a95b4fa64..efef169a2e 100644 --- a/src/test/scala/edu/ie3/simona/event/listener/ResultEventListenerSpec.scala +++ b/src/test/scala/edu/ie3/simona/event/listener/ResultEventListenerSpec.scala @@ -71,32 +71,32 @@ class ResultEventListenerSpec ) // the OutputFileHierarchy - val resultFileHierarchy: (Int, String) => ResultFileHierarchy = - (runId: Int, fileFormat: String) => - ResultFileHierarchy( - outputDir = testTmpDir + File.separator + runId, - simulationName, - ResultEntityPathConfig( - resultEntitiesToBeWritten, - ResultSinkType.Csv(fileFormat = fileFormat) - ), - createDirs = true - ) + private def resultFileHierarchy( + runId: Int, + fileFormat: String, + classes: Set[Class[_ <: ResultEntity]] = resultEntitiesToBeWritten + ): ResultFileHierarchy = + ResultFileHierarchy( + outputDir = testTmpDir + File.separator + runId, + simulationName, + ResultEntityPathConfig( + classes, + ResultSinkType.Csv(fileFormat = fileFormat) + ), + createDirs = true + ) def createDir( - resultFileHierarchy: ResultFileHierarchy, - resultEntitiesToBeWritten: Set[Class[_ <: ResultEntity]] - ): Iterable[Future[(Class[_], ResultEntitySink)]] = { + resultFileHierarchy: ResultFileHierarchy + ): Iterable[Future[ResultEntitySink]] = { val materializer: Materializer = Materializer(system) - val initializeSinks - : PrivateMethod[Iterable[Future[(Class[_], ResultEntitySink)]]] = - PrivateMethod[Iterable[Future[(Class[_], ResultEntitySink)]]]( + val initializeSinks: PrivateMethod[Iterable[Future[ResultEntitySink]]] = + PrivateMethod[Iterable[Future[ResultEntitySink]]]( Symbol("initializeSinks") ) ResultEventListener invokePrivate initializeSinks( - resultEntitiesToBeWritten, resultFileHierarchy, materializer ) @@ -120,7 +120,7 @@ class ResultEventListenerSpec "initialize its sinks correctly" in { val fileHierarchy = resultFileHierarchy(1, ".csv") Await.ready( - Future.sequence(createDir(fileHierarchy, resultEntitiesToBeWritten)), + Future.sequence(createDir(fileHierarchy)), 60 seconds ) @@ -139,11 +139,11 @@ class ResultEventListenerSpec } "check if actor dies when it should die" in { - val fileHierarchy = resultFileHierarchy(2, ".ttt") + val fileHierarchy = + resultFileHierarchy(2, ".ttt", Set(classOf[Transformer3WResult])) val testProbe = TestProbe() val listener = testProbe.childActorOf( ResultEventListener.props( - Set(classOf[Transformer3WResult]), fileHierarchy, testProbe.ref ) @@ -162,7 +162,6 @@ class ResultEventListenerSpec val listenerRef = system.actorOf( ResultEventListener .props( - resultEntitiesToBeWritten, specificOutputFileHierarchy, testActor ) @@ -208,7 +207,6 @@ class ResultEventListenerSpec val listenerRef = system.actorOf( ResultEventListener .props( - resultEntitiesToBeWritten, specificOutputFileHierarchy, testActor ) @@ -292,10 +290,10 @@ class ResultEventListenerSpec PrivateMethod[Map[Transformer3wKey, AggregatedTransformer3wResult]]( Symbol("registerPartialTransformer3wResult") ) - val fileHierarchy = resultFileHierarchy(5, ".csv") + val fileHierarchy = + resultFileHierarchy(5, ".csv", Set(classOf[Transformer3WResult])) val listener = TestFSMRef( new ResultEventListener( - Set(classOf[Transformer3WResult]), fileHierarchy, testActor ) @@ -526,7 +524,6 @@ class ResultEventListenerSpec val listenerRef = system.actorOf( ResultEventListener .props( - resultEntitiesToBeWritten, specificOutputFileHierarchy, testActor ) diff --git a/src/test/scala/edu/ie3/simona/integration/common/IntegrationSpecCommon.scala b/src/test/scala/edu/ie3/simona/integration/common/IntegrationSpecCommon.scala index 8b1fe009e9..eeb0804f77 100644 --- a/src/test/scala/edu/ie3/simona/integration/common/IntegrationSpecCommon.scala +++ b/src/test/scala/edu/ie3/simona/integration/common/IntegrationSpecCommon.scala @@ -12,7 +12,7 @@ trait IntegrationSpecCommon { * or some of your tests are failing you very likely have altered the vn_simona.conf. This config although * is NOT meant to be altered. Instead you should always use a delta config and only override the values and * files of vn_simona/vn_simona.conf. Delta configs can be created by including the config you want to change - * parameters from via include (e.g. include "input/vn_simona/vn_simona.conf") at the + * parameters from via include (e.g. include "input/samples/vn_simona/vn_simona.conf") at the * beginning of your config file and then just override the parameters you want to change! */ val configFile: String = "input/samples/vn_simona/vn_simona.conf" diff --git a/src/test/scala/edu/ie3/simona/io/result/ResultEntityKafkaSpec.scala b/src/test/scala/edu/ie3/simona/io/result/ResultEntityKafkaSpec.scala new file mode 100644 index 0000000000..3f22986085 --- /dev/null +++ b/src/test/scala/edu/ie3/simona/io/result/ResultEntityKafkaSpec.scala @@ -0,0 +1,197 @@ +/* + * © 2021. TU Dortmund University, + * Institute of Energy Systems, Energy Efficiency and Energy Economics, + * Research group Distribution grid planning and operation + */ + +package edu.ie3.simona.io.result + +import akka.actor.ActorSystem +import akka.testkit.TestActorRef +import com.sksamuel.avro4s.RecordFormat +import com.typesafe.config.ConfigFactory +import edu.ie3.datamodel.models.result.NodeResult +import edu.ie3.simona.event.ResultEvent.PowerFlowResultEvent +import edu.ie3.simona.event.listener.ResultEventListener +import edu.ie3.simona.io.result.plain.PlainResult.PlainNodeResult +import edu.ie3.simona.io.result.plain.PlainWriter +import edu.ie3.simona.test.KafkaSpecLike +import edu.ie3.simona.test.KafkaSpecLike.Topic +import edu.ie3.simona.test.common.TestKitWithShutdown +import edu.ie3.simona.util.ResultFileHierarchy +import edu.ie3.util.quantities.PowerSystemUnits +import edu.ie3.util.scala.io.ScalaReflectionSerde +import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG +import org.apache.kafka.clients.consumer.KafkaConsumer +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.serialization.{Deserializer, Serdes} +import org.apache.kafka.common.utils.Bytes +import org.scalatest.GivenWhenThen +import org.scalatest.concurrent.Eventually +import tech.units.indriya.quantity.Quantities + +import java.time.ZonedDateTime +import java.util.UUID +import scala.concurrent.duration._ +import scala.jdk.CollectionConverters._ +import scala.jdk.DurationConverters._ +import scala.language.postfixOps + +/** Adapted from + * https://kafka-tutorials.confluent.io/produce-consume-lang/scala.html + */ +class ResultEntityKafkaSpec + extends TestKitWithShutdown( + ActorSystem( + "ResultEntityKafkaSpec", + ConfigFactory + .parseString( + """akka.loggers = ["edu.ie3.simona.test.common.SilentTestEventListener"] + |akka.loglevel="info" + """.stripMargin + ) + ) + ) + with KafkaSpecLike + with GivenWhenThen + with Eventually { + + var testConsumer: KafkaConsumer[Bytes, PlainNodeResult] = _ + + private implicit lazy val resultFormat: RecordFormat[PlainNodeResult] = + RecordFormat[PlainNodeResult] + val deserializer: Deserializer[PlainNodeResult] = + ScalaReflectionSerde.reflectionDeserializer4S[PlainNodeResult] + + private val topic = "testtopic" + + override val testTopics: Vector[Topic] = Vector( + Topic(topic, 1, 1) + ) + + deserializer.configure( + Map(SCHEMA_REGISTRY_URL_CONFIG -> "mock://unused:8081").asJava, + false + ) + + override def beforeAll(): Unit = { + super.beforeAll() + val config = Map[String, AnyRef]( + "group.id" -> "test", + "bootstrap.servers" -> kafka.bootstrapServers + ) + testConsumer = new KafkaConsumer[Bytes, PlainNodeResult]( + config.asJava, + Serdes.Bytes().deserializer(), + deserializer + ) + } + + "ResultEventListener with Kafka configuration" should { + "write a series of new NodeResults to Kafka" in { + + Given("a ResultEventListener with Kafka config") + + val mockSchemaRegistryUrl = "mock://unused:8081" + + val runId = UUID.randomUUID() + + // build the listener + val listenerRef = TestActorRef( + ResultEventListener.props( + ResultFileHierarchy( + "out", + "simName", + ResultFileHierarchy.ResultEntityPathConfig( + Set(classOf[NodeResult]), + ResultSinkType.Kafka( + topic, + runId, + kafka.bootstrapServers, + mockSchemaRegistryUrl, + 0 + ) + ) + ), + testActor + ) + ) + + And("a collection of NodeResults") + val nodeRes1 = new NodeResult( + ZonedDateTime.parse("2021-01-01T00:00:00+01:00[Europe/Berlin]"), + UUID.randomUUID(), + Quantities.getQuantity(1d, PowerSystemUnits.PU), + Quantities.getQuantity(0d, PowerSystemUnits.DEGREE_GEOM) + ) + val nodeRes2 = new NodeResult( + ZonedDateTime.parse("2021-01-01T00:00:00+01:00[Europe/Berlin]"), + UUID.randomUUID(), + Quantities.getQuantity(0.8d, PowerSystemUnits.PU), + Quantities.getQuantity(15d, PowerSystemUnits.DEGREE_GEOM) + ) + val nodeRes3 = new NodeResult( + ZonedDateTime.parse("2021-01-10T00:00:00+01:00[Europe/Berlin]"), + UUID.randomUUID(), + Quantities.getQuantity(0.75d, PowerSystemUnits.PU), + Quantities.getQuantity(90d, PowerSystemUnits.DEGREE_GEOM) + ) + + When("receiving the NodeResults") + listenerRef ! PowerFlowResultEvent( + Iterable(nodeRes1, nodeRes2, nodeRes3), + Iterable.empty, + Iterable.empty, + Iterable.empty, + Iterable.empty + ) + + val testTopic = testTopics.headOption.value + val topicPartitions: Seq[TopicPartition] = + (0 until testTopic.partitions).map( + new TopicPartition(testTopic.name, _) + ) + + testConsumer.assign(topicPartitions.asJava) + + Then("records can be fetched from Kafka") + eventually(timeout(5 second), interval(1 second)) { + testConsumer.seekToBeginning(topicPartitions.asJava) + val records: List[PlainNodeResult] = + testConsumer.poll((1 second) toJava).asScala.map(_.value()).toList + + records should have length 3 + records should contain( + PlainNodeResult( + runId, + PlainWriter.createSimpleTimeStamp(nodeRes1.getTime), + nodeRes1.getUuid, + nodeRes1.getInputModel, + nodeRes1.getvMag().getValue.doubleValue(), + nodeRes1.getvAng().getValue.doubleValue() + ) + ) + records should contain( + PlainNodeResult( + runId, + PlainWriter.createSimpleTimeStamp(nodeRes2.getTime), + nodeRes2.getUuid, + nodeRes2.getInputModel, + nodeRes2.getvMag().getValue.doubleValue(), + nodeRes2.getvAng().getValue.doubleValue() + ) + ) + records should contain( + PlainNodeResult( + runId, + PlainWriter.createSimpleTimeStamp(nodeRes3.getTime), + nodeRes3.getUuid, + nodeRes3.getInputModel, + nodeRes3.getvMag().getValue.doubleValue(), + nodeRes3.getvAng().getValue.doubleValue() + ) + ) + } + } + } +} diff --git a/src/test/scala/edu/ie3/simona/io/result/ResultSinkTypeSpec.scala b/src/test/scala/edu/ie3/simona/io/result/ResultSinkTypeSpec.scala index c968206933..66b6986c47 100644 --- a/src/test/scala/edu/ie3/simona/io/result/ResultSinkTypeSpec.scala +++ b/src/test/scala/edu/ie3/simona/io/result/ResultSinkTypeSpec.scala @@ -7,9 +7,12 @@ package edu.ie3.simona.io.result import edu.ie3.simona.config.SimonaConfig -import edu.ie3.simona.io.result.ResultSinkType.{Csv, InfluxDb1x} +import edu.ie3.simona.config.SimonaConfig.ResultKafkaParams +import edu.ie3.simona.io.result.ResultSinkType.{Csv, InfluxDb1x, Kafka} import edu.ie3.simona.test.common.UnitSpec +import java.util.UUID + class ResultSinkTypeSpec extends UnitSpec { "A ResultSinkType" should { "be instantiated correctly when supplying a csv sink" in { @@ -22,7 +25,8 @@ class ResultSinkTypeSpec extends UnitSpec { isHierarchic = false ) ), - influxDb1x = None + influxDb1x = None, + kafka = None ) inside(ResultSinkType(conf, "testRun")) { @@ -44,7 +48,8 @@ class ResultSinkTypeSpec extends UnitSpec { port = 1, url = "localhost/" ) - ) + ), + kafka = None ) val runName = "testRun" @@ -58,6 +63,40 @@ class ResultSinkTypeSpec extends UnitSpec { } } + "be instantiated correctly when supplying a kafka sink" in { + val conf = SimonaConfig.Simona.Output.Sink( + csv = None, + influxDb1x = None, + kafka = Some( + ResultKafkaParams( + "localhost:9092", + 12, + "00000000-0000-0000-0000-000000000000", + "https://reg:123", + "topic" + ) + ) + ) + val runName = "testRun" + + inside(ResultSinkType(conf, runName)) { + case Kafka( + topicNodeRes, + runId, + bootstrapServers, + schemaRegistryUrl, + linger + ) => + topicNodeRes shouldBe "topic" + runId shouldBe UUID.fromString("00000000-0000-0000-0000-000000000000") + bootstrapServers shouldBe "localhost:9092" + schemaRegistryUrl shouldBe "https://reg:123" + linger shouldBe 12 + case _ => + fail("Wrong ResultSinkType got instantiated.") + } + } + "fail when more than one sink is supplied" in { val conf = SimonaConfig.Simona.Output.Sink( csv = Some( @@ -74,7 +113,8 @@ class ResultSinkTypeSpec extends UnitSpec { port = 1, url = "localhost" ) - ) + ), + kafka = None ) assertThrows[IllegalArgumentException](ResultSinkType(conf, "testRun")) @@ -83,7 +123,8 @@ class ResultSinkTypeSpec extends UnitSpec { "fail when no sink is supplied" in { val conf = SimonaConfig.Simona.Output.Sink( csv = None, - influxDb1x = None + influxDb1x = None, + kafka = None ) assertThrows[IllegalArgumentException](ResultSinkType(conf, "testRun")) diff --git a/src/test/scala/edu/ie3/simona/io/result/plain/PlainWriterSpec.scala b/src/test/scala/edu/ie3/simona/io/result/plain/PlainWriterSpec.scala new file mode 100644 index 0000000000..ab3c7a8390 --- /dev/null +++ b/src/test/scala/edu/ie3/simona/io/result/plain/PlainWriterSpec.scala @@ -0,0 +1,98 @@ +/* + * © 2022. TU Dortmund University, + * Institute of Energy Systems, Energy Efficiency and Energy Economics, + * Research group Distribution grid planning and operation + */ + +package edu.ie3.simona.io.result.plain + +import edu.ie3.datamodel.models.result.NodeResult +import edu.ie3.simona.io.result.plain.PlainResult.PlainNodeResult +import edu.ie3.simona.io.result.plain.PlainWriter.NodeResultWriter +import edu.ie3.simona.test.common.UnitSpec +import edu.ie3.util.TimeUtil +import edu.ie3.util.quantities.PowerSystemUnits +import org.scalatest.GivenWhenThen +import tech.units.indriya.quantity.Quantities + +import java.time.ZoneId +import java.time.format.DateTimeFormatter +import java.util.UUID + +class PlainWriterSpec extends UnitSpec with GivenWhenThen { + + "A NodeResultWriter" should { + val simRunId = UUID.randomUUID() + val plainWriter = NodeResultWriter(simRunId) + + val timeFormatter = + DateTimeFormatter + .ofPattern("yyyy-MM-dd HH:mm:ss") + .withZone(ZoneId.of("UTC")) + + "should write a plain result correctly" in { + Given("a full NodeResult") + val eventId = UUID.randomUUID() + val time = TimeUtil.withDefaults.toZonedDateTime("2020-01-01 00:00:00") + val inputModelId = UUID.randomUUID() + val vMag = Quantities.getQuantity(0.85d, PowerSystemUnits.PU) + val vAng = Quantities.getQuantity(90d, PowerSystemUnits.DEGREE_GEOM) + + val nodeResultFull = new NodeResult( + eventId, + time, + inputModelId, + vMag, + vAng + ) + + When("converting to a plain result") + val plainResult = plainWriter.writePlain(nodeResultFull) + + Then("plain result is correct") + plainResult.uuid shouldBe eventId + plainResult.time shouldBe time.format(timeFormatter) + plainResult.inputModel shouldBe inputModelId + plainResult.vMag shouldBe vMag + .to(PowerSystemUnits.PU) + .getValue + .doubleValue() + plainResult.vAng shouldBe vAng + .to(PowerSystemUnits.DEGREE_GEOM) + .getValue + .doubleValue() + } + + "should write a full result correctly" in { + Given("a plain NodeResult") + val eventId = UUID.randomUUID() + val time = "2020-01-01 00:00:00" + val inputModelId = UUID.randomUUID() + val vMag = 0.85d + val vAng = 90d + + val nodeResultPlain = PlainNodeResult( + simRunId, + time, + eventId, + inputModelId, + vMag, + vAng + ) + + When("converting to a full NodeResult") + val plainResult = plainWriter.createFull(nodeResultPlain) + + Then("plain result is correct") + plainResult.getUuid shouldBe eventId + plainResult.getTime shouldBe TimeUtil.withDefaults.toZonedDateTime(time) + plainResult.getInputModel shouldBe inputModelId + plainResult + .getvMag() shouldBe Quantities.getQuantity(vMag, PowerSystemUnits.PU) + plainResult.getvAng() shouldBe Quantities.getQuantity( + vAng, + PowerSystemUnits.DEGREE_GEOM + ) + } + } +} diff --git a/src/test/scala/edu/ie3/simona/model/participant/load/LoadModelSpec.scala b/src/test/scala/edu/ie3/simona/model/participant/load/LoadModelSpec.scala index 467214b051..b5e30470c5 100644 --- a/src/test/scala/edu/ie3/simona/model/participant/load/LoadModelSpec.scala +++ b/src/test/scala/edu/ie3/simona/model/participant/load/LoadModelSpec.scala @@ -7,24 +7,20 @@ package edu.ie3.simona.model.participant.load import edu.ie3.simona.model.participant.control.QControl -import edu.ie3.simona.model.participant.load.LoadReference.{ - ActivePower, - EnergyConsumption -} import edu.ie3.simona.model.participant.load.profile.ProfileLoadModel import edu.ie3.simona.model.participant.load.random.RandomLoadModel import edu.ie3.simona.test.common.UnitSpec import edu.ie3.simona.test.common.input.LoadInputTestData -import edu.ie3.simona.util.ConfigUtil -import edu.ie3.util.quantities.PowerSystemUnits.{KILOWATTHOUR, MEGAVOLTAMPERE} +import edu.ie3.util.quantities.PowerSystemUnits.KILOWATTHOUR import edu.ie3.util.quantities.{PowerSystemUnits, QuantityUtil} -import javax.measure.Quantity -import javax.measure.quantity.Power import org.scalatest.PrivateMethodTester import org.scalatest.prop.TableDrivenPropertyChecks import tech.units.indriya.quantity.Quantities import tech.units.indriya.unit.Units.WATT +import javax.measure.Quantity +import javax.measure.quantity.Power + class LoadModelSpec extends UnitSpec with LoadInputTestData @@ -85,7 +81,7 @@ class LoadModelSpec quantityTolerance ) shouldBe true cosPhiRated shouldBe loadInput.getCosPhiRated - loadProfile shouldBe loadInput.getStandardLoadProfile + loadProfile shouldBe loadInput.getLoadProfile reference shouldBe foreSeenReference } } diff --git a/src/test/scala/edu/ie3/simona/model/participant/load/LoadProfileStoreSpec.scala b/src/test/scala/edu/ie3/simona/model/participant/load/LoadProfileStoreSpec.scala index a3c8d34676..2702c1b0b4 100644 --- a/src/test/scala/edu/ie3/simona/model/participant/load/LoadProfileStoreSpec.scala +++ b/src/test/scala/edu/ie3/simona/model/participant/load/LoadProfileStoreSpec.scala @@ -12,8 +12,8 @@ import java.time.temporal.ChronoUnit import breeze.numerics.abs import com.typesafe.scalalogging.LazyLogging -import edu.ie3.datamodel.models.BdewLoadProfile._ -import edu.ie3.datamodel.models.{BdewLoadProfile, StandardLoadProfile} +import edu.ie3.datamodel.models.profile.BdewStandardLoadProfile._ +import edu.ie3.datamodel.models.profile.StandardLoadProfile import edu.ie3.simona.model.participant.load.profile.{ LoadProfileKey, LoadProfileStore, @@ -113,10 +113,10 @@ class LoadProfileStoreSpec /* List the expected annual energy consumption */ val expectedEnergyConsumption : Map[StandardLoadProfile, ComparableQuantity[Energy]] = Map( - BdewLoadProfile.H0 -> Quantities.getQuantity(1000d, KILOWATTHOUR), - BdewLoadProfile.L0 -> Quantities.getQuantity(1002d, KILOWATTHOUR), + H0 -> Quantities.getQuantity(1000d, KILOWATTHOUR), + L0 -> Quantities.getQuantity(1002d, KILOWATTHOUR), /* TODO: Check, if this is correct */ - BdewLoadProfile.G0 -> Quantities.getQuantity(1022d, KILOWATTHOUR) + G0 -> Quantities.getQuantity(1022d, KILOWATTHOUR) ) /* Collect all available time steps in 2020 */ diff --git a/src/test/scala/edu/ie3/simona/service/weather/WeatherSourceSpec.scala b/src/test/scala/edu/ie3/simona/service/weather/WeatherSourceSpec.scala index 8988ddf682..987d42283b 100644 --- a/src/test/scala/edu/ie3/simona/service/weather/WeatherSourceSpec.scala +++ b/src/test/scala/edu/ie3/simona/service/weather/WeatherSourceSpec.scala @@ -27,7 +27,7 @@ import scala.jdk.OptionConverters._ import scala.util.{Failure, Success} class WeatherSourceSpec extends UnitSpec { - private val coordinate0 = GeoUtils.xyToPoint(7.41, 51.47) + private val coordinate0 = GeoUtils.buildPoint(51.47, 7.41) "A weather source" should { "issue a ServiceException, if there are not enough coordinates available" in { @@ -300,14 +300,14 @@ class WeatherSourceSpec extends UnitSpec { } case object WeatherSourceSpec { - private val coordinate67775 = GeoUtils.xyToPoint(7.438, 51.5) - private val coordinate531137 = GeoUtils.xyToPoint(7.375, 51.5) - private val coordinate551525 = GeoUtils.xyToPoint(7.438, 51.438) - private val coordinate278150 = GeoUtils.xyToPoint(7.375, 51.438) - private val coordinate477295 = GeoUtils.xyToPoint(12.812, 52.312) - private val coordinate537947 = GeoUtils.xyToPoint(12.812, 52.25) - private val coordinate144112 = GeoUtils.xyToPoint(12.875, 52.312) - private val coordinate165125 = GeoUtils.xyToPoint(12.875, 52.25) + private val coordinate67775 = GeoUtils.buildPoint(51.5, 7.438) + private val coordinate531137 = GeoUtils.buildPoint(51.5, 7.375) + private val coordinate551525 = GeoUtils.buildPoint(51.438, 7.438) + private val coordinate278150 = GeoUtils.buildPoint(51.438, 7.375) + private val coordinate477295 = GeoUtils.buildPoint(52.312, 12.812) + private val coordinate537947 = GeoUtils.buildPoint(52.25, 12.812) + private val coordinate144112 = GeoUtils.buildPoint(52.312, 12.875) + private val coordinate165125 = GeoUtils.buildPoint(52.25, 12.875) case object DummyWeatherSource extends WeatherSource { override protected val idCoordinateSource: IdCoordinateSource = diff --git a/src/test/scala/edu/ie3/simona/service/weather/WeatherSourceWrapperSpec.scala b/src/test/scala/edu/ie3/simona/service/weather/WeatherSourceWrapperSpec.scala index 8b0502c6cf..96f83968bc 100644 --- a/src/test/scala/edu/ie3/simona/service/weather/WeatherSourceWrapperSpec.scala +++ b/src/test/scala/edu/ie3/simona/service/weather/WeatherSourceWrapperSpec.scala @@ -290,13 +290,13 @@ class WeatherSourceWrapperSpec extends UnitSpec { object WeatherSourceWrapperSpec { // lat/lon are irrelevant, we will manually create weights later on - private val coordinate1a = GeoUtils.xyToPoint(6, 51) - private val coordinate1b = GeoUtils.xyToPoint(7, 51) - private val coordinate1c = GeoUtils.xyToPoint(8, 51) - private val coordinate1d = GeoUtils.xyToPoint(9, 51) - private val coordinate13 = GeoUtils.xyToPoint(10, 51) - private val coordinate13NoTemp = GeoUtils.xyToPoint(10, 52) - private val coordinateEmpty = GeoUtils.xyToPoint(10, 53) + private val coordinate1a = GeoUtils.buildPoint(51, 6) + private val coordinate1b = GeoUtils.buildPoint(51, 7) + private val coordinate1c = GeoUtils.buildPoint(51, 8) + private val coordinate1d = GeoUtils.buildPoint(51, 9) + private val coordinate13 = GeoUtils.buildPoint(51, 10) + private val coordinate13NoTemp = GeoUtils.buildPoint(52, 10) + private val coordinateEmpty = GeoUtils.buildPoint(53, 10) case object DummyPsdmWeatherSource extends PsdmWeatherSource { diff --git a/src/test/scala/edu/ie3/simona/test/KafkaSpecLike.scala b/src/test/scala/edu/ie3/simona/test/KafkaSpecLike.scala new file mode 100644 index 0000000000..c801aafcd6 --- /dev/null +++ b/src/test/scala/edu/ie3/simona/test/KafkaSpecLike.scala @@ -0,0 +1,61 @@ +/* + * © 2021. TU Dortmund University, + * Institute of Energy Systems, Energy Efficiency and Energy Economics, + * Research group Distribution grid planning and operation + */ + +package edu.ie3.simona.test + +import com.dimafeng.testcontainers.KafkaContainer +import edu.ie3.simona.test.KafkaSpecLike.Topic +import org.apache.kafka.clients.admin.{Admin, NewTopic} +import org.junit.Rule +import org.scalatest.{BeforeAndAfterAll, TestSuite} +import org.testcontainers.utility.DockerImageName + +import scala.jdk.CollectionConverters._ + +/** Adapted from + * https://kafka-tutorials.confluent.io/produce-consume-lang/scala.html + */ +trait KafkaSpecLike extends BeforeAndAfterAll { + this: TestSuite => + + protected val testTopics: Vector[Topic] + + @Rule + protected val kafka: KafkaContainer = KafkaContainer( + DockerImageName.parse("confluentinc/cp-kafka:6.1.0") + ) + protected lazy val admin: Admin = Admin.create( + Map[String, AnyRef]("bootstrap.servers" -> kafka.bootstrapServers).asJava + ) + + override def beforeAll(): Unit = { + super.beforeAll() + kafka.start() + admin.createTopics( + testTopics.map { topic => + new NewTopic( + topic.name, + topic.partitions, + topic.replicationFactor + ) + }.asJava + ) + } + + override def afterAll(): Unit = { + admin.close() + kafka.stop() + super.afterAll() + } +} + +object KafkaSpecLike { + final case class Topic( + name: String, + partitions: Int, + replicationFactor: Short + ) +} diff --git a/src/test/scala/edu/ie3/simona/test/common/input/LoadInputTestData.scala b/src/test/scala/edu/ie3/simona/test/common/input/LoadInputTestData.scala index 139e1a577f..b99c09fc8f 100644 --- a/src/test/scala/edu/ie3/simona/test/common/input/LoadInputTestData.scala +++ b/src/test/scala/edu/ie3/simona/test/common/input/LoadInputTestData.scala @@ -7,11 +7,11 @@ package edu.ie3.simona.test.common.input import java.util.UUID - import edu.ie3.datamodel.models.input.OperatorInput import edu.ie3.datamodel.models.input.system.LoadInput import edu.ie3.datamodel.models.input.system.characteristic.CosPhiFixed -import edu.ie3.datamodel.models.{BdewLoadProfile, OperationTime} +import edu.ie3.datamodel.models.OperationTime +import edu.ie3.datamodel.models.profile.BdewStandardLoadProfile import edu.ie3.util.quantities.PowerSystemUnits.{KILOWATTHOUR, VOLTAMPERE} import tech.units.indriya.quantity.Quantities @@ -29,7 +29,7 @@ trait LoadInputTestData extends NodeInputTestData { OperationTime.notLimited(), nodeInputNoSlackNs04KvA, new CosPhiFixed("cosPhiFixed:{(0.0,0.95)}"), - BdewLoadProfile.H0, + BdewStandardLoadProfile.H0, false, Quantities.getQuantity(3000d, KILOWATTHOUR), Quantities.getQuantity(282.74d, VOLTAMPERE),