diff --git a/bom/pom.xml b/bom/pom.xml
index 529fa71fa9b07..0039a4b03f18e 100644
--- a/bom/pom.xml
+++ b/bom/pom.xml
@@ -53,7 +53,7 @@
1.1.1
1.0.3
1.0.3
- 0.0.5
+ 0.0.7
1.2.2
3.20.9
1.1.1
@@ -129,7 +129,7 @@
1.0.2
1.10.7
3.3.2.Final
- 0.0.3
+ 0.0.4
1.1.0
1.1.0
0.9.2.Final
diff --git a/build-parent/pom.xml b/build-parent/pom.xml
index 6560f2221e851..2eb730c16885a 100644
--- a/build-parent/pom.xml
+++ b/build-parent/pom.xml
@@ -41,7 +41,7 @@
what we work with by self downloading it: -->
1.0.0-rc14
3.3.0
- 0.0.2
+ 0.0.4
3.6.3
diff --git a/docs/src/main/asciidoc/async-message-passing.adoc b/docs/src/main/asciidoc/async-message-passing.adoc
index 5f6d858ad5b17..85602cae0bd57 100644
--- a/docs/src/main/asciidoc/async-message-passing.adoc
+++ b/docs/src/main/asciidoc/async-message-passing.adoc
@@ -69,8 +69,19 @@ public class GreetingService {
<1> If not set, the address is the fully qualified name of the bean, for instance, in this snippet it's `org.acme.vertx.GreetingService`.
<2> The method parameter is the message body. If the method returns _something_ it's the message response.
-// TODO Revisit this once the blocking PR is integrated - https://github.com/quarkusio/quarkus/pull/1646
-IMPORTANT: The code consuming the event must be _non-blocking_, as it's called on the Vert.x event loop.
+[IMPORTANT]
+====
+By default, the code consuming the event must be _non-blocking_, as it's called on the Vert.x event loop.
+If your processing is blocking, use the `blocking` arttribute:
+
+[source, java]
+----
+@ConsumeEvent(value = "blocking-consumer", blocking = true)
+void consumeBlocking(String message) {
+ // Something blocking
+}
+----
+====
=== Configuring the address
diff --git a/docs/src/main/asciidoc/using-vertx.adoc b/docs/src/main/asciidoc/using-vertx.adoc
index 55983e0ec4e41..9068259aee00c 100644
--- a/docs/src/main/asciidoc/using-vertx.adoc
+++ b/docs/src/main/asciidoc/using-vertx.adoc
@@ -217,7 +217,7 @@ The first approach can be implemented as follows:
@Produces(MediaType.SERVER_SENT_EVENTS)
@Path("{name}/streaming")
public Publisher greeting(@PathParam("name") String name) {
- return ReactiveStreams.fromPublisher(vertx.periodicStream(2000).toPublisher())
+ return vertx.periodicStream(2000).toPublisherBuilder()
.map(l -> String.format("Hello %s! (%s)%n", name, new Date()))
.buildRs();
}
diff --git a/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/EmitterBuildItem.java b/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/EmitterBuildItem.java
new file mode 100644
index 0000000000000..15ab4d8a439b6
--- /dev/null
+++ b/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/EmitterBuildItem.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright 2019 Red Hat, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.quarkus.smallrye.reactivemessaging.deployment;
+
+import org.jboss.builder.item.MultiBuildItem;
+import org.jboss.jandex.FieldInfo;
+import org.jboss.jandex.MethodInfo;
+
+import io.quarkus.arc.processor.BeanInfo;
+
+public final class EmitterBuildItem extends MultiBuildItem {
+
+ /**
+ * The name of the stream the emitter is connected to.
+ */
+ private final String name;
+
+ public EmitterBuildItem(String name) {
+ this.name = name;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+}
diff --git a/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/SmallRyeReactiveMessagingProcessor.java b/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/SmallRyeReactiveMessagingProcessor.java
index b72a2dc328ee1..329d61c2f51d6 100644
--- a/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/SmallRyeReactiveMessagingProcessor.java
+++ b/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/SmallRyeReactiveMessagingProcessor.java
@@ -17,14 +17,15 @@
import static io.quarkus.deployment.annotations.ExecutionTime.STATIC_INIT;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
+import org.jboss.jandex.AnnotationInstance;
import org.jboss.jandex.DotName;
import org.jboss.jandex.MethodInfo;
import org.jboss.logging.Logger;
@@ -37,6 +38,7 @@
import io.quarkus.arc.processor.AnnotationStore;
import io.quarkus.arc.processor.BeanDeploymentValidator;
import io.quarkus.arc.processor.BeanInfo;
+import io.quarkus.arc.processor.InjectionPointInfo;
import io.quarkus.deployment.annotations.BuildProducer;
import io.quarkus.deployment.annotations.BuildStep;
import io.quarkus.deployment.annotations.Record;
@@ -44,6 +46,8 @@
import io.quarkus.deployment.builditem.substrate.ReflectiveClassBuildItem;
import io.quarkus.smallrye.reactivemessaging.runtime.SmallRyeReactiveMessagingLifecycle;
import io.quarkus.smallrye.reactivemessaging.runtime.SmallRyeReactiveMessagingTemplate;
+import io.smallrye.reactive.messaging.annotations.Emitter;
+import io.smallrye.reactive.messaging.annotations.Stream;
/**
*
@@ -55,6 +59,8 @@ public class SmallRyeReactiveMessagingProcessor {
static final DotName NAME_INCOMING = DotName.createSimple(Incoming.class.getName());
static final DotName NAME_OUTGOING = DotName.createSimple(Outgoing.class.getName());
+ static final DotName NAME_STREAM = DotName.createSimple(Stream.class.getName());
+ static final DotName NAME_EMITTER = DotName.createSimple(Emitter.class.getName());
@BuildStep
AdditionalBeanBuildItem beans() {
@@ -63,6 +69,7 @@ AdditionalBeanBuildItem beans() {
@BuildStep
BeanDeploymentValidatorBuildItem beanDeploymentValidator(BuildProducer mediatorMethods,
+ BuildProducer emitters,
BuildProducer feature) {
feature.produce(new FeatureBuildItem(FeatureBuildItem.SMALLRYE_REACTIVE_MESSAGING));
@@ -78,10 +85,7 @@ public void validate(ValidationContext validationContext) {
for (BeanInfo bean : validationContext.get(Key.BEANS)) {
if (bean.isClassBean()) {
// TODO: add support for inherited business methods
- for (MethodInfo method : bean.getTarget()
- .get()
- .asClass()
- .methods()) {
+ for (MethodInfo method : bean.getTarget().get().asClass().methods()) {
if (annotationStore.hasAnnotation(method, NAME_INCOMING)
|| annotationStore.hasAnnotation(method, NAME_OUTGOING)) {
// TODO: validate method params and return type?
@@ -91,6 +95,19 @@ public void validate(ValidationContext validationContext) {
}
}
}
+
+ for (InjectionPointInfo injectionPoint : validationContext.get(Key.INJECTION_POINTS)) {
+ if (injectionPoint.getRequiredType().name().equals(NAME_EMITTER)) {
+ AnnotationInstance stream = injectionPoint.getRequiredQualifier(NAME_STREAM);
+ if (stream != null) {
+ // Stream.value() is mandatory
+ String name = stream.value().asString();
+ LOGGER.debugf("Emitter injection point '%s' detected, stream name: '%s'",
+ injectionPoint.getTargetInfo(), name);
+ emitters.produce(new EmitterBuildItem(name));
+ }
+ }
+ }
}
});
}
@@ -105,6 +122,7 @@ public List removalExclusions() {
@Record(STATIC_INIT)
public void build(SmallRyeReactiveMessagingTemplate template, BeanContainerBuildItem beanContainer,
List mediatorMethods,
+ List emitterFields,
BuildProducer reflectiveClass) {
/*
* IMPLEMENTATION NOTE/FUTURE IMPROVEMENTS: It would be possible to replace the reflection completely and use Jandex and
@@ -124,7 +142,8 @@ public void build(SmallRyeReactiveMessagingTemplate template, BeanContainerBuild
.getIdentifier());
}
}
- template.registerMediators(beanClassToBeanId, beanContainer.getValue());
+ template.registerMediators(beanClassToBeanId, beanContainer.getValue(),
+ emitterFields.stream().map(EmitterBuildItem::getName).collect(Collectors.toList()));
}
}
diff --git a/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/SimpleBean.java b/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/SimpleBean.java
index 05cd2f7635540..451aea7209709 100644
--- a/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/SimpleBean.java
+++ b/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/SimpleBean.java
@@ -3,7 +3,6 @@
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
-import javax.annotation.PostConstruct;
import javax.enterprise.context.ApplicationScoped;
import org.eclipse.microprofile.reactive.messaging.Incoming;
diff --git a/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/SmallRyeReactiveMessagingTemplate.java b/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/SmallRyeReactiveMessagingTemplate.java
index b1c14a8e115b4..da07fb712945b 100644
--- a/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/SmallRyeReactiveMessagingTemplate.java
+++ b/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/SmallRyeReactiveMessagingTemplate.java
@@ -1,5 +1,6 @@
package io.quarkus.smallrye.reactivemessaging.runtime;
+import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -15,9 +16,10 @@
@Template
public class SmallRyeReactiveMessagingTemplate {
- public void registerMediators(Map beanClassToBeanId, BeanContainer container) {
+ public void registerMediators(Map beanClassToBeanId, BeanContainer container, List emitters) {
// Extract the configuration and register mediators
MediatorManager mediatorManager = container.instance(MediatorManager.class);
+ mediatorManager.initializeEmitters(emitters);
for (Entry entry : beanClassToBeanId.entrySet()) {
try {
Class> beanClass = Thread.currentThread()
diff --git a/independent-projects/arc/processor/src/main/java/io/quarkus/arc/processor/InjectionPointInfo.java b/independent-projects/arc/processor/src/main/java/io/quarkus/arc/processor/InjectionPointInfo.java
index 09387cbb9c1be..f1778e8a6476c 100644
--- a/independent-projects/arc/processor/src/main/java/io/quarkus/arc/processor/InjectionPointInfo.java
+++ b/independent-projects/arc/processor/src/main/java/io/quarkus/arc/processor/InjectionPointInfo.java
@@ -26,6 +26,8 @@
import java.util.function.Predicate;
import org.jboss.jandex.AnnotationInstance;
import org.jboss.jandex.AnnotationTarget;
+import org.jboss.jandex.AnnotationTarget.Kind;
+import org.jboss.jandex.DotName;
import org.jboss.jandex.FieldInfo;
import org.jboss.jandex.MethodInfo;
import org.jboss.jandex.Type;
@@ -126,6 +128,15 @@ public Set getRequiredQualifiers() {
return typeAndQualifiers.qualifiers;
}
+ public AnnotationInstance getRequiredQualifier(DotName name) {
+ for (AnnotationInstance qualifier : typeAndQualifiers.qualifiers) {
+ if (qualifier.name().equals(name)) {
+ return qualifier;
+ }
+ }
+ return null;
+ }
+
public boolean hasDefaultedQualifier() {
return hasDefaultedQualifier;
}