Azure Functions: Cosmos DB & Queue Triggers
Integrate Azure Functions with Cosmos DB change feed, Queue Storage, and Service Bus for event-driven architectures.
Azure Functions integrates deeply with Azure data services. This chapter covers Cosmos DB, Queue Storage, and Service Bus triggers.
Queue Storage Trigger
Process messages from Azure Queue Storage:
# function_app.py
import azure.functions as func
import json
import logging
app = func.FunctionApp()
@app.queue_trigger(
arg_name="msg",
queue_name="order-queue",
connection="AzureWebJobsStorage",
)
def process_order(msg: func.QueueMessage) -> None:
body = msg.get_body().decode("utf-8")
order = json.loads(body)
logging.info("Processing order %s", order["order_id"])
fulfill_order(order)
Send messages to the queue from another function or app:
from azure.storage.queue import QueueClient
import json
import os
def enqueue_order(order):
client = QueueClient.from_connection_string(
os.environ["AzureWebJobsStorage"],
"order-queue",
)
client.send_message(json.dumps(order))
Cosmos DB Trigger (Change Feed)
React to database changes automatically:
@app.cosmos_db_trigger(
arg_name="documents",
database_name="mydb",
container_name="orders",
connection="CosmosDBConnection",
lease_container_name="leases",
create_lease_container_if_not_exists=True,
)
def on_order_change(documents: func.DocumentList) -> None:
for doc in documents:
logging.info("Order changed: %s", doc["id"])
if doc.get("status") == "shipped":
send_shipping_notification(doc)
Configure connection in local.settings.json:
{
"Values": {
"CosmosDBConnection": "AccountEndpoint=https://...;AccountKey=...;"
}
}
Service Bus Trigger
For enterprise messaging with ordering, sessions, and dead-letter queues:
@app.service_bus_queue_trigger(
arg_name="msg",
queue_name="payments",
connection="ServiceBusConnection",
)
def process_payment(msg: func.ServiceBusMessage) -> None:
payment = json.loads(msg.get_body().decode("utf-8"))
logging.info("Payment received: $%.2f", payment["amount"])
charge_customer(payment)
Service Bus vs Queue Storage:
| Feature | Queue Storage | Service Bus |
|---|---|---|
| Cost | Lower | Higher |
| Ordering | No | Yes (sessions) |
| Dead-letter | Manual | Built-in |
| Max message | 64 KB | 256 KB–1 MB |
| Use case | Simple workloads | Enterprise messaging |
Durable Functions — Workflow Orchestration
Coordinate multiple functions into workflows:
pip install azure-functions-durable
import azure.functions as func
import azure.durable_functions as df
app = df.DFApp(http_auth_level=func.AuthLevel.ANONYMOUS)
@app.route(route="orchestrators/order")
@app.durable_client_input(client_name="client")
async def start_order(req: func.HttpRequest, client):
order = req.get_json()
instance_id = await client.start_new("order_orchestrator", None, order)
return client.create_check_status_response(req, instance_id)
@app.orchestration_trigger(context_name="context")
def order_orchestrator(context: df.DurableOrchestrationContext):
order = context.get_input()
# Step 1: Validate inventory
inventory_ok = yield context.call_activity("check_inventory", order)
if not inventory_ok:
return {"status": "failed", "reason": "out_of_stock"}
# Step 2: Process payment
payment = yield context.call_activity("process_payment", order)
# Step 3: Ship order
shipment = yield context.call_activity("ship_order", order)
return {"status": "completed", "payment": payment, "shipment": shipment}
@app.activity_trigger(input_name="order")
def check_inventory(order: dict) -> bool:
return order.get("quantity", 0) <= 100
@app.activity_trigger(input_name="order")
def process_payment(order: dict) -> dict:
return {"transaction_id": "txn_123", "amount": order["amount"]}
@app.activity_trigger(input_name="order")
def ship_order(order: dict) -> dict:
return {"tracking_number": "TRK-456"}
Durable Functions handle state, retries, and timeouts automatically — ideal for multi-step business processes.
HTTP Output Binding
Return data directly to HTTP clients from queue triggers:
@app.queue_trigger(arg_name="msg", queue_name="tasks", connection="AzureWebJobsStorage")
@app.http_output(arg_name="res", route="task-result", methods=["POST"])
def task_processor(msg: func.QueueMessage, res: func.Out[func.HttpResponse]):
result = process(msg.get_body().decode())
res.set(func.HttpResponse(json.dumps(result), mimetype="application/json"))
Local Development
func start
# Queue trigger: use Azure Storage Explorer to add messages
# Or use az CLI:
az storage message put --queue-name order-queue \
--content '{"order_id": "123", "amount": 49.99}' \
--connection-string "..."
Related Chapters
Azure Functions + Cosmos DB + Service Bus form the backbone of many enterprise Azure architectures.