Cloud Functions shine when reacting to events. This chapter covers Pub/Sub messaging and common GCP event sources.

Pub/Sub Background Function

Process messages from a Pub/Sub topic:

  # main.py
import functions_framework
import json
import base64

@functions_framework.cloud_event
def process_message(cloud_event):
    message = cloud_event.data.get("message", {})
    data = base64.b64decode(message.get("data", "")).decode("utf-8")
    attributes = message.get("attributes", {})

    print(f"Received: {data}")
    print(f"Attributes: {attributes}")

    payload = json.loads(data)
    handle_order(payload)

def handle_order(order):
    order_id = order.get("order_id")
    print(f"Processing order {order_id}")
    # Business logic here...
  

Deploy with Pub/Sub trigger:

  gcloud pubsub topics create order-events

gcloud functions deploy process-order \
    --gen2 \
    --runtime=python312 \
    --region=us-central1 \
    --source=. \
    --entry-point=process_message \
    --trigger-topic=order-events
  

Publish a test message:

  gcloud pubsub topics publish order-events \
    --message='{"order_id": "12345", "amount": 99.99}'
  

Publishing to Pub/Sub from Another Function

  from google.cloud import pubsub_v1
import json

publisher = pubsub_v1.PublisherClient()
TOPIC = "projects/my-project/topics/notifications"

def publish_notification(user_id, message):
    data = json.dumps({"user_id": user_id, "message": message}).encode("utf-8")
    future = publisher.publish(TOPIC, data, event_type="notification")
    return future.result()
  

Cloud Storage Trigger (Review + Advanced)

Process files with metadata extraction:

  @functions_framework.cloud_event
def process_upload(cloud_event):
    from google.cloud import storage
    import tempfile
    import os

    data = cloud_event.data
    bucket_name = data["bucket"]
    file_name = data["name"]

    if file_name.startswith("thumbnails/"):
        print("Skipping thumbnail")
        return

    client = storage.Client()
    bucket = client.bucket(bucket_name)
    blob = bucket.blob(file_name)

    with tempfile.NamedTemporaryFile(delete=False) as tmp:
        blob.download_to_filename(tmp.name)
        size = os.path.getsize(tmp.name)
        print(f"File {file_name}: {size} bytes")

    os.unlink(tmp.name)
  

Firestore Trigger

React to database changes:

  @functions_framework.cloud_event
def on_user_created(cloud_event):
    data = cloud_event.data
    value = data.get("value", {})
    fields = value.get("fields", {})

    name = fields.get("name", {}).get("stringValue", "Unknown")
    email = fields.get("email", {}).get("stringValue", "")

    print(f"New user: {name} ({email})")
    send_welcome_email(email, name)
  

Deploy:

  gcloud functions deploy on-user-created \
    --gen2 \
    --runtime=python312 \
    --trigger-event-filters="type=google.cloud.firestore.document.v1.created" \
    --trigger-event-filters="database=(default)" \
    --trigger-event-filters-path-pattern="document=users/{userId}"
  

Scheduled Functions (Cloud Scheduler)

Run on a cron schedule:

  gcloud functions deploy daily-report \
    --gen2 \
    --runtime=python312 \
    --entry-point=generate_report \
    --trigger-http \
    --no-allow-unauthenticated

# Create scheduler job
gcloud scheduler jobs create http daily-report-job \
    --schedule="0 8 * * *" \
    --uri="https://REGION-PROJECT.cloudfunctions.net/daily-report" \
    [email protected]
  
  @functions_framework.http
def generate_report(request):
    report = build_daily_report()
    send_report_email(report)
    return "Report generated", 200
  

Error Handling & Retries

Pub/Sub automatically retries failed invocations. Design for idempotency:

  processed_ids = set()  # Use Firestore/Redis in production

def handle_order(order):
    order_id = order["order_id"]
    if order_id in processed_ids:
        print(f"Duplicate order {order_id}, skipping")
        return
    process(order)
    processed_ids.add(order_id)
  

Configure dead-letter topics for messages that fail repeatedly:

  gcloud pubsub subscriptions update SUBSCRIPTION \
    --dead-letter-topic=failed-orders \
    --max-delivery-attempts=5
  

Local Testing

  pip install functions-framework cloudevents

# Test HTTP function
functions-framework --target=generate_report --debug

# Test with sample CloudEvent
functions-framework --target=process_message --signature-type=cloudevent
  

Event-driven architecture on GCP connects Cloud Functions to your entire cloud infrastructure.