From a1f5cd0b5205212be4f8ab30231f60701d327c10 Mon Sep 17 00:00:00 2001 From: ZhangJian He Date: Tue, 30 May 2023 11:31:11 +0800 Subject: [PATCH] [consumer] Support parse broker metadata --- lib/ClientConnection.cc | 27 ++++++++++++- lib/ClientConnection.h | 2 + lib/Commands.h | 3 ++ run-unit-tests.sh | 6 +++ tests/CMakeLists.txt | 3 ++ tests/brokermetadata/BrokerMetadataTest.cc | 47 ++++++++++++++++++++++ tests/brokermetadata/docker-compose.yml | 43 ++++++++++++++++++++ tests/oauth2/Oauth2Test.cc | 1 - 8 files changed, 129 insertions(+), 3 deletions(-) create mode 100644 tests/brokermetadata/BrokerMetadataTest.cc create mode 100644 tests/brokermetadata/docker-compose.yml diff --git a/lib/ClientConnection.cc b/lib/ClientConnection.cc index 72b9c8e2..d955b32c 100644 --- a/lib/ClientConnection.cc +++ b/lib/ClientConnection.cc @@ -679,11 +679,32 @@ void ClientConnection::processIncomingBuffer() { if (incomingCmd.type() == BaseCommand::MESSAGE) { // Parse message metadata and extract payload proto::MessageMetadata msgMetadata; + proto::BrokerEntryMetadata brokerEntryMetadata; // read checksum uint32_t remainingBytes = frameSize - (cmdSize + 4); bool isChecksumValid = verifyChecksum(incomingBuffer_, remainingBytes, incomingCmd); + auto readerIndex = incomingBuffer_.readerIndex(); + if (incomingBuffer_.readUnsignedShort() == Commands::magicBrokerEntryMetadata) { + // broker entry metadata is present + uint32_t brokerEntryMetadataSize = incomingBuffer_.readUnsignedInt(); + if (!brokerEntryMetadata.ParseFromArray(incomingBuffer_.data(), brokerEntryMetadataSize)) { + LOG_ERROR(cnxString_ << "[consumer id " << incomingCmd.message().consumer_id() + << ", message ledger id " + << incomingCmd.message().message_id().ledgerid() << ", entry id " + << incomingCmd.message().message_id().entryid() + << "] Error parsing broker entry metadata"); + close(); + return; + } + + incomingBuffer_.consume(brokerEntryMetadataSize); + remainingBytes -= (2 + 4 + brokerEntryMetadataSize); + } else { + incomingBuffer_.setReaderIndex(readerIndex); + } + uint32_t metadataSize = incomingBuffer_.readUnsignedInt(); if (!msgMetadata.ParseFromArray(incomingBuffer_.data(), metadataSize)) { LOG_ERROR(cnxString_ << "[consumer id " << incomingCmd.message().consumer_id() // @@ -701,7 +722,8 @@ void ClientConnection::processIncomingBuffer() { uint32_t payloadSize = remainingBytes; SharedBuffer payload = SharedBuffer::copy(incomingBuffer_.data(), payloadSize); incomingBuffer_.consume(payloadSize); - handleIncomingMessage(incomingCmd.message(), isChecksumValid, msgMetadata, payload); + handleIncomingMessage(incomingCmd.message(), isChecksumValid, brokerEntryMetadata, msgMetadata, + payload); } else { handleIncomingCommand(incomingCmd); } @@ -710,7 +732,7 @@ void ClientConnection::processIncomingBuffer() { // We still have 1 to 3 bytes from the next frame assert(incomingBuffer_.readableBytes() < sizeof(uint32_t)); - // Restart with a new buffer and copy the the few bytes at the beginning + // Restart with a new buffer and copy the few bytes at the beginning incomingBuffer_ = SharedBuffer::copyFrom(incomingBuffer_, DefaultBufferSize); // At least we need to read 4 bytes to have the complete frame size @@ -782,6 +804,7 @@ void ClientConnection::handleActiveConsumerChange(const proto::CommandActiveCons } void ClientConnection::handleIncomingMessage(const proto::CommandMessage& msg, bool isChecksumValid, + proto::BrokerEntryMetadata& brokerEntryMetadata, proto::MessageMetadata& msgMetadata, SharedBuffer& payload) { LOG_DEBUG(cnxString_ << "Received a message from the server for consumer: " << msg.consumer_id()); diff --git a/lib/ClientConnection.h b/lib/ClientConnection.h index 24a43d5c..7de90895 100644 --- a/lib/ClientConnection.h +++ b/lib/ClientConnection.h @@ -74,6 +74,7 @@ struct OpSendMsg; namespace proto { class BaseCommand; +class BrokerEntryMetadata; class CommandActiveConsumerChange; class CommandAckResponse; class CommandMessage; @@ -225,6 +226,7 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this +#include + +using namespace pulsar; + +TEST(BrokerMetadataTest, testConsumeSuccess) { + Client client{"pulsar://localhost:6650"}; + Producer producer; + Result producerResult = client.createProducer("persistent://public/default/testConsumeSuccess", producer); + ASSERT_EQ(producerResult, ResultOk); + Consumer consumer; + Result consumerResult = + client.subscribe("persistent://public/default/testConsumeSuccess", "testConsumeSuccess", consumer); + ASSERT_EQ(consumerResult, ResultOk); + const auto msg = MessageBuilder().setContent("testConsumeSuccess").build(); + Result sendResult = producer.send(msg); + ASSERT_EQ(sendResult, ResultOk); + Message receivedMsg; + Result receiveResult = consumer.receive(receivedMsg); + ASSERT_EQ(receiveResult, ResultOk); + ASSERT_EQ(receivedMsg.getDataAsString(), "testConsumeSuccess"); + client.close(); +} + +int main(int argc, char* argv[]) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/tests/brokermetadata/docker-compose.yml b/tests/brokermetadata/docker-compose.yml new file mode 100644 index 00000000..bb719e07 --- /dev/null +++ b/tests/brokermetadata/docker-compose.yml @@ -0,0 +1,43 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +version: '3' +networks: + pulsar: + driver: bridge +services: + standalone: + image: apachepulsar/pulsar:latest + container_name: standalone + hostname: local + restart: "no" + networks: + - pulsar + environment: + - metadataStoreUrl=zk:localhost:2181 + - clusterName=standalone-broker-metadata + - advertisedAddress=localhost + - advertisedListeners=external:pulsar://localhost:6650 + - PULSAR_MEM=-Xms512m -Xmx512m -XX:MaxDirectMemorySize=256m + - PULSAR_PREFIX_BROKER_ENTRY_METADATA_INTERCEPTORS=org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor + - PULSAR_PREFIX_EXPOSING_BROKER_ENTRY_METADATA_TO_CLIENT_ENABLED=true + ports: + - "6650:6650" + - "8080:8080" + command: bash -c "bin/apply-config-from-env.py conf/standalone.conf && exec bin/pulsar standalone -nss -nfw" diff --git a/tests/oauth2/Oauth2Test.cc b/tests/oauth2/Oauth2Test.cc index 62646207..5f273d7e 100644 --- a/tests/oauth2/Oauth2Test.cc +++ b/tests/oauth2/Oauth2Test.cc @@ -75,7 +75,6 @@ int main(int argc, char* argv[]) { ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); - return 0; } static Result testCreateProducer(const std::string& privateKey) {