diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml index b24147d..c3f2cf0 100644 --- a/.github/workflows/build.yaml +++ b/.github/workflows/build.yaml @@ -76,6 +76,11 @@ jobs: - name: Compile tests run: dotnet build --no-incremental --configuration Release /p:Platform="Any CPU" test\KEFCore.Test.sln + - name: Prepare configuration files + run: | + Copy-Item .github\workflows\zookeeper.properties -Destination bin\zookeeper.properties -Force + Copy-Item .github\workflows\server.properties -Destination bin\server.properties -Force + - name: Save KEFCore bin in cache uses: actions/cache/save@v4 with: @@ -83,37 +88,64 @@ jobs: path: ./bin/ key: KEFCore_bin_${{ github.sha }} - execute_tests: + execute_tests_linux: needs: build_windows services: kafka: # Private registry image - image: ghcr.io/masesgroup/knet:master # to be changed with official version when willbe available + image: ghcr.io/masesgroup/knet:latest # to be changed with official version when willbe available credentials: username: ${{ github.actor }} password: ${{ secrets.GITHUB_TOKEN }} ports: - 9092:9092 env: - KNET_RUNNING_MODE: server + KNET_DOCKER_RUNNING_MODE: server JCOBRIDGE_LicensePath: ${{ secrets.JCOBRIDGE_ONLINE }} strategy: fail-fast: false matrix: - os: [ 'ubuntu-latest' ] #, 'macos-latest', 'macos-13' , 'windows-latest' ] - framework: [ 'net462', 'net6.0', 'net8.0' ] + framework: [ 'net6.0', 'net8.0' ] + jdk_vendor: [ 'temurin', 'zulu', 'microsoft', 'corretto', 'oracle'] + jdk_version: [ '11', '17', '21' ] # only LTS versions + exclude: + - jdk_vendor: oracle + jdk_version: 11 + + runs-on: 'ubuntu-latest' + steps: + - name: Restore KEFCore bin from cache + uses: actions/cache/restore@v4 + with: + fail-on-cache-miss: true + enableCrossOsArchive: true + path: ./bin/ + key: KEFCore_bin_${{ github.sha }} + + - name: Set up JDK distribution + uses: actions/setup-java@v4 + with: # running setup-java again overwrites the settings.xml + distribution: ${{ matrix.jdk_vendor }} + java-version: ${{ matrix.jdk_version }} + + - name: Executing MASES.EntityFrameworkCore.KNet.Test.Benchmark on Ubuntu with ${{ matrix.jdk_vendor }} ${{ matrix.jdk_version }} + run: dotnet ./bin/${{ matrix.framework }}/MASES.EntityFrameworkCore.KNet.Test.Benchmark.dll ./bin/${{ matrix.framework }}/Benchmark.KNetStreams.json localhost:9092 + env: + JCOBRIDGE_LicensePath: ${{ secrets.JCOBRIDGE_ONLINE }} + + execute_tests_other: + needs: build_windows + strategy: + fail-fast: false + matrix: + os: [ 'macos-latest', 'macos-13' , 'windows-latest' ] + framework: [ 'net6.0', 'net8.0' ] jdk_vendor: [ 'temurin', 'zulu', 'microsoft', 'corretto', 'oracle'] jdk_version: [ '11', '17', '21' ] # only LTS versions exclude: - - os: ubuntu-latest - framework: net462 - - os: macos-latest - framework: net462 - os: macos-latest framework: net6.0 - - os: macos-13 - framework: net462 - os: macos-13 framework: net6.0 - jdk_vendor: oracle @@ -134,21 +166,43 @@ jobs: with: # running setup-java again overwrites the settings.xml distribution: ${{ matrix.jdk_vendor }} java-version: ${{ matrix.jdk_version }} - - - name: Executing MASES.EntityFrameworkCore.KNet.Test.Benchmark on Ubuntu with ${{ matrix.jdk_vendor }} ${{ matrix.jdk_version }} - if: ${{ matrix.os == 'ubuntu-latest' || matrix.os == 'macos-latest' || matrix.os == 'macos-13' }} - run: dotnet ./bin/${{ matrix.framework }}/MASES.EntityFrameworkCore.KNet.Test.Benchmark.dll ./bin/${{ matrix.framework }}/Benchmark.KNetStreams.json localhost:9092 + + - name: Authenticate to GitHub + run: dotnet nuget add source --username ${{ github.actor }} --password ${{ secrets.GITHUB_TOKEN }} --store-password-in-clear-text --name github "https://nuget.pkg.github.com/masesgroup/index.json" + + - name: Download latest published version of KNetCLI + run: dotnet tool update -g MASES.KNetCLI + + - name: Start Kafka on ${{ matrix.os }} with ${{ matrix.jdk_vendor }} ${{ matrix.jdk_version }} + shell: pwsh + run: | + Start-Process -FilePath knet -ArgumentList ( 'zookeeperstart', '${{ github.workspace }}/bin/zookeeper.properties' ) + Start-Process -FilePath knet -ArgumentList ( 'kafkastart', '${{ github.workspace }}/bin/server.properties' ) env: JCOBRIDGE_LicensePath: ${{ secrets.JCOBRIDGE_ONLINE }} - - - name: Executing MASES.EntityFrameworkCore.KNet.Test.Benchmark on Windows with ${{ matrix.jdk_vendor }} ${{ matrix.jdk_version }} - if: ${{ matrix.os == 'windows-latest' && matrix.framework != 'net462' }} - run: dotnet .\bin\${{ matrix.framework }}\MASES.EntityFrameworkCore.KNet.Test.Benchmark.dll .\bin\${{ matrix.framework }}\Benchmark.KNetStreams.json localhost:9092 + + - name: Executing MASES.EntityFrameworkCore.KNet.Test.Benchmark on ${{ matrix.os }} with ${{ matrix.jdk_vendor }} ${{ matrix.jdk_version }} + shell: pwsh + run: dotnet ${{ github.workspace }}/bin/${{ matrix.framework }}/MASES.EntityFrameworkCore.KNet.Test.Benchmark.dll ${{ github.workspace }}/bin/${{ matrix.framework }}/Benchmark.KNetStreams.json localhost:9092 env: JCOBRIDGE_LicensePath: ${{ secrets.JCOBRIDGE_ONLINE }} + + #- name: Start ZooKeeper and Apache Kafka + # shell: pwsh + # run: | + # knet zookeeperstart ${{ github.workspace }}/.github/wotkflows/zookeeper.properties & + # knet kafkastart ${{ github.workspace }}/.github/wotkflows/server.properties & + # env: + # JCOBRIDGE_LicensePath: ${{ secrets.JCOBRIDGE_ONLINE }} - - name: Executing MASES.EntityFrameworkCore.KNet.Test.Benchmark on Windows with ${{ matrix.jdk_vendor }} ${{ matrix.jdk_version }} - if: ${{ matrix.os == 'windows-latest' && matrix.framework == 'net462' }} - run: .\bin\${{ matrix.framework }}\MASES.EntityFrameworkCore.KNet.Test.Benchmark.exe .\bin\${{ matrix.framework }}\Benchmark.KNetStreams.json localhost:9092 - env: - JCOBRIDGE_LicensePath: ${{ secrets.JCOBRIDGE_ONLINE }} \ No newline at end of file + #- name: Executing MASES.EntityFrameworkCore.KNet.Test.Benchmark on ${{ matrix.os }} with ${{ matrix.jdk_vendor }} ${{ matrix.jdk_version }} + # if: ${{ matrix.os == 'macos-latest' || matrix.os == 'macos-13' }} + # run: dotnet ${{ github.workspace }}/bin/${{ matrix.framework }}/MASES.EntityFrameworkCore.KNet.Test.Benchmark.dll ${{ github.workspace }}/bin/${{ matrix.framework }}/Benchmark.KNetStreams.json localhost:9092 + # env: + # JCOBRIDGE_LicensePath: ${{ secrets.JCOBRIDGE_ONLINE }} + # + #- name: Executing MASES.EntityFrameworkCore.KNet.Test.Benchmark on Windows with ${{ matrix.jdk_vendor }} ${{ matrix.jdk_version }} + # if: ${{ matrix.os == 'windows-latest' }} + # run: dotnet ${{ github.workspace }}\bin\${{ matrix.framework }}\MASES.EntityFrameworkCore.KNet.Test.Benchmark.dll ${{ github.workspace }}\bin\${{ matrix.framework }}\Benchmark.KNetStreams.json localhost:9092 + # env: + # JCOBRIDGE_LicensePath: ${{ secrets.JCOBRIDGE_ONLINE }} \ No newline at end of file diff --git a/.github/workflows/server.properties b/.github/workflows/server.properties new file mode 100644 index 0000000..d194846 --- /dev/null +++ b/.github/workflows/server.properties @@ -0,0 +1,139 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# +# This configuration file is intended for use in ZK-based mode, where Apache ZooKeeper is required. +# See kafka.server.KafkaConfig for additional details and defaults +# + +############################# Server Basics ############################# + +# The id of the broker. This must be set to a unique integer for each broker. +broker.id=0 + +############################# Socket Server Settings ############################# + +# The address the socket server listens on. If not configured, the host name will be equal to the value of +# java.net.InetAddress.getCanonicalHostName(), with PLAINTEXT listener name, and port 9092. +# FORMAT: +# listeners = listener_name://host_name:port +# EXAMPLE: +# listeners = PLAINTEXT://your.host.name:9092 +listeners=PLAINTEXT://0.0.0.0:9092 + +# Listener name, hostname and port the broker will advertise to clients. +# If not set, it uses the value for "listeners". +advertised.listeners=PLAINTEXT://localhost:9092 + +# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details +#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL +listener.security.protocol.map=PLAINTEXT:PLAINTEXT + +# The number of threads that the server uses for receiving requests from the network and sending responses to the network +num.network.threads=3 + +# The number of threads that the server uses for processing requests, which may include disk I/O +num.io.threads=8 + +# The send buffer (SO_SNDBUF) used by the socket server +socket.send.buffer.bytes=102400 + +# The receive buffer (SO_RCVBUF) used by the socket server +socket.receive.buffer.bytes=102400 + +# The maximum size of a request that the socket server will accept (protection against OOM) +socket.request.max.bytes=104857600 + + +############################# Log Basics ############################# + +# A comma separated list of directories under which to store log files +log.dirs=/tmp/kafka-logs + +# The default number of log partitions per topic. More partitions allow greater +# parallelism for consumption, but this will also result in more files across +# the brokers. +num.partitions=1 + +# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown. +# This value is recommended to be increased for installations with data dirs located in RAID array. +num.recovery.threads.per.data.dir=1 + +############################# Internal Topic Settings ############################# +# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state" +# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3. +offsets.topic.replication.factor=1 +transaction.state.log.replication.factor=1 +transaction.state.log.min.isr=1 + +############################# Log Flush Policy ############################# + +# Messages are immediately written to the filesystem but by default we only fsync() to sync +# the OS cache lazily. The following configurations control the flush of data to disk. +# There are a few important trade-offs here: +# 1. Durability: Unflushed data may be lost if you are not using replication. +# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush. +# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks. +# The settings below allow one to configure the flush policy to flush data after a period of time or +# every N messages (or both). This can be done globally and overridden on a per-topic basis. + +# The number of messages to accept before forcing a flush of data to disk +#log.flush.interval.messages=10000 + +# The maximum amount of time a message can sit in a log before we force a flush +#log.flush.interval.ms=1000 + +############################# Log Retention Policy ############################# + +# The following configurations control the disposal of log segments. The policy can +# be set to delete segments after a period of time, or after a given size has accumulated. +# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens +# from the end of the log. + +# The minimum age of a log file to be eligible for deletion due to age +log.retention.hours=168 + +# A size-based retention policy for logs. Segments are pruned from the log unless the remaining +# segments drop below log.retention.bytes. Functions independently of log.retention.hours. +#log.retention.bytes=1073741824 + +# The maximum size of a log segment file. When this size is reached a new log segment will be created. +#log.segment.bytes=1073741824 + +# The interval at which log segments are checked to see if they can be deleted according +# to the retention policies +log.retention.check.interval.ms=300000 + +############################# Zookeeper ############################# + +# Zookeeper connection string (see zookeeper docs for details). +# This is a comma separated host:port pairs, each corresponding to a zk +# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". +# You can also append an optional chroot string to the urls to specify the +# root directory for all kafka znodes. +zookeeper.connect=127.0.0.1:2181 + +# Timeout in ms for connecting to zookeeper +zookeeper.connection.timeout.ms=18000 + + +############################# Group Coordinator Settings ############################# + +# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance. +# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms. +# The default value for this is 3 seconds. +# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing. +# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup. +group.initial.rebalance.delay.ms=0 diff --git a/.github/workflows/zookeeper.properties b/.github/workflows/zookeeper.properties new file mode 100644 index 0000000..90f4332 --- /dev/null +++ b/.github/workflows/zookeeper.properties @@ -0,0 +1,24 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# the directory where the snapshot is stored. +dataDir=/tmp/zookeeper +# the port at which the clients will connect +clientPort=2181 +# disable the per-ip limit on the number of connections since this is a non-production config +maxClientCnxns=0 +# Disable the adminserver by default to avoid port conflicts. +# Set the port to something non-conflicting if choosing to enable this +admin.enableServer=false +# admin.serverPort=8080 diff --git a/src/net/KEFCore/Storage/Internal/KafkaCluster.cs b/src/net/KEFCore/Storage/Internal/KafkaCluster.cs index 071c952..9a75541 100644 --- a/src/net/KEFCore/Storage/Internal/KafkaCluster.cs +++ b/src/net/KEFCore/Storage/Internal/KafkaCluster.cs @@ -28,6 +28,7 @@ using Org.Apache.Kafka.Clients.Admin; using Org.Apache.Kafka.Common.Errors; using Org.Apache.Kafka.Tools; +using MASES.JCOBridge.C2JBridge; namespace MASES.EntityFrameworkCore.KNet.Storage.Internal; /// @@ -227,7 +228,14 @@ private string CreateTable(IEntityType entityType, int cycle) } catch (TopicExistsException ex) { - if (ex.Message.Contains("deletion")) + string message; + if (ex.BridgeInstance == null && ex is JVMBridgeException ex2) + { + message = ex2.Message; + } + else message = ex.Message; + + if (message.Contains("deletion")) { Thread.Sleep(1000); // wait a while to complete topic deletion and try again return CreateTable(entityType, cycle++); diff --git a/test/KEFCore.Benchmark.Test/Program.cs b/test/KEFCore.Benchmark.Test/Program.cs index 0b5ba4d..9cb657d 100644 --- a/test/KEFCore.Benchmark.Test/Program.cs +++ b/test/KEFCore.Benchmark.Test/Program.cs @@ -205,7 +205,7 @@ join pg in context.Posts on op.BlogId equals pg.BlogId for (int i = 0; i < min.Length; i++) { min[i] = TimeSpan.MaxValue; } TimeSpan[] total = new TimeSpan[maxTests]; for (int i = 0; i < total.Length; i++) { total[i] = TimeSpan.Zero; } - for (int i = 0; i < ProgramConfig.Config.NumberOfExecutions; i++) + for (int i = 0; i < _tests.Count; i++) { var item = _tests[i].QueryTimes;