diff --git a/bin/aioPrintStream.py b/bin/aioPrintStream.py new file mode 100644 index 0000000..0d8b749 --- /dev/null +++ b/bin/aioPrintStream.py @@ -0,0 +1,91 @@ +#!/usr/bin/env python +# +# This file is part of alert_stream. +# +# Developed for the LSST Data Management System. +# This product includes software developed by the LSST Project +# (https://www.lsst.org). +# See the COPYRIGHT file at the top-level directory of this distribution +# for details of code ownership. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +"""Consumes alert stream and prints all messages to the console. + +Proof of concept using asyncio/aiokafka. +""" + +import asyncio +import platform +import struct + +from typing import Tuple + +from aiokafka import AIOKafkaConsumer + +from lsst.alert.packet import Schema +from lsst.alert.packet import SchemaRegistry + +class Decoder(object): + """Avro alert packet deserializer. + + Paramters + --------- + schema + If supplied, always uses this schema to decode events. Otherwise, an + appropriate schema is retrieved from the registry. + """ + def __init__(self, schema: Schema=None) -> None: + self.schema = schema + self.schema_registry = SchemaRegistry.from_filesystem() + + def __call__(self, raw_bytes: bytes) -> Tuple[Schema, dict]: + """Decode Avro-serialized raw bytes. + + Parameters + ---------- + raw_bytes + Data to be decoded. Assumed to be in Confluent wire format. + + Returns + ------- + schema + The schema used to decode the message. + message + The decoded message. + """ + schema_hash = struct.unpack("!I", raw_bytes[1:5])[0] + schema = (self.schema if self.schema is not None else + self.schema_registry.get_by_id(schema_hash)) + return schema, schema.deserialize(raw_bytes[5:]) + +async def consume() -> None: + consumer = AIOKafkaConsumer( + 'my-stream', + loop=loop, + bootstrap_servers='localhost:29092', + value_deserializer=Decoder(), + group_id=platform.node()) + await consumer.start() + try: + # Consume messages + async for msg in consumer: + print("consumed: ", msg.topic, msg.partition, msg.offset, + msg.key, msg.value, msg.timestamp) + finally: + # Will leave consumer group; perform autocommit if enabled. + await consumer.stop() + +loop = asyncio.get_event_loop() +loop.run_until_complete(consume()) diff --git a/bin/filterStream.py b/bin/filterStream.py index 6f23c46..281bcdd 100644 --- a/bin/filterStream.py +++ b/bin/filterStream.py @@ -24,11 +24,10 @@ """Alert stream filter deployer. """ -from __future__ import print_function import argparse import sys -import os import inspect +import platform from lsst.alert.stream import alertConsumer, alertProducer from lsst.alert.stream import filterBase from lsst.alert.stream import filters @@ -48,18 +47,15 @@ def main(): help='Globally unique name of the consumer group. ' 'Consumers in the same group will share messages ' '(i.e., only one consumer will receive a message, ' - 'as in a queue). Default is value of $HOSTNAME.') + 'as in a queue). Default is the current hostname.', + default=platform.node()) args = parser.parse_args() fnum = args.filterNum # Configure consumer connection to Kafka broker - cconf = {'bootstrap.servers': args.broker, + cconf = {'bootstrap.servers': args.broker, 'group.id': args.group, 'default.topic.config': {'auto.offset.reset': 'smallest'}} - if args.group: - cconf['group.id'] = args.group - else: - cconf['group.id'] = os.environ['HOSTNAME'] pconf = {'bootstrap.servers': args.broker} diff --git a/bin/monitorStream.py b/bin/monitorStream.py index e0301b3..0ae7a74 100644 --- a/bin/monitorStream.py +++ b/bin/monitorStream.py @@ -29,6 +29,7 @@ import argparse import os +import platform import sys from lsst.alert.stream import alertConsumer @@ -43,16 +44,13 @@ def main(): help='Globally unique name of the consumer group. ' 'Consumers in the same group will share messages ' '(i.e., only one consumer will receive a message, ' - 'as in a queue). Default is value of $HOSTNAME.') + 'as in a queue). Default is the current hostname.', + default=platform.node()) args = parser.parse_args() # Configure consumer connection to Kafka broker - conf = {'bootstrap.servers': args.broker, + conf = {'bootstrap.servers': args.broker, 'group.id': args.group, 'default.topic.config': {'auto.offset.reset': 'smallest'}} - if args.group: - conf['group.id'] = args.group - else: - conf['group.id'] = os.environ['HOSTNAME'] # Start consumer and monitor alert stream with alertConsumer.AlertConsumer(args.topic, **conf) as streamWatcher: diff --git a/bin/printStream.py b/bin/printStream.py index d48bdea..64851fd 100644 --- a/bin/printStream.py +++ b/bin/printStream.py @@ -31,6 +31,8 @@ import argparse import sys import os +import platform + from lsst.alert.stream import alertConsumer @@ -87,20 +89,17 @@ def main(): help='Globally unique name of the consumer group. ' 'Consumers in the same group will share messages ' '(i.e., only one consumer will receive a message, ' - 'as in a queue). Default is value of $HOSTNAME.') + 'as in a queue). Default is the current hostname.', + default=platform.node()) parser.add_argument('--stampDir', type=str, - help='Output directory for writing postage stamp' + help='Output directory for writing postage stamp ' 'cutout files. **THERE ARE NO STAMPS RIGHT NOW.**') args = parser.parse_args() # Configure consumer connection to Kafka broker - conf = {'bootstrap.servers': args.broker, + conf = {'bootstrap.servers': args.broker, 'group.id': args.group, 'default.topic.config': {'auto.offset.reset': 'smallest'}} - if args.group: - conf['group.id'] = args.group - else: - conf['group.id'] = os.environ['HOSTNAME'] # Start consumer and print alert stream with alertConsumer.AlertConsumer(args.topic, **conf) as streamReader: diff --git a/bin/sendAlertStream.py b/bin/sendAlertStream.py index 781286a..46a05b1 100644 --- a/bin/sendAlertStream.py +++ b/bin/sendAlertStream.py @@ -25,52 +25,14 @@ content. """ -from __future__ import print_function import argparse +import asyncio import glob +import itertools import time -import asyncio from lsst.alert.stream import alertProducer from lsst.alert.packet import retrieve_alerts - -@asyncio.coroutine -def delay(wait_sec, function, *args): - """Sleep for a given time before calling a function. - Parameters - ---------- - wait_sec - Time in seconds to sleep before calling `function`. - function - Function to return after sleeping. - """ - yield from asyncio.sleep(wait_sec) - return function(*args) - - -@asyncio.coroutine -def schedule_delays(eventloop, function, argslist, interval=39): - """Schedule delayed calls of functions at a repeating interval. - Parameters - ---------- - eventloop - Event loop returned by asyncio.get_event_loop(). - function - Function to be scheduled. - argslist - List of inputs for function to loop over. - interval - Time in seconds between calls. - """ - counter = 1 - for arg in argslist: - wait_time = interval - (time.time() % interval) - yield from asyncio.ensure_future(delay(wait_time, function, arg)) - print('visits finished: {} \t time: {}'.format(counter, time.time())) - counter += 1 - eventloop.stop() - - def main(): parser = argparse.ArgumentParser(description=__doc__) parser.add_argument('broker', type=str, @@ -89,23 +51,30 @@ def main(): files.sort() def send_visit(f): - print('visit:', f[15:20], '\ttime:', time.time()) + start_time = time.time() + print('visit:', f[15:20], '\ttime:', start_time) # Load alert contents with open(f, mode='rb') as file_data: # TODO replace Avro files with visits having better S/N cut # for now, limit to first 10,000 alerts (current have ~70,000) schema, alert_packets = retrieve_alerts(file_data) - alert_count = 0 - for record in alert_packets: - if alert_count < 10000: - streamProducer.send(schema, record) - alert_count += 1 - else: - break + ALERTS_TO_SEND = 10000 + for alert_count, record in enumerate(alert_packets): + if alert_count < ALERTS_TO_SEND: + streamProducer.send(schema, record) + else: + break streamProducer.flush() + print(f"Sent {alert_count} alerts in {time.time() - start_time}s.") + # Schedule visits to be send every `interval` seconds. loop = asyncio.get_event_loop() - asyncio.ensure_future(schedule_delays(loop, send_visit, files)) + interval = 39 # Seconds between visits + for delay, filename in zip(itertools.count(0, interval), files): + loop.call_later(delay, send_visit, filename) + + # Shut down the event loop after the last visit has been sent. + loop.call_later(delay, loop.stop) loop.run_forever() loop.close() diff --git a/docker-compose.yml b/docker-compose.yml index 4d91846..e62f418 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -16,8 +16,15 @@ services: environment: - KAFKA_BROKER_ID=1 - KAFKA_ZOOKEEPER_CONNECT=zookeeper:32181 - - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092 + + # Define separate listeners for inside & outside the Docker network. + # https://rmoff.net/2018/08/02/kafka-listeners-explained/ + - KAFKA_LISTENERS=INTERNAL://kafka:9092,EXTERNAL://kafka:29092 + - KAFKA_ADVERTISED_LISTENERS=INTERNAL://kafka:9092,EXTERNAL://localhost:29092 + - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT + - KAFKA_INTER_BROKER_LISTENER_NAME=INTERNAL + - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 # remove for >= 3 brokers - KAFKA_DELETE_TOPIC_ENABLE=true ports: - - "9092" + - "29092:29092"