Skip to content

Commit

Permalink
✨ 完善 mica-nats 模块
Browse files Browse the repository at this point in the history
  • Loading branch information
li-xunhuan committed Aug 22, 2023
1 parent 2fba81d commit 170fdfa
Show file tree
Hide file tree
Showing 7 changed files with 204 additions and 111 deletions.
1 change: 0 additions & 1 deletion mica-nats/build.gradle
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
dependencies {
api project(":mica-core")
api "io.nats:jnats:2.16.13"
implementation "jakarta.validation:jakarta.validation-api"
implementation "org.springframework.boot:spring-boot-starter"
compileOnly "org.springframework.cloud:spring-cloud-context"
compileOnly "net.dreamlu:mica-auto:${micaAutoVersion}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@
*
* @return 最大消息数量
*/
long pendingMessageLimit() default Consumer.DEFAULT_MAX_BYTES;
long pendingMessageLimit() default Consumer.DEFAULT_MAX_MESSAGES;

/**
* 设置非调度推送订阅在内部(挂起)消息队列中所能容纳的最大字节数。
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@
import lombok.Setter;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.validation.annotation.Validated;

import javax.validation.constraints.NotBlank;
import java.time.Duration;

/**
Expand All @@ -37,17 +35,15 @@
*/
@Getter
@Setter
@Validated
@RefreshScope
@ConfigurationProperties(NatsProperties.PREFIX)
public class NatsProperties {
public static final String PREFIX = "nats";

/**
* nats服务器的URL,可以是 , 逗号分隔的列表。
* nats服务器的URL,可以是 , 逗号分隔的列表。默认:nats://localhost:4222
*/
@NotBlank
private String server;
private String server = Options.DEFAULT_URL;

/**
* 连接名称,显示在线程名称中。
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@
import io.nats.client.support.JsonUtils;
import lombok.extern.slf4j.Slf4j;
import net.dreamlu.mica.nats.core.NatsStreamListenerDetector;
import net.dreamlu.mica.nats.utils.StreamConfigurationUtil;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;

import java.io.IOException;
Expand All @@ -36,57 +38,35 @@
*/
@Slf4j
@AutoConfiguration(after = NatsConfiguration.class)
@ConditionalOnProperty(
prefix = NatsStreamProperties.PREFIX,
name = "enable",
havingValue = "true"
)
@ConditionalOnClass(Options.class)
public class NatsStreamConfiguration {

@Bean
public JetStream natsJetStream(Connection natsConnection,
NatsStreamProperties properties,
ObjectProvider<NatsStreamCustomizer> natsStreamCustomizerObjectProvider)
throws IOException, JetStreamApiException {
StreamConfiguration.Builder streamConfigurationBuilder = StreamConfiguration.builder()
.name(properties.getName())
.description(properties.getDescription())
.subjects(properties.getSubjects())
.retentionPolicy(properties.getRetentionPolicy())
.maxConsumers(properties.getMaxConsumers())
.maxMessages(properties.getMaxMsgs())
.maxMessagesPerSubject(properties.getMaxMsgsPerSubject())
.maxBytes(properties.getMaxBytes())
.maxAge(properties.getMaxAge())
.maxMsgSize(properties.getMaxMsgSize())
.storageType(properties.getStorageType())
.replicas(properties.getReplicas())
.noAck(properties.isNoAck())
.templateOwner(properties.getTemplateOwner())
.discardPolicy(properties.getDiscardPolicy())
.discardNewPerSubject(properties.isDiscardNewPerSubject())
.duplicateWindow(properties.getDuplicateWindow())
.allowRollup(properties.isAllowRollup())
.allowDirect(properties.isAllowDirect())
.denyDelete(properties.isDenyDelete())
.denyPurge(properties.isDenyPurge())
.metadata(properties.getMetadata());
// 是否已封存
if (properties.isSealed()) {
streamConfigurationBuilder.seal();
}
// 用户自定义配置
natsStreamCustomizerObjectProvider.orderedStream().forEach(natsOptionsCustomizer -> natsOptionsCustomizer.customize(streamConfigurationBuilder));
// stream 配置
StreamConfiguration streamConfiguration = streamConfigurationBuilder.build();
// stream 流管理器
JetStreamManagement jsm = natsConnection.jetStreamManagement();
StreamInfo streamInfo = jsm.addStream(streamConfiguration);
// 打印 stream 信息
log.info(JsonUtils.getFormatted(streamInfo));
return natsConnection.jetStream();
}
@Bean
public JetStream natsJetStream(Connection natsConnection,
NatsStreamProperties properties,
ObjectProvider<NatsStreamCustomizer> natsStreamCustomizerObjectProvider)
throws IOException, JetStreamApiException {
// stream 配置
StreamConfiguration streamConfiguration = StreamConfigurationUtil.from(properties, natsStreamCustomizerObjectProvider);
// stream 流管理器
JetStreamManagement jsm = natsConnection.jetStreamManagement();
StreamInfo streamInfo = jsm.addStream(streamConfiguration);
// 打印 stream 信息
log.info(JsonUtils.getFormatted(streamInfo));
return natsConnection.jetStream();
}

@Bean
public NatsStreamListenerDetector natsStreamListenerDetector(Connection natsConnection,
JetStream natsJetStream) {
return new NatsStreamListenerDetector(natsConnection, natsJetStream);
}
@Bean
public NatsStreamListenerDetector natsStreamListenerDetector(NatsStreamProperties properties,
ObjectProvider<NatsStreamCustomizer> natsStreamCustomizerObjectProvider,
Connection natsConnection,
JetStream natsJetStream) {
return new NatsStreamListenerDetector(properties, natsStreamCustomizerObjectProvider, natsConnection, natsJetStream);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
@RefreshScope
@ConfigurationProperties(NatsStreamProperties.PREFIX)
public class NatsStreamProperties {
public static final String PREFIX = "nats.stream";
public static final String PREFIX = NatsProperties.PREFIX + ".stream";

/**
* 是否开启 nats JetStream,默认为:false
Expand All @@ -53,6 +53,14 @@ public class NatsStreamProperties {
* 描述
*/
private String description;
/**
* 消费者名称
*/
private String consumerName;
/**
* 消费者分组
*/
private String consumerGroup;
/**
* 默认订阅列表
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,11 @@
import lombok.RequiredArgsConstructor;
import net.dreamlu.mica.core.utils.Exceptions;
import net.dreamlu.mica.nats.annotation.NatsStreamListener;
import net.dreamlu.mica.nats.config.NatsStreamCustomizer;
import net.dreamlu.mica.nats.config.NatsStreamProperties;
import net.dreamlu.mica.nats.utils.StreamConfigurationUtil;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.util.Assert;
Expand All @@ -37,61 +41,78 @@
*/
@RequiredArgsConstructor
public class NatsStreamListenerDetector implements BeanPostProcessor {
private final Connection natsConnection;
private final JetStream jetStream;
private final NatsStreamProperties properties;
private final ObjectProvider<NatsStreamCustomizer> natsStreamCustomizerObjectProvider;
private final Connection natsConnection;
private final JetStream jetStream;

@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
Class<?> userClass = ClassUtils.getUserClass(bean);
ReflectionUtils.doWithMethods(userClass, method -> {
NatsStreamListener listener = AnnotationUtils.findAnnotation(method, NatsStreamListener.class);
if (listener != null) {
String subject = listener.value();
Assert.hasText(subject, "@NatsStreamListener value(subject) must not be empty.");
// 消息处理器
MessageHandler messageHandler = new DefaultMessageHandler(bean, method);
try {
jetStreamSubscribe(listener, messageHandler);
} catch (JetStreamApiException | IOException e) {
throw Exceptions.unchecked(e);
}
}
}, ReflectionUtils.USER_DECLARED_METHODS);
return bean;
}
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
Class<?> userClass = ClassUtils.getUserClass(bean);
ReflectionUtils.doWithMethods(userClass, method -> {
NatsStreamListener listener = AnnotationUtils.findAnnotation(method, NatsStreamListener.class);
if (listener != null) {
String subject = listener.value();
Assert.hasText(subject, "@NatsStreamListener value(subject) must not be empty.");
// 消息处理器
MessageHandler messageHandler = new DefaultMessageHandler(bean, method);
try {
jetStreamSubscribe(listener, messageHandler);
} catch (JetStreamApiException | IOException e) {
throw Exceptions.unchecked(e);
}
}
}, ReflectionUtils.USER_DECLARED_METHODS);
return bean;
}

/**
* JetStream 订阅
*
* @param listener NatsStreamListener
* @param messageHandler MessageHandler
*/
private void jetStreamSubscribe(NatsStreamListener listener, MessageHandler messageHandler)
throws JetStreamApiException, IOException {
String subject = listener.value();
// 调度器
Dispatcher dispatcher = natsConnection.createDispatcher(messageHandler);
String deliverSubject = listener.deliverSubject();
String deliverGroup = listener.deliverGroup();
long pendingMessageLimit = listener.pendingMessageLimit();
long pendingByteLimit = listener.pendingByteLimit();
dispatcher.setPendingLimits(pendingMessageLimit, pendingByteLimit);
// 订阅策略
PushSubscribeOptions.Builder optionsBuilder = PushSubscribeOptions.builder()
.pendingMessageLimit(pendingMessageLimit)
.pendingByteLimit(pendingByteLimit)
.ordered(listener.ordered());
if (StringUtils.hasText(deliverSubject)) {
optionsBuilder.deliverSubject(deliverSubject);
}
if (StringUtils.hasText(deliverGroup)) {
optionsBuilder.deliverGroup(deliverGroup);
}
// stream 流名称
String listenerStream = listener.stream();
String streamName = properties.getName();
// 判断监听器上的 Stream Name
if (StringUtils.hasText(listenerStream) && !streamName.equals(listenerStream)) {
optionsBuilder.stream(listenerStream);
JetStreamManagement jsm = natsConnection.jetStreamManagement();
// 不是默认的流,则添加流
jsm.addStream(StreamConfigurationUtil.from(listenerStream, properties, natsStreamCustomizerObjectProvider));
}
// 队列
String queue = listener.queue();
// 是否自动 ack
boolean autoAck = listener.autoAck();
if (StringUtils.hasText(queue)) {
jetStream.subscribe(subject, queue, dispatcher, messageHandler, autoAck, optionsBuilder.build());
} else {
jetStream.subscribe(subject, dispatcher, messageHandler, autoAck, optionsBuilder.build());
}
}

/**
* JetStream 订阅
*
* @param listener NatsStreamListener
* @param messageHandler MessageHandler
*/
private void jetStreamSubscribe(NatsStreamListener listener, MessageHandler messageHandler)
throws JetStreamApiException, IOException {
String subject = listener.value();
// 调度器
Dispatcher dispatcher = natsConnection.createDispatcher(messageHandler);
// 订阅策略
PushSubscribeOptions.Builder optionsBuilder = PushSubscribeOptions.builder()
.deliverSubject(listener.deliverSubject())
.deliverGroup(listener.deliverGroup())
.pendingByteLimit(listener.pendingByteLimit())
.pendingMessageLimit(listener.pendingMessageLimit())
.ordered(listener.ordered());
// stream 流名称
String stream = listener.stream();
if (StringUtils.hasText(stream)) {
optionsBuilder.stream(stream);
}
// 队列
String queue = listener.queue();
// 是否自动 ack
boolean autoAck = listener.autoAck();
if (StringUtils.hasText(queue)) {
jetStream.subscribe(subject, queue, dispatcher, messageHandler, autoAck, optionsBuilder.build());
} else {
jetStream.subscribe(subject, dispatcher, messageHandler, autoAck, optionsBuilder.build());
}
}

}
Loading

0 comments on commit 170fdfa

Please sign in to comment.