Skip to content

Experiments with asyncio #27

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 91 additions & 0 deletions bin/aioPrintStream.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
#!/usr/bin/env python
#
# This file is part of alert_stream.
#
# Developed for the LSST Data Management System.
# This product includes software developed by the LSST Project
# (https://www.lsst.org).
# See the COPYRIGHT file at the top-level directory of this distribution
# for details of code ownership.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.

"""Consumes alert stream and prints all messages to the console.

Proof of concept using asyncio/aiokafka.
"""

import asyncio
import platform
import struct

from typing import Tuple

from aiokafka import AIOKafkaConsumer

from lsst.alert.packet import Schema
from lsst.alert.packet import SchemaRegistry

class Decoder(object):
"""Avro alert packet deserializer.

Paramters
---------
schema
If supplied, always uses this schema to decode events. Otherwise, an
appropriate schema is retrieved from the registry.
"""
def __init__(self, schema: Schema=None) -> None:
self.schema = schema
self.schema_registry = SchemaRegistry.from_filesystem()

def __call__(self, raw_bytes: bytes) -> Tuple[Schema, dict]:
"""Decode Avro-serialized raw bytes.

Parameters
----------
raw_bytes
Data to be decoded. Assumed to be in Confluent wire format.

Returns
-------
schema
The schema used to decode the message.
message
The decoded message.
"""
schema_hash = struct.unpack("!I", raw_bytes[1:5])[0]
schema = (self.schema if self.schema is not None else
self.schema_registry.get_by_id(schema_hash))
return schema, schema.deserialize(raw_bytes[5:])

async def consume() -> None:
consumer = AIOKafkaConsumer(
'my-stream',
loop=loop,
bootstrap_servers='localhost:29092',
value_deserializer=Decoder(),
group_id=platform.node())
await consumer.start()
try:
# Consume messages
async for msg in consumer:
print("consumed: ", msg.topic, msg.partition, msg.offset,
msg.key, msg.value, msg.timestamp)
finally:
# Will leave consumer group; perform autocommit if enabled.
await consumer.stop()

loop = asyncio.get_event_loop()
loop.run_until_complete(consume())
12 changes: 4 additions & 8 deletions bin/filterStream.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,10 @@
"""Alert stream filter deployer.
"""

from __future__ import print_function
import argparse
import sys
import os
import inspect
import platform
from lsst.alert.stream import alertConsumer, alertProducer
from lsst.alert.stream import filterBase
from lsst.alert.stream import filters
Expand All @@ -48,18 +47,15 @@ def main():
help='Globally unique name of the consumer group. '
'Consumers in the same group will share messages '
'(i.e., only one consumer will receive a message, '
'as in a queue). Default is value of $HOSTNAME.')
'as in a queue). Default is the current hostname.',
default=platform.node())

args = parser.parse_args()
fnum = args.filterNum

# Configure consumer connection to Kafka broker
cconf = {'bootstrap.servers': args.broker,
cconf = {'bootstrap.servers': args.broker, 'group.id': args.group,
'default.topic.config': {'auto.offset.reset': 'smallest'}}
if args.group:
cconf['group.id'] = args.group
else:
cconf['group.id'] = os.environ['HOSTNAME']

pconf = {'bootstrap.servers': args.broker}

Expand Down
10 changes: 4 additions & 6 deletions bin/monitorStream.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import argparse
import os
import platform
import sys
from lsst.alert.stream import alertConsumer

Expand All @@ -43,16 +44,13 @@ def main():
help='Globally unique name of the consumer group. '
'Consumers in the same group will share messages '
'(i.e., only one consumer will receive a message, '
'as in a queue). Default is value of $HOSTNAME.')
'as in a queue). Default is the current hostname.',
default=platform.node())
args = parser.parse_args()

# Configure consumer connection to Kafka broker
conf = {'bootstrap.servers': args.broker,
conf = {'bootstrap.servers': args.broker, 'group.id': args.group,
'default.topic.config': {'auto.offset.reset': 'smallest'}}
if args.group:
conf['group.id'] = args.group
else:
conf['group.id'] = os.environ['HOSTNAME']

# Start consumer and monitor alert stream
with alertConsumer.AlertConsumer(args.topic, **conf) as streamWatcher:
Expand Down
13 changes: 6 additions & 7 deletions bin/printStream.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import argparse
import sys
import os
import platform

from lsst.alert.stream import alertConsumer


Expand Down Expand Up @@ -87,20 +89,17 @@ def main():
help='Globally unique name of the consumer group. '
'Consumers in the same group will share messages '
'(i.e., only one consumer will receive a message, '
'as in a queue). Default is value of $HOSTNAME.')
'as in a queue). Default is the current hostname.',
default=platform.node())
parser.add_argument('--stampDir', type=str,
help='Output directory for writing postage stamp'
help='Output directory for writing postage stamp '
'cutout files. **THERE ARE NO STAMPS RIGHT NOW.**')

args = parser.parse_args()

# Configure consumer connection to Kafka broker
conf = {'bootstrap.servers': args.broker,
conf = {'bootstrap.servers': args.broker, 'group.id': args.group,
'default.topic.config': {'auto.offset.reset': 'smallest'}}
if args.group:
conf['group.id'] = args.group
else:
conf['group.id'] = os.environ['HOSTNAME']

# Start consumer and print alert stream
with alertConsumer.AlertConsumer(args.topic, **conf) as streamReader:
Expand Down
67 changes: 18 additions & 49 deletions bin/sendAlertStream.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,52 +25,14 @@
content.
"""

from __future__ import print_function
import argparse
import asyncio
import glob
import itertools
import time
import asyncio
from lsst.alert.stream import alertProducer
from lsst.alert.packet import retrieve_alerts


@asyncio.coroutine
def delay(wait_sec, function, *args):
"""Sleep for a given time before calling a function.
Parameters
----------
wait_sec
Time in seconds to sleep before calling `function`.
function
Function to return after sleeping.
"""
yield from asyncio.sleep(wait_sec)
return function(*args)


@asyncio.coroutine
def schedule_delays(eventloop, function, argslist, interval=39):
"""Schedule delayed calls of functions at a repeating interval.
Parameters
----------
eventloop
Event loop returned by asyncio.get_event_loop().
function
Function to be scheduled.
argslist
List of inputs for function to loop over.
interval
Time in seconds between calls.
"""
counter = 1
for arg in argslist:
wait_time = interval - (time.time() % interval)
yield from asyncio.ensure_future(delay(wait_time, function, arg))
print('visits finished: {} \t time: {}'.format(counter, time.time()))
counter += 1
eventloop.stop()


def main():
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument('broker', type=str,
Expand All @@ -89,23 +51,30 @@ def main():
files.sort()

def send_visit(f):
print('visit:', f[15:20], '\ttime:', time.time())
start_time = time.time()
print('visit:', f[15:20], '\ttime:', start_time)
# Load alert contents
with open(f, mode='rb') as file_data:
# TODO replace Avro files with visits having better S/N cut
# for now, limit to first 10,000 alerts (current have ~70,000)
schema, alert_packets = retrieve_alerts(file_data)
alert_count = 0
for record in alert_packets:
if alert_count < 10000:
streamProducer.send(schema, record)
alert_count += 1
else:
break
ALERTS_TO_SEND = 10000
for alert_count, record in enumerate(alert_packets):
if alert_count < ALERTS_TO_SEND:
streamProducer.send(schema, record)
else:
break
streamProducer.flush()
print(f"Sent {alert_count} alerts in {time.time() - start_time}s.")

# Schedule visits to be send every `interval` seconds.
loop = asyncio.get_event_loop()
asyncio.ensure_future(schedule_delays(loop, send_visit, files))
interval = 39 # Seconds between visits
for delay, filename in zip(itertools.count(0, interval), files):
loop.call_later(delay, send_visit, filename)

# Shut down the event loop after the last visit has been sent.
loop.call_later(delay, loop.stop)
loop.run_forever()
loop.close()

Expand Down
11 changes: 9 additions & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,15 @@ services:
environment:
- KAFKA_BROKER_ID=1
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:32181
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092

# Define separate listeners for inside & outside the Docker network.
# https://rmoff.net/2018/08/02/kafka-listeners-explained/
- KAFKA_LISTENERS=INTERNAL://kafka:9092,EXTERNAL://kafka:29092
- KAFKA_ADVERTISED_LISTENERS=INTERNAL://kafka:9092,EXTERNAL://localhost:29092
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
- KAFKA_INTER_BROKER_LISTENER_NAME=INTERNAL

- KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 # remove for >= 3 brokers
- KAFKA_DELETE_TOPIC_ENABLE=true
ports:
- "9092"
- "29092:29092"