From d5672578d6eb400d88082a66ef7266b745f181b9 Mon Sep 17 00:00:00 2001 From: williamji Date: Tue, 10 Jan 2023 17:28:37 +0800 Subject: [PATCH] =?UTF-8?q?<=E4=BF=AE=E6=94=B9=E5=A4=8D=E7=94=A8=E4=BA=91?= =?UTF-8?q?=E5=87=BD=E6=95=B0=E5=AE=9E=E4=BE=8B=E5=8F=AF=E8=83=BD=E6=8A=A5?= =?UTF-8?q?=E9=94=99"RecordAccumulator=20is=20closed">?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Python3.6-COSSCFCkafka/src/cos_to_kafka.py | 12 +++++------- Python3.6-COSSCFCkafka/src/index.py | 14 +++++--------- 2 files changed, 10 insertions(+), 16 deletions(-) diff --git a/Python3.6-COSSCFCkafka/src/cos_to_kafka.py b/Python3.6-COSSCFCkafka/src/cos_to_kafka.py index 7b69054923..72a2894a1a 100644 --- a/Python3.6-COSSCFCkafka/src/cos_to_kafka.py +++ b/Python3.6-COSSCFCkafka/src/cos_to_kafka.py @@ -45,15 +45,13 @@ def on_send_error(excp): key = None value = json.dumps(data).encode('utf-8') self.producer.send(topic, key = key, value = value).add_callback(on_send_success).add_errback(on_send_error) - # block until all async messages are sent self.producer.flush() except KafkaError as e: return e - finally: - if self.producer is not None: - self.producer.close() - e_time = time.time() - - return "{} messages delivered in {}s".format(count, e_time - s_time) \ No newline at end of file + return "{} messages delivered in {}s".format(count, e_time - s_time) + def close(self): + if self.producer is not None: + self.producer.flush() + self.producer.close() \ No newline at end of file diff --git a/Python3.6-COSSCFCkafka/src/index.py b/Python3.6-COSSCFCkafka/src/index.py index 60039571de..872f5282c0 100755 --- a/Python3.6-COSSCFCkafka/src/index.py +++ b/Python3.6-COSSCFCkafka/src/index.py @@ -8,20 +8,16 @@ logger = logging.getLogger('Index') logger.setLevel(logging.INFO) -def main_handler(event, context): - logger.info("start main handler") - kafka_address = os.getenv("kafka_address") - kafka_topic_name = os.getenv("kafka_topic_name") +kafka_address = os.getenv("kafka_address") +kafka_topic_name = os.getenv("kafka_topic_name") +def main_handler(event, context): cos_to_kafka = CosToKafka( kafka_address, - # security_protocol = "PLAINTEXT", - #sasl_mechanism = "PLAIN", - #sasl_plain_username = "ckafka-80o10xxx#lkoxx", - #sasl_plain_password = "kongllxxxx", api_version=(1, 1, 1) ) - + logger.info("start main handler") ret = cos_to_kafka.send(kafka_topic_name, event) logger.info(ret) + cos_to_kafka.close() return ret \ No newline at end of file