A FastAPI-inspired framework to build production-grade Kafka applications effortlessly, with support for routing, DI, retries, DLQ, and custom serialization.
✅ FastAPI-like routing using decorators
✅ Built-in dependency injection system
✅ Priority-based message processing
✅ Retry and DLQ support
✅ Plug-and-play serialization (JSON, Avro, Protobuf)
✅ Built for async using aiokafka
pip install kafka-framework
pip install kafka-framework[avro]
pip install kafka-framework[all]
from kafka_framework import KafkaApp, TopicRouter, Depends
from kafka_framework.serialization import JSONSerializer
app = KafkaApp(
bootstrap_servers=["localhost:9092"],
group_id="my-consumer-group",
serializer=JSONSerializer()
)
router = TopicRouter()
# Dependencies
async def get_db():
return {"connection": "db"}
def get_config():
return {"env": "production"}
@router.topic_event(topic="orders", event_name="order_created", priority=1)
async def handle_order_created(message, db=Depends(get_db), config=Depends(get_config)):
print(f"Processing order {message.value['id']}")
@router.topic_event(
topic="orders",
event_name="order_cancelled",
priority=2,
retry_attempts=3,
dlq_postfix="cancelled"
)
async def handle_order_cancelled(message):
print(f"Cancelling order {message.value['id']}")
app.include_router(router)
# Entry point
if __name__ == "__main__":
import asyncio
asyncio.run(app.start())
Handlers with higher priority run first:
@router.topic_event("notifications", "vip", priority=10)
async def handle_vip(message): ...
@router.topic_event("notifications", "normal", priority=1)
async def handle_normal(message): ...
Unprocessed or failed messages are pushed to DLQ.
@router.topic_event("orders", "order_created", dlq_postfix="created")
async def handle_order(message): ...
Retries failed handlers before DLQ fallback:
@router.topic_event("orders", "fail", retry_attempts=5)
async def flaky_handler(message): ...
Supports JSON, Protobuf, and Avro.
from kafka_framework.serialization import AvroSerializer
import json
schema = {
"type": "record",
"name": "Order",
"fields": [{"name": "id", "type": "string"}, {"name": "amount", "type": "double"}]
}
app = KafkaApp(
bootstrap_servers=["localhost:9092"],
serializer=AvroSerializer(
schema_registry_url="http://localhost:8081",
schema_str=json.dumps(schema)
)
)
KafkaApp(
bootstrap_servers=["localhost:9092"],
group_id="your-group",
config={
"consumer_config": {
"auto_offset_reset": "earliest",
"enable_auto_commit": True,
"max_poll_records": 500
},
"producer_config": {
"acks": "all",
"compression_type": "gzip",
"max_request_size": 1048576
}
}
)
We welcome contributions! Here’s how you can help:
- 🤛 Open issues
- ✍️ Submit PRs
- 🗨️ Discuss improvements
- 📚 Improve docs and examples
See CONTRIBUTING.md to get started.
- Documentation
- Admin/monitoring interface
- Kafka Streams integration
- Auto-schema registry
- CLI tooling
This project is licensed under the MIT License. See LICENSE for details.
- 📘 Documentation (coming soon)
- 🐍 PyPI package
- 🔧 GitHub Actions CI
- 💬 Discussions (coming soon)
Build Kafka consumers like web APIs — clean, testable, async-ready.