On this page
article
Cloud Functions with Pub/Sub & Events
Build event-driven Google Cloud Functions with Pub/Sub, Cloud Storage triggers, Firestore, and Cloud Scheduler.
Cloud Functions shine when reacting to events. This chapter covers Pub/Sub messaging and common GCP event sources.
Pub/Sub Background Function
Process messages from a Pub/Sub topic:
# main.py
import functions_framework
import json
import base64
@functions_framework.cloud_event
def process_message(cloud_event):
message = cloud_event.data.get("message", {})
data = base64.b64decode(message.get("data", "")).decode("utf-8")
attributes = message.get("attributes", {})
print(f"Received: {data}")
print(f"Attributes: {attributes}")
payload = json.loads(data)
handle_order(payload)
def handle_order(order):
order_id = order.get("order_id")
print(f"Processing order {order_id}")
# Business logic here...
Deploy with Pub/Sub trigger:
gcloud pubsub topics create order-events
gcloud functions deploy process-order \
--gen2 \
--runtime=python312 \
--region=us-central1 \
--source=. \
--entry-point=process_message \
--trigger-topic=order-events
Publish a test message:
gcloud pubsub topics publish order-events \
--message='{"order_id": "12345", "amount": 99.99}'
Publishing to Pub/Sub from Another Function
from google.cloud import pubsub_v1
import json
publisher = pubsub_v1.PublisherClient()
TOPIC = "projects/my-project/topics/notifications"
def publish_notification(user_id, message):
data = json.dumps({"user_id": user_id, "message": message}).encode("utf-8")
future = publisher.publish(TOPIC, data, event_type="notification")
return future.result()
Cloud Storage Trigger (Review + Advanced)
Process files with metadata extraction:
@functions_framework.cloud_event
def process_upload(cloud_event):
from google.cloud import storage
import tempfile
import os
data = cloud_event.data
bucket_name = data["bucket"]
file_name = data["name"]
if file_name.startswith("thumbnails/"):
print("Skipping thumbnail")
return
client = storage.Client()
bucket = client.bucket(bucket_name)
blob = bucket.blob(file_name)
with tempfile.NamedTemporaryFile(delete=False) as tmp:
blob.download_to_filename(tmp.name)
size = os.path.getsize(tmp.name)
print(f"File {file_name}: {size} bytes")
os.unlink(tmp.name)
Firestore Trigger
React to database changes:
@functions_framework.cloud_event
def on_user_created(cloud_event):
data = cloud_event.data
value = data.get("value", {})
fields = value.get("fields", {})
name = fields.get("name", {}).get("stringValue", "Unknown")
email = fields.get("email", {}).get("stringValue", "")
print(f"New user: {name} ({email})")
send_welcome_email(email, name)
Deploy:
gcloud functions deploy on-user-created \
--gen2 \
--runtime=python312 \
--trigger-event-filters="type=google.cloud.firestore.document.v1.created" \
--trigger-event-filters="database=(default)" \
--trigger-event-filters-path-pattern="document=users/{userId}"
Scheduled Functions (Cloud Scheduler)
Run on a cron schedule:
gcloud functions deploy daily-report \
--gen2 \
--runtime=python312 \
--entry-point=generate_report \
--trigger-http \
--no-allow-unauthenticated
# Create scheduler job
gcloud scheduler jobs create http daily-report-job \
--schedule="0 8 * * *" \
--uri="https://REGION-PROJECT.cloudfunctions.net/daily-report" \
[email protected]
@functions_framework.http
def generate_report(request):
report = build_daily_report()
send_report_email(report)
return "Report generated", 200
Error Handling & Retries
Pub/Sub automatically retries failed invocations. Design for idempotency:
processed_ids = set() # Use Firestore/Redis in production
def handle_order(order):
order_id = order["order_id"]
if order_id in processed_ids:
print(f"Duplicate order {order_id}, skipping")
return
process(order)
processed_ids.add(order_id)
Configure dead-letter topics for messages that fail repeatedly:
gcloud pubsub subscriptions update SUBSCRIPTION \
--dead-letter-topic=failed-orders \
--max-delivery-attempts=5
Local Testing
pip install functions-framework cloudevents
# Test HTTP function
functions-framework --target=generate_report --debug
# Test with sample CloudEvent
functions-framework --target=process_message --signature-type=cloudevent
Related Chapters
- Getting Started
- Best Practices
- AWS Lambda API Gateway — similar patterns on AWS
Event-driven architecture on GCP connects Cloud Functions to your entire cloud infrastructure.