Event Streaming with Kafka
Build event-driven Python applications with Apache Kafka — producers, consumers, topics, consumer groups, and async processing with aiokafka.
Apache Kafka is a distributed event streaming platform. Python services use it for decoupled microservices, real-time pipelines, log aggregation, and event sourcing.
Core Concepts
| Term | Meaning |
|---|---|
| Topic | Named stream of events (like a category) |
| Partition | Ordered, immutable log within a topic — enables parallelism |
| Producer | Publishes messages to a topic |
| Consumer | Reads messages from a topic |
| Consumer Group | Consumers that share work — each partition read by one member |
| Offset | Position of a consumer in a partition |
| Broker | Kafka server that stores and serves data |
Producer → Topic (partitions 0, 1, 2) → Consumer Group
├── Consumer A (partition 0)
└── Consumer B (partitions 1, 2)
Local Development
Run Kafka with Docker Compose:
# docker-compose.yml
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.5.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka:
image: confluentinc/cp-kafka:7.5.0
depends_on: [zookeeper]
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
docker compose up -d
Producer with confluent-kafka
The most widely used Python Kafka client:
pip install confluent-kafka
# producer.py
import json
from confluent_kafka import Producer
conf = {
"bootstrap.servers": "localhost:9092",
"client.id": "order-producer",
}
producer = Producer(conf)
def delivery_report(err, msg):
if err:
print(f"Delivery failed: {err}")
else:
print(f"Delivered to {msg.topic()} [{msg.partition()}] @ {msg.offset()}")
def send_order(order_id: str, amount: float):
payload = json.dumps({"order_id": order_id, "amount": amount})
producer.produce(
topic="orders",
key=order_id.encode(),
value=payload.encode(),
callback=delivery_report,
)
producer.poll(0) # trigger callbacks
send_order("ORD-001", 99.99)
send_order("ORD-002", 149.50)
producer.flush() # wait for all messages to be delivered
Keys determine partition assignment — messages with the same key go to the same partition (useful for ordering per entity).
Consumer
# consumer.py
import json
from confluent_kafka import Consumer
conf = {
"bootstrap.servers": "localhost:9092",
"group.id": "order-processors",
"auto.offset.reset": "earliest", # start from beginning if no offset
}
consumer = Consumer(conf)
consumer.subscribe(["orders"])
try:
while True:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
print(f"Error: {msg.error()}")
continue
order = json.loads(msg.value().decode())
print(f"Processing order {order['order_id']}: ${order['amount']}")
# Process order...
consumer.commit(msg) # manual commit after successful processing
finally:
consumer.close()
Consumer Group Behavior
- Multiple consumers in the same group share partitions
- Adding consumers scales throughput (up to partition count)
- Each message is delivered to one consumer in the group
- Different groups each receive all messages (pub/sub pattern)
Async Consumer with aiokafka
For asyncio applications:
pip install aiokafka
# async_consumer.py
import asyncio
import json
from aiokafka import AIOKafkaConsumer
async def consume_orders():
consumer = AIOKafkaConsumer(
"orders",
bootstrap_servers="localhost:9092",
group_id="async-processors",
auto_offset_reset="earliest",
)
await consumer.start()
try:
async for msg in consumer:
order = json.loads(msg.value.decode())
await process_order(order)
finally:
await consumer.stop()
async def process_order(order: dict):
print(f"Async processing: {order['order_id']}")
asyncio.run(consume_orders())
Message Schema with JSON / Avro
Unstructured JSON works for prototypes. Production systems use schemas:
# Structured event envelope
event = {
"event_id": "evt-123",
"event_type": "order.created",
"timestamp": "2026-06-13T10:00:00Z",
"payload": {"order_id": "ORD-001", "amount": 99.99},
}
For schema evolution, use Confluent Schema Registry with Avro or Protobuf:
pip install confluent-kafka[avro]
Error Handling and Retries
Failed messages should not block the pipeline:
def process_message(msg):
try:
order = json.loads(msg.value().decode())
handle_order(order)
except Exception as e:
send_to_dead_letter_queue(msg, error=str(e))
def send_to_dead_letter_queue(msg, error: str):
producer.produce(
topic="orders-dlq",
value=msg.value(),
headers=[("error", error.encode())],
)
Dead Letter Queue (DLQ) — a separate topic for messages that failed processing, enabling inspection and replay.
Idempotent Processing
Consumers may receive duplicate messages (at-least-once delivery). Make handlers idempotent:
processed_ids: set[str] = set() # use Redis/DB in production
def handle_order(order: dict):
if order["order_id"] in processed_ids:
return # already handled
# ... process ...
processed_ids.add(order["order_id"])
Kafka in a Python Microservice Architecture
FastAPI API → Kafka (orders topic) → Worker Service → PostgreSQL
↓
Analytics Service → Data Warehouse
↓
Notification Service → Email/SMS
Each service is independently deployable and scales based on its consumer lag.
Monitoring Consumer Lag
Consumer lag — how far behind a consumer is from the latest message. High lag indicates the consumer cannot keep up.
# confluent CLI
kafka-consumer-groups --bootstrap-server localhost:9092 \
--describe --group order-processors
Alert when lag exceeds a threshold — it means processing is falling behind.
Kafka vs Other Message Queues
| Kafka | RabbitMQ | Redis Pub/Sub | |
|---|---|---|---|
| Model | Log / stream | Queue / routing | Pub/sub |
| Persistence | Durable by default | Optional | In-memory |
| Replay | Yes (by offset) | No (ack removes) | No |
| Throughput | Very high | Moderate | High |
| Use case | Event streaming, logs | Task queues | Real-time notifications |
Production Best Practices
- Partition count — plan upfront; hard to change later without rebalancing
- Replication factor — at least 3 in production for fault tolerance
- Manual commits — commit after successful processing, not before
- Idempotent producers — enable
enable.idempotence=Trueto prevent duplicates - Monitor lag — primary health metric for consumers
- Schema registry — enforce contracts between producers and consumers
- Don’t use Kafka as a database — it’s a log, not a query engine
Related
- Async Programming — asyncio fundamentals for aiokafka
- Concurrency — threading vs async for workers
- DevOps & CI/CD — deploy consumers in Docker/Kubernetes
- System Design — event-driven architecture patterns
Kafka powers the event backbone of modern Python microservice architectures.