TekOnline

Building Bulletproof Stripe Subscriptions: From CLI Testing to Production-Ready Code

A comprehensive guide to implementing robust Stripe subscription handling with automatic customer recovery, webhook processing, and comprehensive error handling.

This implementation is in production; check it out at jobsight.tekonline.com.au, and let me know if we can assist with your implementation

Introduction

Building a subscription-based SaaS application requires more than just integrating Stripe’s API. You need bulletproof error handling, seamless customer management, and robust testing workflows. In this deep-dive, we’ll explore how to build a production-ready Stripe subscription system that handles edge cases, recovers from failures, and provides excellent developer experience.

The Challenge: Real-World Subscription Complexity

When building WorkSite Diary, a construction site management SaaS, we encountered several real-world challenges:

  • Customer ID Management: Users expect seamless billing even when backend data gets corrupted
  • Environment Separation: Test vs production customer isolation
  • Webhook Reliability: Handling webhook failures and retries
  • Customer Recovery: What happens when stored customer IDs become invalid?
  • Developer Testing: Efficient workflows for testing subscription flows

Architecture Overview

Our solution uses a layered architecture:

/markd

┌─────────────────┐    ┌──────────────────┐    ┌─────────────────┐
│   Frontend      │    │   Backend API    │    │   Stripe API    │
│   (Angular)     │◄──►│   (FastAPI)      │◄──►│   (Webhooks)    │
└─────────────────┘    └──────────────────┘    └─────────────────┘
                              │
                              ▼
                    ┌──────────────────┐
                    │   Database       │
                    │   (SQLite/PG)    │
                    └──────────────────┘

Key Components:

  1. CustomerService: Handles all customer lifecycle management
  2. Billing Routes: API endpoints for subscription operations
  3. Webhook Handlers: Process Stripe events reliably
  4. Email Notifications: Admin alerts for system events
  5. CLI Testing: Stripe CLI integration for development

Part 1: Setting Up Stripe CLI for Development

Installation and Authentication

# Install Stripe CLI (Linux/Mac)
curl -s https://packages.stripe.com/api/security/keypair/stripe-cli-gpg/public | gpg --dearmor | sudo tee /usr/share/keyrings/stripe.gpg
echo "deb [signed-by=/usr/share/keyrings/stripe.gpg] https://packages.stripe.com/stripe-cli-debian-local stable main" | sudo tee -a /etc/apt/sources.list.d/stripe.list
sudo apt update && sudo apt install stripe

# Authenticate with your Stripe account
stripe login

Local Webhook Testing

The game-changer for subscription development is real-time webhook testing:

# Forward webhooks to your local development server
stripe listen --forward-to localhost:5010/api/billing/webhook

# Output shows your webhook signing secret
# > Your webhook signing secret is whsec_1234567890abcdef...

Pro Tip: The webhook signing secret changes each time you restart the CLI. Always update your environment variables!

Environment Configuration

# .env.dev
STRIPE_SECRET_KEY=sk_test_...
STRIPE_WEBHOOK_SECRET=whsec_... # From CLI output
STRIPE_PUBLISHABLE_KEY=pk_test_...

Part 2: Database Schema Design

User-Centric Customer Management

We store customer IDs at the user level, not company level, for better flexibility:

-- User table with environment-specific customer IDs
CREATE TABLE users (
    id INTEGER PRIMARY KEY,
    email TEXT UNIQUE NOT NULL,
    name TEXT,
    password_hash TEXT,
    is_verified BOOLEAN DEFAULT FALSE,
    
    -- Environment-specific Stripe customer IDs
    stripe_customer_id_test TEXT NULL,
    stripe_customer_id_live TEXT NULL,
    
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- Company table for subscription tracking
CREATE TABLE companies (
    id INTEGER PRIMARY KEY,
    name TEXT NOT NULL,
    subscription_status TEXT DEFAULT 'free',
    subscription_plan TEXT DEFAULT 'free',
    stripe_subscription_id TEXT,
    current_period_end TIMESTAMP,
    
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

Why User-Level Customer IDs?

  1. Portability: Users can switch companies without losing billing history
  2. Simplicity: One customer per user, regardless of company affiliations
  3. Recovery: Easier to track and recover lost customer relationships

Part 3: The CustomerService – Heart of the System

Core Service Implementation

import stripe
import logging
from sqlalchemy.ext.asyncio import AsyncSession
from models.user import User
from config import get_settings

logger = logging.getLogger(__name__)
settings = get_settings()
stripe.api_key = settings.STRIPE_SECRET_KEY

class CustomerService:
    @staticmethod
    async def get_customer_object(user: User, db: AsyncSession) -> stripe.Customer:
        """
        Get Stripe customer with automatic recovery.
        This is the bulletproof method that handles all edge cases.
        """
        is_live_mode = not settings.STRIPE_SECRET_KEY.startswith('sk_test_')
        customer_id = user.get_stripe_customer_id(is_live_mode)
        
        if customer_id:
            try:
                # Try to retrieve stored customer
                customer = stripe.Customer.retrieve(customer_id)
                logger.info(f"Found existing customer: {customer.id}")
                return customer
            except stripe.error.InvalidRequestError as e:
                if "No such customer" in str(e):
                    # Customer was deleted from Stripe - recover automatically
                    logger.warning(f"Customer {customer_id} not found, recovering...")
                    
                    # Notify admin about the issue
                    await CustomerService._notify_missing_customer(
                        customer_id, user, db
                    )
                    
                    # Clear invalid ID and create new customer
                    user.set_stripe_customer_id(None, is_live_mode)
                    await db.commit()
                else:
                    raise
        
        # Create new customer (first time or recovery)
        return await CustomerService._create_new_customer(user, db)
    
    @staticmethod
    async def _create_new_customer(user: User, db: AsyncSession) -> stripe.Customer:
        """Create new customer with deduplication."""
        is_live_mode = not settings.STRIPE_SECRET_KEY.startswith('sk_test_')
        
        # First, search for existing customers by email
        existing_customers = stripe.Customer.list(email=user.email, limit=10)
        
        # Look for customer with matching user metadata
        for customer in existing_customers.data:
            metadata = customer.get('metadata', {})
            if metadata.get('user_id') == str(user.id):
                # Found existing customer, relink it
                user.set_stripe_customer_id(customer.id, is_live_mode)
                await db.commit()
                return customer
        
        # Create brand new customer
        customer = stripe.Customer.create(
            email=user.email,
            name=user.name,
            metadata={
                "user_id": str(user.id),
                "environment": "live" if is_live_mode else "test",
                "created_via": "auto_recovery"
            }
        )
        
        # Store new customer ID
        user.set_stripe_customer_id(customer.id, is_live_mode)
        await db.commit()
        
        # Notify admin of successful recovery
        await CustomerService._notify_recovery_success(customer.id, user)
        
        return customer

Key Features:

  1. Automatic Recovery: Handles deleted customers transparently
  2. Environment Awareness: Separate customers for test/live
  3. Deduplication: Prevents duplicate customers
  4. Admin Notifications: Alerts for manual intervention needs
  5. Comprehensive Logging: Full audit trail

Part 4: Robust Billing API Endpoints

Checkout Session Creation

from fastapi import APIRouter, Depends, HTTPException
from services.customer_service import CustomerService

@router.post("/create-checkout-session")
async def create_checkout_session(
    request: CheckoutSessionRequest,
    current_user: User = Depends(get_current_user),
    db: AsyncSession = Depends(get_db)
) -> CheckoutSessionResponse:
    """Create checkout session with bulletproof customer handling."""
    
    try:
        # Get or create customer (handles all edge cases)
        customer = await CustomerService.get_customer_object(current_user, db)
        
        # Create checkout session
        session = stripe.checkout.Session.create(
            customer=customer.id,  # Always valid customer ID
            payment_method_types=['card'],
            line_items=[{
                'price': get_price_id(request.plan_id, request.billing_cycle),
                'quantity': 1,
            }],
            mode='subscription',
            success_url=f"{settings.FRONTEND_URL}/account?success=true&session_id={{CHECKOUT_SESSION_ID}}",
            cancel_url=f"{settings.FRONTEND_URL}/account?canceled=true",
            metadata={
                "user_id": str(current_user.id),
                "plan_id": request.plan_id,
                "billing_cycle": request.billing_cycle
            }
        )
        
        return CheckoutSessionResponse(
            checkout_url=session.url,
            session_id=session.id
        )
        
    except Exception as e:
        logger.error(f"Checkout session creation failed: {e}")
        raise HTTPException(
            status_code=500, 
            detail=f"Failed to create checkout session: {str(e)}"
        )

Customer Portal Access

@router.get("/customer-portal")
async def create_customer_portal_session(
    current_user: User = Depends(get_current_user),
    db: AsyncSession = Depends(get_db)
):
    """Create customer portal session with automatic customer recovery."""
    
    try:
        # Get customer (creates if missing)
        customer = await CustomerService.get_customer_object(current_user, db)
        
        # Create portal session
        portal_session = stripe.billing_portal.Session.create(
            customer=customer.id,
            return_url=f"{settings.FRONTEND_URL}/account"
        )
        
        return {"portal_url": portal_session.url}
        
    except Exception as e:
        logger.error(f"Portal session creation failed: {e}")
        raise HTTPException(
            status_code=500,
            detail=f"Failed to create portal session: {str(e)}"
        )

Part 5: Webhook Event Processing

Secure Webhook Handling

import stripe
from fastapi import Request, HTTPException

@router.post("/webhook")
async def handle_stripe_webhook(
    request: Request, 
    db: AsyncSession = Depends(get_db)
):
    """Handle Stripe webhook events securely."""
    
    payload = await request.body()
    sig_header = request.headers.get('stripe-signature')
    
    try:
        # Verify webhook signature
        event = stripe.Webhook.construct_event(
            payload, sig_header, settings.STRIPE_WEBHOOK_SECRET
        )
    except ValueError:
        raise HTTPException(status_code=400, detail="Invalid payload")
    except stripe.error.SignatureVerificationError:
        raise HTTPException(status_code=400, detail="Invalid signature")
    
    # Route events to appropriate handlers
    event_handlers = {
        'checkout.session.completed': handle_checkout_completed,
        'invoice.payment_succeeded': handle_payment_succeeded,
        'invoice.payment_failed': handle_payment_failed,
        'customer.subscription.updated': handle_subscription_updated,
        'customer.subscription.deleted': handle_subscription_canceled,
    }
    
    handler = event_handlers.get(event['type'])
    if handler:
        await handler(event['data']['object'], db)
    else:
        logger.info(f"Unhandled event type: {event['type']}")
    
    return {"status": "success"}

Subscription Lifecycle Handlers

async def handle_checkout_completed(session, db: AsyncSession):
    """Handle successful checkout completion."""
    
    # Extract metadata
    user_id = session['metadata'].get('user_id')
    plan_id = session['metadata'].get('plan_id')
    
    if not user_id or not plan_id:
        logger.error("Missing metadata in checkout session")
        return
    
    # Get user and update subscription
    user = await db.get(User, int(user_id))
    if not user:
        logger.error(f"User {user_id} not found")
        return
    
    # Get company (assuming user has current company)
    company = user.current_company
    if not company:
        logger.error(f"No company found for user {user_id}")
        return
    
    # Update subscription details
    company.subscription_plan = plan_id
    company.subscription_status = "active"
    company.stripe_subscription_id = session.get('subscription')
    
    # Handle subscription period
    if session.get('subscription'):
        subscription = stripe.Subscription.retrieve(session['subscription'])
        if hasattr(subscription, 'current_period_end'):
            company.current_period_end = datetime.fromtimestamp(
                subscription.current_period_end
            )
    
    await db.commit()
    logger.info(f"Subscription activated for user {user_id}")

async def handle_payment_failed(invoice, db: AsyncSession):
    """Handle failed payment."""
    
    subscription_id = invoice.get('subscription')
    if not subscription_id:
        return
    
    # Find company by subscription ID
    result = await db.execute(
        select(Company).where(Company.stripe_subscription_id == subscription_id)
    )
    company = result.scalars().first()
    
    if company:
        company.subscription_status = "past_due"
        await db.commit()
        logger.warning(f"Payment failed for company {company.id}")
        
        # Could trigger email notifications here

Part 6: Admin Notification System

Email Alerts for System Events

class EmailService:
    @staticmethod
    async def send_missing_customer_notification(
        customer_id: str, user_email: str, user_name: str, user_id: int
    ) -> bool:
        """Alert admin when stored customer ID is invalid."""
        
        html_content = f"""
        <h2>⚠️ Missing Stripe Customer Alert!</h2>
        <p>A stored customer ID was not found in Stripe:</p>
        <div style="background-color: #fff3cd; padding: 15px; border-radius: 5px;">
            <p><strong>Missing Customer ID:</strong> <code>{customer_id}</code></p>
            <p><strong>User:</strong> {user_name} ({user_email})</p>
            <p><strong>User ID:</strong> {user_id}</p>
            <p><strong>Time:</strong> {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}</p>
        </div>
        <p><strong>Automatic Recovery Actions:</strong></p>
        <ul>
            <li>✅ Cleared invalid customer ID from database</li>
            <li>🔍 System will search for existing customers</li>
            <li>🆕 Will create new customer if needed</li>
        </ul>
        <p>No action required - system will recover automatically.</p>
        """
        
        return await EmailService.send_custom_email(
            to="admin@yourcompany.com",
            subject="⚠️ Missing Stripe Customer - Auto Recovery",
            html_content=html_content
        )
    
    @staticmethod
    async def send_customer_recovery_success_notification(
        customer_id: str, user_email: str, user_name: str, user_id: int
    ) -> bool:
        """Confirm successful customer recovery."""
        
        html_content = f"""
        <h2>✅ Customer Recovery Successful!</h2>
        <p>New customer created during automatic recovery:</p>
        <div style="background-color: #d4edda; padding: 15px; border-radius: 5px;">
            <p><strong>New Customer ID:</strong> <code>{customer_id}</code></p>
            <p><strong>User:</strong> {user_name} ({user_email})</p>
            <p><strong>Recovery completed:</strong> {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}</p>
        </div>
        <p>User can now proceed with billing operations normally.</p>
        """
        
        return await EmailService.send_custom_email(
            to="admin@yourcompany.com",
            subject="✅ Customer Recovery Success",
            html_content=html_content
        )

Part 7: Testing Workflows

Local Development Testing

# Terminal 1: Start your development server
npm run dev  # or python -m uvicorn main:app --reload

# Terminal 2: Start Stripe CLI webhook forwarding
stripe listen --forward-to localhost:5010/api/billing/webhook

# Terminal 3: Trigger test events
stripe trigger checkout.session.completed
stripe trigger invoice.payment_succeeded
stripe trigger customer.subscription.updated

Integration Testing

# test_subscriptions.py
import pytest
from services.customer_service import CustomerService

@pytest.mark.asyncio
async def test_customer_recovery(db_session, test_user):
    """Test automatic customer recovery when stored ID is invalid."""
    
    # Set invalid customer ID
    test_user.stripe_customer_id_test = "cus_invalid123"
    await db_session.commit()
    
    # Should recover automatically
    customer = await CustomerService.get_customer_object(test_user, db_session)
    
    # Verify recovery
    assert customer is not None
    assert customer.id != "cus_invalid123"
    assert customer.email == test_user.email
    
    # Verify database updated
    await db_session.refresh(test_user)
    assert test_user.stripe_customer_id_test == customer.id

@pytest.mark.asyncio
async def test_webhook_processing(client, db_session):
    """Test webhook event processing."""
    
    webhook_payload = {
        "type": "checkout.session.completed",
        "data": {
            "object": {
                "id": "cs_test_123",
                "customer": "cus_test_123",
                "subscription": "sub_test_123",
                "metadata": {
                    "user_id": "1",
                    "plan_id": "startup"
                }
            }
        }
    }
    
    response = await client.post("/api/billing/webhook", json=webhook_payload)
    assert response.status_code == 200

Part 8: Production Deployment Considerations

Environment Variables

# Production .env
STRIPE_SECRET_KEY=sk_live_...  # Live secret key
STRIPE_WEBHOOK_SECRET=whsec_... # Production webhook secret
STRIPE_PUBLISHABLE_KEY=pk_live_...

# Database
DATABASE_URL=postgresql://...

# Email notifications
SMTP_HOST=smtp.gmail.com
SMTP_USERNAME=noreply@yourcompany.com
SMTP_PASSWORD=...

Webhook Endpoint Security

# Webhook endpoint configuration in Stripe Dashboard:
# URL: https://yourapp.com/api/billing/webhook
# Events to send:
# - checkout.session.completed
# - invoice.payment_succeeded
# - invoice.payment_failed
# - customer.subscription.updated
# - customer.subscription.deleted

# Security headers
@router.post("/webhook")
async def handle_stripe_webhook(request: Request):
    # Always verify signature first
    signature = request.headers.get('stripe-signature')
    if not signature:
        raise HTTPException(status_code=400, detail="Missing signature")
    
    # ... rest of webhook handling

Database Migrations

-- Migration: Add customer ID fields to users
ALTER TABLE users ADD COLUMN stripe_customer_id_test VARCHAR NULL;
ALTER TABLE users ADD COLUMN stripe_customer_id_live VARCHAR NULL;

-- Add indexes for performance
CREATE INDEX idx_users_customer_test ON users(stripe_customer_id_test);
CREATE INDEX idx_users_customer_live ON users(stripe_customer_id_live);

-- Migrate existing data if needed
UPDATE users 
SET stripe_customer_id_test = stripe_customer_id_legacy 
WHERE stripe_customer_id_legacy IS NOT NULL;

Part 9: Monitoring and Observability

Key Metrics to Track

# Custom metrics for monitoring
import logging
from datetime import datetime

class SubscriptionMetrics:
    @staticmethod
    def log_customer_recovery(user_id: int, old_customer_id: str, new_customer_id: str):
        logger.info(
            "customer_recovery",
            extra={
                "user_id": user_id,
                "old_customer_id": old_customer_id,
                "new_customer_id": new_customer_id,
                "timestamp": datetime.now().isoformat()
            }
        )
    
    @staticmethod
    def log_subscription_event(event_type: str, user_id: int, subscription_id: str):
        logger.info(
            "subscription_event",
            extra={
                "event_type": event_type,
                "user_id": user_id,
                "subscription_id": subscription_id,
                "timestamp": datetime.now().isoformat()
            }
        )

Health Checks

@router.get("/health/billing")
async def billing_health_check():
    """Health check for billing system."""
    
    checks = {
        "stripe_api": False,
        "webhook_secret": bool(settings.STRIPE_WEBHOOK_SECRET),
        "database": False
    }
    
    # Test Stripe API
    try:
        stripe.Account.retrieve()
        checks["stripe_api"] = True
    except:
        pass
    
    # Test database
    try:
        # Simple query to test DB connection
        await db.execute("SELECT 1")
        checks["database"] = True
    except:
        pass
    
    healthy = all(checks.values())
    status_code = 200 if healthy else 503
    
    return JSONResponse(
        status_code=status_code,
        content={
            "status": "healthy" if healthy else "unhealthy",
            "checks": checks,
            "timestamp": datetime.now().isoformat()
        }
    )

Part 10: Advanced Features

Subscription Plan Changes

@router.post("/change-plan")
async def change_subscription_plan(
    request: ChangePlanRequest,
    current_user: User = Depends(get_current_user),
    db: AsyncSession = Depends(get_db)
):
    """Handle subscription plan changes with proration."""
    
    company = current_user.current_company
    if not company or not company.stripe_subscription_id:
        raise HTTPException(status_code=400, detail="No active subscription")
    
    try:
        # Get current subscription
        subscription = stripe.Subscription.retrieve(company.stripe_subscription_id)
        
        # Update subscription with new price
        updated_subscription = stripe.Subscription.modify(
            company.stripe_subscription_id,
            items=[{
                'id': subscription['items']['data'][0]['id'],
                'price': get_price_id(request.new_plan_id, request.billing_cycle),
            }],
            proration_behavior='create_prorations'  # Handle prorations
        )
        
        # Update local database
        company.subscription_plan = request.new_plan_id
        await db.commit()
        
        return {
            "success": True,
            "message": f"Plan changed to {request.new_plan_id}",
            "proration_amount": calculate_proration(updated_subscription)
        }
        
    except stripe.error.StripeError as e:
        raise HTTPException(status_code=400, detail=f"Stripe error: {str(e)}")

Usage-Based Billing

@router.post("/report-usage")
async def report_usage_to_stripe(
    usage_data: UsageReportRequest,
    current_user: User = Depends(get_current_user)
):
    """Report usage for metered billing."""
    
    company = current_user.current_company
    if not company.stripe_subscription_id:
        raise HTTPException(status_code=400, detail="No active subscription")
    
    try:
        # Report usage to Stripe
        stripe.UsageRecord.create(
            subscription_item=usage_data.subscription_item_id,
            quantity=usage_data.quantity,
            timestamp=int(usage_data.timestamp.timestamp()),
            action='increment'  # or 'set' for absolute values
        )
        
        return {"success": True, "usage_reported": usage_data.quantity}
        
    except stripe.error.StripeError as e:
        raise HTTPException(status_code=400, detail=f"Usage reporting failed: {str(e)}")

Best Practices and Lessons Learned

1. Always Handle Customer Recovery

Never assume stored customer IDs are valid. Customers can be deleted manually from Stripe dashboard, during testing, or through bulk operations.

2. Environment Separation is Critical

Use separate customer IDs for test and live environments. This prevents accidental charges and data corruption.

3. Webhook Idempotency

Always handle duplicate webhook events gracefully:

@router.post("/webhook")
async def handle_webhook(request: Request, db: AsyncSession = Depends(get_db)):
    event_id = event['id']
    
    # Check if we've already processed this event
    existing = await db.execute(
        select(ProcessedWebhook).where(ProcessedWebhook.stripe_event_id == event_id)
    )
    if existing.scalar():
        return {"status": "already_processed"}
    
    # Process event...
    
    # Mark as processed
    processed = ProcessedWebhook(stripe_event_id=event_id, processed_at=datetime.now())
    db.add(processed)
    await db.commit()

4. Comprehensive Logging

Log everything with context. Future debugging will thank you:

logger.info(
    "subscription_created",
    extra={
        "user_id": user.id,
        "customer_id": customer.id,
        "subscription_id": subscription.id,
        "plan_id": plan_id,
        "environment": "live" if is_live_mode else "test"
    }
)

5. Admin Notifications

Set up alerts for critical events. Silent failures in billing can be expensive:

  • Customer recovery events
  • Failed payments
  • Webhook processing errors
  • Subscription cancellations

6. Test Everything

Use Stripe’s test mode extensively:

# Test different scenarios
stripe trigger checkout.session.completed
stripe trigger invoice.payment_failed
stripe trigger customer.subscription.deleted

# Test with different amounts
stripe trigger checkout.session.completed --add checkout_session:amount_total=2000

Conclusion

Building a robust subscription system requires thinking beyond the happy path. By implementing automatic customer recovery, comprehensive error handling, and thorough testing workflows, you can build a system that handles real-world complexity gracefully.

Key takeaways:

  1. Customer Recovery: Always handle invalid customer IDs automatically
  2. Environment Separation: Use separate customer IDs for test/live
  3. Webhook Reliability: Handle failures, duplicates, and edge cases
  4. Admin Visibility: Alert on critical events and recovery actions
  5. Testing Workflows: Use Stripe CLI for efficient development
  6. Comprehensive Logging: Log everything with context
  7. Graceful Degradation: Never fail user operations due to billing issues

The investment in building these robust systems pays dividends in reduced support tickets, improved user experience, and developer confidence when making changes to critical billing code.


This implementation powers WorkSite Diary’s subscription system, handling thousands of billing operations with zero manual intervention required for customer recovery scenarios.

Code Repository

The complete implementation is available with:

  • Full CustomerService with recovery logic
  • Comprehensive webhook handlers
  • Email notification system
  • Test suites and integration tests
  • Production deployment configurations

Ready to implement bulletproof subscriptions in your SaaS? Start with the CustomerService pattern and build from there. Your future self (and your users) will thank you.


Posted

in

by

Tags:

Comments

Leave a Reply

Your email address will not be published. Required fields are marked *