diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 9dba48c9..5a4d4f68 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -15,5 +15,5 @@ jobs: name: Tests (CI) uses: ./.github/workflows/tests.yml with: - image: ${{ fromJSON(vars.KURRENTDB_DOCKER_IMAGES).ci.fullname }} + runtime: ci secrets: inherit diff --git a/.github/workflows/load-configuration.yml b/.github/workflows/load-configuration.yml new file mode 100644 index 00000000..a2995585 --- /dev/null +++ b/.github/workflows/load-configuration.yml @@ -0,0 +1,46 @@ +name: Load KurrentDB Runtime Configuration +on: + workflow_call: + inputs: + runtime: + description: "The runtime's name. Current options are: `ci`, `previous-lts`, `latest`" + type: string + + outputs: + runtime: + description: The runtime's name + value: ${{ inputs.runtime }} + + registry: + description: The Docker registry + value: ${{ jobs.load.outputs.registry }} + + image: + description: The Docker image + value: ${{ jobs.load.outputs.image }} + + tag: + description: The Docker image tag + value: ${{ jobs.load.outputs.tag }} + + full_image_name: + description: The full Docker image name (including registry, image, and tag) + value: ${{ jobs.load.outputs.full_image_name }} + +jobs: + load: + runs-on: ubuntu-latest + outputs: + registry: ${{ steps.set.outputs.registry }} + image: ${{ steps.set.outputs.image }} + tag: ${{ steps.set.outputs.tag }} + full_image_name: ${{ steps.set.outputs.full_image_name }} + + steps: + - name: Set KurrentDB Runtime Configuration Properties + id: set + run: | + echo "registry=${{ fromJSON(vars.KURRENTDB_DOCKER_IMAGES)[inputs.runtime].registry }}" >> $GITHUB_OUTPUT + echo "tag=${{ fromJSON(vars.KURRENTDB_DOCKER_IMAGES)[inputs.runtime].tag }}" >> $GITHUB_OUTPUT + echo "image=${{ fromJSON(vars.KURRENTDB_DOCKER_IMAGES)[inputs.runtime].image }}" >> $GITHUB_OUTPUT + echo "full_image_name=${{ fromJSON(vars.KURRENTDB_DOCKER_IMAGES)[inputs.runtime].fullname }}" >> $GITHUB_OUTPUT diff --git a/.github/workflows/lts.yml b/.github/workflows/lts.yml index c0936f87..4b1fc04b 100644 --- a/.github/workflows/lts.yml +++ b/.github/workflows/lts.yml @@ -15,7 +15,7 @@ jobs: name: Tests (LTS) uses: ./.github/workflows/tests.yml with: - image: ${{ fromJSON(vars.KURRENTDB_DOCKER_IMAGES).lts.fullname }} + runtime: lts secrets: inherit # Will be removed in the future @@ -24,5 +24,7 @@ jobs: uses: ./.github/workflows/plugins-tests.yml with: - image: "docker.eventstore.com/eventstore-ee/eventstoredb-commercial:24.2.0-jammy" + registry: docker.eventstore.com/eventstore-ee + image: eventstoredb-commercial + tag: 24.2.0-jammy secrets: inherit diff --git a/.github/workflows/plugins-tests.yml b/.github/workflows/plugins-tests.yml index 08aed4c8..fed0338a 100644 --- a/.github/workflows/plugins-tests.yml +++ b/.github/workflows/plugins-tests.yml @@ -3,9 +3,15 @@ name: enterprise plugins tests workflow on: workflow_call: inputs: + registry: + required: true + type: string image: required: true type: string + tag: + required: true + type: string jobs: single_node: @@ -46,7 +52,7 @@ jobs: - name: Execute Gradle build run: ./gradlew ci --tests ${{ matrix.test }}Tests env: - KURRENTDB_IMAGE: ${{ inputs.image }} + KURRENTDB_IMAGE: ${{inputs.registry}}/${{ inputs.image }}:${{ inputs.tag }} SECURE: true - uses: actions/upload-artifact@v4 @@ -81,7 +87,9 @@ jobs: - name: Set up cluster with Docker Compose run: docker compose up -d env: - KURRENTDB_IMAGE: ${{ inputs.image }} + KURRENTDB_DOCKER_REGISTRY: ${{ inputs.registry }} + KURRENTDB_DOCKER_IMAGE: ${{ inputs.image }} + KURRENTDB_DOCKER_TAG: ${{ inputs.tag }} - name: Generate user certificates run: docker compose --file configure-user-certs-for-tests.yml up diff --git a/.github/workflows/previous-lts.yml b/.github/workflows/previous-lts.yml index 0fd060c2..f1941397 100644 --- a/.github/workflows/previous-lts.yml +++ b/.github/workflows/previous-lts.yml @@ -15,5 +15,5 @@ jobs: name: Tests (Previous LTS) uses: ./.github/workflows/tests.yml with: - image: ${{ fromJSON(vars.KURRENTDB_DOCKER_IMAGES)['previous-lts'].fullname }} + runtime: previous-lts secrets: inherit diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 0495aefc..92ce26e8 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -3,18 +3,24 @@ name: tests workflow on: workflow_call: inputs: - image: + runtime: required: true type: string jobs: + load_configuration: + uses: ./.github/workflows/load-configuration.yml + with: + runtime: ${{ inputs.runtime }} + single_node: + needs: load_configuration name: Single node strategy: fail-fast: false matrix: - test: [Streams, PersistentSubscriptions, Telemetry] + test: [Streams, PersistentSubscriptions, Telemetry, MultiStreamAppend] runs-on: ubuntu-latest steps: @@ -41,16 +47,10 @@ jobs: - name: Execute Gradle build run: ./gradlew ci --tests ${{ matrix.test }}Tests env: - KURRENTDB_IMAGE: ${{ inputs.image }} - - - uses: actions/upload-artifact@v4 - if: failure() - with: - name: esdb_logs.tar.gz - path: /tmp/esdb_logs.tar.gz - if-no-files-found: error + KURRENTDB_IMAGE: ${{ needs.load_configuration.outputs.full_image_name }} secure: + needs: load_configuration name: Secure strategy: @@ -86,7 +86,7 @@ jobs: - name: Execute Gradle build run: ./gradlew ci --tests ${{ matrix.test }}Tests env: - KURRENTDB_IMAGE: ${{ inputs.image }} + KURRENTDB_IMAGE: ${{ needs.load_configuration.outputs.full_image_name }} SECURE: true - uses: actions/upload-artifact@v4 @@ -96,12 +96,13 @@ jobs: path: /tmp/esdb_logs.tar.gz cluster: + needs: load_configuration name: Cluster strategy: fail-fast: false matrix: - test: [Streams, PersistentSubscriptions] + test: [Streams, PersistentSubscriptions, MultiStreamAppend] runs-on: ubuntu-latest steps: @@ -117,7 +118,9 @@ jobs: - name: Set up cluster with Docker Compose run: docker compose up -d env: - KURRENTDB_IMAGE: ${{ inputs.image }} + KURRENTDB_DOCKER_REGISTRY: ${{ needs.load_configuration.outputs.registry }} + KURRENTDB_DOCKER_IMAGE: ${{ needs.load_configuration.outputs.image }} + KURRENTDB_DOCKER_TAG: ${{ needs.load_configuration.outputs.tag }} - name: Set up JDK 8 uses: actions/setup-java@v3 diff --git a/docker-compose.yml b/docker-compose.yml index e64e7e83..344f19f3 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,5 +1,3 @@ -version: "3.5" - services: volumes-provisioner: image: hasnat/volumes-provisioner @@ -26,71 +24,57 @@ services: - volumes-provisioner esdb-node1: &template - image: ${KURRENTDB_IMAGE:-docker.kurrent.io/eventstore/eventstoredb-ee:lts} + image: ${KURRENTDB_DOCKER_REGISTRY:-docker.kurrent.io/eventstore}/${KURRENTDB_DOCKER_IMAGE:-eventstoredb-ee}:${KURRENTDB_DOCKER_TAG:-lts} env_file: - vars.env environment: - EVENTSTORE_GOSSIP_SEED=172.30.240.12:2113,172.30.240.13:2113 - - EVENTSTORE_INT_IP=172.30.240.11 - - EVENTSTORE_CERTIFICATE_FILE=/etc/eventstore/certs/node1/node.crt - - EVENTSTORE_CERTIFICATE_PRIVATE_KEY_FILE=/etc/eventstore/certs/node1/node.key - - EVENTSTORE_ADVERTISE_HTTP_PORT_TO_CLIENT_AS=2111 + - EVENTSTORE_REPLICATION_IP=172.30.240.11 + - EVENTSTORE_CERTIFICATE_FILE=/etc/kurrentdb/certs/node1/node.crt + - EVENTSTORE_CERTIFICATE_PRIVATE_KEY_FILE=/etc/kurrentdb/certs/node1/node.key + - EVENTSTORE_ADVERTISE_NODE_PORT_TO_CLIENT_AS=2111 ports: - 2111:2113 networks: clusternetwork: ipv4_address: 172.30.240.11 volumes: - - ./certs:/etc/eventstore/certs + - ./certs:/etc/kurrentdb/certs restart: unless-stopped depends_on: - cert-gen esdb-node2: <<: *template - env_file: - - vars.env environment: - EVENTSTORE_GOSSIP_SEED=172.30.240.11:2113,172.30.240.13:2113 - - EVENTSTORE_INT_IP=172.30.240.12 - - EVENTSTORE_CERTIFICATE_FILE=/etc/eventstore/certs/node2/node.crt - - EVENTSTORE_CERTIFICATE_PRIVATE_KEY_FILE=/etc/eventstore/certs/node2/node.key - - EVENTSTORE_ADVERTISE_HTTP_PORT_TO_CLIENT_AS=2112 + - EVENTSTORE_REPLICATION_IP=172.30.240.12 + - EVENTSTORE_CERTIFICATE_FILE=/etc/kurrentdb/certs/node2/node.crt + - EVENTSTORE_CERTIFICATE_PRIVATE_KEY_FILE=/etc/kurrentdb/certs/node2/node.key + - EVENTSTORE_ADVERTISE_NODE_PORT_TO_CLIENT_AS=2112 ports: - 2112:2113 networks: clusternetwork: ipv4_address: 172.30.240.12 - volumes: - - ./certs:/etc/eventstore/certs - restart: unless-stopped - depends_on: - - cert-gen esdb-node3: <<: *template - env_file: - - vars.env environment: - EVENTSTORE_GOSSIP_SEED=172.30.240.11:2113,172.30.240.12:2113 - - EVENTSTORE_INT_IP=172.30.240.13 - - EVENTSTORE_CERTIFICATE_FILE=/etc/eventstore/certs/node3/node.crt - - EVENTSTORE_CERTIFICATE_PRIVATE_KEY_FILE=/etc/eventstore/certs/node3/node.key - - EVENTSTORE_ADVERTISE_HTTP_PORT_TO_CLIENT_AS=2113 + - EVENTSTORE_REPLICATION_IP=172.30.240.13 + - EVENTSTORE_CERTIFICATE_FILE=/etc/kurrentdb/certs/node3/node.crt + - EVENTSTORE_CERTIFICATE_PRIVATE_KEY_FILE=/etc/kurrentdb/certs/node3/node.key + - EVENTSTORE_ADVERTISE_NODE_PORT_TO_CLIENT_AS=2113 ports: - 2113:2113 networks: clusternetwork: ipv4_address: 172.30.240.13 - volumes: - - ./certs:/etc/eventstore/certs - restart: unless-stopped - depends_on: - - cert-gen networks: clusternetwork: - name: eventstoredb.local + name: kurrentdb.local driver: bridge ipam: driver: default diff --git a/src/main/java/io/kurrent/dbclient/AppendStreamFailure.java b/src/main/java/io/kurrent/dbclient/AppendStreamFailure.java new file mode 100644 index 00000000..0d9d5728 --- /dev/null +++ b/src/main/java/io/kurrent/dbclient/AppendStreamFailure.java @@ -0,0 +1,36 @@ +package io.kurrent.dbclient; + +public class AppendStreamFailure { + private final io.kurrentdb.v2.AppendStreamFailure inner; + + AppendStreamFailure(io.kurrentdb.v2.AppendStreamFailure inner) { + this.inner = inner; + } + + public String getStreamName() { + return this.inner.getStream(); + } + + public void visit(MultiAppendStreamErrorVisitor visitor) { + if (this.inner.getErrorCase() == io.kurrentdb.v2.AppendStreamFailure.ErrorCase.WRONG_EXPECTED_REVISION) { + visitor.onWrongExpectedRevision(this.inner.getWrongExpectedRevision().getStreamRevision()); + return; + } + + if (this.inner.getErrorCase() == io.kurrentdb.v2.AppendStreamFailure.ErrorCase.ACCESS_DENIED) { + visitor.onAccessDenied(this.inner.getAccessDenied().getReason()); + } + + if (this.inner.getErrorCase() == io.kurrentdb.v2.AppendStreamFailure.ErrorCase.STREAM_DELETED) { + visitor.onStreamDeleted(); + return; + } + + if (this.inner.getErrorCase() == io.kurrentdb.v2.AppendStreamFailure.ErrorCase.TRANSACTION_MAX_SIZE_EXCEEDED) { + visitor.onTransactionMaxSizeExceeded(this.inner.getTransactionMaxSizeExceeded().getMaxSize()); + return; + } + + throw new IllegalArgumentException("Append failure does not match any known error type"); + } +} diff --git a/src/main/java/io/kurrent/dbclient/AppendStreamRequest.java b/src/main/java/io/kurrent/dbclient/AppendStreamRequest.java new file mode 100644 index 00000000..d2c666b3 --- /dev/null +++ b/src/main/java/io/kurrent/dbclient/AppendStreamRequest.java @@ -0,0 +1,27 @@ +package io.kurrent.dbclient; + +import java.util.Iterator; + +public class AppendStreamRequest { + private final String streamName; + private final Iterator events; + private final StreamState expectedState; + + public AppendStreamRequest(String streamName, Iterator events, StreamState expectedState) { + this.streamName = streamName; + this.events = events; + this.expectedState = expectedState; + } + + public String getStreamName() { + return streamName; + } + + public Iterator getEvents() { + return events; + } + + public StreamState getExpectedState() { + return expectedState; + } +} diff --git a/src/main/java/io/kurrent/dbclient/AppendStreamSuccess.java b/src/main/java/io/kurrent/dbclient/AppendStreamSuccess.java new file mode 100644 index 00000000..cff5be7e --- /dev/null +++ b/src/main/java/io/kurrent/dbclient/AppendStreamSuccess.java @@ -0,0 +1,21 @@ +package io.kurrent.dbclient; + +public class AppendStreamSuccess { + private final io.kurrentdb.v2.AppendStreamSuccess inner; + + AppendStreamSuccess(io.kurrentdb.v2.AppendStreamSuccess inner) { + this.inner = inner; + } + + public String getStreamName() { + return this.inner.getStream(); + } + + public long getStreamRevision() { + return this.inner.getStreamRevision(); + } + + public long getPosition() { + return this.inner.getPosition(); + } +} diff --git a/src/main/java/io/kurrent/dbclient/FeatureFlags.java b/src/main/java/io/kurrent/dbclient/FeatureFlags.java index 83e755a5..11b66517 100644 --- a/src/main/java/io/kurrent/dbclient/FeatureFlags.java +++ b/src/main/java/io/kurrent/dbclient/FeatureFlags.java @@ -7,5 +7,6 @@ class FeatureFlags { public final static int PERSISTENT_SUBSCRIPTION_RESTART_SUBSYSTEM = 8; public final static int PERSISTENT_SUBSCRIPTION_GET_INFO = 16; public final static int PERSISTENT_SUBSCRIPTION_TO_ALL = 32; + public final static int MULTI_STREAM_APPEND = 64; public final static int PERSISTENT_SUBSCRIPTION_MANAGEMENT = PERSISTENT_SUBSCRIPTION_LIST | PERSISTENT_SUBSCRIPTION_REPLAY | PERSISTENT_SUBSCRIPTION_GET_INFO | PERSISTENT_SUBSCRIPTION_RESTART_SUBSYSTEM; } diff --git a/src/main/java/io/kurrent/dbclient/GossipClient.java b/src/main/java/io/kurrent/dbclient/GossipClient.java index 3ba89b7c..cd84e5a7 100644 --- a/src/main/java/io/kurrent/dbclient/GossipClient.java +++ b/src/main/java/io/kurrent/dbclient/GossipClient.java @@ -21,7 +21,7 @@ class GossipClient { public GossipClient(KurrentDBClientSettings settings, ManagedChannel channel) { _channel = channel; - _stub = GrpcUtils.configureStub(GossipGrpc.newStub(_channel), settings, new GossipOption(), (long)settings.getGossipTimeout()); + _stub = GrpcUtils.configureStub(GossipGrpc.newStub(_channel), settings, new GossipOption(), settings.getGossipTimeout()); } public void shutdown() { diff --git a/src/main/java/io/kurrent/dbclient/GrpcUtils.java b/src/main/java/io/kurrent/dbclient/GrpcUtils.java index 9dce2240..2779f9b2 100644 --- a/src/main/java/io/kurrent/dbclient/GrpcUtils.java +++ b/src/main/java/io/kurrent/dbclient/GrpcUtils.java @@ -113,10 +113,14 @@ static public StreamsOuterClass.ReadReq.Options.StreamOptions toStreamOptions(St } static public , O> S configureStub(S stub, KurrentDBClientSettings settings, OptionsBase options) { - return configureStub(stub, settings, options, null); + return configureStub(stub, settings, options, null, true); } - static public , O> S configureStub(S stub, KurrentDBClientSettings settings, OptionsBase options, Long forceDeadlineInMs) { + static public , O> S configureStub(S stub, KurrentDBClientSettings settings, OptionsBase options, long forceDeadlineInMs) { + return configureStub(stub, settings, options, forceDeadlineInMs, true); + } + + static public , O> S configureStub(S stub, KurrentDBClientSettings settings, OptionsBase options, Long forceDeadlineInMs, boolean forwardRequiresLeader) { S finalStub = stub; ConnectionMetadata metadata = new ConnectionMetadata(); @@ -146,7 +150,7 @@ static public , O> S configureStub(S stub, Kurren metadata.authenticated(credentials); } - if (options.isLeaderRequired() || settings.getNodePreference() == NodePreference.LEADER) { + if (forwardRequiresLeader && (options.isLeaderRequired() || settings.getNodePreference() == NodePreference.LEADER)) { metadata.requiresLeader(); } diff --git a/src/main/java/io/kurrent/dbclient/KurrentDBClient.java b/src/main/java/io/kurrent/dbclient/KurrentDBClient.java index 22cbe548..3cbea889 100644 --- a/src/main/java/io/kurrent/dbclient/KurrentDBClient.java +++ b/src/main/java/io/kurrent/dbclient/KurrentDBClient.java @@ -75,6 +75,10 @@ public CompletableFuture appendToStream(String streamName, AppendTo return new AppendToStream(this.getGrpcClient(), streamName, events, options).execute(); } + public CompletableFuture multiAppend(AppendToStreamOptions options, Iterator requests) { + return new MultiStreamAppend(this.getGrpcClient(), requests).execute(); + } + /** * Sets a stream's metadata. * @param streamName stream's name. diff --git a/src/main/java/io/kurrent/dbclient/MultiAppendStreamErrorVisitor.java b/src/main/java/io/kurrent/dbclient/MultiAppendStreamErrorVisitor.java new file mode 100644 index 00000000..02b4d70a --- /dev/null +++ b/src/main/java/io/kurrent/dbclient/MultiAppendStreamErrorVisitor.java @@ -0,0 +1,8 @@ +package io.kurrent.dbclient; + +public interface MultiAppendStreamErrorVisitor { + default void onWrongExpectedRevision(long streamRevision) {} + default void onAccessDenied(String reason) {} + default void onStreamDeleted() {} + default void onTransactionMaxSizeExceeded(int maxSize) {} +} diff --git a/src/main/java/io/kurrent/dbclient/MultiAppendWriteResult.java b/src/main/java/io/kurrent/dbclient/MultiAppendWriteResult.java new file mode 100644 index 00000000..ad9f1dad --- /dev/null +++ b/src/main/java/io/kurrent/dbclient/MultiAppendWriteResult.java @@ -0,0 +1,22 @@ +package io.kurrent.dbclient; + +import java.util.List; +import java.util.Optional; + +public class MultiAppendWriteResult { + private final List successes; + private final List failures; + + public MultiAppendWriteResult(List successes, List failures) { + this.successes = successes; + this.failures = failures; + } + + public Optional> getSuccesses() { + return Optional.ofNullable(successes); + } + + public Optional> getFailures() { + return Optional.ofNullable(failures); + } +} diff --git a/src/main/java/io/kurrent/dbclient/MultiStreamAppend.java b/src/main/java/io/kurrent/dbclient/MultiStreamAppend.java new file mode 100644 index 00000000..17967a52 --- /dev/null +++ b/src/main/java/io/kurrent/dbclient/MultiStreamAppend.java @@ -0,0 +1,109 @@ +package io.kurrent.dbclient; + +import com.google.protobuf.ByteString; +import io.grpc.Metadata; +import io.grpc.StatusRuntimeException; +import io.grpc.stub.StreamObserver; +import io.kurrentdb.v2.AppendRecord; +import io.kurrentdb.v2.MultiStreamAppendResponse; +import io.kurrentdb.v2.StreamsServiceGrpc; +import kurrentdb.protobuf.DynamicValueOuterClass; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.CompletableFuture; + +class MultiStreamAppend { + private final GrpcClient client; + private final Iterator requests; + + public MultiStreamAppend(GrpcClient client, Iterator requests) { + this.client = client; + this.requests = requests; + } + + public CompletableFuture execute() { + return this.client.runWithArgs(this::append); + } + + private CompletableFuture append(WorkItemArgs args) { + CompletableFuture result = new CompletableFuture<>(); + + if (!args.supportFeature(FeatureFlags.MULTI_STREAM_APPEND)) { + result.completeExceptionally(new UnsupportedOperationException("Multi-stream append is not supported by the server")); + return result; + } + + StreamsServiceGrpc.StreamsServiceStub client = GrpcUtils.configureStub(StreamsServiceGrpc.newStub(args.getChannel()), this.client.getSettings(), new OptionsBase<>(), null, false); + StreamObserver requestStream = client.multiStreamAppendSession(GrpcUtils.convertSingleResponse(result, this::onResponse)); + + try { + while (this.requests.hasNext()) { + AppendStreamRequest request = this.requests.next(); + io.kurrentdb.v2.AppendStreamRequest.Builder builder = io.kurrentdb.v2.AppendStreamRequest.newBuilder() + .setStream(request.getStreamName()); + + while (request.getEvents().hasNext()) { + EventData event = request.getEvents().next(); + builder.addRecords(AppendRecord.newBuilder() + .setData(ByteString.copyFrom(event.getEventData())) + .setRecordId(event.getEventId().toString()) + .putProperties(SystemMetadataKeys.DATA_FORMAT, DynamicValueOuterClass + .DynamicValue + .newBuilder() + .setBytesValue(ByteString.copyFromUtf8(event.getContentType())) + .build()) + .putProperties(SystemMetadataKeys.SCHEMA_NAME, DynamicValueOuterClass + .DynamicValue + .newBuilder() + .setBytesValue(ByteString.copyFromUtf8(event.getEventType())) + .build()) + .build()); + } + + requestStream.onNext(builder.build()); + } + + requestStream.onCompleted(); + } catch (StatusRuntimeException e) { + String leaderHost = e.getTrailers().get(Metadata.Key.of("leader-endpoint-host", Metadata.ASCII_STRING_MARSHALLER)); + String leaderPort = e.getTrailers().get(Metadata.Key.of("leader-endpoint-port", Metadata.ASCII_STRING_MARSHALLER)); + + if (leaderHost != null && leaderPort != null) { + NotLeaderException reason = new NotLeaderException(leaderHost, Integer.valueOf(leaderPort)); + requestStream.onError(reason); + result.completeExceptionally(reason); + } else { + requestStream.onError(e); + result.completeExceptionally(e); + } + } catch (RuntimeException e) { + requestStream.onError(e); + result.completeExceptionally(e); + } + + return result; + } + + public MultiAppendWriteResult onResponse(MultiStreamAppendResponse response) { + List failures = null; + List successes = null; + + if (response.hasFailure()) { + failures = new ArrayList<>(response.getFailure().getOutputCount()); + + for (io.kurrentdb.v2.AppendStreamFailure failure : response.getFailure().getOutputList()) { + failures.add(new AppendStreamFailure(failure)); + } + } else { + successes = new ArrayList<>(response.getSuccess().getOutputCount()); + + for (io.kurrentdb.v2.AppendStreamSuccess success : response.getSuccess().getOutputList()) { + successes.add(new AppendStreamSuccess(success)); + } + } + + return new MultiAppendWriteResult(successes, failures); + } +} diff --git a/src/main/java/io/kurrent/dbclient/ServerFeatures.java b/src/main/java/io/kurrent/dbclient/ServerFeatures.java index 12d377c4..4943c187 100644 --- a/src/main/java/io/kurrent/dbclient/ServerFeatures.java +++ b/src/main/java/io/kurrent/dbclient/ServerFeatures.java @@ -95,6 +95,8 @@ private static CompletableFuture getSupportedFeaturesInternal(Server default: break; } + } else if (method.getMethodName().equals("multistreamappendsession")) { + features |= FeatureFlags.MULTI_STREAM_APPEND; } } diff --git a/src/main/java/io/kurrent/dbclient/ServerVersion.java b/src/main/java/io/kurrent/dbclient/ServerVersion.java index f61f9a1d..64c16d69 100644 --- a/src/main/java/io/kurrent/dbclient/ServerVersion.java +++ b/src/main/java/io/kurrent/dbclient/ServerVersion.java @@ -58,6 +58,17 @@ public boolean isLessOrEqualThan(int major, int minor) { return true; } + public boolean isGreaterOrEqualThan(int major, int minor) { + int cmp; + if ((cmp = Integer.compare(this.major, major)) != 0) + return cmp > 0; + + if ((cmp = Integer.compare(this.minor, minor)) != 0) + return cmp > 0; + + return true; + } + @Override public String toString() { return "ServerVersion{" + diff --git a/src/main/java/io/kurrent/dbclient/SystemMetadataKeys.java b/src/main/java/io/kurrent/dbclient/SystemMetadataKeys.java index d5ae2cc7..d46a02dc 100644 --- a/src/main/java/io/kurrent/dbclient/SystemMetadataKeys.java +++ b/src/main/java/io/kurrent/dbclient/SystemMetadataKeys.java @@ -5,4 +5,6 @@ class SystemMetadataKeys { static final String CREATED = "created"; static final String IS_JSON = "is-json"; static final String TYPE = "type"; + static final String SCHEMA_NAME = "$schema.name"; + static final String DATA_FORMAT = "$schema.data-format"; } diff --git a/src/main/proto/dynamic-value.proto b/src/main/proto/dynamic-value.proto new file mode 100644 index 00000000..04ce17e5 --- /dev/null +++ b/src/main/proto/dynamic-value.proto @@ -0,0 +1,42 @@ +syntax = "proto3"; + +import "google/protobuf/duration.proto"; +import "google/protobuf/timestamp.proto"; +import "google/protobuf/struct.proto"; + +package kurrentdb.protobuf; +option csharp_namespace = "KurrentDB.Protobuf"; + +message DynamicValue { + oneof kind { + // Represents a null value. + google.protobuf.NullValue null_value = 1; + + // Represents a 32-bit signed integer value. + sint32 int32_value = 2; + + // Represents a 64-bit signed integer value. + sint64 int64_value = 3; + + // Represents a byte array value. + bytes bytes_value = 4; + + // Represents a 64-bit double-precision floating-point value. + double double_value = 5; + + // Represents a 32-bit single-precision floating-point value + float float_value = 6; + + // Represents a string value. + string string_value = 7; + + // Represents a boolean value. + bool boolean_value = 8; + + // Represents a timestamp value. + google.protobuf.Timestamp timestamp_value = 9; + + // Represents a duration value. + google.protobuf.Duration duration_value = 10; + } +} \ No newline at end of file diff --git a/src/main/proto/streams.v2.proto b/src/main/proto/streams.v2.proto new file mode 100644 index 00000000..6aac3647 --- /dev/null +++ b/src/main/proto/streams.v2.proto @@ -0,0 +1,170 @@ +syntax = "proto3"; + +// +// This protocol is UNSTABLE in the sense of being subject to change. +// + +package kurrentdb.protocol.v2; + +option csharp_namespace = "KurrentDB.Protocol.V2"; +option java_package = "io.kurrentdb.v2"; +option java_multiple_files = true; + +import "dynamic-value.proto"; + +service StreamsService { + // Executes an atomic operation to append records to multiple streams. + // This transactional method ensures that all appends either succeed + // completely, or are entirely rolled back, thereby maintaining strict data + // consistency across all involved streams. + rpc MultiStreamAppend(MultiStreamAppendRequest) returns (MultiStreamAppendResponse); + + // Streaming version of MultiStreamAppend that allows clients to send multiple + // append requests over a single connection. When the stream completes, all + // records are appended transactionally (all succeed or fail together). + // Provides improved efficiency for high-throughput scenarios while + // maintaining the same transactional guarantees. + rpc MultiStreamAppendSession(stream AppendStreamRequest) returns (MultiStreamAppendResponse); +} + +// Record to be appended to a stream. +message AppendRecord { + // Universally Unique identifier for the record. Must be a guid. + // If not provided, the server will generate a new one. + optional string record_id = 1; + + // A collection of properties providing additional system information about the + // record. + map properties = 2; + + // The actual data payload of the record, stored as bytes. + bytes data = 3; +} + +// Constants that match the expected state of a stream during an +// append operation. It can be used to specify whether the stream should exist, +// not exist, or can be in any state. +enum ExpectedRevisionConstants { + // The stream should exist and have a single event. + EXPECTED_REVISION_CONSTANTS_SINGLE_EVENT = 0; + + // It is not important whether the stream exists or not. + EXPECTED_REVISION_CONSTANTS_ANY = -2; + + // The stream should not exist. If it does, the append will fail. + EXPECTED_REVISION_CONSTANTS_NO_STREAM = -1; + + // The stream should exist + EXPECTED_REVISION_CONSTANTS_EXISTS = -4; +} + +// Represents the input for appending records to a specific stream. +message AppendStreamRequest { + // The name of the stream to append records to. + string stream = 1; + + // The records to append to the stream. + repeated AppendRecord records = 2; + + // The expected revision of the stream. If the stream's current revision does + // not match, the append will fail. + // The expected revision can also be one of the special values + // from ExpectedRevisionConstants. + // missing value means no expectation: same as EXPECTED_REVISION_CONSTANTS_ANY + optional sint64 expected_revision = 3; +} + +// Success represents the successful outcome of an append operation. +message AppendStreamSuccess { + // The name of the stream to which records were appended. + string stream = 1; + + // The position of the last appended record in the transaction. + int64 position = 2; + + // The revision of the stream after the append operation. + int64 stream_revision = 3; +} + +// Failure represents the detailed error information when an append operation fails. +message AppendStreamFailure { + // The name of the stream to which records failed to append. + string stream = 1; + + // The error details + oneof error { + // Failed because the actual stream revision didn't match the expected revision. + ErrorDetails.WrongExpectedRevision wrong_expected_revision = 2; + + // Failed because the client lacks sufficient permissions. + ErrorDetails.AccessDenied access_denied = 3; + + // Failed because the target stream has been deleted. + ErrorDetails.StreamDeleted stream_deleted = 4; + + ErrorDetails.TransactionMaxSizeExceeded transaction_max_size_exceeded = 5; + } +} + +// Represents the output of appending records to a specific stream. +message AppendStreamResponse { + // The result of the append operation. + oneof result { + // Success represents the successful outcome of an append operation. + AppendStreamSuccess success = 1; + + // Failure represents the details of a failed append operation. + AppendStreamFailure failure = 2; + } +} + +// MultiStreamAppendRequest represents a request to append records to multiple streams. +message MultiStreamAppendRequest { + // A list of AppendStreamInput messages, each representing a stream to which records should be appended. + repeated AppendStreamRequest input = 1; +} + +// Response from the MultiStreamAppend operation. +message MultiStreamAppendResponse { + oneof result { + // Success represents the successful outcome of a multi-stream append operation. + Success success = 1; + + // Failure represents the details of a failed multi-stream append operation. + Failure failure = 2; + } + + message Success { + repeated AppendStreamSuccess output = 1; + } + + message Failure { + repeated AppendStreamFailure output = 1; + } +} + +// ErrorDetails provides detailed information about specific error conditions. +message ErrorDetails { + // When the user does not have sufficient permissions to perform the operation. + message AccessDenied { + // The reason for access denial. + string reason = 1; + } + + // When the stream has been deleted. + message StreamDeleted { + } + + // When the expected revision of the stream does not match the actual revision. + message WrongExpectedRevision { + // The actual revision of the stream. + int64 stream_revision = 1; + } + + // When the transaction exceeds the maximum size allowed + // (it's bigger than the configured chunk size). + message TransactionMaxSizeExceeded { + // The maximum allowed size of the transaction. + int32 max_size = 1; + } +} \ No newline at end of file diff --git a/src/test/java/io/kurrent/dbclient/MultiStreamAppendTests.java b/src/test/java/io/kurrent/dbclient/MultiStreamAppendTests.java new file mode 100644 index 00000000..122ce7ca --- /dev/null +++ b/src/test/java/io/kurrent/dbclient/MultiStreamAppendTests.java @@ -0,0 +1,88 @@ +package io.kurrent.dbclient; + +import org.junit.jupiter.api.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.ExecutionException; + +public class MultiStreamAppendTests implements ConnectionAware { + static private Database database; + static private Logger logger; + + @BeforeAll + public static void setup() { + database = DatabaseFactory.spawn(); + logger = LoggerFactory.getLogger(MultiStreamAppendTests.class); + } + + @Override + public Database getDatabase() { + return database; + } + + @Override + public Logger getLogger() { + return logger; + } + + @AfterAll + public static void cleanup() { + database.dispose(); + } + + @Test + public void testMultiStreamAppend() throws ExecutionException, InterruptedException { + KurrentDBClient client = getDefaultClient(); + + Optional version = client.getServerVersion().get(); + + Assumptions.assumeTrue( + version.isPresent() && version.get().isGreaterOrEqualThan(25, 0), + "Multi-stream append is not supported server versions below 25.0.0" + ); + + List requests = new ArrayList<>(); + + List events = new ArrayList<>(); + for (int i = 0; i < 10; i++) + events.add(EventData.builderAsBinary("created", new byte[0]).build()); + + requests.add(new AppendStreamRequest("foobar", events.iterator(), StreamState.any())); + requests.add(new AppendStreamRequest("baz", events.iterator(), StreamState.any())); + + MultiAppendWriteResult result = client.multiAppend(AppendToStreamOptions.get(), requests.iterator()).get(); + + Assertions.assertTrue(result.getSuccesses().isPresent()); + } + + @Test + public void testMultiStreamAppendWhenUnsupported() throws ExecutionException, InterruptedException { + KurrentDBClient client = getDefaultClient(); + + Optional version = client.getServerVersion().get(); + Assumptions.assumeFalse( + version.isPresent() && version.get().isGreaterOrEqualThan(25, 0), + "Multi-stream is supported server versions greater or equal to 25.0.0" + ); + + List requests = new ArrayList<>(); + + List events = new ArrayList<>(); + for (int i = 0; i < 10; i++) + events.add(EventData.builderAsBinary("created", new byte[0]).build()); + + requests.add(new AppendStreamRequest("foobar", events.iterator(), StreamState.any())); + requests.add(new AppendStreamRequest("baz", events.iterator(), StreamState.any())); + + ExecutionException e = Assertions.assertThrows( + ExecutionException.class, + () -> client.multiAppend(AppendToStreamOptions.get(), requests.iterator()).get()); + + Assertions.assertInstanceOf(UnsupportedOperationException.class, e.getCause()); + } +} + diff --git a/vars.env b/vars.env index 7883b8c5..a3ecdad9 100644 --- a/vars.env +++ b/vars.env @@ -1,8 +1,8 @@ EVENTSTORE_CLUSTER_SIZE=3 EVENTSTORE_RUN_PROJECTIONS=All -EVENTSTORE_INT_TCP_PORT=1112 -EVENTSTORE_HTTP_PORT=2113 -EVENTSTORE_TRUSTED_ROOT_CERTIFICATES_PATH=/etc/eventstore/certs/ca +EVENTSTORE_REPLICATION_PORT=1112 +EVENTSTORE_NODE_PORT=2113 +EVENTSTORE_TRUSTED_ROOT_CERTIFICATES_PATH=/etc/kurrentdb/certs/ca EVENTSTORE_DISCOVER_VIA_DNS=false EVENTSTORE_ENABLE_ATOM_PUB_OVER_HTTP=true EVENTSTORE_ADVERTISE_HOST_TO_CLIENT_AS=localhost