diff --git a/src/mqtt/protocol/mqtt/mqtt_quic.c b/src/mqtt/protocol/mqtt/mqtt_quic.c index e9297a08..1417089d 100644 --- a/src/mqtt/protocol/mqtt/mqtt_quic.c +++ b/src/mqtt/protocol/mqtt/mqtt_quic.c @@ -298,7 +298,7 @@ mqtt_sub_stream(mqtt_pipe_t *p, nni_msg *msg, uint16_t packet_id, nni_aio *aio) "packetID duplicated!", packet_id); nni_aio *m_aio = nni_mqtt_msg_get_aio(tmsg); - if (m_aio) { + if (m_aio && nni_msg_get_type(tmsg) != CMD_PUBLISH) { nni_aio_finish_error(m_aio, UNSPECIFIED_ERROR); } nni_msg_free(tmsg); @@ -403,7 +403,7 @@ mqtt_send_msg(nni_aio *aio, nni_msg *msg, mqtt_sock_t *s) "packetID duplicated!", packet_id); nni_aio *m_aio = nni_mqtt_msg_get_aio(tmsg); - if (m_aio) { + if (m_aio && ptype != NNG_MQTT_PUBLISH) { nni_aio_finish_error(m_aio, UNSPECIFIED_ERROR); } nni_msg_free(tmsg); @@ -475,8 +475,6 @@ mqtt_pipe_send_msg(nni_aio *aio, nni_msg *msg, mqtt_pipe_t *p, uint16_t packet_i if (qos == 0) { break; // QoS 0 need no packet id } - // case NNG_MQTT_SUBSCRIBE: - // case NNG_MQTT_UNSUBSCRIBE: nni_mqtt_msg_set_packet_id(msg, packet_id); nni_mqtt_msg_set_aio(msg, aio); tmsg = nni_id_get(&p->sent_unack, packet_id); @@ -485,7 +483,7 @@ mqtt_pipe_send_msg(nni_aio *aio, nni_msg *msg, mqtt_pipe_t *p, uint16_t packet_i "packetID duplicated!", packet_id); nni_aio *m_aio = nni_mqtt_msg_get_aio(tmsg); - if (m_aio) { + if (m_aio && (nni_msg_get_type(tmsg) == CMD_SUBSCRIBE || nni_msg_get_type(tmsg) == CMD_UNSUBSCRIBE)) { nni_aio_finish_error(m_aio, UNSPECIFIED_ERROR); } nni_msg_free(tmsg); @@ -718,8 +716,7 @@ mqtt_quic_data_strm_recv_cb(void *arg) } if (!nni_aio_busy(s->ack_aio)) { nni_aio_set_msg(s->ack_aio, msg); - user_aio = s->ack_aio; - // nni_aio_finish(s->ack_aio, 0, nni_msg_len(msg)); + nni_aio_finish(s->ack_aio, 0, nni_msg_len(msg)); } else { nni_lmq_put(s->ack_lmq, msg); log_debug("ack msg cached!"); @@ -965,8 +962,7 @@ mqtt_quic_recv_cb(void *arg) } if (!nni_aio_busy(s->ack_aio)) { nni_aio_set_msg(s->ack_aio, msg); - user_aio = s->ack_aio; - // nni_aio_finish(s->ack_aio, 0, nni_msg_len(msg)); + nni_aio_finish(s->ack_aio, 0, nni_msg_len(msg)); } else { nni_lmq_put(s->ack_lmq, msg); log_debug("ack msg cached!"); diff --git a/src/mqtt/protocol/mqtt/mqttv5_quic.c b/src/mqtt/protocol/mqtt/mqttv5_quic.c index 9ba54be9..f90cd7c4 100644 --- a/src/mqtt/protocol/mqtt/mqttv5_quic.c +++ b/src/mqtt/protocol/mqtt/mqttv5_quic.c @@ -308,7 +308,7 @@ mqtt_sub_stream(mqtt_pipe_t *p, nni_msg *msg, uint16_t packet_id, nni_aio *aio) "packetID duplicated!", packet_id); nni_aio *m_aio = nni_mqtt_msg_get_aio(tmsg); - if (m_aio) { + if (m_aio && nni_msg_get_type(tmsg) != CMD_PUBLISH) { nni_aio_finish_error(m_aio, UNSPECIFIED_ERROR); } nni_msg_free(tmsg); @@ -413,7 +413,7 @@ mqtt_send_msg(nni_aio *aio, nni_msg *msg, mqtt_sock_t *s) "packetID duplicated!", packet_id); nni_aio *m_aio = nni_mqtt_msg_get_aio(tmsg); - if (m_aio) { + if (m_aio && ptype != NNG_MQTT_PUBLISH) { nni_aio_finish_error(m_aio, UNSPECIFIED_ERROR); } nni_msg_free(tmsg); @@ -485,8 +485,6 @@ mqtt_pipe_send_msg(nni_aio *aio, nni_msg *msg, mqtt_pipe_t *p, uint16_t packet_i if (qos == 0) { break; // QoS 0 need no packet id } - // case NNG_MQTT_SUBSCRIBE: - // case NNG_MQTT_UNSUBSCRIBE: nni_mqtt_msg_set_packet_id(msg, packet_id); nni_mqtt_msg_set_aio(msg, aio); tmsg = nni_id_get(&p->sent_unack, packet_id); @@ -495,7 +493,7 @@ mqtt_pipe_send_msg(nni_aio *aio, nni_msg *msg, mqtt_pipe_t *p, uint16_t packet_i "packetID duplicated!", packet_id); nni_aio *m_aio = nni_mqtt_msg_get_aio(tmsg); - if (m_aio) { + if (m_aio && ptype != NNG_MQTT_PUBLISH) { nni_aio_finish_error(m_aio, UNSPECIFIED_ERROR); } nni_msg_free(tmsg); @@ -728,8 +726,7 @@ mqtt_quic_data_strm_recv_cb(void *arg) } if (!nni_aio_busy(s->ack_aio)) { nni_aio_set_msg(s->ack_aio, msg); - user_aio = s->ack_aio; - // nni_aio_finish(s->ack_aio, 0, nni_msg_len(msg)); + nni_aio_finish(s->ack_aio, 0, nni_msg_len(msg)); } else { nni_lmq_put(s->ack_lmq, msg); log_debug("ack msg cached!"); @@ -977,8 +974,7 @@ mqtt_quic_recv_cb(void *arg) } if (!nni_aio_busy(s->ack_aio)) { nni_aio_set_msg(s->ack_aio, msg); - user_aio = s->ack_aio; - // nni_aio_finish(s->ack_aio, 0, nni_msg_len(msg)); + nni_aio_finish(s->ack_aio, 0, nni_msg_len(msg)); } else { nni_lmq_put(s->ack_lmq, msg); log_debug("ack msg cached!");