Apache Kafka is a distributed event streaming platform. Python services use it for decoupled microservices, real-time pipelines, log aggregation, and event sourcing.

Core Concepts

Term Meaning
Topic Named stream of events (like a category)
Partition Ordered, immutable log within a topic — enables parallelism
Producer Publishes messages to a topic
Consumer Reads messages from a topic
Consumer Group Consumers that share work — each partition read by one member
Offset Position of a consumer in a partition
Broker Kafka server that stores and serves data
  Producer → Topic (partitions 0, 1, 2) → Consumer Group
                                              ├── Consumer A (partition 0)
                                              └── Consumer B (partitions 1, 2)
  

Local Development

Run Kafka with Docker Compose:

  # docker-compose.yml
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181

  kafka:
    image: confluentinc/cp-kafka:7.5.0
    depends_on: [zookeeper]
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
  
  docker compose up -d
  

Producer with confluent-kafka

The most widely used Python Kafka client:

  pip install confluent-kafka
  
  # producer.py
import json
from confluent_kafka import Producer

conf = {
    "bootstrap.servers": "localhost:9092",
    "client.id": "order-producer",
}
producer = Producer(conf)

def delivery_report(err, msg):
    if err:
        print(f"Delivery failed: {err}")
    else:
        print(f"Delivered to {msg.topic()} [{msg.partition()}] @ {msg.offset()}")

def send_order(order_id: str, amount: float):
    payload = json.dumps({"order_id": order_id, "amount": amount})
    producer.produce(
        topic="orders",
        key=order_id.encode(),
        value=payload.encode(),
        callback=delivery_report,
    )
    producer.poll(0)  # trigger callbacks

send_order("ORD-001", 99.99)
send_order("ORD-002", 149.50)
producer.flush()  # wait for all messages to be delivered
  

Keys determine partition assignment — messages with the same key go to the same partition (useful for ordering per entity).

Consumer

  # consumer.py
import json
from confluent_kafka import Consumer

conf = {
    "bootstrap.servers": "localhost:9092",
    "group.id": "order-processors",
    "auto.offset.reset": "earliest",  # start from beginning if no offset
}
consumer = Consumer(conf)
consumer.subscribe(["orders"])

try:
    while True:
        msg = consumer.poll(timeout=1.0)
        if msg is None:
            continue
        if msg.error():
            print(f"Error: {msg.error()}")
            continue

        order = json.loads(msg.value().decode())
        print(f"Processing order {order['order_id']}: ${order['amount']}")
        # Process order...
        consumer.commit(msg)  # manual commit after successful processing
finally:
    consumer.close()
  

Consumer Group Behavior

  • Multiple consumers in the same group share partitions
  • Adding consumers scales throughput (up to partition count)
  • Each message is delivered to one consumer in the group
  • Different groups each receive all messages (pub/sub pattern)

Async Consumer with aiokafka

For asyncio applications:

  pip install aiokafka
  
  # async_consumer.py
import asyncio
import json
from aiokafka import AIOKafkaConsumer

async def consume_orders():
    consumer = AIOKafkaConsumer(
        "orders",
        bootstrap_servers="localhost:9092",
        group_id="async-processors",
        auto_offset_reset="earliest",
    )
    await consumer.start()
    try:
        async for msg in consumer:
            order = json.loads(msg.value.decode())
            await process_order(order)
    finally:
        await consumer.stop()

async def process_order(order: dict):
    print(f"Async processing: {order['order_id']}")

asyncio.run(consume_orders())
  

Message Schema with JSON / Avro

Unstructured JSON works for prototypes. Production systems use schemas:

  # Structured event envelope
event = {
    "event_id": "evt-123",
    "event_type": "order.created",
    "timestamp": "2026-06-13T10:00:00Z",
    "payload": {"order_id": "ORD-001", "amount": 99.99},
}
  

For schema evolution, use Confluent Schema Registry with Avro or Protobuf:

  pip install confluent-kafka[avro]
  

Error Handling and Retries

Failed messages should not block the pipeline:

  def process_message(msg):
    try:
        order = json.loads(msg.value().decode())
        handle_order(order)
    except Exception as e:
        send_to_dead_letter_queue(msg, error=str(e))

def send_to_dead_letter_queue(msg, error: str):
    producer.produce(
        topic="orders-dlq",
        value=msg.value(),
        headers=[("error", error.encode())],
    )
  

Dead Letter Queue (DLQ) — a separate topic for messages that failed processing, enabling inspection and replay.

Idempotent Processing

Consumers may receive duplicate messages (at-least-once delivery). Make handlers idempotent:

  processed_ids: set[str] = set()  # use Redis/DB in production

def handle_order(order: dict):
    if order["order_id"] in processed_ids:
        return  # already handled
    # ... process ...
    processed_ids.add(order["order_id"])
  

Kafka in a Python Microservice Architecture

  FastAPI API → Kafka (orders topic) → Worker Service → PostgreSQL
                  ↓
            Analytics Service → Data Warehouse
                  ↓
            Notification Service → Email/SMS
  

Each service is independently deployable and scales based on its consumer lag.

Monitoring Consumer Lag

Consumer lag — how far behind a consumer is from the latest message. High lag indicates the consumer cannot keep up.

  # confluent CLI
kafka-consumer-groups --bootstrap-server localhost:9092 \
  --describe --group order-processors
  

Alert when lag exceeds a threshold — it means processing is falling behind.

Kafka vs Other Message Queues

Kafka RabbitMQ Redis Pub/Sub
Model Log / stream Queue / routing Pub/sub
Persistence Durable by default Optional In-memory
Replay Yes (by offset) No (ack removes) No
Throughput Very high Moderate High
Use case Event streaming, logs Task queues Real-time notifications

Production Best Practices

  1. Partition count — plan upfront; hard to change later without rebalancing
  2. Replication factor — at least 3 in production for fault tolerance
  3. Manual commits — commit after successful processing, not before
  4. Idempotent producers — enable enable.idempotence=True to prevent duplicates
  5. Monitor lag — primary health metric for consumers
  6. Schema registry — enforce contracts between producers and consumers
  7. Don’t use Kafka as a database — it’s a log, not a query engine

Kafka powers the event backbone of modern Python microservice architectures.