Your cart is currently empty!
Building Bulletproof Stripe Subscriptions: From CLI Testing to Production-Ready Code
A comprehensive guide to implementing robust Stripe subscription handling with automatic customer recovery, webhook processing, and comprehensive error handling.
This implementation is in production; check it out at jobsight.tekonline.com.au, and let me know if we can assist with your implementation
Introduction
Building a subscription-based SaaS application requires more than just integrating Stripe’s API. You need bulletproof error handling, seamless customer management, and robust testing workflows. In this deep-dive, we’ll explore how to build a production-ready Stripe subscription system that handles edge cases, recovers from failures, and provides excellent developer experience.
The Challenge: Real-World Subscription Complexity
When building WorkSite Diary, a construction site management SaaS, we encountered several real-world challenges:
- Customer ID Management: Users expect seamless billing even when backend data gets corrupted
- Environment Separation: Test vs production customer isolation
- Webhook Reliability: Handling webhook failures and retries
- Customer Recovery: What happens when stored customer IDs become invalid?
- Developer Testing: Efficient workflows for testing subscription flows
Architecture Overview
Our solution uses a layered architecture:
/markd
┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐ │ Frontend │ │ Backend API │ │ Stripe API │ │ (Angular) │◄──►│ (FastAPI) │◄──►│ (Webhooks) │ └─────────────────┘ └──────────────────┘ └─────────────────┘ │ ▼ ┌──────────────────┐ │ Database │ │ (SQLite/PG) │ └──────────────────┘
Key Components:
- CustomerService: Handles all customer lifecycle management
- Billing Routes: API endpoints for subscription operations
- Webhook Handlers: Process Stripe events reliably
- Email Notifications: Admin alerts for system events
- CLI Testing: Stripe CLI integration for development
Part 1: Setting Up Stripe CLI for Development
Installation and Authentication
# Install Stripe CLI (Linux/Mac)
curl -s https://packages.stripe.com/api/security/keypair/stripe-cli-gpg/public | gpg --dearmor | sudo tee /usr/share/keyrings/stripe.gpg
echo "deb [signed-by=/usr/share/keyrings/stripe.gpg] https://packages.stripe.com/stripe-cli-debian-local stable main" | sudo tee -a /etc/apt/sources.list.d/stripe.list
sudo apt update && sudo apt install stripe
# Authenticate with your Stripe account
stripe login
Local Webhook Testing
The game-changer for subscription development is real-time webhook testing:
# Forward webhooks to your local development server
stripe listen --forward-to localhost:5010/api/billing/webhook
# Output shows your webhook signing secret
# > Your webhook signing secret is whsec_1234567890abcdef...
Pro Tip: The webhook signing secret changes each time you restart the CLI. Always update your environment variables!
Environment Configuration
# .env.dev
STRIPE_SECRET_KEY=sk_test_...
STRIPE_WEBHOOK_SECRET=whsec_... # From CLI output
STRIPE_PUBLISHABLE_KEY=pk_test_...
Part 2: Database Schema Design
User-Centric Customer Management
We store customer IDs at the user level, not company level, for better flexibility:
-- User table with environment-specific customer IDs
CREATE TABLE users (
id INTEGER PRIMARY KEY,
email TEXT UNIQUE NOT NULL,
name TEXT,
password_hash TEXT,
is_verified BOOLEAN DEFAULT FALSE,
-- Environment-specific Stripe customer IDs
stripe_customer_id_test TEXT NULL,
stripe_customer_id_live TEXT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- Company table for subscription tracking
CREATE TABLE companies (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
subscription_status TEXT DEFAULT 'free',
subscription_plan TEXT DEFAULT 'free',
stripe_subscription_id TEXT,
current_period_end TIMESTAMP,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
Why User-Level Customer IDs?
- Portability: Users can switch companies without losing billing history
- Simplicity: One customer per user, regardless of company affiliations
- Recovery: Easier to track and recover lost customer relationships
Part 3: The CustomerService – Heart of the System
Core Service Implementation
import stripe
import logging
from sqlalchemy.ext.asyncio import AsyncSession
from models.user import User
from config import get_settings
logger = logging.getLogger(__name__)
settings = get_settings()
stripe.api_key = settings.STRIPE_SECRET_KEY
class CustomerService:
@staticmethod
async def get_customer_object(user: User, db: AsyncSession) -> stripe.Customer:
"""
Get Stripe customer with automatic recovery.
This is the bulletproof method that handles all edge cases.
"""
is_live_mode = not settings.STRIPE_SECRET_KEY.startswith('sk_test_')
customer_id = user.get_stripe_customer_id(is_live_mode)
if customer_id:
try:
# Try to retrieve stored customer
customer = stripe.Customer.retrieve(customer_id)
logger.info(f"Found existing customer: {customer.id}")
return customer
except stripe.error.InvalidRequestError as e:
if "No such customer" in str(e):
# Customer was deleted from Stripe - recover automatically
logger.warning(f"Customer {customer_id} not found, recovering...")
# Notify admin about the issue
await CustomerService._notify_missing_customer(
customer_id, user, db
)
# Clear invalid ID and create new customer
user.set_stripe_customer_id(None, is_live_mode)
await db.commit()
else:
raise
# Create new customer (first time or recovery)
return await CustomerService._create_new_customer(user, db)
@staticmethod
async def _create_new_customer(user: User, db: AsyncSession) -> stripe.Customer:
"""Create new customer with deduplication."""
is_live_mode = not settings.STRIPE_SECRET_KEY.startswith('sk_test_')
# First, search for existing customers by email
existing_customers = stripe.Customer.list(email=user.email, limit=10)
# Look for customer with matching user metadata
for customer in existing_customers.data:
metadata = customer.get('metadata', {})
if metadata.get('user_id') == str(user.id):
# Found existing customer, relink it
user.set_stripe_customer_id(customer.id, is_live_mode)
await db.commit()
return customer
# Create brand new customer
customer = stripe.Customer.create(
email=user.email,
name=user.name,
metadata={
"user_id": str(user.id),
"environment": "live" if is_live_mode else "test",
"created_via": "auto_recovery"
}
)
# Store new customer ID
user.set_stripe_customer_id(customer.id, is_live_mode)
await db.commit()
# Notify admin of successful recovery
await CustomerService._notify_recovery_success(customer.id, user)
return customer
Key Features:
- Automatic Recovery: Handles deleted customers transparently
- Environment Awareness: Separate customers for test/live
- Deduplication: Prevents duplicate customers
- Admin Notifications: Alerts for manual intervention needs
- Comprehensive Logging: Full audit trail
Part 4: Robust Billing API Endpoints
Checkout Session Creation
from fastapi import APIRouter, Depends, HTTPException
from services.customer_service import CustomerService
@router.post("/create-checkout-session")
async def create_checkout_session(
request: CheckoutSessionRequest,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db)
) -> CheckoutSessionResponse:
"""Create checkout session with bulletproof customer handling."""
try:
# Get or create customer (handles all edge cases)
customer = await CustomerService.get_customer_object(current_user, db)
# Create checkout session
session = stripe.checkout.Session.create(
customer=customer.id, # Always valid customer ID
payment_method_types=['card'],
line_items=[{
'price': get_price_id(request.plan_id, request.billing_cycle),
'quantity': 1,
}],
mode='subscription',
success_url=f"{settings.FRONTEND_URL}/account?success=true&session_id={{CHECKOUT_SESSION_ID}}",
cancel_url=f"{settings.FRONTEND_URL}/account?canceled=true",
metadata={
"user_id": str(current_user.id),
"plan_id": request.plan_id,
"billing_cycle": request.billing_cycle
}
)
return CheckoutSessionResponse(
checkout_url=session.url,
session_id=session.id
)
except Exception as e:
logger.error(f"Checkout session creation failed: {e}")
raise HTTPException(
status_code=500,
detail=f"Failed to create checkout session: {str(e)}"
)
Customer Portal Access
@router.get("/customer-portal")
async def create_customer_portal_session(
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db)
):
"""Create customer portal session with automatic customer recovery."""
try:
# Get customer (creates if missing)
customer = await CustomerService.get_customer_object(current_user, db)
# Create portal session
portal_session = stripe.billing_portal.Session.create(
customer=customer.id,
return_url=f"{settings.FRONTEND_URL}/account"
)
return {"portal_url": portal_session.url}
except Exception as e:
logger.error(f"Portal session creation failed: {e}")
raise HTTPException(
status_code=500,
detail=f"Failed to create portal session: {str(e)}"
)
Part 5: Webhook Event Processing
Secure Webhook Handling
import stripe
from fastapi import Request, HTTPException
@router.post("/webhook")
async def handle_stripe_webhook(
request: Request,
db: AsyncSession = Depends(get_db)
):
"""Handle Stripe webhook events securely."""
payload = await request.body()
sig_header = request.headers.get('stripe-signature')
try:
# Verify webhook signature
event = stripe.Webhook.construct_event(
payload, sig_header, settings.STRIPE_WEBHOOK_SECRET
)
except ValueError:
raise HTTPException(status_code=400, detail="Invalid payload")
except stripe.error.SignatureVerificationError:
raise HTTPException(status_code=400, detail="Invalid signature")
# Route events to appropriate handlers
event_handlers = {
'checkout.session.completed': handle_checkout_completed,
'invoice.payment_succeeded': handle_payment_succeeded,
'invoice.payment_failed': handle_payment_failed,
'customer.subscription.updated': handle_subscription_updated,
'customer.subscription.deleted': handle_subscription_canceled,
}
handler = event_handlers.get(event['type'])
if handler:
await handler(event['data']['object'], db)
else:
logger.info(f"Unhandled event type: {event['type']}")
return {"status": "success"}
Subscription Lifecycle Handlers
async def handle_checkout_completed(session, db: AsyncSession):
"""Handle successful checkout completion."""
# Extract metadata
user_id = session['metadata'].get('user_id')
plan_id = session['metadata'].get('plan_id')
if not user_id or not plan_id:
logger.error("Missing metadata in checkout session")
return
# Get user and update subscription
user = await db.get(User, int(user_id))
if not user:
logger.error(f"User {user_id} not found")
return
# Get company (assuming user has current company)
company = user.current_company
if not company:
logger.error(f"No company found for user {user_id}")
return
# Update subscription details
company.subscription_plan = plan_id
company.subscription_status = "active"
company.stripe_subscription_id = session.get('subscription')
# Handle subscription period
if session.get('subscription'):
subscription = stripe.Subscription.retrieve(session['subscription'])
if hasattr(subscription, 'current_period_end'):
company.current_period_end = datetime.fromtimestamp(
subscription.current_period_end
)
await db.commit()
logger.info(f"Subscription activated for user {user_id}")
async def handle_payment_failed(invoice, db: AsyncSession):
"""Handle failed payment."""
subscription_id = invoice.get('subscription')
if not subscription_id:
return
# Find company by subscription ID
result = await db.execute(
select(Company).where(Company.stripe_subscription_id == subscription_id)
)
company = result.scalars().first()
if company:
company.subscription_status = "past_due"
await db.commit()
logger.warning(f"Payment failed for company {company.id}")
# Could trigger email notifications here
Part 6: Admin Notification System
Email Alerts for System Events
class EmailService:
@staticmethod
async def send_missing_customer_notification(
customer_id: str, user_email: str, user_name: str, user_id: int
) -> bool:
"""Alert admin when stored customer ID is invalid."""
html_content = f"""
<h2>⚠️ Missing Stripe Customer Alert!</h2>
<p>A stored customer ID was not found in Stripe:</p>
<div style="background-color: #fff3cd; padding: 15px; border-radius: 5px;">
<p><strong>Missing Customer ID:</strong> <code>{customer_id}</code></p>
<p><strong>User:</strong> {user_name} ({user_email})</p>
<p><strong>User ID:</strong> {user_id}</p>
<p><strong>Time:</strong> {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}</p>
</div>
<p><strong>Automatic Recovery Actions:</strong></p>
<ul>
<li>✅ Cleared invalid customer ID from database</li>
<li>🔍 System will search for existing customers</li>
<li>🆕 Will create new customer if needed</li>
</ul>
<p>No action required - system will recover automatically.</p>
"""
return await EmailService.send_custom_email(
to="admin@yourcompany.com",
subject="⚠️ Missing Stripe Customer - Auto Recovery",
html_content=html_content
)
@staticmethod
async def send_customer_recovery_success_notification(
customer_id: str, user_email: str, user_name: str, user_id: int
) -> bool:
"""Confirm successful customer recovery."""
html_content = f"""
<h2>✅ Customer Recovery Successful!</h2>
<p>New customer created during automatic recovery:</p>
<div style="background-color: #d4edda; padding: 15px; border-radius: 5px;">
<p><strong>New Customer ID:</strong> <code>{customer_id}</code></p>
<p><strong>User:</strong> {user_name} ({user_email})</p>
<p><strong>Recovery completed:</strong> {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}</p>
</div>
<p>User can now proceed with billing operations normally.</p>
"""
return await EmailService.send_custom_email(
to="admin@yourcompany.com",
subject="✅ Customer Recovery Success",
html_content=html_content
)
Part 7: Testing Workflows
Local Development Testing
# Terminal 1: Start your development server
npm run dev # or python -m uvicorn main:app --reload
# Terminal 2: Start Stripe CLI webhook forwarding
stripe listen --forward-to localhost:5010/api/billing/webhook
# Terminal 3: Trigger test events
stripe trigger checkout.session.completed
stripe trigger invoice.payment_succeeded
stripe trigger customer.subscription.updated
Integration Testing
# test_subscriptions.py
import pytest
from services.customer_service import CustomerService
@pytest.mark.asyncio
async def test_customer_recovery(db_session, test_user):
"""Test automatic customer recovery when stored ID is invalid."""
# Set invalid customer ID
test_user.stripe_customer_id_test = "cus_invalid123"
await db_session.commit()
# Should recover automatically
customer = await CustomerService.get_customer_object(test_user, db_session)
# Verify recovery
assert customer is not None
assert customer.id != "cus_invalid123"
assert customer.email == test_user.email
# Verify database updated
await db_session.refresh(test_user)
assert test_user.stripe_customer_id_test == customer.id
@pytest.mark.asyncio
async def test_webhook_processing(client, db_session):
"""Test webhook event processing."""
webhook_payload = {
"type": "checkout.session.completed",
"data": {
"object": {
"id": "cs_test_123",
"customer": "cus_test_123",
"subscription": "sub_test_123",
"metadata": {
"user_id": "1",
"plan_id": "startup"
}
}
}
}
response = await client.post("/api/billing/webhook", json=webhook_payload)
assert response.status_code == 200
Part 8: Production Deployment Considerations
Environment Variables
# Production .env
STRIPE_SECRET_KEY=sk_live_... # Live secret key
STRIPE_WEBHOOK_SECRET=whsec_... # Production webhook secret
STRIPE_PUBLISHABLE_KEY=pk_live_...
# Database
DATABASE_URL=postgresql://...
# Email notifications
SMTP_HOST=smtp.gmail.com
SMTP_USERNAME=noreply@yourcompany.com
SMTP_PASSWORD=...
Webhook Endpoint Security
# Webhook endpoint configuration in Stripe Dashboard:
# URL: https://yourapp.com/api/billing/webhook
# Events to send:
# - checkout.session.completed
# - invoice.payment_succeeded
# - invoice.payment_failed
# - customer.subscription.updated
# - customer.subscription.deleted
# Security headers
@router.post("/webhook")
async def handle_stripe_webhook(request: Request):
# Always verify signature first
signature = request.headers.get('stripe-signature')
if not signature:
raise HTTPException(status_code=400, detail="Missing signature")
# ... rest of webhook handling
Database Migrations
-- Migration: Add customer ID fields to users
ALTER TABLE users ADD COLUMN stripe_customer_id_test VARCHAR NULL;
ALTER TABLE users ADD COLUMN stripe_customer_id_live VARCHAR NULL;
-- Add indexes for performance
CREATE INDEX idx_users_customer_test ON users(stripe_customer_id_test);
CREATE INDEX idx_users_customer_live ON users(stripe_customer_id_live);
-- Migrate existing data if needed
UPDATE users
SET stripe_customer_id_test = stripe_customer_id_legacy
WHERE stripe_customer_id_legacy IS NOT NULL;
Part 9: Monitoring and Observability
Key Metrics to Track
# Custom metrics for monitoring
import logging
from datetime import datetime
class SubscriptionMetrics:
@staticmethod
def log_customer_recovery(user_id: int, old_customer_id: str, new_customer_id: str):
logger.info(
"customer_recovery",
extra={
"user_id": user_id,
"old_customer_id": old_customer_id,
"new_customer_id": new_customer_id,
"timestamp": datetime.now().isoformat()
}
)
@staticmethod
def log_subscription_event(event_type: str, user_id: int, subscription_id: str):
logger.info(
"subscription_event",
extra={
"event_type": event_type,
"user_id": user_id,
"subscription_id": subscription_id,
"timestamp": datetime.now().isoformat()
}
)
Health Checks
@router.get("/health/billing")
async def billing_health_check():
"""Health check for billing system."""
checks = {
"stripe_api": False,
"webhook_secret": bool(settings.STRIPE_WEBHOOK_SECRET),
"database": False
}
# Test Stripe API
try:
stripe.Account.retrieve()
checks["stripe_api"] = True
except:
pass
# Test database
try:
# Simple query to test DB connection
await db.execute("SELECT 1")
checks["database"] = True
except:
pass
healthy = all(checks.values())
status_code = 200 if healthy else 503
return JSONResponse(
status_code=status_code,
content={
"status": "healthy" if healthy else "unhealthy",
"checks": checks,
"timestamp": datetime.now().isoformat()
}
)
Part 10: Advanced Features
Subscription Plan Changes
@router.post("/change-plan")
async def change_subscription_plan(
request: ChangePlanRequest,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db)
):
"""Handle subscription plan changes with proration."""
company = current_user.current_company
if not company or not company.stripe_subscription_id:
raise HTTPException(status_code=400, detail="No active subscription")
try:
# Get current subscription
subscription = stripe.Subscription.retrieve(company.stripe_subscription_id)
# Update subscription with new price
updated_subscription = stripe.Subscription.modify(
company.stripe_subscription_id,
items=[{
'id': subscription['items']['data'][0]['id'],
'price': get_price_id(request.new_plan_id, request.billing_cycle),
}],
proration_behavior='create_prorations' # Handle prorations
)
# Update local database
company.subscription_plan = request.new_plan_id
await db.commit()
return {
"success": True,
"message": f"Plan changed to {request.new_plan_id}",
"proration_amount": calculate_proration(updated_subscription)
}
except stripe.error.StripeError as e:
raise HTTPException(status_code=400, detail=f"Stripe error: {str(e)}")
Usage-Based Billing
@router.post("/report-usage")
async def report_usage_to_stripe(
usage_data: UsageReportRequest,
current_user: User = Depends(get_current_user)
):
"""Report usage for metered billing."""
company = current_user.current_company
if not company.stripe_subscription_id:
raise HTTPException(status_code=400, detail="No active subscription")
try:
# Report usage to Stripe
stripe.UsageRecord.create(
subscription_item=usage_data.subscription_item_id,
quantity=usage_data.quantity,
timestamp=int(usage_data.timestamp.timestamp()),
action='increment' # or 'set' for absolute values
)
return {"success": True, "usage_reported": usage_data.quantity}
except stripe.error.StripeError as e:
raise HTTPException(status_code=400, detail=f"Usage reporting failed: {str(e)}")
Best Practices and Lessons Learned
1. Always Handle Customer Recovery
Never assume stored customer IDs are valid. Customers can be deleted manually from Stripe dashboard, during testing, or through bulk operations.
2. Environment Separation is Critical
Use separate customer IDs for test and live environments. This prevents accidental charges and data corruption.
3. Webhook Idempotency
Always handle duplicate webhook events gracefully:
@router.post("/webhook")
async def handle_webhook(request: Request, db: AsyncSession = Depends(get_db)):
event_id = event['id']
# Check if we've already processed this event
existing = await db.execute(
select(ProcessedWebhook).where(ProcessedWebhook.stripe_event_id == event_id)
)
if existing.scalar():
return {"status": "already_processed"}
# Process event...
# Mark as processed
processed = ProcessedWebhook(stripe_event_id=event_id, processed_at=datetime.now())
db.add(processed)
await db.commit()
4. Comprehensive Logging
Log everything with context. Future debugging will thank you:
logger.info(
"subscription_created",
extra={
"user_id": user.id,
"customer_id": customer.id,
"subscription_id": subscription.id,
"plan_id": plan_id,
"environment": "live" if is_live_mode else "test"
}
)
5. Admin Notifications
Set up alerts for critical events. Silent failures in billing can be expensive:
- Customer recovery events
- Failed payments
- Webhook processing errors
- Subscription cancellations
6. Test Everything
Use Stripe’s test mode extensively:
# Test different scenarios
stripe trigger checkout.session.completed
stripe trigger invoice.payment_failed
stripe trigger customer.subscription.deleted
# Test with different amounts
stripe trigger checkout.session.completed --add checkout_session:amount_total=2000
Conclusion
Building a robust subscription system requires thinking beyond the happy path. By implementing automatic customer recovery, comprehensive error handling, and thorough testing workflows, you can build a system that handles real-world complexity gracefully.
Key takeaways:
- Customer Recovery: Always handle invalid customer IDs automatically
- Environment Separation: Use separate customer IDs for test/live
- Webhook Reliability: Handle failures, duplicates, and edge cases
- Admin Visibility: Alert on critical events and recovery actions
- Testing Workflows: Use Stripe CLI for efficient development
- Comprehensive Logging: Log everything with context
- Graceful Degradation: Never fail user operations due to billing issues
The investment in building these robust systems pays dividends in reduced support tickets, improved user experience, and developer confidence when making changes to critical billing code.
This implementation powers WorkSite Diary’s subscription system, handling thousands of billing operations with zero manual intervention required for customer recovery scenarios.
Code Repository
The complete implementation is available with:
- Full CustomerService with recovery logic
- Comprehensive webhook handlers
- Email notification system
- Test suites and integration tests
- Production deployment configurations
Ready to implement bulletproof subscriptions in your SaaS? Start with the CustomerService pattern and build from there. Your future self (and your users) will thank you.
by
Tags:
Leave a Reply