Building a Model Gateway - Routing, Rate Limiting, and Observability
You're managing multiple LLM providers. OpenAI for general tasks, Anthropic for long contexts, vLLM-production-deployment-guide) for self-hosted inference. Each has different APIs, rate limits, pricing models, and failure patterns. Your team's burning money on redundant calls. Users are hitting provider limits with no fallback. You're flying blind on which models actually cost what, and why latency spikes at 3 AM.
This is the model gateway problem - and it's solvable with the right architecture.
A model gateway sits between your applications and your inference providers, abstracting away provider differences while adding intelligent routing, cost tracking, and observability. In this article, we'll build a production-ready gateway in Python using FastAPI, showing you exactly how to route requests intelligently, throttle at the token level, track costs per user, and instrument everything so you know what's happening.
Table of Contents
- The Gateway Pattern: Why You Need It
- Why Naive Approaches Fail at Scale
- Architecture Overview
- Building the Gateway: Core Structure
- Intelligent Routing: Cost and Capability Matching
- Rate Limiting at the Token Level
- Cost Tracking: Attribution and Budget Enforcement
- Bringing It Together: The Request Lifecycle
- Observability: From Metrics to Dashboards
- Streaming Responses: Handling Real-Time Output
- Fallback and Failover: Resilience Through Redundancy
- Production Considerations
- Operational Excellence Through Gateway Design
- Summary
The Gateway Pattern: Why You Need It
Before diving into code, let's ground this in the problem it solves.
You have three providers:
- OpenAI: Fast, expensive, rate-limited per API key
- Anthropic: Better for long contexts, moderate cost
- vLLM cluster: Your own hardware, cheap but latency-variable
Without a gateway, your app code has to know about all three. You're mixing business logic (which provider to use?) with application logic (what to ask?). When vLLM goes down, you need to rewrite your app. When you hit OpenAI rate limits, you're stuck. Your data science team can't iterate on routing strategies without touching production code. And every new provider integration requires touching multiple systems. This is fragile and doesn't scale.
A gateway inverts this. Your app makes a single, simple request to a unified API, while the gateway handles:
- Provider routing: Which backend serves this request?
- Authentication: Is this user authorized? Do they have budget?
- Rate limiting: Are they within their token quota?
- Failover: If provider A fails, try provider B
- Cost tracking: Who paid for this?
- Observability: What happened and when?
This is architecture-production-inference-deployment)-guide) as the solution to complexity. By centralizing these concerns, you enable your teams to operate independently while maintaining safety guardrails. Your infrastructure becomes intelligently aware of costs and constraints, enforcing them automatically rather than discovering problems in production.
Why Naive Approaches Fail at Scale
We've seen many teams try to manage multiple inference providers without a gateway, and it always ends the same way. Initially, they hardcode provider selection logic in their application layer. This works for a while, but as the system grows, problems cascade. One team we worked with had provider selection logic spread across five different services. When they discovered a new use case where one provider performed better, they had to update code in five places. When they wanted to do canary testing on a new provider, they couldn't do it - the logic was baked into their services.
The cost tracking problem is even worse. Without a centralized gateway, each service logs its own API calls in its own format. When someone asks "how much did we spend on inference last month," the answer requires querying five different log streams and reconciling different formats. Or worse, you don't know at all because the data was never tracked. Then your CEO looks at the cloud bill and asks why it's suddenly doubled, and you have no way to trace the expense back to the features that caused it.
Rate limiting without a gateway becomes a nightmare. Different services might hit the same provider's rate limits independently. One service exhausts the rate limit, impacting other services. Without visibility into total consumption, you can't anticipate rate limit hits. You're constantly surprising yourself with "oh, we hit that limit again" incidents.
The gateway pattern solves all of this by creating a single point of control. All inference requests flow through one place, so you have one source of truth for routing, rate limiting, cost tracking, and authentication. This is what allows you to manage the inherent complexity of multi-provider inference at scale.
The gateway pattern solves all of this by creating a single point of control. All inference requests flow through one place, so you have one source of truth for routing, rate limiting, cost tracking, and authentication. This is what allows you to manage the inherent complexity of multi-provider inference at scale.
The practical benefit is that your gateway becomes a policy engine. When business requirements change, you update the routing rules in one place. All applications immediately benefit from the new policy. You might decide that requests under fifty tokens should always go to the cheap vLLM cluster, while requests over five hundred tokens should go to OpenAI because it handles them faster. This logic lives in the gateway, not scattered across your application layer.
Financial visibility is another reason why this pattern scales. Your CEO can ask how much you're spending on inference, and you can give them a complete answer. You can see cost broken down by provider, by user, by day. You can forecast future costs based on current growth rates. You can identify cost anomalies immediately when a user suddenly submits a thousand requests in an hour. This visibility enables better decision making. You might discover that one feature is accidentally expensive and optimize it. You might find that a particular model is overpriced for what it delivers and switch to a cheaper alternative.
The human factor matters too. Your team needs to be able to understand what the gateway is doing. If routing decisions are opaque, engineers will bypass the gateway and call providers directly when something doesn't work. This defeats the entire point. The gateway should expose clear logging about why it made each routing decision. What was the request size? How much did it cost to route to each provider? Why was OpenAI chosen instead of Anthropic? This visibility builds trust and enables learning.
Architecture Overview
Let's map the system:
┌─────────────────────────────────────────────────────────┐
│ Application │
│ (chat app, agent, batch job) │
└────────────────────┬────────────────────────────────────┘
│ OpenAI-compatible requests
▼
┌─────────────────────────────────────────────────────────┐
│ Model Gateway (FastAPI) │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Auth Layer │ │ Router │ │ Limiter │ │
│ │ (validate │ │ (select │ │ (token │ │
│ │ tokens) │ │ provider) │ │ bucket) │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
└────┬─────────────┬─────────────┬──────────────────────┘
│ │ │
▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐
│ OpenAI │ │Anthropic │ │ vLLM │
│ Client │ │ Client │ │ Client │
└──────────┘ └──────────┘ └──────────┘
│ │ │
▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐
│ OpenAI │ │Anthropic │ │vLLM │
│ API │ │ API │ │Cluster │
└──────────┘ └──────────┘ └──────────┘
┌──────────────────────────────┐
│ Observability Stack │
│ (PostgreSQL, Redis, Metrics) │
└──────────────────────────────┘
The gateway touches every request. That's why observability - tracking trace IDs, latency, costs - matters from day one. Every decision made in the gateway has consequences: choosing OpenAI saves $0.02 per request but adds 200ms latency. Choosing vLLM saves money but might fail under load. The gateway must understand these tradeoffs and route accordingly.
Building the Gateway: Core Structure
Let's start with the foundation. We'll use FastAPI because it's async-native (critical for handling many concurrent provider requests), and we'll make it OpenAI-compatible so existing SDKs work unchanged.
from fastapi import FastAPI, Depends, Header, HTTPException
from fastapi.responses import StreamingResponse
import httpx
import json
import uuid
import time
from typing import Optional
from datetime import datetime
import logging
app = FastAPI()
logger = logging.getLogger(__name__)
class GatewayConfig:
"""Centralized gateway configuration"""
PROVIDERS = {
"openai": {
"base_url": "https://api.openai.com/v1",
"api_key_env": "OPENAI_API_KEY",
"models": ["gpt-4", "gpt-4-turbo", "gpt-3.5-turbo"],
"cost_per_1k_input": 0.03,
"cost_per_1k_output": 0.06,
},
"anthropic": {
"base_url": "https://api.anthropic.com/v1",
"api_key_env": "ANTHROPIC_API_KEY",
"models": ["claude-3-opus", "claude-3-sonnet"],
"cost_per_1k_input": 0.003,
"cost_per_1k_output": 0.015,
},
"vllm": {
"base_url": "http://vllm-cluster:8000/v1",
"models": ["llama-2-70b", "mistral-7b"],
"cost_per_1k_tokens": 0.0001, # Your compute cost
},
}
class RequestContext:
"""Track metadata for a single request through the gateway"""
def __init__(self, user_id: str, trace_id: Optional[str] = None):
self.user_id = user_id
self.trace_id = trace_id or str(uuid.uuid4())
self.created_at = datetime.utcnow()
self.provider = None
self.model = None
self.input_tokens = 0
self.output_tokens = 0
self.cost = 0.0
self.latency_ms = 0
def to_log(self):
return {
"trace_id": self.trace_id,
"user_id": self.user_id,
"provider": self.provider,
"model": self.model,
"input_tokens": self.input_tokens,
"output_tokens": self.output_tokens,
"cost_usd": self.cost,
"latency_ms": self.latency_ms,
"timestamp": self.created_at.isoformat(),
}Why this structure? The RequestContext travels with every request through the gateway. By the time we log it, we know everything: which user, which provider, actual token counts, actual cost. This is your window into system behavior. When a customer says "why was I charged $50?", you can retrieve their trace ID and see exactly which requests, which models, which providers were used.
The GatewayConfig hardcodes provider metadata. In production, you'd load this from environment variables or a configuration service, but the pattern is clear: each provider declares its models, API endpoint, and pricing. This is what enables intelligent routing. When a new model becomes available or pricing changes, you update one place.
Understanding the cost dynamics of different providers is crucial for making intelligent routing decisions. OpenAI pricing is transparent and straightforward, but on the higher end. You pay per million input tokens and separately per million output tokens. Anthropic pricing is slightly lower, and they offer longer context windows which means fewer round trips for certain tasks. vLLM running on your own infrastructure has completely different economics. You pay for GPU time regardless of usage, so once it's running, each inference is effectively free until you hit capacity constraints.
This economic diversity is why the routing engine needs to be model-agnostic and provider-aware. The same logical request might have three different implementations, each with different cost and performance characteristics. A straightforward summarization task might be cheap on vLLM, more expensive on Anthropic, and very expensive on OpenAI. But a request requiring deep reasoning or very long context handling might be faster and better quality on Anthropic even if it costs more.
The real-world implication is that your routing logic needs to be flexible and data-driven. Early in your implementation, you might route everything to OpenAI because it's familiar. But as your system grows and costs become a significant operational concern, you'll want to shift traffic to cheaper providers for appropriate use cases. The gateway enables this evolution naturally. You're not rewriting application code; you're refining the router logic.
Another benefit of centralizing routing is that you can implement A/B testing at the infrastructure level. You can route certain users to one provider while others use a different provider, then measure quality metrics for each. Maybe vLLM is cheaper but Anthropic has better quality for your specific use case. By measuring, you can make an informed decision. This kind of measurement-driven optimization is nearly impossible without a gateway. With a gateway, it's a straightforward feature request.
Intelligent Routing: Cost and Capability Matching
Not all models are created equal. Some are fast but expensive. Others are slow but cheap. Some support long contexts. Your job is to match the request to the right provider based on what the request actually needs. This is where routing becomes more than a simple lookup.
import tiktoken
class RoutingEngine:
"""Select the best provider for a given request"""
def __init__(self, config: GatewayConfig):
self.config = config
self.model_to_provider = self._build_model_map()
def _build_model_map(self):
"""Build a quick lookup: model name → list of providers"""
mapping = {}
for provider_name, provider_cfg in self.config.PROVIDERS.items():
for model in provider_cfg.get("models", []):
if model not in mapping:
mapping[model] = []
mapping[model].append(provider_name)
return mapping
async def select_provider(
self,
model: str,
messages: list,
user_preferences: dict = None
) -> tuple[str, str]:
"""
Select provider for this request.
Returns: (provider_name, actual_model_name)
Strategy:
1. If model explicitly requested, use that provider
2. If model + capability needed, find cheapest provider with that capability
3. Fall back to cost-optimized choice
"""
user_preferences = user_preferences or {}
# Check if explicit model maps to a single provider
if model in self.model_to_provider:
providers = self.model_to_provider[model]
if len(providers) == 1:
return providers[0], model
# Count input tokens to estimate cost
try:
encoding = tiktoken.encoding_for_model(model)
token_count = sum(
len(encoding.encode(msg.get("content", "")))
for msg in messages
)
except:
token_count = sum(
len(msg.get("content", "").split())
for msg in messages
) * 1.3 # Rough estimate
# Route based on context length requirement
context_chars = sum(len(msg.get("content", "")) for msg in messages)
if context_chars > 50000:
# Long context = Anthropic is best
if "anthropic" in [p for prov in self.model_to_provider.values() for p in prov]:
return "anthropic", "claude-3-opus"
# Default: cost-optimized routing
# For small requests, use vLLM (cheapest)
if token_count < 1000 and "vllm" in self.model_to_provider.get(model, []):
return "vllm", model
# For standard requests, OpenAI gpt-3.5 is good balance
if "openai" in self.model_to_provider.get(model, []):
return "openai", "gpt-3.5-turbo"
# Fallback
return list(self.model_to_provider.get(model, []))[0], model
routing_engine = RoutingEngine(GatewayConfig())The hidden layer here: This router is where business logic lives. "Cost-optimized" might mean something different for your team. Maybe you care about latency instead. Maybe you want to gradually shift load to a new provider to test it before full commitment. The code structure lets you change strategy in one place, not scattered across your app. That's the real power of the gateway pattern. Your infrastructure becomes data-driven and adaptable.
Rate Limiting at the Token Level
Request-count rate limiting is yesterday's problem. Token-count rate limiting is today's. A user who makes one request with 100K tokens has used more of your API budget than someone who made 100 requests with 1K tokens each. You need to throttle by actual consumption, not request count. This is critical for fairness and cost control.
import redis
from datetime import timedelta
import math
class TokenBucketLimiter:
"""
Token bucket algorithm: tracks token consumption per user.
Why token bucket?
- Allows bursts (user can spike temporarily)
- Fair to heavy and light users
- Easy to implement with Redis (atomic operations)
- Matches provider rate limits (which count tokens)
"""
def __init__(
self,
redis_client: redis.Redis,
tokens_per_minute: int = 90000,
burst_multiplier: float = 1.5
):
self.redis = redis_client
self.tokens_per_minute = tokens_per_minute
self.burst_allowance = int(tokens_per_minute * burst_multiplier)
self.refill_rate = tokens_per_minute / 60 # tokens per second
async def check_rate_limit(
self,
user_id: str,
prompt_tokens: int,
completion_tokens: int = 0
) -> tuple[bool, int, int]:
"""
Check if user is within their rate limit.
Returns: (allowed, tokens_remaining, retry_after_seconds)
"""
key = f"ratelimit:{user_id}"
# Get current bucket state
bucket_data = self.redis.get(key)
if bucket_data:
current = json.loads(bucket_data)
tokens_available = current["tokens"]
last_refill = current["last_refill"]
else:
tokens_available = self.burst_allowance
last_refill = time.time()
# Refill tokens based on time elapsed
now = time.time()
seconds_elapsed = now - last_refill
refill_tokens = int(seconds_elapsed * self.refill_rate)
tokens_available = min(
tokens_available + refill_tokens,
self.burst_allowance
)
# Check if request fits
required_tokens = prompt_tokens + (completion_tokens or prompt_tokens // 2)
allowed = required_tokens <= tokens_available
if allowed:
tokens_available -= required_tokens
retry_after = 0
else:
# How long until user can retry?
tokens_needed = required_tokens - tokens_available
retry_after = math.ceil(tokens_needed / self.refill_rate)
# Persist updated bucket
self.redis.setex(
key,
timedelta(hours=24),
json.dumps({
"tokens": int(tokens_available),
"last_refill": now
})
)
return allowed, int(tokens_available), retry_after
limiter = TokenBucketLimiter(redis.Redis(host="localhost"))What makes this different from typical rate limiters:
- Token-aware: We count actual tokens consumed, not requests
- Burst-tolerant: User can spike to 150% for a moment, enabling sustained throughput while preventing sustained abuse
- Transparent retry: We tell the client exactly when to retry (Retry-After header), enabling intelligent client-side backoff
- Per-user: Not per-API-key or per-IP, but actual user identity (from your auth system)
This is production-grade throttling because it matches how LLM providers actually throttle you. When OpenAI says "90K tokens per minute," they mean tokens, not requests.
Cost Tracking: Attribution and Budget Enforcement
Every request costs money. Your job is to know who's paying and how much. Two mechanisms: soft limits (warn the user, log alerts) and hard limits (block the request). Without this, a single runaway job can generate unexpected bills.
import asyncpg
class CostTracker:
"""Track token usage and costs per user, enforce budgets"""
def __init__(self, db_pool: asyncpg.Pool):
self.db = db_pool
async def record_request(self, ctx: RequestContext):
"""Log completed request to database"""
query = """
INSERT INTO api_calls (
trace_id, user_id, provider, model,
input_tokens, output_tokens, cost_usd,
latency_ms, created_at
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
"""
await self.db.execute(
query,
ctx.trace_id,
ctx.user_id,
ctx.provider,
ctx.model,
ctx.input_tokens,
ctx.output_tokens,
ctx.cost,
ctx.latency_ms,
ctx.created_at
)
async def check_budget(
self,
user_id: str,
estimated_cost: float,
hard_limit: float = 100.0,
soft_limit: float = 80.0
) -> tuple[bool, dict]:
"""
Check if user is within budget.
Returns: (allowed, budget_info)
"""
query = """
SELECT
COALESCE(SUM(cost_usd), 0) as total_cost,
COUNT(*) as request_count,
MAX(created_at) as last_request
FROM api_calls
WHERE user_id = $1
AND created_at > NOW() - INTERVAL '30 days'
"""
row = await self.db.fetchrow(query, user_id)
monthly_cost = row["total_cost"]
projected_cost = monthly_cost + estimated_cost
allowed = projected_cost <= hard_limit
budget_info = {
"user_id": user_id,
"monthly_cost_usd": round(monthly_cost, 2),
"projected_cost_usd": round(projected_cost, 2),
"hard_limit_usd": hard_limit,
"soft_limit_usd": soft_limit,
"percent_of_limit": round((projected_cost / hard_limit) * 100, 1),
"allowed": allowed,
"warning": projected_cost > soft_limit,
"days_remaining": 30 - (datetime.utcnow().date().day)
}
return allowed, budget_info
cost_tracker = CostTracker(db_pool)The business logic embedded here: Hard limits are hard stops. Soft limits trigger warnings (email, dashboard, logs). A team burns through their budget because nobody was watching. With this pattern, you can escalate intelligently:
- 50% of limit → informational log
- 80% of limit (soft) → alert to team lead
- 100% of limit (hard) → 429 Too Many Requests, block the call
This transforms cost management from reactive (after the fact surprise bills) to proactive (users are warned before hitting limits).
Bringing It Together: The Request Lifecycle
Here's the full request flow:
from fastapi import Request
from contextlib import asynccontextmanager
@app.post("/v1/chat/completions")
async def chat_completions(
request_body: dict,
authorization: str = Header(None),
x_trace_id: str = Header(None),
):
"""OpenAI-compatible chat completions endpoint"""
# Step 1: Authenticate and extract user identity
user_id = await authenticate_user(authorization)
if not user_id:
raise HTTPException(status_code=401, detail="Invalid auth")
ctx = RequestContext(user_id, x_trace_id)
start_time = time.time()
try:
# Step 2: Route to best provider
provider, actual_model = await routing_engine.select_provider(
model=request_body.get("model"),
messages=request_body.get("messages", []),
)
ctx.provider = provider
ctx.model = actual_model
# Step 3: Estimate tokens (for rate limiting)
# In real code, use the tokenizer for actual count
estimated_input_tokens = estimate_tokens(request_body)
# Step 4: Check rate limit
allowed, tokens_remaining, retry_after = await limiter.check_rate_limit(
user_id=user_id,
prompt_tokens=estimated_input_tokens
)
if not allowed:
logger.warning(
f"Rate limit exceeded for {user_id}",
extra={"trace_id": ctx.trace_id}
)
return JSONResponse(
status_code=429,
content={"error": "Rate limit exceeded"},
headers={"Retry-After": str(retry_after)}
)
# Step 5: Check budget
estimated_cost = calculate_cost(provider, estimated_input_tokens)
budget_allowed, budget_info = await cost_tracker.check_budget(
user_id=user_id,
estimated_cost=estimated_cost
)
if not budget_allowed:
logger.warning(
f"Budget exceeded for {user_id}",
extra={"trace_id": ctx.trace_id, "budget": budget_info}
)
return JSONResponse(
status_code=402, # Payment Required
content={"error": "Budget limit exceeded"}
)
# Step 6: Forward to actual provider
provider_client = get_provider_client(provider)
logger.info(
f"Routing request to {provider}",
extra={
"trace_id": ctx.trace_id,
"user_id": user_id,
"provider": provider,
"model": actual_model
}
)
response = await provider_client.chat.completions.create(
model=actual_model,
messages=request_body.get("messages"),
**request_body.get("kwargs", {})
)
# Step 7: Extract actual token counts from response
ctx.input_tokens = response.usage.prompt_tokens
ctx.output_tokens = response.usage.completion_tokens
ctx.cost = calculate_actual_cost(provider, ctx.input_tokens, ctx.output_tokens)
ctx.latency_ms = int((time.time() - start_time) * 1000)
# Step 8: Record for accounting and observability
await cost_tracker.record_request(ctx)
logger.info(
f"Request completed",
extra=ctx.to_log()
)
return response
except Exception as e:
ctx.latency_ms = int((time.time() - start_time) * 1000)
logger.error(
f"Request failed: {str(e)}",
extra=ctx.to_log(),
exc_info=True
)
raise
async def authenticate_user(auth_header: str) -> Optional[str]:
"""Extract user_id from Authorization header"""
if not auth_header or not auth_header.startswith("Bearer "):
return None
token = auth_header.split(" ")[1]
# Validate token, extract user_id
# (In production: check JWT, consult user database, etc.)
return "user_123" # placeholder
def estimate_tokens(request_body: dict) -> int:
"""Rough token estimate from messages"""
total_chars = sum(
len(msg.get("content", ""))
for msg in request_body.get("messages", [])
)
return int(total_chars / 4) # rough rule of thumb
def calculate_cost(provider: str, input_tokens: int) -> float:
"""Estimate cost before request"""
config = GatewayConfig.PROVIDERS[provider]
return (input_tokens / 1000) * config.get("cost_per_1k_input", 0.001)
def calculate_actual_cost(provider: str, input_tokens: int, output_tokens: int) -> float:
"""Calculate actual cost after request completes"""
config = GatewayConfig.PROVIDERS[provider]
input_cost = (input_tokens / 1000) * config.get("cost_per_1k_input", 0.001)
output_cost = (output_tokens / 1000) * config.get("cost_per_1k_output", 0.001)
return input_cost + output_costWhat just happened:
- User makes a request with their auth token
- Gateway validates identity and creates a trace ID
- Router picks the best provider based on request characteristics
- Limiter checks tokens remaining in their quota
- Budget enforcer checks monthly spend
- Request goes to provider
- Response comes back with actual token counts
- Cost and latency recorded to database
- Structured logs with trace ID enable end-to-end debugging
Every step is observable. If something goes wrong, you have the trace ID. If costs spike, you can see which provider and which user. If latency increases, you can correlate it with which models were in use.
Rate limiting is not just a technical control; it's a business control. Without proper rate limiting, users can accidentally burn through their budgets in minutes. A user runs an experiment that sends one thousand requests in rapid succession, and suddenly they've spent five hundred dollars. With proper rate limiting, you can prevent this. You set a hard limit on tokens per minute, and the user's requests queue or fail gracefully once they hit the limit.
The token bucket algorithm is particularly elegant because it handles both steady-state and burst traffic. A user with a quota of ten thousand tokens per minute can consume all ten thousand in the first second if they want to, but then they're done for the minute. Or they can spread their consumption evenly throughout the minute. The algorithm doesn't care; it just ensures fairness and predictability.
In production implementations, you need to think carefully about what happens when a user hits their rate limit. Do you queue their requests and process them when capacity becomes available? Do you return an HTTP 429 error immediately? Do you return a degraded response from a cheaper provider? Different choices have different implications for user experience and cost. Some teams implement tiered fallback: if a user hits their limit on OpenAI, automatically downgrade their request to a cheaper provider without failing their request. This provides a better user experience while still protecting your cost structure.
The interaction between rate limiting and routing is subtle but important. If your routing engine doesn't know about rate limit constraints, it might route a request to a provider that the user has no quota left with, causing an unnecessary failure. Better implementations coordinate routing and rate limiting. The router queries the rate limiter to see which providers the user has capacity with, then routes to the best option among those providers. This requires a feedback loop between these components, but it significantly improves system reliability.
Observability: From Metrics to Dashboards
Logging token counts is table-stakes. Real observability means tracing requests end-to-end and correlating failures with cost anomalies.
from prometheus_client import Counter, Histogram, Gauge
# Metrics
requests_total = Counter(
"gateway_requests_total",
"Total requests processed",
labelnames=["provider", "model", "status"]
)
request_latency = Histogram(
"gateway_request_latency_ms",
"Request latency in milliseconds",
labelnames=["provider"],
buckets=[100, 500, 1000, 2000, 5000, 10000]
)
tokens_processed = Counter(
"gateway_tokens_processed_total",
"Total tokens processed",
labelnames=["provider", "token_type"] # input or output
)
cost_per_user = Gauge(
"gateway_user_monthly_cost_usd",
"Monthly cost per user",
labelnames=["user_id"]
)
# In your request handler, after response:
requests_total.labels(
provider=ctx.provider,
model=ctx.model,
status="success"
).inc()
request_latency.labels(provider=ctx.provider).observe(ctx.latency_ms)
tokens_processed.labels(provider=ctx.provider, token_type="input").inc(ctx.input_tokens)
tokens_processed.labels(provider=ctx.provider, token_type="output").inc(ctx.output_tokens)In Grafana, you now have:
- "Requests per provider per minute" - spot provider issues
- "Cost by user over time" - budget forecasting
- "Latency percentiles" - SLA tracking
- "Error rate by provider" - reliability
The trace ID in your logs means you can correlate: "At 03:45 UTC, vLLM latency spiked, requests queued, user_456 hit rate limit, cost_tracker recorded $150 in one hour."
Streaming Responses: Handling Real-Time Output
Most LLM applications need streaming - users see tokens appear in real time rather than waiting for the entire response. A gateway should be transparent to streaming too.
from fastapi.responses import StreamingResponse
import json
@app.post("/v1/chat/completions/stream")
async def chat_completions_stream(
request_body: dict,
authorization: str = Header(None),
x_trace_id: str = Header(None),
):
"""OpenAI-compatible streaming endpoint"""
user_id = await authenticate_user(authorization)
ctx = RequestContext(user_id, x_trace_id)
start_time = time.time()
# Same routing, rate limiting, budget checks as unary
provider, actual_model = await routing_engine.select_provider(
model=request_body.get("model"),
messages=request_body.get("messages", []),
)
ctx.provider = provider
ctx.model = actual_model
# Get the streaming response
provider_client = get_provider_client(provider)
async def event_stream():
"""Wrapper that tracks tokens as they arrive"""
accumulated_tokens = 0
try:
# Stream from provider
async with provider_client.chat.completions.create(
model=actual_model,
messages=request_body.get("messages"),
stream=True,
**request_body.get("kwargs", {})
) as response:
async for chunk in response:
# Extract token counts if available
if hasattr(chunk, "usage") and chunk.usage:
accumulated_tokens = chunk.usage.completion_tokens
# Forward to client
yield f"data: {json.dumps(chunk.model_dump())}\n\n"
# After streaming completes, record
ctx.output_tokens = accumulated_tokens
ctx.latency_ms = int((time.time() - start_time) * 1000)
await cost_tracker.record_request(ctx)
logger.info("Streaming request completed", extra=ctx.to_log())
except Exception as e:
ctx.latency_ms = int((time.time() - start_time) * 1000)
logger.error(
f"Streaming request failed: {str(e)}",
extra=ctx.to_log(),
exc_info=True
)
yield f"data: {json.dumps({'error': str(e)})}\n\n"
return StreamingResponse(event_stream(), media_type="text/event-stream")Why streaming needs special handling: You can't accurately count output tokens until the response finishes. Some providers send token counts in the final chunk; others don't. The cost accounting happens after the stream closes. This is why streaming cost tracking is harder than it looks - you're attributing cost based on incomplete information mid-stream, then correcting when it's done.
Fallback and Failover: Resilience Through Redundancy
What happens when your primary provider fails? A good gateway has a fallback chain.
class ResilientRouter:
"""Route with automatic fallback on provider failure"""
def __init__(self, routing_engine: RoutingEngine):
self.routing_engine = routing_engine
self.provider_health = {} # Track recent failures
async def select_provider_with_fallback(
self,
model: str,
messages: list,
max_retries: int = 2
) -> tuple[str, str]:
"""
Select provider with automatic fallback.
Strategy:
1. Get primary provider from routing engine
2. If it's unhealthy, try next best option
3. Keep trying until we find a healthy provider
"""
fallback_chain = self._build_fallback_chain(model)
for attempt, provider in enumerate(fallback_chain):
if attempt >= max_retries:
break
# Check if provider is currently healthy
if self._is_healthy(provider):
return provider, model
logger.warning(
f"Provider {provider} unhealthy, trying fallback",
extra={"model": model, "attempt": attempt}
)
# If all else fails, return primary (will likely error, but we tried)
return fallback_chain[0], model
def _build_fallback_chain(self, model: str) -> list[str]:
"""Order providers by preference for fallback"""
# Explicit fallback order: OpenAI → Anthropic → vLLM
preferred_order = ["openai", "anthropic", "vllm"]
return [p for p in preferred_order if p in self.routing_engine.model_to_provider.get(model, [])]
def _is_healthy(self, provider: str) -> bool:
"""Check if provider has recent failures"""
failures = self.provider_health.get(provider, {})
recent_failures = sum(
1 for ts in failures.get("timestamps", [])
if time.time() - ts < 60 # Last 60 seconds
)
# Unhealthy if 3+ failures in last minute
return recent_failures < 3
def record_failure(self, provider: str):
"""Mark provider as having failed"""
if provider not in self.provider_health:
self.provider_health[provider] = {"timestamps": []}
self.provider_health[provider]["timestamps"].append(time.time())The hidden logic: This is a circuit breaker pattern. If OpenAI fails 3 times in a minute, stop sending requests to it for the next period. Automatically try Anthropic instead. This prevents cascading failures where your gateway queues up requests to a dead provider, building up latency that affects all users. Real implementations use libraries like pybreaker that handle exponential backoff, timeout calculation, and half-open states (testing the provider occasionally to see if it recovered).
Production Considerations
This example is a foundation. Real deployments need additional layers of sophistication. You need caching that sits between your gateway and your rate limiters, reducing the number of Redis hits. You need automatic retries that use exponential backoff and jitter, preventing thundering herd problems when a provider temporarily fails. You need circuit breakers that track provider reliability and automatically stop sending requests to providers that are misbehaving.
Health checking is critical. You should ping each provider periodically and track their response times and error rates. If a provider's latency starts degrading, the router should gradually shift traffic to healthier providers. Some teams implement adaptive routing where the router tracks real-time provider performance and automatically weights providers based on their observed reliability and latency.
Fallback chains are where your system's resilience lives. When your primary provider fails, you need a predetermined chain of fallbacks. But the fallback logic isn't just "try A, if it fails try B." Good fallback implementations track which providers are currently in good health and route accordingly. Some teams implement exponential backoff for failing providers, giving them a chance to recover without immediately load-balancing back to them.
Streaming is where the architectural patterns we discussed become critical. Many applications need to stream LLM responses token by token, or stream structured outputs as they're generated. Your gateway needs to transparently handle this, without buffering the entire response in memory. This is why the streaming patterns matter at the gateway level - if you're buffering responses, your latency becomes unbounded.
Multi-region routing is valuable if you're serving global users. A request from a user in Europe might be served faster by a European LLM provider than by OpenAI's US-based API. Your router should understand geography and route accordingly.
Team isolation is crucial for multi-tenant scenarios. If you're hosting a platform where multiple teams can use your inference gateway, you need to enforce quotas per team, not per user. You need to prevent one team's runaway job from consuming all the gateway's capacity. You need billing that's accurate per team.
Each of these is a plug-in to the architecture you've built. The core pattern - unified API, intelligent routing, rate limiting, cost tracking, observability - is the skeleton all else hangs on. Build this core first. Add the production layers as you hit the specific scaling challenges that require them.
Advanced routing strategies become valuable as your gateway matures and you have more operational data. You might discover that certain providers have specific strengths. Anthropic might consistently produce higher quality outputs for reasoning tasks. OpenAI might be faster for simple completions. vLLM might be cheaper for certain use cases. Once you have this data, you can embed it into your routing logic as user preferences or request type signatures.
Some teams implement feature-based routing where certain features in your application always route to a particular provider. You might have one team building code generation features that always use OpenAI, while another team building analysis features uses Anthropic. The gateway enforces this routing without application code needing to know about it. This is powerful because it decouples team decisions about which provider to use from the application logic.
Cost optimization through the gateway becomes a continuous process. You monitor costs over time, identify expensive features or user segments, and optimize them. Maybe you discover that one user is spending ten times more than similar users because they're using a less efficient feature. The gateway lets you experiment with optimization strategies. You could route their requests to a cheaper provider and measure if quality is acceptable. You could implement response caching to avoid repeated expensive calls. The gateway enables these optimizations without changing application code.
Fallback and retry strategies in the gateway provide resilience that would be difficult to achieve at the application layer. If OpenAI returns an error, the gateway automatically retries with Anthropic. If that fails, it tries vLLM. From the application's perspective, the request simply succeeded using a different provider. The application doesn't need to know about the fallback; the gateway handles it transparently. This is incredibly powerful for reliability.
The gateway also enables testing and staging. You can test a new provider in production without affecting users. Route one percent of traffic to the new provider, measure its behavior, and then roll back if there are issues. Or gradually increase the percentage until you're confident. This is canary deployment at the infrastructure level, which is much simpler to implement and manage than doing it at the application layer.
Team velocity improves significantly when you have a good gateway. Multiple teams can iterate on their features independently without worrying about provider integration. The platform team manages the gateway and provider integrations. Application teams focus on building features. This separation of concerns enables faster iteration and cleaner code.
Monitoring and cost visibility enable business decisions that wouldn't otherwise be possible. When your CEO asks whether you should invest in fine-tuning a private model, you can look at the cost data. If you're spending five thousand dollars a month on inference, fine-tuning might be worthwhile. If you're spending two hundred dollars a month, it's not. The gateway gives you the data to make informed business decisions.
Operational Excellence Through Gateway Design
The real value of a well-designed gateway emerges over time in production. What looks like elegant architecture on day one becomes invaluable infrastructure by month six. Consider the scenario where you need to respond to a security incident involving one provider. If requests are tightly coupled to that provider's API throughout your codebase, responding requires coordinating changes across multiple services. With a gateway, you flip a flag and routes change instantly. That's the difference between panic and calm problem-solving. Every team member can focus on addressing the security issue instead of coordinating deployments.
Another operational benefit is the ability to absorb provider API changes without affecting your users. When a provider deprecates an endpoint or changes response format, the gateway translates. Your application code doesn't need updates. This shields your team from the constant churn of provider API evolution. Imagine running a service where every provider update would require application code changes. That's hundreds of hours of engineering time annually. A gateway makes provider updates a platform concern, not an application concern.
Capacity planning becomes significantly easier with a gateway. You have centralized visibility into usage patterns. You know which providers are being used heavily, which features drive the most load, which times of day see the heaviest traffic. This data drives infrastructure decisions. Maybe you discover that ninety percent of requests happen during business hours, suggesting you could use cheaper spot instances for off-peak time. Maybe you find that one feature drives forty percent of costs, suggesting an opportunity for optimization or monetization. The gateway gives you the data to make informed capacity decisions.
The gateway also becomes your insurance policy against vendor lock-in. If you invest heavily in a single provider and then their pricing changes dramatically, you're stuck. With a gateway, switching providers becomes an operational change, not an architectural one. Some teams maintain relationships with multiple providers specifically to avoid lock-in risk. When provider A raises prices, you shift traffic to provider B seamlessly. From the application's perspective, nothing changed. From a cost perspective, you just saved ten thousand dollars a month.
Summary
A model gateway isn't a nice-to-have. It's the infrastructure that makes multiple providers manageable. You're building:
- Abstraction: Apps don't need to know about provider differences
- Intelligence: Routing decisions based on cost, latency, capability
- Safety: Rate limiting by token, budget enforcement, soft/hard limits
- Visibility: Every request tracked, costs attributed, latency understood
Start with this foundation. Add retry logic and circuit breakers next. Then multi-region routing. Then predictive scaling. The pattern is the same: layer by layer, observation informs the next iteration.
Your LLM infrastructure will be more resilient, cheaper, and faster. Your team will sleep better knowing they're not silently burning cash on the most expensive provider for every request.
One more aspect worth emphasizing is the gateway as a platform for experimentation. As an organization, you'll have questions about whether to upgrade models, switch providers, or adopt new inference techniques. A gateway makes answering these questions possible. Run an experiment routing fifty percent of traffic to the new model while the rest use the old model. Compare quality metrics, latency, and cost. Make a data-driven decision. Without a gateway, running this kind of experiment requires changes to application code and careful coordination across teams. With a gateway, it's straightforward.
The maturity of your gateway should evolve with your business needs. Initially, you might need basic routing and cost tracking. As you grow, you'll need more sophisticated features like canary deployments, multi-region routing, and predictive cost forecasting. The architecture you've built provides the foundation for all of these enhancements. You're not ripping and replacing; you're evolving the gateway gradually.
Team training becomes important as your gateway grows more sophisticated. New engineers need to understand how routing works, how to add a new provider, how to debug issues. Good documentation and runbooks are invaluable. Some teams maintain example configurations showing common scenarios. They maintain decision trees for choosing between providers. They maintain playbooks for common troubleshooting scenarios. This institutional knowledge makes your gateway more valuable and easier to operate.
The business value of a well-built gateway extends beyond just infrastructure. It enables your team to make better decisions faster. It enables experimentation that would otherwise be too risky or complex. It enables cost optimization that directly impacts your bottom line. It enables reliability that improves customer experience. These benefits accumulate over time, making a good gateway one of the best investments your infrastructure team can make.
The future of model gateways will likely involve more automation and intelligence. Gateways might automatically optimize routing based on real-time performance data. They might predict future cost and route to minimize expected cost. They might route based on user location, device type, and context in addition to request characteristics. They might integrate with external optimization services to discover new routing strategies. The core pattern remains the same: centralize control, enable intelligence, provide visibility. The implementation details evolve as the technology landscape changes.