Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #4174]Support to subscribe message by tags when using RocketMQ storage plugin. #4175

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public class Constants {

public static final String PROPERTY_MESSAGE_STORE_TIMESTAMP = "storetimestamp";

public static final String MESSAGE_PROP_SEPARATOR = "99";
public static final String MESSAGE_PROP_SEPARATOR = "95";

public static final String EVENTMESH_CONF_HOME = System.getProperty("confPath", System.getenv("confPath"));

Expand Down Expand Up @@ -194,6 +194,8 @@ public class Constants {
*/
public static final String CONTENT_TYPE_CLOUDEVENTS_JSON = "application/cloudevents+json";

public static final String MSG_TAG = "tags";

public static final String HTTP = "HTTP";

public static final String TCP = "TCP";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@

private String topic;

private String subExpression;

@JsonDeserialize(converter = SubscriptionModeConverter.class)
private SubscriptionMode mode;

Expand All @@ -43,6 +45,13 @@
this.type = type;
}

public SubscriptionItem(String topic, SubscriptionMode mode, SubscriptionType type, String subExpression) {
this.topic = topic;
this.mode = mode;
this.type = type;
this.subExpression = subExpression;
}

Check warning on line 53 in eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/SubscriptionItem.java

View check run for this annotation

Codecov / codecov/patch

eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/SubscriptionItem.java#L48-L53

Added lines #L48 - L53 were not covered by tests

public SubscriptionType getType() {
return type;
}
Expand All @@ -67,12 +76,21 @@
this.mode = mode;
}

public String getSubExpression() {
return subExpression;

Check warning on line 80 in eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/SubscriptionItem.java

View check run for this annotation

Codecov / codecov/patch

eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/SubscriptionItem.java#L80

Added line #L80 was not covered by tests
}

public void setSubExpression(String subExpression) {
this.subExpression = subExpression;
}

Check warning on line 85 in eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/SubscriptionItem.java

View check run for this annotation

Codecov / codecov/patch

eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/SubscriptionItem.java#L84-L85

Added lines #L84 - L85 were not covered by tests

@Override
public String toString() {
return "SubscriptionItem{"
+ "topic=" + topic
+ ", mode=" + mode
+ ", type=" + type
+ ", subExpression=" + subExpression
+ '}';
}

Expand All @@ -85,11 +103,11 @@
return false;
}
SubscriptionItem that = (SubscriptionItem) o;
return Objects.equal(topic, that.topic) && mode == that.mode && type == that.type;
return Objects.equal(topic, that.topic) && mode == that.mode && type == that.type && Objects.equal(subExpression, that.subExpression);
}

@Override
public int hashCode() {
return Objects.hashCode(topic, mode, type);
return Objects.hashCode(topic, mode, type, subExpression);

Check warning on line 111 in eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/SubscriptionItem.java

View check run for this annotation

Codecov / codecov/patch

eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/SubscriptionItem.java#L111

Added line #L111 was not covered by tests
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,10 @@ public class ExampleConstants {
public static final String EVENTMESH_HTTP_ASYNC_TEST_TOPIC = "TEST-TOPIC-HTTP-ASYNC";
public static final String EVENTMESH_HTTP_SYNC_TEST_TOPIC = "TEST-TOPIC-HTTP-SYNC";
public static final String EVENTMESH_TCP_ASYNC_TEST_TOPIC = "TEST-TOPIC-TCP-ASYNC";
public static final String EVENTMESH_TCP_ASYNC_TEST_TOPIC_TAG = "TEST-TOPIC-TCP-ASYNC-TAG";
public static final String EVENTMESH_TCP_SYNC_TEST_TOPIC = "TEST-TOPIC-TCP-SYNC";
public static final String EVENTMESH_TCP_BROADCAST_TEST_TOPIC = "TEST-TOPIC-TCP-BROADCAST";
public static final String EVENTMESH_TCP_BROADCAST_TEST_TOPIC_TAG = "TEST-TOPIC-TCP-BROADCAST-TAG";

public static final String DEFAULT_EVENTMESH_TEST_PRODUCER_GROUP = "EventMeshTest-producerGroup";
public static final String DEFAULT_EVENTMESH_TEST_CONSUMER_GROUP = "EventMeshTest-consumerGroup";
Expand All @@ -56,4 +58,6 @@ public class ExampleConstants {
public static final String IDC = "FT";
public static final String SUB_SYS = "1234";
public static final String SERVER_PORT = "server.port";

public static final String TAG_PREFIX = "TEST_MSG_TAG_";
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import org.apache.eventmesh.common.protocol.tcp.UserAgent;
import org.apache.eventmesh.common.utils.JsonUtils;

import org.apache.commons.lang3.StringUtils;

import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
Expand Down Expand Up @@ -105,38 +107,47 @@ public static Package rrResponse(final EventMeshMessage request) {
}

public static EventMeshMessage getEventMeshMessage(String eventMeshTcpSyncTestTopic, String msgType, String msg,
String keys, String keyMsg, String testMessage) {
String keys, String keyMsg, String testMessage, String tag) {
final EventMeshMessage mqmsg = new EventMeshMessage();
mqmsg.setTopic(eventMeshTcpSyncTestTopic);
mqmsg.getProperties().put(msgType, msg);
mqmsg.getProperties().put(UtilsConstants.TTL, DEFAULT_TTL_MS);
mqmsg.getProperties().put(keys, keyMsg);
if (StringUtils.isNotBlank(tag)) {
mqmsg.getProperties().put(Constants.MSG_TAG, tag);
}
mqmsg.getHeaders().put(Constants.DATA_CONTENT_TYPE, "text/plain");
mqmsg.setBody(testMessage);
return mqmsg;
}

public static EventMeshMessage generateSyncRRMqMsg() {
return getEventMeshMessage(ExampleConstants.EVENTMESH_TCP_SYNC_TEST_TOPIC, UtilsConstants.MSG_TYPE,
"persistent", UtilsConstants.KEYS, generateRandomString(16), "testSyncRR");
"persistent", UtilsConstants.KEYS, generateRandomString(16), "testSyncRR", null);
}

private static EventMeshMessage generateAsyncRRMqMsg() {
return getEventMeshMessage(ExampleConstants.EVENTMESH_TCP_SYNC_TEST_TOPIC, UtilsConstants.REPLY_TO,
"localhost@ProducerGroup-producerPool-9-access#V1_4_0#CI", UtilsConstants.PROPERTY_MESSAGE_REPLY_TO,
"notnull", "testAsyncRR");
"notnull", "testAsyncRR", null);
}

public static EventMeshMessage generateAsyncEventMqMsg() {
return getEventMeshMessage(ExampleConstants.EVENTMESH_TCP_ASYNC_TEST_TOPIC, UtilsConstants.REPLY_TO,
"localhost@ProducerGroup-producerPool-9-access#V1_4_0#CI", UtilsConstants.PROPERTY_MESSAGE_REPLY_TO,
"notnull", ASYNC_MSG_BODY);
"notnull", ASYNC_MSG_BODY, null);
}

public static EventMeshMessage generateBroadcastMqMsg() {
return getEventMeshMessage(ExampleConstants.EVENTMESH_TCP_ASYNC_TEST_TOPIC, UtilsConstants.REPLY_TO,
public static EventMeshMessage generateAsyncEventMqMsgWithTag(int i) {
return getEventMeshMessage(ExampleConstants.EVENTMESH_TCP_ASYNC_TEST_TOPIC_TAG, UtilsConstants.REPLY_TO,
"localhost@ProducerGroup-producerPool-9-access#V1_4_0#CI", UtilsConstants.PROPERTY_MESSAGE_REPLY_TO,
"notnull", ASYNC_MSG_BODY);
"notnull", ASYNC_MSG_BODY, ExampleConstants.TAG_PREFIX + i);
}

public static EventMeshMessage generateBroadcastMqMsg() {
return getEventMeshMessage(ExampleConstants.EVENTMESH_TCP_BROADCAST_TEST_TOPIC, UtilsConstants.REPLY_TO,
"localhost@ProducerGroup-producerPool-9-access#V1_4_0#CI", UtilsConstants.PROPERTY_MESSAGE_REPLY_TO,
"notnull", ASYNC_MSG_BODY, null);
}

private static String generateRandomString(final int length) {
Expand All @@ -162,6 +173,22 @@ public static CloudEvent generateCloudEventV1Async() {
.build();
}

public static CloudEvent generateCloudEventV1AsyncWithTag(int i) {
final Map<String, String> content = new HashMap<>();
content.put(UtilsConstants.CONTENT, ASYNC_MSG_BODY);

return CloudEventBuilder.v1()
.withId(UUID.randomUUID().toString())
.withSubject(ExampleConstants.EVENTMESH_TCP_ASYNC_TEST_TOPIC_TAG)
.withSource(URI.create("/"))
.withDataContentType(ExampleConstants.CLOUDEVENT_CONTENT_TYPE)
.withType(CLOUD_EVENTS_PROTOCOL_NAME)
.withData(Objects.requireNonNull(JsonUtils.toJSONString(content)).getBytes(Constants.DEFAULT_CHARSET))
.withExtension(UtilsConstants.TTL, DEFAULT_TTL_MS)
.withExtension(Constants.MSG_TAG, ExampleConstants.TAG_PREFIX + i)
.build();
}

public static CloudEvent generateCloudEventV1SyncRR() {
final Map<String, String> content = new HashMap<>();
content.put(UtilsConstants.CONTENT, "testSyncRR");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,16 @@ public static void main(String[] args) throws Exception {

ThreadUtils.sleep(1, TimeUnit.SECONDS);
}

for (int i = 0; i < 4; i++) {
CloudEvent event = EventMeshTestUtils.generateCloudEventV1AsyncWithTag(i);
if (log.isInfoEnabled()) {
log.info("begin send async msg[{}] with tag: {}", i, event);
}
client.publish(event, EventMeshCommon.DEFAULT_TIME_OUT_MILLS);

ThreadUtils.sleep(1, TimeUnit.SECONDS);
}
ThreadUtils.sleep(2, TimeUnit.SECONDS);
} catch (Exception e) {
log.error("AsyncPublish failed", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.eventmesh.tcp.common.EventMeshTestUtils;
import org.apache.eventmesh.util.Utils;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Properties;

Expand All @@ -42,6 +43,15 @@
public class SyncRequest {

public static void main(String[] args) throws Exception {
EventMeshTCPClient<CloudEvent> client = createClient();
try {
publishMsg(client);
} catch (Exception e) {
log.error("SyncRequest failed", e);
}
}

private static EventMeshTCPClient<CloudEvent> createClient() throws IOException {
final Properties properties = Utils.readPropertiesFile(ExampleConstants.CONFIG_FILE_NAME);
final String eventMeshIp = properties.getProperty(ExampleConstants.EVENTMESH_IP);
final int eventMeshTcpPort = Integer.parseInt(properties.getProperty(ExampleConstants.EVENTMESH_TCP_PORT));
Expand All @@ -51,40 +61,39 @@ public static void main(String[] args) throws Exception {
.port(eventMeshTcpPort)
.userAgent(userAgent)
.build();
final EventMeshTCPClient<CloudEvent> client = EventMeshTCPClientFactory.createEventMeshTCPClient(
eventMeshTcpClientConfig, CloudEvent.class);
client.init();
return client;
}

try {

final EventMeshTCPClient<CloudEvent> client = EventMeshTCPClientFactory.createEventMeshTCPClient(
eventMeshTcpClientConfig, CloudEvent.class);
client.init();

final CloudEvent event = EventMeshTestUtils.generateCloudEventV1SyncRR();

log.info("begin send rr msg: {}", event);

final Package response = client.rr(event, EventMeshCommon.DEFAULT_TIME_OUT_MILLS);
// check-NPE EventFormat
final EventFormat eventFormat = EventFormatProvider.getInstance().resolveFormat(JsonFormat.CONTENT_TYPE);
if (null == eventFormat) {
log.error("eventFormat is null. end the process");
return;
}
private static void publishMsg(EventMeshTCPClient<CloudEvent> client) {
final CloudEvent event = EventMeshTestUtils.generateCloudEventV1SyncRR();
log.info("begin send req-resp msg: {}", event);

final CloudEvent replyEvent = eventFormat
.deserialize(response.getBody().toString().getBytes(StandardCharsets.UTF_8));
final Package response = client.rr(event, EventMeshCommon.DEFAULT_TIME_OUT_MILLS);
logResponse(response);
}

// check-NPE CloudEventData
final CloudEventData cloudEventData = replyEvent.getData();
if (null == cloudEventData) {
log.error("replyEvent.data is null. end the process");
return;
}
private static void logResponse(Package response) {
// check-NPE EventFormat
final EventFormat eventFormat = EventFormatProvider.getInstance().resolveFormat(JsonFormat.CONTENT_TYPE);
if (null == eventFormat) {
log.error("eventFormat is null. end the process");
return;
}

final String content = new String(cloudEventData.toBytes(), StandardCharsets.UTF_8);
log.info("receive rr reply: {}|{}", response, content);
final CloudEvent replyEvent = eventFormat
.deserialize(response.getBody().toString().getBytes(StandardCharsets.UTF_8));

} catch (Exception e) {
log.error("SyncRequest failed", e);
// check-NPE CloudEventData
final CloudEventData cloudEventData = replyEvent.getData();
if (null == cloudEventData) {
log.error("replyEvent.data is null. end the process");
return;
}

final String content = new String(cloudEventData.toBytes(), StandardCharsets.UTF_8);
log.info("receive rr reply, response: {}, response's content: {}", response, content);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,17 @@ public static void main(String[] args) throws Exception {

ThreadUtils.sleep(1, TimeUnit.SECONDS);
}

for (int i = 0; i < 4; i++) {
final EventMeshMessage eventMeshMessage = EventMeshTestUtils.generateAsyncEventMqMsgWithTag(i);

if (log.isInfoEnabled()) {
log.info("begin send async msg[{}] with tag, msg: {}", i, eventMeshMessage);
}
client.publish(eventMeshMessage, EventMeshCommon.DEFAULT_TIME_OUT_MILLS);

ThreadUtils.sleep(1, TimeUnit.SECONDS);
}
ThreadUtils.sleep(2, TimeUnit.SECONDS);
} catch (Exception e) {
log.error("AsyncPublish failed", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.eventmesh.client.tcp.EventMeshTCPClientFactory;
import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook;
import org.apache.eventmesh.client.tcp.conf.EventMeshTCPClientConfig;
import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.common.ExampleConstants;
import org.apache.eventmesh.common.protocol.SubscriptionMode;
import org.apache.eventmesh.common.protocol.SubscriptionType;
Expand All @@ -29,6 +30,7 @@
import org.apache.eventmesh.util.Utils;

import java.nio.charset.StandardCharsets;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;

Expand Down Expand Up @@ -56,6 +58,9 @@ public static void main(String[] args) throws Exception {

client.subscribe(ExampleConstants.EVENTMESH_TCP_ASYNC_TEST_TOPIC, SubscriptionMode.CLUSTERING,
SubscriptionType.ASYNC);
client.subscribe(ExampleConstants.EVENTMESH_TCP_ASYNC_TEST_TOPIC_TAG,
ExampleConstants.TAG_PREFIX + 1 + "||" + ExampleConstants.TAG_PREFIX + 3,
SubscriptionMode.CLUSTERING, SubscriptionType.ASYNC);
client.registerSubBusiHandler(new AsyncSubscribe());

client.listen();
Expand All @@ -72,7 +77,11 @@ public Optional<CloudEvent> handle(final CloudEvent msg) {
}

final String content = new String(msg.getData().toBytes(), StandardCharsets.UTF_8);
log.info("receive async msg: {}|{}", msg, content);
if (Objects.nonNull(msg.getExtension(Constants.MSG_TAG))) {
log.info("receive async msg, msg:{}, msg's data:{}, tag:{}", msg, content, msg.getExtension(Constants.MSG_TAG));
} else {
log.info("receive async msg, msg:{}, msg's data:{}", msg, content);
}
return Optional.empty();
}
}
Loading
Loading