From 7b301428d1a109c2226365c10de860939d24f438 Mon Sep 17 00:00:00 2001 From: Clement Escoffier Date: Wed, 3 Apr 2019 15:37:03 +0100 Subject: [PATCH 1/5] Update Reactive Messaging and Axle client versions Rework the emitter support --- bom/pom.xml | 4 +- build-parent/pom.xml | 2 +- .../deployment/EmitterBuildItem.java | 50 +++++++++++++++++++ .../SmallRyeReactiveMessagingProcessor.java | 23 +++++++-- .../SmallRyeReactiveMessagingTemplate.java | 4 +- 5 files changed, 76 insertions(+), 7 deletions(-) create mode 100644 extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/EmitterBuildItem.java diff --git a/bom/pom.xml b/bom/pom.xml index badd99eb1adf6..218ebe6be2785 100644 --- a/bom/pom.xml +++ b/bom/pom.xml @@ -53,7 +53,7 @@ 1.1.1 1.0.3 1.0.3 - 0.0.5 + 0.0.7 1.2.2 3.20.9 1.1.1 @@ -129,7 +129,7 @@ 1.0.2 1.10.7 3.3.2.Final - 0.0.3 + 0.0.4 1.1.0 1.1.0 0.9.2.Final diff --git a/build-parent/pom.xml b/build-parent/pom.xml index a408724ea022a..505d8349ed35d 100644 --- a/build-parent/pom.xml +++ b/build-parent/pom.xml @@ -41,7 +41,7 @@ what we work with by self downloading it: --> 1.0.0-rc14 3.3.0 - 0.0.2 + 0.0.4 3.6.3 diff --git a/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/EmitterBuildItem.java b/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/EmitterBuildItem.java new file mode 100644 index 0000000000000..3a9a264917d9b --- /dev/null +++ b/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/EmitterBuildItem.java @@ -0,0 +1,50 @@ +/* + * Copyright 2019 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.quarkus.smallrye.reactivemessaging.deployment; + +import org.jboss.builder.item.MultiBuildItem; +import org.jboss.jandex.FieldInfo; +import org.jboss.jandex.MethodInfo; + +import io.quarkus.arc.processor.BeanInfo; + +public final class EmitterBuildItem extends MultiBuildItem { + + private final BeanInfo bean; + + private final FieldInfo field; + + private final String name; + + public EmitterBuildItem(BeanInfo bean, FieldInfo field, String name) { + this.bean = bean; + this.field = field; + this.name = name; + } + + public BeanInfo getBean() { + return bean; + } + + public FieldInfo getField() { + return field; + } + + public String getName() { + return name; + } + +} diff --git a/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/SmallRyeReactiveMessagingProcessor.java b/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/SmallRyeReactiveMessagingProcessor.java index b72a2dc328ee1..536ea9a7b7824 100644 --- a/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/SmallRyeReactiveMessagingProcessor.java +++ b/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/SmallRyeReactiveMessagingProcessor.java @@ -22,11 +22,11 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import org.eclipse.microprofile.reactive.messaging.Incoming; import org.eclipse.microprofile.reactive.messaging.Outgoing; -import org.jboss.jandex.DotName; -import org.jboss.jandex.MethodInfo; +import org.jboss.jandex.*; import org.jboss.logging.Logger; import io.quarkus.arc.deployment.AdditionalBeanBuildItem; @@ -44,6 +44,8 @@ import io.quarkus.deployment.builditem.substrate.ReflectiveClassBuildItem; import io.quarkus.smallrye.reactivemessaging.runtime.SmallRyeReactiveMessagingLifecycle; import io.quarkus.smallrye.reactivemessaging.runtime.SmallRyeReactiveMessagingTemplate; +import io.smallrye.reactive.messaging.annotations.Emitter; +import io.smallrye.reactive.messaging.annotations.Stream; /** * @@ -55,6 +57,7 @@ public class SmallRyeReactiveMessagingProcessor { static final DotName NAME_INCOMING = DotName.createSimple(Incoming.class.getName()); static final DotName NAME_OUTGOING = DotName.createSimple(Outgoing.class.getName()); + static final DotName NAME_STREAM = DotName.createSimple(Stream.class.getName()); @BuildStep AdditionalBeanBuildItem beans() { @@ -63,6 +66,7 @@ AdditionalBeanBuildItem beans() { @BuildStep BeanDeploymentValidatorBuildItem beanDeploymentValidator(BuildProducer mediatorMethods, + BuildProducer emitterFields, BuildProducer feature) { feature.produce(new FeatureBuildItem(FeatureBuildItem.SMALLRYE_REACTIVE_MESSAGING)); @@ -89,6 +93,17 @@ public void validate(ValidationContext validationContext) { LOGGER.debugf("Found mediator business method %s declared on %s", method, bean); } } + + for (FieldInfo field : bean.getTarget().get().asClass().fields()) { + if (annotationStore.hasAnnotation(field, NAME_STREAM)) { + if (field.type().name().equals( + DotName.createSimple(Emitter.class.getName()))) { + String name = annotationStore.getAnnotation(field, NAME_STREAM).value().asString(); + LOGGER.debugf("Emitter field '%s' detected, stream name: '%s'", field.name(), name); + emitterFields.produce(new EmitterBuildItem(bean, field, name)); + } + } + } } } } @@ -105,6 +120,7 @@ public List removalExclusions() { @Record(STATIC_INIT) public void build(SmallRyeReactiveMessagingTemplate template, BeanContainerBuildItem beanContainer, List mediatorMethods, + List emitterFields, BuildProducer reflectiveClass) { /* * IMPLEMENTATION NOTE/FUTURE IMPROVEMENTS: It would be possible to replace the reflection completely and use Jandex and @@ -124,7 +140,8 @@ public void build(SmallRyeReactiveMessagingTemplate template, BeanContainerBuild .getIdentifier()); } } - template.registerMediators(beanClassToBeanId, beanContainer.getValue()); + template.registerMediators(beanClassToBeanId, beanContainer.getValue(), + emitterFields.stream().map(EmitterBuildItem::getName).collect(Collectors.toList())); } } diff --git a/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/SmallRyeReactiveMessagingTemplate.java b/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/SmallRyeReactiveMessagingTemplate.java index b1c14a8e115b4..da07fb712945b 100644 --- a/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/SmallRyeReactiveMessagingTemplate.java +++ b/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/SmallRyeReactiveMessagingTemplate.java @@ -1,5 +1,6 @@ package io.quarkus.smallrye.reactivemessaging.runtime; +import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -15,9 +16,10 @@ @Template public class SmallRyeReactiveMessagingTemplate { - public void registerMediators(Map beanClassToBeanId, BeanContainer container) { + public void registerMediators(Map beanClassToBeanId, BeanContainer container, List emitters) { // Extract the configuration and register mediators MediatorManager mediatorManager = container.instance(MediatorManager.class); + mediatorManager.initializeEmitters(emitters); for (Entry entry : beanClassToBeanId.entrySet()) { try { Class beanClass = Thread.currentThread() From 355bf7e97445785780d3079a5e9f3856d6d41cc0 Mon Sep 17 00:00:00 2001 From: Clement Escoffier Date: Thu, 4 Apr 2019 07:59:11 +0100 Subject: [PATCH 2/5] Add an admonition to mention the new `blocking` attribute of `@ConsumeEvent` --- docs/src/main/asciidoc/async-message-passing.adoc | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/docs/src/main/asciidoc/async-message-passing.adoc b/docs/src/main/asciidoc/async-message-passing.adoc index 5f6d858ad5b17..85602cae0bd57 100644 --- a/docs/src/main/asciidoc/async-message-passing.adoc +++ b/docs/src/main/asciidoc/async-message-passing.adoc @@ -69,8 +69,19 @@ public class GreetingService { <1> If not set, the address is the fully qualified name of the bean, for instance, in this snippet it's `org.acme.vertx.GreetingService`. <2> The method parameter is the message body. If the method returns _something_ it's the message response. -// TODO Revisit this once the blocking PR is integrated - https://github.com/quarkusio/quarkus/pull/1646 -IMPORTANT: The code consuming the event must be _non-blocking_, as it's called on the Vert.x event loop. +[IMPORTANT] +==== +By default, the code consuming the event must be _non-blocking_, as it's called on the Vert.x event loop. +If your processing is blocking, use the `blocking` arttribute: + +[source, java] +---- +@ConsumeEvent(value = "blocking-consumer", blocking = true) +void consumeBlocking(String message) { + // Something blocking +} +---- +==== === Configuring the address From ef2a84afecb9d102fead871be7ff742e0351b298 Mon Sep 17 00:00:00 2001 From: Clement Escoffier Date: Thu, 4 Apr 2019 07:59:35 +0100 Subject: [PATCH 3/5] Use the new Axle method to retrieve a PublisherBuilder directly --- docs/src/main/asciidoc/using-vertx.adoc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/src/main/asciidoc/using-vertx.adoc b/docs/src/main/asciidoc/using-vertx.adoc index ccb68c83129bb..ddf13a049f878 100644 --- a/docs/src/main/asciidoc/using-vertx.adoc +++ b/docs/src/main/asciidoc/using-vertx.adoc @@ -217,7 +217,7 @@ The first approach can be implemented as follows: @Produces(MediaType.SERVER_SENT_EVENTS) @Path("{name}/streaming") public Publisher greeting(@PathParam("name") String name) { - return ReactiveStreams.fromPublisher(vertx.periodicStream(2000).toPublisher()) + return vertx.periodicStream(2000).toPublisherBuilder() .map(l -> String.format("Hello %s! (%s)%n", name, new Date())) .buildRs(); } From d79e5d236efd25acd38cfc82b1822d99f1e3308d Mon Sep 17 00:00:00 2001 From: Clement Escoffier Date: Thu, 4 Apr 2019 10:28:28 +0100 Subject: [PATCH 4/5] Remove useless field in the EmitterBuildItem Use constants for DOTNAME --- .../deployment/EmitterBuildItem.java | 19 ++++--------------- .../SmallRyeReactiveMessagingProcessor.java | 15 +++++++-------- 2 files changed, 11 insertions(+), 23 deletions(-) diff --git a/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/EmitterBuildItem.java b/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/EmitterBuildItem.java index 3a9a264917d9b..15ab4d8a439b6 100644 --- a/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/EmitterBuildItem.java +++ b/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/EmitterBuildItem.java @@ -23,26 +23,15 @@ public final class EmitterBuildItem extends MultiBuildItem { - private final BeanInfo bean; - - private final FieldInfo field; - + /** + * The name of the stream the emitter is connected to. + */ private final String name; - public EmitterBuildItem(BeanInfo bean, FieldInfo field, String name) { - this.bean = bean; - this.field = field; + public EmitterBuildItem(String name) { this.name = name; } - public BeanInfo getBean() { - return bean; - } - - public FieldInfo getField() { - return field; - } - public String getName() { return name; } diff --git a/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/SmallRyeReactiveMessagingProcessor.java b/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/SmallRyeReactiveMessagingProcessor.java index 536ea9a7b7824..1e247e3abdd77 100644 --- a/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/SmallRyeReactiveMessagingProcessor.java +++ b/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/SmallRyeReactiveMessagingProcessor.java @@ -58,6 +58,7 @@ public class SmallRyeReactiveMessagingProcessor { static final DotName NAME_INCOMING = DotName.createSimple(Incoming.class.getName()); static final DotName NAME_OUTGOING = DotName.createSimple(Outgoing.class.getName()); static final DotName NAME_STREAM = DotName.createSimple(Stream.class.getName()); + static final DotName NAME_EMITTER = DotName.createSimple(Emitter.class.getName()); @BuildStep AdditionalBeanBuildItem beans() { @@ -82,10 +83,9 @@ public void validate(ValidationContext validationContext) { for (BeanInfo bean : validationContext.get(Key.BEANS)) { if (bean.isClassBean()) { // TODO: add support for inherited business methods - for (MethodInfo method : bean.getTarget() - .get() - .asClass() - .methods()) { + ClassInfo ci = bean.getTarget() + .orElseThrow(() -> new IllegalStateException("Target expected")).asClass(); + for (MethodInfo method : ci.methods()) { if (annotationStore.hasAnnotation(method, NAME_INCOMING) || annotationStore.hasAnnotation(method, NAME_OUTGOING)) { // TODO: validate method params and return type? @@ -94,13 +94,12 @@ public void validate(ValidationContext validationContext) { } } - for (FieldInfo field : bean.getTarget().get().asClass().fields()) { + for (FieldInfo field : ci.fields()) { if (annotationStore.hasAnnotation(field, NAME_STREAM)) { - if (field.type().name().equals( - DotName.createSimple(Emitter.class.getName()))) { + if (field.type().name().equals(NAME_EMITTER)) { String name = annotationStore.getAnnotation(field, NAME_STREAM).value().asString(); LOGGER.debugf("Emitter field '%s' detected, stream name: '%s'", field.name(), name); - emitterFields.produce(new EmitterBuildItem(bean, field, name)); + emitterFields.produce(new EmitterBuildItem(name)); } } } From bc21c157b8fadb19541e0e5beba7fbd6f054ae91 Mon Sep 17 00:00:00 2001 From: Martin Kouba Date: Thu, 4 Apr 2019 13:15:22 +0200 Subject: [PATCH 5/5] Emitter support - support all types of injection --- .../SmallRyeReactiveMessagingProcessor.java | 31 ++++++++++--------- .../reactivemessaging/SimpleBean.java | 1 - .../arc/processor/InjectionPointInfo.java | 10 ++++++ 3 files changed, 27 insertions(+), 15 deletions(-) diff --git a/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/SmallRyeReactiveMessagingProcessor.java b/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/SmallRyeReactiveMessagingProcessor.java index 1e247e3abdd77..329d61c2f51d6 100644 --- a/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/SmallRyeReactiveMessagingProcessor.java +++ b/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/SmallRyeReactiveMessagingProcessor.java @@ -17,7 +17,6 @@ import static io.quarkus.deployment.annotations.ExecutionTime.STATIC_INIT; -import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.List; @@ -26,7 +25,9 @@ import org.eclipse.microprofile.reactive.messaging.Incoming; import org.eclipse.microprofile.reactive.messaging.Outgoing; -import org.jboss.jandex.*; +import org.jboss.jandex.AnnotationInstance; +import org.jboss.jandex.DotName; +import org.jboss.jandex.MethodInfo; import org.jboss.logging.Logger; import io.quarkus.arc.deployment.AdditionalBeanBuildItem; @@ -37,6 +38,7 @@ import io.quarkus.arc.processor.AnnotationStore; import io.quarkus.arc.processor.BeanDeploymentValidator; import io.quarkus.arc.processor.BeanInfo; +import io.quarkus.arc.processor.InjectionPointInfo; import io.quarkus.deployment.annotations.BuildProducer; import io.quarkus.deployment.annotations.BuildStep; import io.quarkus.deployment.annotations.Record; @@ -67,7 +69,7 @@ AdditionalBeanBuildItem beans() { @BuildStep BeanDeploymentValidatorBuildItem beanDeploymentValidator(BuildProducer mediatorMethods, - BuildProducer emitterFields, + BuildProducer emitters, BuildProducer feature) { feature.produce(new FeatureBuildItem(FeatureBuildItem.SMALLRYE_REACTIVE_MESSAGING)); @@ -83,9 +85,7 @@ public void validate(ValidationContext validationContext) { for (BeanInfo bean : validationContext.get(Key.BEANS)) { if (bean.isClassBean()) { // TODO: add support for inherited business methods - ClassInfo ci = bean.getTarget() - .orElseThrow(() -> new IllegalStateException("Target expected")).asClass(); - for (MethodInfo method : ci.methods()) { + for (MethodInfo method : bean.getTarget().get().asClass().methods()) { if (annotationStore.hasAnnotation(method, NAME_INCOMING) || annotationStore.hasAnnotation(method, NAME_OUTGOING)) { // TODO: validate method params and return type? @@ -93,15 +93,18 @@ public void validate(ValidationContext validationContext) { LOGGER.debugf("Found mediator business method %s declared on %s", method, bean); } } + } + } - for (FieldInfo field : ci.fields()) { - if (annotationStore.hasAnnotation(field, NAME_STREAM)) { - if (field.type().name().equals(NAME_EMITTER)) { - String name = annotationStore.getAnnotation(field, NAME_STREAM).value().asString(); - LOGGER.debugf("Emitter field '%s' detected, stream name: '%s'", field.name(), name); - emitterFields.produce(new EmitterBuildItem(name)); - } - } + for (InjectionPointInfo injectionPoint : validationContext.get(Key.INJECTION_POINTS)) { + if (injectionPoint.getRequiredType().name().equals(NAME_EMITTER)) { + AnnotationInstance stream = injectionPoint.getRequiredQualifier(NAME_STREAM); + if (stream != null) { + // Stream.value() is mandatory + String name = stream.value().asString(); + LOGGER.debugf("Emitter injection point '%s' detected, stream name: '%s'", + injectionPoint.getTargetInfo(), name); + emitters.produce(new EmitterBuildItem(name)); } } } diff --git a/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/SimpleBean.java b/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/SimpleBean.java index 05cd2f7635540..451aea7209709 100644 --- a/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/SimpleBean.java +++ b/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/SimpleBean.java @@ -3,7 +3,6 @@ import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; -import javax.annotation.PostConstruct; import javax.enterprise.context.ApplicationScoped; import org.eclipse.microprofile.reactive.messaging.Incoming; diff --git a/independent-projects/arc/processor/src/main/java/io/quarkus/arc/processor/InjectionPointInfo.java b/independent-projects/arc/processor/src/main/java/io/quarkus/arc/processor/InjectionPointInfo.java index f8c0849959c11..7abdd8d98fb6a 100644 --- a/independent-projects/arc/processor/src/main/java/io/quarkus/arc/processor/InjectionPointInfo.java +++ b/independent-projects/arc/processor/src/main/java/io/quarkus/arc/processor/InjectionPointInfo.java @@ -27,6 +27,7 @@ import org.jboss.jandex.AnnotationInstance; import org.jboss.jandex.AnnotationTarget; import org.jboss.jandex.AnnotationTarget.Kind; +import org.jboss.jandex.DotName; import org.jboss.jandex.FieldInfo; import org.jboss.jandex.MethodInfo; import org.jboss.jandex.Type; @@ -138,6 +139,15 @@ public Set getRequiredQualifiers() { return typeAndQualifiers.qualifiers; } + public AnnotationInstance getRequiredQualifier(DotName name) { + for (AnnotationInstance qualifier : typeAndQualifiers.qualifiers) { + if (qualifier.name().equals(name)) { + return qualifier; + } + } + return null; + } + public boolean hasDefaultedQualifier() { return hasDefaultedQualifier; }