Skip to content

Commit 9cbcf84

Browse files
evans-yetzulitai
authored andcommitted
[FLINK-23672] Add smoke-e2e-java for Java SDK smoke test
This closes #248.
1 parent 99ada68 commit 9cbcf84

File tree

57 files changed

+1494
-7712
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

57 files changed

+1494
-7712
lines changed

statefun-e2e-tests/pom.xml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,11 @@ under the License.
3636
<module>statefun-e2e-tests-common</module>
3737
<module>statefun-sanity-e2e</module>
3838
<module>statefun-exactly-once-remote-e2e</module>
39-
<module>statefun-smoke-e2e-common</module>
39+
<module>statefun-smoke-e2e-driver</module>
4040
<module>statefun-smoke-e2e-embedded</module>
41+
<module>statefun-smoke-e2e-multilang-base</module>
42+
<module>statefun-smoke-e2e-multilang-harness</module>
43+
<module>statefun-smoke-e2e-java</module>
4144
</modules>
4245

4346
<build>

statefun-e2e-tests/statefun-smoke-e2e-common/pom.xml renamed to statefun-e2e-tests/statefun-smoke-e2e-driver/pom.xml

Lines changed: 2 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ under the License.
2525
</parent>
2626
<modelVersion>4.0.0</modelVersion>
2727

28-
<artifactId>statefun-smoke-e2e-common</artifactId>
28+
<artifactId>statefun-smoke-e2e-driver</artifactId>
2929

3030
<properties>
3131
<commons-math3.version>3.5</commons-math3.version>
@@ -49,11 +49,6 @@ under the License.
4949
<artifactId>statefun-flink-io</artifactId>
5050
<version>${project.version}</version>
5151
</dependency>
52-
<dependency>
53-
<groupId>org.apache.flink</groupId>
54-
<artifactId>statefun-flink-common</artifactId>
55-
<version>${project.version}</version>
56-
</dependency>
5752

5853
<!-- smoke logic -->
5954
<dependency>
@@ -127,19 +122,6 @@ under the License.
127122
</exclusion>
128123
</exclusions>
129124
</dependency>
130-
<dependency>
131-
<groupId>org.apache.flink</groupId>
132-
<artifactId>statefun-flink-harness</artifactId>
133-
<version>${project.version}</version>
134-
<scope>test</scope>
135-
<exclusions>
136-
<!-- conflicts with testcontainers -->
137-
<exclusion>
138-
<groupId>com.fasterxml.jackson.core</groupId>
139-
<artifactId>jackson-annotations</artifactId>
140-
</exclusion>
141-
</exclusions>
142-
</dependency>
143125
</dependencies>
144126

145127
<build>
@@ -174,6 +156,7 @@ under the License.
174156
</execution>
175157
</executions>
176158
</plugin>
159+
177160
<!--
178161
The following plugin invokes protoc to generate Java classes out of the *.proto
179162
definitions located at: (1) src/main/protobuf (2) ${additional-sources.dir}.
Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
* See the License for the specific language governing permissions and
1616
* limitations under the License.
1717
*/
18-
package org.apache.flink.statefun.e2e.smoke;
18+
package org.apache.flink.statefun.e2e.smoke.common;
1919

2020
import org.apache.flink.statefun.sdk.FunctionType;
2121
import org.apache.flink.statefun.sdk.io.EgressIdentifier;
@@ -24,15 +24,21 @@
2424

2525
public class Constants {
2626

27+
public static final String NAMESPACE = "statefun.smoke.e2e";
28+
public static final String INGRESS_NAME = "command-generator-source";
29+
public static final String EGRESS_NAME = "discard-sink";
30+
public static final String VERIFICATION_EGRESS_NAME = "verification-sink";
31+
public static final String FUNCTION_NAME = "command-interpreter-fn";
32+
2733
public static final IngressIdentifier<TypedValue> IN =
28-
new IngressIdentifier<>(TypedValue.class, "statefun.smoke.e2e", "command-generator-source");
34+
new IngressIdentifier<>(TypedValue.class, NAMESPACE, INGRESS_NAME);
2935

3036
public static final EgressIdentifier<TypedValue> OUT =
31-
new EgressIdentifier<>("statefun.smoke.e2e", "discard-sink", TypedValue.class);
37+
new EgressIdentifier<>(NAMESPACE, EGRESS_NAME, TypedValue.class);
3238

3339
public static final EgressIdentifier<TypedValue> VERIFICATION_RESULT =
34-
new EgressIdentifier<>("statefun.smoke.e2e", "verification-sink", TypedValue.class);
40+
new EgressIdentifier<>(NAMESPACE, VERIFICATION_EGRESS_NAME, TypedValue.class);
3541

3642
// For embedded/remote functions to bind with the smoke-e2e-common testing framework
37-
public static final FunctionType FN_TYPE = new FunctionType("statefun.smoke.e2e", "f1");
43+
public static final FunctionType FN_TYPE = new FunctionType(NAMESPACE, FUNCTION_NAME);
3844
}
Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,20 +15,24 @@
1515
* See the License for the specific language governing permissions and
1616
* limitations under the License.
1717
*/
18+
package org.apache.flink.statefun.e2e.smoke.common;
1819

19-
package org.apache.flink.statefun.e2e.smoke;
20+
public final class Ids {
21+
private final String[] cache;
2022

21-
import org.junit.Test;
22-
23-
public class SmokeVerificationEmbeddedE2E {
23+
public Ids(int maxIds) {
24+
this.cache = createIds(maxIds);
25+
}
2426

25-
@Test(timeout = 1_000 * 60 * 10)
26-
public void runWith() throws Throwable {
27-
ModuleParameters parameters = new ModuleParameters();
28-
parameters.setNumberOfFunctionInstances(128);
29-
parameters.setMessageCount(100_000);
30-
parameters.setMaxFailures(1);
27+
public String idOf(int address) {
28+
return cache[address];
29+
}
3130

32-
SmokeRunner.run(parameters);
31+
private static String[] createIds(int maxIds) {
32+
String[] ids = new String[maxIds];
33+
for (int i = 0; i < maxIds; i++) {
34+
ids[i] = Integer.toString(i);
35+
}
36+
return ids;
3337
}
3438
}
Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
* See the License for the specific language governing permissions and
1616
* limitations under the License.
1717
*/
18-
package org.apache.flink.statefun.e2e.smoke;
18+
package org.apache.flink.statefun.e2e.smoke.common;
1919

2020
import java.io.Serializable;
2121
import java.util.Map;
@@ -41,6 +41,7 @@ public final class ModuleParameters implements Serializable {
4141
private int maxFailures = 1;
4242
private String verificationServerHost = "localhost";
4343
private int verificationServerPort = 5050;
44+
private boolean isAsyncOpSupported = false;
4445

4546
/** Creates an instance of ModuleParameters from a key-value map. */
4647
public static ModuleParameters from(Map<String, String> globalConfiguration) {
@@ -158,6 +159,14 @@ public void setVerificationServerPort(int verificationServerPort) {
158159
this.verificationServerPort = verificationServerPort;
159160
}
160161

162+
public boolean isAsyncOpSupported() {
163+
return isAsyncOpSupported;
164+
}
165+
166+
public void setAsyncOpSupported(boolean asyncOpSupported) {
167+
isAsyncOpSupported = asyncOpSupported;
168+
}
169+
161170
@Override
162171
public String toString() {
163172
return "ModuleParameters{"
@@ -188,6 +197,8 @@ public String toString() {
188197
+ '\''
189198
+ ", verificationServerPort="
190199
+ verificationServerPort
200+
+ ", isAsyncOpSupported="
201+
+ isAsyncOpSupported
191202
+ '}';
192203
}
193204
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.statefun.e2e.smoke.common;
20+
21+
import org.apache.flink.statefun.e2e.smoke.generated.Commands;
22+
import org.apache.flink.statefun.e2e.smoke.generated.SourceCommand;
23+
import org.apache.flink.statefun.e2e.smoke.generated.VerificationResult;
24+
import org.apache.flink.statefun.sdk.TypeName;
25+
import org.apache.flink.statefun.sdk.reqreply.generated.TypedValue;
26+
27+
public final class Types {
28+
private Types() {}
29+
30+
public static final TypeName SOURCE_COMMANDS_TYPE =
31+
TypeName.parseFrom(Constants.NAMESPACE + "/source-command");
32+
public static final TypeName VERIFICATION_RESULT_TYPE =
33+
TypeName.parseFrom(Constants.NAMESPACE + "/verification-result");
34+
public static final TypeName COMMANDS_TYPE =
35+
TypeName.parseFrom(Constants.NAMESPACE + "/commands");
36+
37+
public static boolean isTypeOf(TypedValue value, TypeName type) {
38+
return value.getTypename().equals(type.canonicalTypenameString());
39+
}
40+
41+
public static TypedValue packSourceCommand(SourceCommand sourceCommand) {
42+
return TypedValue.newBuilder()
43+
.setTypename(SOURCE_COMMANDS_TYPE.canonicalTypenameString())
44+
.setHasValue(true)
45+
.setValue(sourceCommand.toByteString())
46+
.build();
47+
}
48+
49+
public static SourceCommand unpackSourceCommand(TypedValue typedValue) {
50+
if (!isTypeOf(typedValue, SOURCE_COMMANDS_TYPE)) {
51+
throw new IllegalStateException("Unexpected TypedValue: " + typedValue);
52+
}
53+
try {
54+
return SourceCommand.parseFrom(typedValue.getValue());
55+
} catch (Exception e) {
56+
throw new RuntimeException("Unable to parse SourceCommand from TypedValue.", e);
57+
}
58+
}
59+
60+
public static TypedValue packCommands(Commands commands) {
61+
return TypedValue.newBuilder()
62+
.setTypename(COMMANDS_TYPE.canonicalTypenameString())
63+
.setHasValue(true)
64+
.setValue(commands.toByteString())
65+
.build();
66+
}
67+
68+
public static Commands unpackCommands(TypedValue typedValue) {
69+
if (!isTypeOf(typedValue, COMMANDS_TYPE)) {
70+
throw new IllegalStateException("Unexpected TypedValue: " + typedValue);
71+
}
72+
try {
73+
return Commands.parseFrom(typedValue.getValue());
74+
} catch (Exception e) {
75+
throw new RuntimeException("Unable to parse Commands from TypedValue.", e);
76+
}
77+
}
78+
79+
public static TypedValue packVerificationResult(VerificationResult verificationResult) {
80+
return TypedValue.newBuilder()
81+
.setTypename(VERIFICATION_RESULT_TYPE.canonicalTypenameString())
82+
.setHasValue(true)
83+
.setValue(verificationResult.toByteString())
84+
.build();
85+
}
86+
87+
public static VerificationResult unpackVerificationResult(TypedValue typedValue) {
88+
if (!isTypeOf(typedValue, VERIFICATION_RESULT_TYPE)) {
89+
throw new IllegalStateException("Unexpected TypedValue: " + typedValue);
90+
}
91+
try {
92+
return VerificationResult.parseFrom(typedValue.getValue());
93+
} catch (Exception e) {
94+
throw new RuntimeException("Unable to parse SourceCommand from TypedValue.", e);
95+
}
96+
}
97+
}
Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,9 @@
1515
* See the License for the specific language governing permissions and
1616
* limitations under the License.
1717
*/
18-
package org.apache.flink.statefun.e2e.smoke;
18+
package org.apache.flink.statefun.e2e.smoke.driver;
1919

20+
import static org.apache.flink.statefun.e2e.smoke.common.Types.packSourceCommand;
2021
import static org.apache.flink.statefun.e2e.smoke.generated.Command.Verify;
2122
import static org.apache.flink.statefun.e2e.smoke.generated.Command.newBuilder;
2223

@@ -33,11 +34,11 @@
3334
import org.apache.flink.runtime.state.CheckpointListener;
3435
import org.apache.flink.runtime.state.FunctionInitializationContext;
3536
import org.apache.flink.runtime.state.FunctionSnapshotContext;
37+
import org.apache.flink.statefun.e2e.smoke.common.ModuleParameters;
3638
import org.apache.flink.statefun.e2e.smoke.generated.Command;
3739
import org.apache.flink.statefun.e2e.smoke.generated.Commands;
3840
import org.apache.flink.statefun.e2e.smoke.generated.SourceCommand;
3941
import org.apache.flink.statefun.e2e.smoke.generated.SourceSnapshot;
40-
import org.apache.flink.statefun.flink.common.types.TypedValueUtil;
4142
import org.apache.flink.statefun.sdk.reqreply.generated.TypedValue;
4243
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
4344
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
@@ -171,7 +172,7 @@ private void generate(SourceContext<TypedValue> ctx) {
171172
return;
172173
}
173174
functionStateTracker.apply(command);
174-
ctx.collect(TypedValueUtil.packProtobufMessage(command));
175+
ctx.collect(packSourceCommand(command));
175176
this.commandsSentSoFar = i;
176177
}
177178
}
@@ -191,7 +192,7 @@ private void verify(SourceContext<TypedValue> ctx) {
191192
.setCommands(Commands.newBuilder().addCommand(verify))
192193
.build();
193194
synchronized (ctx.getCheckpointLock()) {
194-
ctx.collect(TypedValueUtil.packProtobufMessage(command));
195+
ctx.collect(packSourceCommand(command));
195196
}
196197
}
197198
}
Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,17 +15,19 @@
1515
* See the License for the specific language governing permissions and
1616
* limitations under the License.
1717
*/
18-
package org.apache.flink.statefun.e2e.smoke;
18+
package org.apache.flink.statefun.e2e.smoke.driver;
1919

2020
import static java.util.Arrays.asList;
2121
import static org.apache.commons.math3.util.Pair.create;
2222

23+
import java.util.ArrayList;
2324
import java.util.List;
2425
import java.util.Objects;
2526
import java.util.function.Supplier;
2627
import org.apache.commons.math3.distribution.EnumeratedDistribution;
2728
import org.apache.commons.math3.random.RandomGenerator;
2829
import org.apache.commons.math3.util.Pair;
30+
import org.apache.flink.statefun.e2e.smoke.common.ModuleParameters;
2931
import org.apache.flink.statefun.e2e.smoke.generated.Command;
3032
import org.apache.flink.statefun.e2e.smoke.generated.Commands;
3133
import org.apache.flink.statefun.e2e.smoke.generated.SourceCommand;
@@ -75,13 +77,18 @@ private int address() {
7577
}
7678

7779
private List<Pair<Gen, Double>> randomCommandGenerators() {
78-
return asList(
79-
create(new StateModifyGen(), moduleParameters.getStateModificationsPr()),
80-
create(new SendGen(), moduleParameters.getSendPr()),
81-
create(new SendAfterGen(), moduleParameters.getSendAfterPr()),
82-
create(new SendAsyncOp(), moduleParameters.getAsyncSendPr()),
83-
create(new Noop(), moduleParameters.getNoopPr()),
84-
create(new SendEgress(), moduleParameters.getSendEgressPr()));
80+
List<Pair<Gen, Double>> list =
81+
new ArrayList<>(
82+
asList(
83+
create(new StateModifyGen(), moduleParameters.getStateModificationsPr()),
84+
create(new SendGen(), moduleParameters.getSendPr()),
85+
create(new SendAfterGen(), moduleParameters.getSendAfterPr()),
86+
create(new Noop(), moduleParameters.getNoopPr()),
87+
create(new SendEgress(), moduleParameters.getSendEgressPr())));
88+
if (moduleParameters.isAsyncOpSupported()) {
89+
list.add(create(new SendAsyncOp(), moduleParameters.getAsyncSendPr()));
90+
}
91+
return list;
8592
}
8693

8794
interface Gen {
Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,14 @@
1515
* See the License for the specific language governing permissions and
1616
* limitations under the License.
1717
*/
18-
package org.apache.flink.statefun.e2e.smoke;
18+
package org.apache.flink.statefun.e2e.smoke.driver;
19+
20+
import static org.apache.flink.statefun.e2e.smoke.common.Types.unpackSourceCommand;
1921

2022
import java.util.Objects;
23+
import org.apache.flink.statefun.e2e.smoke.common.Constants;
24+
import org.apache.flink.statefun.e2e.smoke.common.Ids;
2125
import org.apache.flink.statefun.e2e.smoke.generated.SourceCommand;
22-
import org.apache.flink.statefun.flink.common.types.TypedValueUtil;
2326
import org.apache.flink.statefun.sdk.FunctionType;
2427
import org.apache.flink.statefun.sdk.io.Router;
2528
import org.apache.flink.statefun.sdk.reqreply.generated.TypedValue;
@@ -33,8 +36,7 @@ public CommandRouter(Ids ids) {
3336

3437
@Override
3538
public void route(TypedValue command, Downstream<TypedValue> downstream) {
36-
SourceCommand sourceCommand =
37-
TypedValueUtil.unpackProtobufMessage(command, SourceCommand.parser());
39+
SourceCommand sourceCommand = unpackSourceCommand(command);
3840
FunctionType type = Constants.FN_TYPE;
3941
String id = ids.idOf(sourceCommand.getTarget());
4042
downstream.forward(type, id, command);

0 commit comments

Comments
 (0)