Skip to content

RocketMQ org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException: invokeAsync call timeout 解决 #68

Open
@Shellbye

Description

@Shellbye

最近在调研和压测 RocketMQ,在压测时,异步地连续发送2w条消息,生产者就开始大量抛错,错误信息就是标题里的

org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException: 
invokeAsync call timeout

在网上搜了不少资料,虽然大家很多都是RemotingTooMuchRequestException这个异常,但是具体的错误信息其实是不一样的,网络上的异常具体信息大多都是sendDefaultImpl call timeout,但是我这里是invokeAsync call timeout,资料比较少,后来在看了源码之后,发现了如下代码:

   public void invokeAsync(String addr, RemotingCommand request, long timeoutMillis, InvokeCallback invokeCallback)
        throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException,
        RemotingSendRequestException {
        long beginStartTime = System.currentTimeMillis();
        final Channel channel = this.getAndCreateChannel(addr);
        if (channel != null && channel.isActive()) {
            try {
                doBeforeRpcHooks(addr, request);
                long costTime = System.currentTimeMillis() - beginStartTime;
                if (timeoutMillis < costTime) {
                    throw new RemotingTooMuchRequestException("invokeAsync call timeout");
                }
                this.invokeAsyncImpl(channel, request, timeoutMillis - costTime, invokeCallback);
            } catch (RemotingSendRequestException e) {
                log.warn("invokeAsync: send request exception, so close the channel[{}]", addr);
                this.closeChannel(addr, channel);
                throw e;
            }
        } else {
            this.closeChannel(addr, channel);
            throw new RemotingConnectException(addr);
        }
    }

之前看过一个大神讲RocketMQ,里面说到这里的这个costTime,大意就是client在发送的消息之前,需要一些准备工作(比如上面的this.getAndCreateChannel(addr)doBeforeRpcHooks(addr, request)),这个costTime就是这些准备工作的用时,那么如果这些准备工作都超过了你给的超时时长timeoutMillis(系统默认3s),那么这个消息就还没发呢,就单单了,所以错误信息就是invokeAsync call timeout,还没发呢,单单是唤起(invoke)就超市了,那么解决办法是什么呢?两种,一种就是一次性少发点消息,或者加大超时时长,如下:

    producer.send(msg, new SendCallback() {
        @Override
        public void onSuccess(SendResult sendResult) {
            //
        }
    
        @Override
        public void onException(Throwable e) {
            e.printStackTrace();
        }
    }, 20 * 1000); // <=== 这里是设置超时时长的地方

Metadata

Metadata

Assignees

No one assigned

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions