Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integrating message consumption with imperative code #107

Open
nakumgaurav opened this issue Jul 25, 2020 · 2 comments
Open

Integrating message consumption with imperative code #107

nakumgaurav opened this issue Jul 25, 2020 · 2 comments

Comments

@nakumgaurav
Copy link

nakumgaurav commented Jul 25, 2020

Hi,

I saw that publishing messages can be integrated with imperative code using Emitters and Channels. I would like to know if there is a way to integrate message consumption with imperative code.

Basically, I have something like:

@ApplicationScoped
class MyApp{
    private String currRecord;

    @Acknowledgment(Acknowledgment.Strategy.MANUAL)
    @Incoming(DEFAULT_STREAM_NAME_INCOMING)
    public CompletionStage<Void> consumeMessage(Message<String> message){
        this.currRecord = message.getPayload();
        return message.ack();
    }

    public func1(){
         // this function needs the current value of record but 
         // it is always null since messages are being read after 
        //  func1 has been called
    }
}

I have an external API which calls func1 but I can never get hold of the messages. I need help understanding how can I read messages using consumeMessage() and make them available to the external code?

Any help would be appreciated!
Thanks!

@kabir
Copy link
Contributor

kabir commented Jul 27, 2020

Yes, again you use @Channel on a field with a reactive type (Flowable, Publisher, PublisherBuilder, whatever works in your environment). Here is a RXJava 2 example:

    @Inject
    @Channel(CHANNEL_NAME)
    Flowable<Message<String>> sourceStream;

    public List<String> consume() {
        return Flowable.fromPublisher(sourceStream)
                .map(Message::getPayload)
                .toList()
                .blockingGet();
    }

@nakumgaurav
Copy link
Author

Hi @kabir

Thanks a lot for your answer!

I am trying to construct a minimal end-to-end example of what you suggested for consuming messages via channels.

Main.java

public class Main
{
    public static void main( String[] args )
    {
        SeContainer container = SeContainerInitializer.newInstance().initialize();
        List<String> msgs = container.getBeanManager().createInstance().select(App.class).get().consume();
    }
}

App.java

@ApplicationScoped
public class App 
{

    @Inject
    @Channel("prices")
    Flowable<Message<String>> sourceStream;

    public List<String> consume() {
        return Flowable.fromPublisher(sourceStream)
                .map(Message::getPayload)
                .toList()
                .blockingGet();
    }

    @Outgoing("prices")
    public PublisherBuilder<String> produceMsgs(){
        System.out.println("#################### Pruducing messages ");
        return ReactiveStreams.of("hello", "with", "SmallRye", "reactive", "message");
    }
}

However, I am getting the exception:

org.jboss.weld.exceptions.DeploymentException: SRMSG00070: No channel found for name: prices, injection point: [BackedAnnotatedField] @Inject @Channel test.smallrye.App.sourceStream
    at org.jboss.weld.bootstrap.events.AbstractDeploymentContainerEvent.fire (AbstractDeploymentContainerEvent.java:38)
    at org.jboss.weld.bootstrap.events.AfterDeploymentValidationImpl.fire (AfterDeploymentValidationImpl.java:28)
    at org.jboss.weld.bootstrap.WeldStartup.validateBeans (WeldStartup.java:505)
    at org.jboss.weld.bootstrap.WeldBootstrap.validateBeans (WeldBootstrap.java:93)
    at org.jboss.weld.environment.se.Weld.initialize (Weld.java:804)
    at org.jboss.weld.environment.se.Weld.initialize (Weld.java:176)
    at test.smallrye.Main.main (Main.java:14)
    at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:254)
    at java.lang.Thread.run (Thread.java:748)
Caused by: javax.enterprise.inject.spi.DeploymentException: SRMSG00070: No channel found for name: prices, injection point: [BackedAnnotatedField] @Inject @Channel test.smallrye.App.sourceStream

Why is the container not able to find the outgoing prices annotated method (publishMsgs)? Following instructions from smallrye docs:

You must have a @Outgoing("my-channel") somewhere in your application (meaning a method generating messages transiting on the channel my-channel), or an inbound connector configured to manage the prices channel (mp.messaging.incoming.prices…​)

Is there something else that I need to do?

Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants