diff --git a/Python3.6-COSSCFCkafka/src/cos_to_kafka.py b/Python3.6-COSSCFCkafka/src/cos_to_kafka.py index 7b69054923..72a2894a1a 100644 --- a/Python3.6-COSSCFCkafka/src/cos_to_kafka.py +++ b/Python3.6-COSSCFCkafka/src/cos_to_kafka.py @@ -45,15 +45,13 @@ def on_send_error(excp): key = None value = json.dumps(data).encode('utf-8') self.producer.send(topic, key = key, value = value).add_callback(on_send_success).add_errback(on_send_error) - # block until all async messages are sent self.producer.flush() except KafkaError as e: return e - finally: - if self.producer is not None: - self.producer.close() - e_time = time.time() - - return "{} messages delivered in {}s".format(count, e_time - s_time) \ No newline at end of file + return "{} messages delivered in {}s".format(count, e_time - s_time) + def close(self): + if self.producer is not None: + self.producer.flush() + self.producer.close() \ No newline at end of file diff --git a/Python3.6-COSSCFCkafka/src/index.py b/Python3.6-COSSCFCkafka/src/index.py index 60039571de..872f5282c0 100755 --- a/Python3.6-COSSCFCkafka/src/index.py +++ b/Python3.6-COSSCFCkafka/src/index.py @@ -8,20 +8,16 @@ logger = logging.getLogger('Index') logger.setLevel(logging.INFO) -def main_handler(event, context): - logger.info("start main handler") - kafka_address = os.getenv("kafka_address") - kafka_topic_name = os.getenv("kafka_topic_name") +kafka_address = os.getenv("kafka_address") +kafka_topic_name = os.getenv("kafka_topic_name") +def main_handler(event, context): cos_to_kafka = CosToKafka( kafka_address, - # security_protocol = "PLAINTEXT", - #sasl_mechanism = "PLAIN", - #sasl_plain_username = "ckafka-80o10xxx#lkoxx", - #sasl_plain_password = "kongllxxxx", api_version=(1, 1, 1) ) - + logger.info("start main handler") ret = cos_to_kafka.send(kafka_topic_name, event) logger.info(ret) + cos_to_kafka.close() return ret \ No newline at end of file