From 170fdfa0306d571dc6ff328a94a44f5c519c0109 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C7=B3=C3=83=C3=8E?= <1101766085@qq.com> Date: Tue, 22 Aug 2023 15:38:07 +0800 Subject: [PATCH] =?UTF-8?q?:sparkles:=20=E5=AE=8C=E5=96=84=20mica-nats=20?= =?UTF-8?q?=E6=A8=A1=E5=9D=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- mica-nats/build.gradle | 1 - .../nats/annotation/NatsStreamListener.java | 2 +- .../mica/nats/config/NatsProperties.java | 8 +- .../nats/config/NatsStreamConfiguration.java | 76 ++++------- .../nats/config/NatsStreamProperties.java | 10 +- .../nats/core/NatsStreamListenerDetector.java | 129 ++++++++++-------- .../nats/utils/StreamConfigurationUtil.java | 89 ++++++++++++ 7 files changed, 204 insertions(+), 111 deletions(-) create mode 100644 mica-nats/src/main/java/net/dreamlu/mica/nats/utils/StreamConfigurationUtil.java diff --git a/mica-nats/build.gradle b/mica-nats/build.gradle index e81318b76..4ff7e6345 100644 --- a/mica-nats/build.gradle +++ b/mica-nats/build.gradle @@ -1,7 +1,6 @@ dependencies { api project(":mica-core") api "io.nats:jnats:2.16.13" - implementation "jakarta.validation:jakarta.validation-api" implementation "org.springframework.boot:spring-boot-starter" compileOnly "org.springframework.cloud:spring-cloud-context" compileOnly "net.dreamlu:mica-auto:${micaAutoVersion}" diff --git a/mica-nats/src/main/java/net/dreamlu/mica/nats/annotation/NatsStreamListener.java b/mica-nats/src/main/java/net/dreamlu/mica/nats/annotation/NatsStreamListener.java index 7f42214da..7ff72d7ae 100644 --- a/mica-nats/src/main/java/net/dreamlu/mica/nats/annotation/NatsStreamListener.java +++ b/mica-nats/src/main/java/net/dreamlu/mica/nats/annotation/NatsStreamListener.java @@ -85,7 +85,7 @@ * * @return 最大消息数量 */ - long pendingMessageLimit() default Consumer.DEFAULT_MAX_BYTES; + long pendingMessageLimit() default Consumer.DEFAULT_MAX_MESSAGES; /** * 设置非调度推送订阅在内部(挂起)消息队列中所能容纳的最大字节数。 diff --git a/mica-nats/src/main/java/net/dreamlu/mica/nats/config/NatsProperties.java b/mica-nats/src/main/java/net/dreamlu/mica/nats/config/NatsProperties.java index b6757be81..ad5ac91fb 100644 --- a/mica-nats/src/main/java/net/dreamlu/mica/nats/config/NatsProperties.java +++ b/mica-nats/src/main/java/net/dreamlu/mica/nats/config/NatsProperties.java @@ -21,9 +21,7 @@ import lombok.Setter; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.cloud.context.config.annotation.RefreshScope; -import org.springframework.validation.annotation.Validated; -import javax.validation.constraints.NotBlank; import java.time.Duration; /** @@ -37,17 +35,15 @@ */ @Getter @Setter -@Validated @RefreshScope @ConfigurationProperties(NatsProperties.PREFIX) public class NatsProperties { public static final String PREFIX = "nats"; /** - * nats服务器的URL,可以是 , 逗号分隔的列表。 + * nats服务器的URL,可以是 , 逗号分隔的列表。默认:nats://localhost:4222 */ - @NotBlank - private String server; + private String server = Options.DEFAULT_URL; /** * 连接名称,显示在线程名称中。 diff --git a/mica-nats/src/main/java/net/dreamlu/mica/nats/config/NatsStreamConfiguration.java b/mica-nats/src/main/java/net/dreamlu/mica/nats/config/NatsStreamConfiguration.java index dd36134fc..eeedc6396 100644 --- a/mica-nats/src/main/java/net/dreamlu/mica/nats/config/NatsStreamConfiguration.java +++ b/mica-nats/src/main/java/net/dreamlu/mica/nats/config/NatsStreamConfiguration.java @@ -22,9 +22,11 @@ import io.nats.client.support.JsonUtils; import lombok.extern.slf4j.Slf4j; import net.dreamlu.mica.nats.core.NatsStreamListenerDetector; +import net.dreamlu.mica.nats.utils.StreamConfigurationUtil; import org.springframework.beans.factory.ObjectProvider; import org.springframework.boot.autoconfigure.AutoConfiguration; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Bean; import java.io.IOException; @@ -36,57 +38,35 @@ */ @Slf4j @AutoConfiguration(after = NatsConfiguration.class) +@ConditionalOnProperty( + prefix = NatsStreamProperties.PREFIX, + name = "enable", + havingValue = "true" +) @ConditionalOnClass(Options.class) public class NatsStreamConfiguration { - @Bean - public JetStream natsJetStream(Connection natsConnection, - NatsStreamProperties properties, - ObjectProvider natsStreamCustomizerObjectProvider) - throws IOException, JetStreamApiException { - StreamConfiguration.Builder streamConfigurationBuilder = StreamConfiguration.builder() - .name(properties.getName()) - .description(properties.getDescription()) - .subjects(properties.getSubjects()) - .retentionPolicy(properties.getRetentionPolicy()) - .maxConsumers(properties.getMaxConsumers()) - .maxMessages(properties.getMaxMsgs()) - .maxMessagesPerSubject(properties.getMaxMsgsPerSubject()) - .maxBytes(properties.getMaxBytes()) - .maxAge(properties.getMaxAge()) - .maxMsgSize(properties.getMaxMsgSize()) - .storageType(properties.getStorageType()) - .replicas(properties.getReplicas()) - .noAck(properties.isNoAck()) - .templateOwner(properties.getTemplateOwner()) - .discardPolicy(properties.getDiscardPolicy()) - .discardNewPerSubject(properties.isDiscardNewPerSubject()) - .duplicateWindow(properties.getDuplicateWindow()) - .allowRollup(properties.isAllowRollup()) - .allowDirect(properties.isAllowDirect()) - .denyDelete(properties.isDenyDelete()) - .denyPurge(properties.isDenyPurge()) - .metadata(properties.getMetadata()); - // 是否已封存 - if (properties.isSealed()) { - streamConfigurationBuilder.seal(); - } - // 用户自定义配置 - natsStreamCustomizerObjectProvider.orderedStream().forEach(natsOptionsCustomizer -> natsOptionsCustomizer.customize(streamConfigurationBuilder)); - // stream 配置 - StreamConfiguration streamConfiguration = streamConfigurationBuilder.build(); - // stream 流管理器 - JetStreamManagement jsm = natsConnection.jetStreamManagement(); - StreamInfo streamInfo = jsm.addStream(streamConfiguration); - // 打印 stream 信息 - log.info(JsonUtils.getFormatted(streamInfo)); - return natsConnection.jetStream(); - } + @Bean + public JetStream natsJetStream(Connection natsConnection, + NatsStreamProperties properties, + ObjectProvider natsStreamCustomizerObjectProvider) + throws IOException, JetStreamApiException { + // stream 配置 + StreamConfiguration streamConfiguration = StreamConfigurationUtil.from(properties, natsStreamCustomizerObjectProvider); + // stream 流管理器 + JetStreamManagement jsm = natsConnection.jetStreamManagement(); + StreamInfo streamInfo = jsm.addStream(streamConfiguration); + // 打印 stream 信息 + log.info(JsonUtils.getFormatted(streamInfo)); + return natsConnection.jetStream(); + } - @Bean - public NatsStreamListenerDetector natsStreamListenerDetector(Connection natsConnection, - JetStream natsJetStream) { - return new NatsStreamListenerDetector(natsConnection, natsJetStream); - } + @Bean + public NatsStreamListenerDetector natsStreamListenerDetector(NatsStreamProperties properties, + ObjectProvider natsStreamCustomizerObjectProvider, + Connection natsConnection, + JetStream natsJetStream) { + return new NatsStreamListenerDetector(properties, natsStreamCustomizerObjectProvider, natsConnection, natsJetStream); + } } diff --git a/mica-nats/src/main/java/net/dreamlu/mica/nats/config/NatsStreamProperties.java b/mica-nats/src/main/java/net/dreamlu/mica/nats/config/NatsStreamProperties.java index 007d57209..ae90b88b2 100644 --- a/mica-nats/src/main/java/net/dreamlu/mica/nats/config/NatsStreamProperties.java +++ b/mica-nats/src/main/java/net/dreamlu/mica/nats/config/NatsStreamProperties.java @@ -39,7 +39,7 @@ @RefreshScope @ConfigurationProperties(NatsStreamProperties.PREFIX) public class NatsStreamProperties { - public static final String PREFIX = "nats.stream"; + public static final String PREFIX = NatsProperties.PREFIX + ".stream"; /** * 是否开启 nats JetStream,默认为:false @@ -53,6 +53,14 @@ public class NatsStreamProperties { * 描述 */ private String description; + /** + * 消费者名称 + */ + private String consumerName; + /** + * 消费者分组 + */ + private String consumerGroup; /** * 默认订阅列表 */ diff --git a/mica-nats/src/main/java/net/dreamlu/mica/nats/core/NatsStreamListenerDetector.java b/mica-nats/src/main/java/net/dreamlu/mica/nats/core/NatsStreamListenerDetector.java index 562e709b7..a221810d0 100644 --- a/mica-nats/src/main/java/net/dreamlu/mica/nats/core/NatsStreamListenerDetector.java +++ b/mica-nats/src/main/java/net/dreamlu/mica/nats/core/NatsStreamListenerDetector.java @@ -20,7 +20,11 @@ import lombok.RequiredArgsConstructor; import net.dreamlu.mica.core.utils.Exceptions; import net.dreamlu.mica.nats.annotation.NatsStreamListener; +import net.dreamlu.mica.nats.config.NatsStreamCustomizer; +import net.dreamlu.mica.nats.config.NatsStreamProperties; +import net.dreamlu.mica.nats.utils.StreamConfigurationUtil; import org.springframework.beans.BeansException; +import org.springframework.beans.factory.ObjectProvider; import org.springframework.beans.factory.config.BeanPostProcessor; import org.springframework.core.annotation.AnnotationUtils; import org.springframework.util.Assert; @@ -37,61 +41,78 @@ */ @RequiredArgsConstructor public class NatsStreamListenerDetector implements BeanPostProcessor { - private final Connection natsConnection; - private final JetStream jetStream; + private final NatsStreamProperties properties; + private final ObjectProvider natsStreamCustomizerObjectProvider; + private final Connection natsConnection; + private final JetStream jetStream; - @Override - public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { - Class userClass = ClassUtils.getUserClass(bean); - ReflectionUtils.doWithMethods(userClass, method -> { - NatsStreamListener listener = AnnotationUtils.findAnnotation(method, NatsStreamListener.class); - if (listener != null) { - String subject = listener.value(); - Assert.hasText(subject, "@NatsStreamListener value(subject) must not be empty."); - // 消息处理器 - MessageHandler messageHandler = new DefaultMessageHandler(bean, method); - try { - jetStreamSubscribe(listener, messageHandler); - } catch (JetStreamApiException | IOException e) { - throw Exceptions.unchecked(e); - } - } - }, ReflectionUtils.USER_DECLARED_METHODS); - return bean; - } + @Override + public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { + Class userClass = ClassUtils.getUserClass(bean); + ReflectionUtils.doWithMethods(userClass, method -> { + NatsStreamListener listener = AnnotationUtils.findAnnotation(method, NatsStreamListener.class); + if (listener != null) { + String subject = listener.value(); + Assert.hasText(subject, "@NatsStreamListener value(subject) must not be empty."); + // 消息处理器 + MessageHandler messageHandler = new DefaultMessageHandler(bean, method); + try { + jetStreamSubscribe(listener, messageHandler); + } catch (JetStreamApiException | IOException e) { + throw Exceptions.unchecked(e); + } + } + }, ReflectionUtils.USER_DECLARED_METHODS); + return bean; + } + + /** + * JetStream 订阅 + * + * @param listener NatsStreamListener + * @param messageHandler MessageHandler + */ + private void jetStreamSubscribe(NatsStreamListener listener, MessageHandler messageHandler) + throws JetStreamApiException, IOException { + String subject = listener.value(); + // 调度器 + Dispatcher dispatcher = natsConnection.createDispatcher(messageHandler); + String deliverSubject = listener.deliverSubject(); + String deliverGroup = listener.deliverGroup(); + long pendingMessageLimit = listener.pendingMessageLimit(); + long pendingByteLimit = listener.pendingByteLimit(); + dispatcher.setPendingLimits(pendingMessageLimit, pendingByteLimit); + // 订阅策略 + PushSubscribeOptions.Builder optionsBuilder = PushSubscribeOptions.builder() + .pendingMessageLimit(pendingMessageLimit) + .pendingByteLimit(pendingByteLimit) + .ordered(listener.ordered()); + if (StringUtils.hasText(deliverSubject)) { + optionsBuilder.deliverSubject(deliverSubject); + } + if (StringUtils.hasText(deliverGroup)) { + optionsBuilder.deliverGroup(deliverGroup); + } + // stream 流名称 + String listenerStream = listener.stream(); + String streamName = properties.getName(); + // 判断监听器上的 Stream Name + if (StringUtils.hasText(listenerStream) && !streamName.equals(listenerStream)) { + optionsBuilder.stream(listenerStream); + JetStreamManagement jsm = natsConnection.jetStreamManagement(); + // 不是默认的流,则添加流 + jsm.addStream(StreamConfigurationUtil.from(listenerStream, properties, natsStreamCustomizerObjectProvider)); + } + // 队列 + String queue = listener.queue(); + // 是否自动 ack + boolean autoAck = listener.autoAck(); + if (StringUtils.hasText(queue)) { + jetStream.subscribe(subject, queue, dispatcher, messageHandler, autoAck, optionsBuilder.build()); + } else { + jetStream.subscribe(subject, dispatcher, messageHandler, autoAck, optionsBuilder.build()); + } + } - /** - * JetStream 订阅 - * - * @param listener NatsStreamListener - * @param messageHandler MessageHandler - */ - private void jetStreamSubscribe(NatsStreamListener listener, MessageHandler messageHandler) - throws JetStreamApiException, IOException { - String subject = listener.value(); - // 调度器 - Dispatcher dispatcher = natsConnection.createDispatcher(messageHandler); - // 订阅策略 - PushSubscribeOptions.Builder optionsBuilder = PushSubscribeOptions.builder() - .deliverSubject(listener.deliverSubject()) - .deliverGroup(listener.deliverGroup()) - .pendingByteLimit(listener.pendingByteLimit()) - .pendingMessageLimit(listener.pendingMessageLimit()) - .ordered(listener.ordered()); - // stream 流名称 - String stream = listener.stream(); - if (StringUtils.hasText(stream)) { - optionsBuilder.stream(stream); - } - // 队列 - String queue = listener.queue(); - // 是否自动 ack - boolean autoAck = listener.autoAck(); - if (StringUtils.hasText(queue)) { - jetStream.subscribe(subject, queue, dispatcher, messageHandler, autoAck, optionsBuilder.build()); - } else { - jetStream.subscribe(subject, dispatcher, messageHandler, autoAck, optionsBuilder.build()); - } - } } diff --git a/mica-nats/src/main/java/net/dreamlu/mica/nats/utils/StreamConfigurationUtil.java b/mica-nats/src/main/java/net/dreamlu/mica/nats/utils/StreamConfigurationUtil.java new file mode 100644 index 000000000..a4b3fe8d0 --- /dev/null +++ b/mica-nats/src/main/java/net/dreamlu/mica/nats/utils/StreamConfigurationUtil.java @@ -0,0 +1,89 @@ +/* + * Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & www.dreamlu.net). + *

+ * Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.gnu.org/licenses/lgpl.html + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package net.dreamlu.mica.nats.utils; + +import io.nats.client.api.StreamConfiguration; +import lombok.experimental.UtilityClass; +import net.dreamlu.mica.nats.config.NatsStreamCustomizer; +import net.dreamlu.mica.nats.config.NatsStreamProperties; +import org.springframework.beans.factory.ObjectProvider; +import org.springframework.util.StringUtils; + +/** + * nats Stream 配置 工具 + * + * @author L.cm + */ +@UtilityClass +public class StreamConfigurationUtil { + + /** + * 构造 StreamConfiguration + * + * @param properties NatsStreamProperties + * @param natsStreamCustomizerObjectProvider ObjectProvider + * @return StreamConfiguration + */ + public static StreamConfiguration from(NatsStreamProperties properties, + ObjectProvider natsStreamCustomizerObjectProvider) { + return from(null, properties, natsStreamCustomizerObjectProvider); + } + + /** + * 构造 StreamConfiguration + * + * @param streamName stream name + * @param properties NatsStreamProperties + * @param natsStreamCustomizerObjectProvider ObjectProvider + * @return StreamConfiguration + */ + public static StreamConfiguration from(String streamName, + NatsStreamProperties properties, + ObjectProvider natsStreamCustomizerObjectProvider) { + StreamConfiguration.Builder builder = StreamConfiguration.builder() + .name(StringUtils.hasText(streamName) ? streamName : properties.getName()) + .description(properties.getDescription()) + .subjects(properties.getSubjects()) + .retentionPolicy(properties.getRetentionPolicy()) + .maxConsumers(properties.getMaxConsumers()) + .maxMessages(properties.getMaxMsgs()) + .maxMessagesPerSubject(properties.getMaxMsgsPerSubject()) + .maxBytes(properties.getMaxBytes()) + .maxAge(properties.getMaxAge()) + .maxMsgSize(properties.getMaxMsgSize()) + .storageType(properties.getStorageType()) + .replicas(properties.getReplicas()) + .noAck(properties.isNoAck()) + .templateOwner(properties.getTemplateOwner()) + .discardPolicy(properties.getDiscardPolicy()) + .discardNewPerSubject(properties.isDiscardNewPerSubject()) + .duplicateWindow(properties.getDuplicateWindow()) + .allowRollup(properties.isAllowRollup()) + .allowDirect(properties.isAllowDirect()) + .denyDelete(properties.isDenyDelete()) + .denyPurge(properties.isDenyPurge()) + .metadata(properties.getMetadata()); + // 是否已封存 + if (properties.isSealed()) { + builder.seal(); + } + // 用户自定义配置 + natsStreamCustomizerObjectProvider.orderedStream().forEach(natsOptionsCustomizer -> natsOptionsCustomizer.customize(builder)); + return builder.build(); + } + +}