Azure Functions integrates deeply with Azure data services. This chapter covers Cosmos DB, Queue Storage, and Service Bus triggers.

Queue Storage Trigger

Process messages from Azure Queue Storage:

  # function_app.py
import azure.functions as func
import json
import logging

app = func.FunctionApp()

@app.queue_trigger(
    arg_name="msg",
    queue_name="order-queue",
    connection="AzureWebJobsStorage",
)
def process_order(msg: func.QueueMessage) -> None:
    body = msg.get_body().decode("utf-8")
    order = json.loads(body)
    logging.info("Processing order %s", order["order_id"])
    fulfill_order(order)
  

Send messages to the queue from another function or app:

  from azure.storage.queue import QueueClient
import json
import os

def enqueue_order(order):
    client = QueueClient.from_connection_string(
        os.environ["AzureWebJobsStorage"],
        "order-queue",
    )
    client.send_message(json.dumps(order))
  

Cosmos DB Trigger (Change Feed)

React to database changes automatically:

  @app.cosmos_db_trigger(
    arg_name="documents",
    database_name="mydb",
    container_name="orders",
    connection="CosmosDBConnection",
    lease_container_name="leases",
    create_lease_container_if_not_exists=True,
)
def on_order_change(documents: func.DocumentList) -> None:
    for doc in documents:
        logging.info("Order changed: %s", doc["id"])
        if doc.get("status") == "shipped":
            send_shipping_notification(doc)
  

Configure connection in local.settings.json:

  {
  "Values": {
    "CosmosDBConnection": "AccountEndpoint=https://...;AccountKey=...;"
  }
}
  

Service Bus Trigger

For enterprise messaging with ordering, sessions, and dead-letter queues:

  @app.service_bus_queue_trigger(
    arg_name="msg",
    queue_name="payments",
    connection="ServiceBusConnection",
)
def process_payment(msg: func.ServiceBusMessage) -> None:
    payment = json.loads(msg.get_body().decode("utf-8"))
    logging.info("Payment received: $%.2f", payment["amount"])
    charge_customer(payment)
  

Service Bus vs Queue Storage:

Feature Queue Storage Service Bus
Cost Lower Higher
Ordering No Yes (sessions)
Dead-letter Manual Built-in
Max message 64 KB 256 KB–1 MB
Use case Simple workloads Enterprise messaging

Durable Functions — Workflow Orchestration

Coordinate multiple functions into workflows:

  pip install azure-functions-durable
  
  import azure.functions as func
import azure.durable_functions as df

app = df.DFApp(http_auth_level=func.AuthLevel.ANONYMOUS)

@app.route(route="orchestrators/order")
@app.durable_client_input(client_name="client")
async def start_order(req: func.HttpRequest, client):
    order = req.get_json()
    instance_id = await client.start_new("order_orchestrator", None, order)
    return client.create_check_status_response(req, instance_id)

@app.orchestration_trigger(context_name="context")
def order_orchestrator(context: df.DurableOrchestrationContext):
    order = context.get_input()

    # Step 1: Validate inventory
    inventory_ok = yield context.call_activity("check_inventory", order)
    if not inventory_ok:
        return {"status": "failed", "reason": "out_of_stock"}

    # Step 2: Process payment
    payment = yield context.call_activity("process_payment", order)

    # Step 3: Ship order
    shipment = yield context.call_activity("ship_order", order)

    return {"status": "completed", "payment": payment, "shipment": shipment}

@app.activity_trigger(input_name="order")
def check_inventory(order: dict) -> bool:
    return order.get("quantity", 0) <= 100

@app.activity_trigger(input_name="order")
def process_payment(order: dict) -> dict:
    return {"transaction_id": "txn_123", "amount": order["amount"]}

@app.activity_trigger(input_name="order")
def ship_order(order: dict) -> dict:
    return {"tracking_number": "TRK-456"}
  

Durable Functions handle state, retries, and timeouts automatically — ideal for multi-step business processes.

HTTP Output Binding

Return data directly to HTTP clients from queue triggers:

  @app.queue_trigger(arg_name="msg", queue_name="tasks", connection="AzureWebJobsStorage")
@app.http_output(arg_name="res", route="task-result", methods=["POST"])
def task_processor(msg: func.QueueMessage, res: func.Out[func.HttpResponse]):
    result = process(msg.get_body().decode())
    res.set(func.HttpResponse(json.dumps(result), mimetype="application/json"))
  

Local Development

  func start
# Queue trigger: use Azure Storage Explorer to add messages
# Or use az CLI:
az storage message put --queue-name order-queue \
    --content '{"order_id": "123", "amount": 49.99}' \
    --connection-string "..."
  

Azure Functions + Cosmos DB + Service Bus form the backbone of many enterprise Azure architectures.