第22章:429 Too Many Requests – 速率限制实现深度分析
22.1 定义与语义
429 Too Many Requests 状态码表示用户在给定时间内发送了太多请求("速率限制")。该状态码的核心含义是:
-
客户端请求频率超过服务器限制
-
服务器暂时拒绝处理当前请求
-
客户端应该在等待一段时间后重试
关键特性:
-
速率限制:限制单位时间内的请求数量
-
可恢复错误:等待一段时间后可恢复正常
-
包含重试信息:响应通常包含何时可重试的指示
协议要求:
-
应包含描述限制详情的响应体
-
可以包含 Retry-After 头部指示重试时间
-
可能包含速率限制配额信息
22.2 速率限制策略分类
22.2.1 常见速率限制算法
python
# 速率限制算法分类与实现
from abc import ABC, abstractmethod
import time
from typing import Dict, List, Optional, Tuple
import math
from collections import deque
import threading
class RateLimiter(ABC):
"""速率限制器抽象基类"""
@abstractmethod
def is_allowed(self, key: str, weight: int = 1) -> Tuple[bool, Dict]:
"""检查是否允许请求"""
pass
@abstractmethod
def get_remaining(self, key: str) -> int:
"""获取剩余配额"""
pass
@abstractmethod
def get_reset_time(self, key: str) -> float:
"""获取重置时间"""
pass
class TokenBucketLimiter(RateLimiter):
"""令牌桶算法实现"""
def __init__(self, capacity: int, refill_rate: float):
"""
Args:
capacity: 桶容量
refill_rate: 每秒补充的令牌数
"""
self.capacity = capacity
self.refill_rate = refill_rate
self.buckets: Dict[str, Dict] = {}
self.lock = threading.RLock()
def is_allowed(self, key: str, weight: int = 1) -> Tuple[bool, Dict]:
with self.lock:
current_time = time.time()
# 获取或创建桶
if key not in self.buckets:
self.buckets[key] = {
'tokens': self.capacity,
'last_refill': current_time
}
bucket = self.buckets[key]
# 补充令牌
time_passed = current_time – bucket['last_refill']
tokens_to_add = time_passed * self.refill_rate
bucket['tokens'] = min(self.capacity, bucket['tokens'] + tokens_to_add)
bucket['last_refill'] = current_time
# 检查是否有足够令牌
if bucket['tokens'] >= weight:
bucket['tokens'] -= weight
remaining = bucket['tokens']
reset_time = current_time + (self.capacity – bucket['tokens']) / self.refill_rate
return True, {
'remaining': int(remaining),
'reset_time': reset_time,
'limit': self.capacity
}
else:
# 计算需要等待的时间
tokens_needed = weight – bucket['tokens']
wait_time = tokens_needed / self.refill_rate
remaining = 0
reset_time = current_time + wait_time
return False, {
'remaining': remaining,
'reset_time': reset_time,
'retry_after': math.ceil(wait_time),
'limit': self.capacity
}
def get_remaining(self, key: str) -> int:
with self.lock:
if key not in self.buckets:
return self.capacity
bucket = self.buckets[key]
current_time = time.time()
# 补充令牌
time_passed = current_time – bucket['last_refill']
tokens_to_add = time_passed * self.refill_rate
tokens = min(self.capacity, bucket['tokens'] + tokens_to_add)
return int(tokens)
def get_reset_time(self, key: str) -> float:
with self.lock:
if key not in self.buckets:
return time.time()
bucket = self.buckets[key]
current_time = time.time()
# 计算令牌完全补充的时间
if bucket['tokens'] < self.capacity:
tokens_needed = self.capacity – bucket['tokens']
return current_time + tokens_needed / self.refill_rate
else:
return current_time
class FixedWindowLimiter(RateLimiter):
"""固定窗口算法实现"""
def __init__(self, requests_per_window: int, window_seconds: int):
"""
Args:
requests_per_window: 每个窗口允许的请求数
window_seconds: 窗口大小(秒)
"""
self.requests_per_window = requests_per_window
self.window_seconds = window_seconds
self.windows: Dict[str, Dict] = {}
self.lock = threading.RLock()
def is_allowed(self, key: str, weight: int = 1) -> Tuple[bool, Dict]:
with self.lock:
current_time = time.time()
window_start = math.floor(current_time / self.window_seconds) * self.window_seconds
# 获取或创建窗口
if key not in self.windows:
self.windows[key] = {
'window_start': window_start,
'count': 0
}
window = self.windows[key]
# 如果窗口已过期,重置
if window['window_start'] < window_start:
window['window_start'] = window_start
window['count'] = 0
# 检查是否超过限制
if window['count'] + weight <= self.requests_per_window:
window['count'] += weight
remaining = self.requests_per_window – window['count']
reset_time = window['window_start'] + self.window_seconds
return True, {
'remaining': remaining,
'reset_time': reset_time,
'limit': self.requests_per_window
}
else:
# 已超过限制
remaining = 0
reset_time = window['window_start'] + self.window_seconds
retry_after = math.ceil(reset_time – current_time)
return False, {
'remaining': remaining,
'reset_time': reset_time,
'retry_after': retry_after,
'limit': self.requests_per_window
}
def get_remaining(self, key: str) -> int:
with self.lock:
current_time = time.time()
window_start = math.floor(current_time / self.window_seconds) * self.window_seconds
if key not in self.windows:
return self.requests_per_window
window = self.windows[key]
# 如果窗口已过期,返回完整配额
if window['window_start'] < window_start:
return self.requests_per_window
return max(0, self.requests_per_window – window['count'])
def get_reset_time(self, key: str) -> float:
with self.lock:
current_time = time.time()
window_start = math.floor(current_time / self.window_seconds) * self.window_seconds
if key not in self.windows:
return window_start + self.window_seconds
window = self.windows[key]
# 如果窗口已过期,返回下一个窗口结束时间
if window['window_start'] < window_start:
return window_start + self.window_seconds
return window['window_start'] + self.window_seconds
class SlidingWindowLogLimiter(RateLimiter):
"""滑动窗口日志算法实现"""
def __init__(self, requests_per_window: int, window_seconds: int):
"""
Args:
requests_per_window: 每个窗口允许的请求数
window_seconds: 窗口大小(秒)
"""
self.requests_per_window = requests_per_window
self.window_seconds = window_seconds
self.logs: Dict[str, deque] = {}
self.lock = threading.RLock()
def is_allowed(self, key: str, weight: int = 1) -> Tuple[bool, Dict]:
with self.lock:
current_time = time.time()
window_start = current_time – self.window_seconds
# 获取或创建日志队列
if key not in self.logs:
self.logs[key] = deque()
log = self.logs[key]
# 清理过期记录
while log and log[0] < window_start:
log.popleft()
# 检查是否超过限制
if len(log) + weight <= self.requests_per_window:
# 允许请求,记录时间戳
for _ in range(weight):
log.append(current_time)
remaining = self.requests_per_window – len(log)
# 计算重置时间(最早请求的时间 + 窗口大小)
reset_time = log[0] + self.window_seconds if log else current_time + self.window_seconds
return True, {
'remaining': remaining,
'reset_time': reset_time,
'limit': self.requests_per_window
}
else:
# 已超过限制
remaining = 0
# 计算最早可重试时间
retry_time = log[0] + self.window_seconds if log else current_time
retry_after = math.ceil(retry_time – current_time)
reset_time = retry_time
return False, {
'remaining': remaining,
'reset_time': reset_time,
'retry_after': max(0, retry_after),
'limit': self.requests_per_window
}
def get_remaining(self, key: str) -> int:
with self.lock:
current_time = time.time()
window_start = current_time – self.window_seconds
if key not in self.logs:
return self.requests_per_window
log = self.logs[key]
# 清理过期记录
while log and log[0] < window_start:
log.popleft()
return max(0, self.requests_per_window – len(log))
def get_reset_time(self, key: str) -> float:
with self.lock:
current_time = time.time()
window_start = current_time – self.window_seconds
if key not in self.logs:
return current_time + self.window_seconds
log = self.logs[key]
# 清理过期记录
while log and log[0] < window_start:
log.popleft()
if log:
# 最早请求的时间 + 窗口大小
return log[0] + self.window_seconds
else:
return current_time + self.window_seconds
class AdaptiveRateLimiter:
"""自适应速率限制器"""
def __init__(self, base_limiter: RateLimiter, adaptation_config: Dict = None):
self.base_limiter = base_limiter
self.config = {
'min_requests_per_second': 1,
'max_requests_per_second': 100,
'scale_up_factor': 1.1, # 增加10%
'scale_down_factor': 0.9, # 减少10%
'scale_up_interval': 60, # 每60秒评估一次增加
'scale_down_threshold': 0.8, # 使用率80%时考虑增加
'load_shedding_threshold': 0.95, # 使用率95%时开始降级
** (adaptation_config or {})
}
self.usage_history = deque(maxlen=100)
self.adaptation_history = []
self.last_adaptation_time = time.time()
def is_allowed(self, key: str, weight: int = 1) -> Tuple[bool, Dict]:
# 检查基础限制器
allowed, result = self.base_limiter.is_allowed(key, weight)
# 记录使用情况
self.record_usage(key, allowed, result)
# 自适应调整
self.adaptive_adjustment()
return allowed, result
def record_usage(self, key: str, allowed: bool, result: Dict):
"""记录使用情况"""
current_time = time.time()
usage = {
'timestamp': current_time,
'key': key,
'allowed': allowed,
'remaining': result.get('remaining', 0),
'limit': result.get('limit', 0)
}
self.usage_history.append(usage)
def adaptive_adjustment(self):
"""自适应调整"""
current_time = time.time()
# 检查是否到达调整间隔
if current_time – self.last_adaptation_time < self.config['scale_up_interval']:
return
# 分析最近的使用情况
recent_history = [u for u in self.usage_history
if u['timestamp'] > current_time – self.config['scale_up_interval']]
if not recent_history:
return
# 计算平均使用率
total_requests = len(recent_history)
allowed_requests = sum(1 for u in recent_history if u['allowed'])
usage_rate = allowed_requests / total_requests if total_requests > 0 else 0
# 调整逻辑
if usage_rate > self.config['scale_down_threshold']:
# 使用率高,考虑增加限制
self.scale_up()
elif usage_rate < 0.5:
# 使用率低,考虑减少限制
self.scale_down()
self.last_adaptation_time = current_time
def scale_up(self):
"""增加速率限制"""
# 获取当前限制器配置
if isinstance(self.base_limiter, TokenBucketLimiter):
new_capacity = min(
self.base_limiter.capacity * self.config['scale_up_factor'],
self.config['max_requests_per_second'] * 60 # 转换为每分钟
)
self.base_limiter.capacity = new_capacity
self.adaptation_history.append({
'timestamp': time.time(),
'action': 'scale_up',
'new_capacity': new_capacity,
'reason': 'high_usage_rate'
})
elif isinstance(self.base_limiter, FixedWindowLimiter):
new_limit = min(
self.base_limiter.requests_per_window * self.config['scale_up_factor'],
self.config['max_requests_per_second'] * self.base_limiter.window_seconds
)
self.base_limiter.requests_per_window = int(new_limit)
self.adaptation_history.append({
'timestamp': time.time(),
'action': 'scale_up',
'new_limit': new_limit,
'reason': 'high_usage_rate'
})
def scale_down(self):
"""减少速率限制"""
if isinstance(self.base_limiter, TokenBucketLimiter):
new_capacity = max(
self.base_limiter.capacity * self.config['scale_down_factor'],
self.config['min_requests_per_second'] * 60
)
self.base_limiter.capacity = new_capacity
self.adaptation_history.append({
'timestamp': time.time(),
'action': 'scale_down',
'new_capacity': new_capacity,
'reason': 'low_usage_rate'
})
elif isinstance(self.base_limiter, FixedWindowLimiter):
new_limit = max(
self.base_limiter.requests_per_window * self.config['scale_down_factor'],
self.config['min_requests_per_second'] * self.base_limiter.window_seconds
)
self.base_limiter.requests_per_window = int(new_limit)
self.adaptation_history.append({
'timestamp': time.time(),
'action': 'scale_down',
'new_limit': new_limit,
'reason': 'low_usage_rate'
})
22.2.2 多层速率限制策略
python
# 多层速率限制策略
from typing import List, Dict, Optional, Tuple
import hashlib
class MultiLayerRateLimiter:
"""多层速率限制器"""
def __init__(self, layers_config: List[Dict]):
"""
Args:
layers_config: 各层配置列表,按顺序检查
[
{
'limiter_class': TokenBucketLimiter,
'args': [100, 1.0], # capacity, refill_rate
'kwargs': {},
'key_builder': 'ip', # ip, user, endpoint, custom
'cost': 1, # 请求成本
'name': 'ip_layer'
},
…
]
"""
self.layers = []
self.layer_configs = layers_config
self.init_layers()
def init_layers(self):
"""初始化各层限制器"""
for config in self.layer_configs:
limiter_class = config['limiter_class']
args = config.get('args', [])
kwargs = config.get('kwargs', {})
limiter = limiter_class(*args, **kwargs)
self.layers.append({
'limiter': limiter,
'key_builder': config.get('key_builder', 'ip'),
'cost': config.get('cost', 1),
'name': config.get('name', 'unnamed_layer'),
'priority': config.get('priority', 0)
})
# 按优先级排序
self.layers.sort(key=lambda x: x['priority'], reverse=True)
def is_allowed(self, request_info: Dict) -> Tuple[bool, Dict]:
"""
检查请求是否被允许
Args:
request_info: 请求信息
{
'ip': '127.0.0.1',
'user_id': 'user123',
'endpoint': '/api/users',
'method': 'GET',
'headers': {…}
}
Returns:
(allowed, result)
"""
results = []
for layer in self.layers:
# 构建键
key = self.build_key(layer['key_builder'], request_info)
cost = layer['cost']
# 检查该层限制
allowed, result = layer['limiter'].is_allowed(key, cost)
result['layer'] = layer['name']
results.append(result)
if not allowed:
# 该层被限制,立即返回
return False, {
'allowed': False,
'first_blocking_layer': layer['name'],
'details': results,
'retry_after': result.get('retry_after'),
'reset_time': result.get('reset_time')
}
# 所有层都允许
return True, {
'allowed': True,
'details': results
}
def build_key(self, key_builder: str, request_info: Dict) -> str:
"""构建限制键"""
if key_builder == 'ip':
return f"ip:{request_info.get('ip', 'unknown')}"
elif key_builder == 'user':
user_id = request_info.get('user_id')
if user_id:
return f"user:{user_id}"
else:
# 未认证用户使用IP
return f"ip:{request_info.get('ip', 'unknown')}"
elif key_builder == 'endpoint':
endpoint = request_info.get('endpoint', 'unknown')
method = request_info.get('method', 'GET')
return f"endpoint:{method}:{endpoint}"
elif key_builder == 'user_endpoint':
user_id = request_info.get('user_id', 'anonymous')
endpoint = request_info.get('endpoint', 'unknown')
method = request_info.get('method', 'GET')
return f"user_endpoint:{user_id}:{method}:{endpoint}"
elif key_builder == 'custom':
# 自定义键构建逻辑
custom_data = request_info.get('custom_key_data', '')
key_hash = hashlib.md5(custom_data.encode()).hexdigest()
return f"custom:{key_hash}"
else:
# 默认使用IP
return f"ip:{request_info.get('ip', 'unknown')}"
def get_quotas(self, request_info: Dict) -> Dict:
"""获取所有层的配额信息"""
quotas = {}
for layer in self.layers:
key = self.build_key(layer['key_builder'], request_info)
limiter = layer['limiter']
remaining = limiter.get_remaining(key)
reset_time = limiter.get_reset_time(key)
quotas[layer['name']] = {
'remaining': remaining,
'reset_time': reset_time,
'limit': self.get_layer_limit(layer),
'key_builder': layer['key_builder']
}
return quotas
def get_layer_limit(self, layer: Dict) -> int:
"""获取层的限制值"""
limiter = layer['limiter']
if isinstance(limiter, TokenBucketLimiter):
return limiter.capacity
elif isinstance(limiter, FixedWindowLimiter):
return limiter.requests_per_window
elif isinstance(limiter, SlidingWindowLogLimiter):
return limiter.requests_per_window
else:
return 0
def get_rate_limit_headers(self, request_info: Dict) -> Dict[str, str]:
"""获取速率限制头部信息"""
quotas = self.get_quotas(request_info)
# 找到最严格的限制
strictest_layer = None
strictest_remaining = float('inf')
for name, quota in quotas.items():
if quota['remaining'] < strictest_remaining:
strictest_remaining = quota['remaining']
strictest_layer = name
if strictest_layer:
quota = quotas[strictest_layer]
headers = {
'X-RateLimit-Limit': str(quota['limit']),
'X-RateLimit-Remaining': str(quota['remaining']),
'X-RateLimit-Reset': str(int(quota['reset_time']))
}
# 添加重试头部(如果剩余为0)
if quota['remaining'] <= 0:
retry_after = max(0, math.ceil(quota['reset_time'] – time.time()))
headers['Retry-After'] = str(retry_after)
return headers
return {}
# 使用示例
def create_production_rate_limiter():
"""创建生产环境使用的多层速率限制器"""
layers_config = [
# 第一层:IP基础限制(防止滥用)
{
'limiter_class': TokenBucketLimiter,
'args': [100, 1.0], # 100请求,每秒补充1个
'key_builder': 'ip',
'cost': 1,
'name': 'ip_basic',
'priority': 10
},
# 第二层:用户级别限制
{
'limiter_class': FixedWindowLimiter,
'args': [1000, 3600], # 1000请求/小时
'key_builder': 'user',
'cost': 1,
'name': 'user_hourly',
'priority': 20
},
# 第三层:端点级别限制
{
'limiter_class': SlidingWindowLogLimiter,
'args': [60, 60], # 60请求/分钟
'key_builder': 'endpoint',
'cost': 1,
'name': 'endpoint_minute',
'priority': 30
},
# 第四层:敏感端点更严格限制
{
'limiter_class': FixedWindowLimiter,
'args': [10, 60], # 10请求/分钟(用于敏感操作)
'key_builder': 'user_endpoint',
'cost': 1,
'name': 'sensitive_endpoint',
'priority': 40,
'condition': lambda req: req.get('endpoint', '').startswith('/api/admin/')
}
]
return MultiLayerRateLimiter(layers_config)
22.3 详细实现与最佳实践
22.3.1 分布式速率限制实现
python
# 基于Redis的分布式速率限制器
import redis
import json
import pickle
from typing import Optional, Tuple, Dict, Any
import time
import hashlib
class RedisRateLimiter:
"""基于Redis的分布式速率限制器"""
def __init__(self, redis_client: redis.Redis, namespace: str = "ratelimit"):
self.redis = redis_client
self.namespace = namespace
self.lua_scripts = self.load_lua_scripts()
def load_lua_scripts(self) -> Dict[str, str]:
"""加载Lua脚本(原子操作)"""
# 令牌桶算法的Lua脚本
token_bucket_script = """
local key = KEYS[1]
local capacity = tonumber(ARGV[1])
local refill_rate = tonumber(ARGV[2])
local weight = tonumber(ARGV[3])
local now = tonumber(ARGV[4])
local bucket = redis.call('HMGET', key, 'tokens', 'last_refill')
local tokens
local last_refill
if bucket[1] == false then
— 初始化桶
tokens = capacity
last_refill = now
redis.call('HMSET', key, 'tokens', tokens, 'last_refill', last_refill)
else
tokens = tonumber(bucket[1])
last_refill = tonumber(bucket[2])
— 补充令牌
local time_passed = now – last_refill
local tokens_to_add = time_passed * refill_rate
tokens = math.min(capacity, tokens + tokens_to_add)
last_refill = now
end
— 检查是否有足够令牌
if tokens >= weight then
tokens = tokens – weight
redis.call('HMSET', key, 'tokens', tokens, 'last_refill', last_refill)
— 计算重置时间
local reset_time = now + (capacity – tokens) / refill_rate
return {1, tokens, reset_time, capacity}
else
— 计算需要等待的时间
local tokens_needed = weight – tokens
local wait_time = tokens_needed / refill_rate
local reset_time = now + wait_time
return {0, tokens, reset_time, capacity, wait_time}
end
"""
# 固定窗口算法的Lua脚本
fixed_window_script = """
local key = KEYS[1]
local limit = tonumber(ARGV[1])
local window = tonumber(ARGV[2])
local weight = tonumber(ARGV[3])
local now = tonumber(ARGV[4])
local window_start = math.floor(now / window) * window
local window_key = key .. ':' .. tostring(window_start)
— 获取当前计数
local count = redis.call('GET', window_key)
if count == false then
count = 0
else
count = tonumber(count)
end
— 检查是否超过限制
if count + weight <= limit then
— 增加计数
redis.call('INCRBY', window_key, weight)
— 设置过期时间(窗口结束时)
redis.call('EXPIRE', window_key, window)
local remaining = limit – (count + weight)
local reset_time = window_start + window
return {1, remaining, reset_time, limit}
else
— 已超过限制
local remaining = 0
local reset_time = window_start + window
local retry_after = reset_time – now
return {0, remaining, reset_time, limit, retry_after}
end
"""
# 滑动窗口算法的Lua脚本
sliding_window_script = """
local key = KEYS[1]
local limit = tonumber(ARGV[1])
local window = tonumber(ARGV[2])
local weight = tonumber(ARGV[3])
local now = tonumber(ARGV[4])
local window_start = now – window
— 获取所有记录
local records = redis.call('ZRANGEBYSCORE', key, window_start, now)
local count = #records
— 检查是否超过限制
if count + weight <= limit then
— 添加新记录
for i = 1, weight do
redis.call('ZADD', key, now, now .. ':' .. i)
end
— 清理过期记录
redis.call('ZREMRANGEBYSCORE', key, 0, window_start)
— 设置过期时间
redis.call('EXPIRE', key, window)
— 计算剩余配额
local remaining = limit – (count + weight)
— 计算重置时间(最早记录的时间 + 窗口大小)
local reset_time
if count > 0 then
local oldest = redis.call('ZRANGE', key, 0, 0, 'WITHSCORES')[2]
reset_time = oldest + window
else
reset_time = now + window
end
return {1, remaining, reset_time, limit}
else
— 已超过限制
local remaining = 0
— 计算最早可重试时间
local reset_time
if count > 0 then
local oldest = redis.call('ZRANGE', key, 0, 0, 'WITHSCORES')[2]
reset_time = oldest + window
else
reset_time = now + window
end
local retry_after = reset_time – now
return {0, remaining, reset_time, limit, retry_after}
end
"""
return {
'token_bucket': self.redis.script_load(token_bucket_script),
'fixed_window': self.redis.script_load(fixed_window_script),
'sliding_window': self.redis.script_load(sliding_window_script)
}
def token_bucket_is_allowed(self, key: str, capacity: int,
refill_rate: float, weight: int = 1) -> Tuple[bool, Dict]:
"""令牌桶算法检查"""
redis_key = f"{self.namespace}:token_bucket:{key}"
now = time.time()
result = self.redis.evalsha(
self.lua_scripts['token_bucket'],
1, # key数量
redis_key,
capacity,
refill_rate,
weight,
now
)
allowed = bool(result[0])
tokens = result[1]
reset_time = result[2]
limit = result[3]
if allowed:
return True, {
'remaining': int(tokens),
'reset_time': reset_time,
'limit': limit
}
else:
wait_time = result[4]
return False, {
'remaining': int(tokens),
'reset_time': reset_time,
'retry_after': math.ceil(wait_time),
'limit': limit
}
def fixed_window_is_allowed(self, key: str, limit: int,
window_seconds: int, weight: int = 1) -> Tuple[bool, Dict]:
"""固定窗口算法检查"""
redis_key = f"{self.namespace}:fixed_window:{key}"
now = time.time()
result = self.redis.evalsha(
self.lua_scripts['fixed_window'],
1,
redis_key,
limit,
window_seconds,
weight,
now
)
allowed = bool(result[0])
remaining = result[1]
reset_time = result[2]
limit_val = result[3]
if allowed:
return True, {
'remaining': int(remaining),
'reset_time': reset_time,
'limit': limit_val
}
else:
retry_after = result[4]
return False, {
'remaining': int(remaining),
'reset_time': reset_time,
'retry_after': math.ceil(retry_after),
'limit': limit_val
}
def sliding_window_is_allowed(self, key: str, limit: int,
window_seconds: int, weight: int = 1) -> Tuple[bool, Dict]:
"""滑动窗口算法检查"""
redis_key = f"{self.namespace}:sliding_window:{key}"
now = time.time()
result = self.redis.evalsha(
self.lua_scripts['sliding_window'],
1,
redis_key,
limit,
window_seconds,
weight,
now
)
allowed = bool(result[0])
remaining = result[1]
reset_time = result[2]
limit_val = result[3]
if allowed:
return True, {
'remaining': int(remaining),
'reset_time': reset_time,
'limit': limit_val
}
else:
retry_after = result[4]
return False, {
'remaining': int(remaining),
'reset_time': reset_time,
'retry_after': math.ceil(retry_after),
'limit': limit_val
}
def get_quotas(self, key: str, algorithm: str = 'token_bucket',
**kwargs) -> Optional[Dict]:
"""获取配额信息"""
if algorithm == 'token_bucket':
capacity = kwargs.get('capacity', 100)
refill_rate = kwargs.get('refill_rate', 1.0)
redis_key = f"{self.namespace}:token_bucket:{key}"
bucket = self.redis.hgetall(redis_key)
if not bucket:
return {
'remaining': capacity,
'limit': capacity,
'reset_time': time.time()
}
tokens = float(bucket.get(b'tokens', 0))
last_refill = float(bucket.get(b'last_refill', time.time()))
# 补充令牌
now = time.time()
time_passed = now – last_refill
tokens_to_add = time_passed * refill_rate
tokens = min(capacity, tokens + tokens_to_add)
# 计算重置时间
if tokens < capacity:
reset_time = now + (capacity – tokens) / refill_rate
else:
reset_time = now
return {
'remaining': int(tokens),
'limit': capacity,
'reset_time': reset_time
}
elif algorithm == 'fixed_window':
limit = kwargs.get('limit', 100)
window_seconds = kwargs.get('window_seconds', 60)
now = time.time()
window_start = math.floor(now / window_seconds) * window_seconds
redis_key = f"{self.namespace}:fixed_window:{key}:{window_start}"
count = int(self.redis.get(redis_key) or 0)
remaining = max(0, limit – count)
reset_time = window_start + window_seconds
return {
'remaining': remaining,
'limit': limit,
'reset_time': reset_time
}
return None
def reset_limits(self, key: str, algorithm: str = 'token_bucket'):
"""重置限制"""
if algorithm == 'token_bucket':
redis_key = f"{self.namespace}:token_bucket:{key}"
self.redis.delete(redis_key)
elif algorithm == 'fixed_window':
# 删除所有相关的窗口键
pattern = f"{self.namespace}:fixed_window:{key}:*"
keys = self.redis.keys(pattern)
if keys:
self.redis.delete(*keys)
elif algorithm == 'sliding_window':
redis_key = f"{self.namespace}:sliding_window:{key}"
self.redis.delete(redis_key)
class DistributedRateLimiter:
"""分布式速率限制管理器"""
def __init__(self, redis_client: redis.Redis, config: Dict):
self.redis = redis_client
self.config = config
self.redis_limiter = RedisRateLimiter(redis_client)
# 本地缓存,减少Redis访问
self.local_cache = {}
self.cache_ttl = 1 # 本地缓存1秒
def is_allowed(self, identifier: str, rule_name: str) -> Tuple[bool, Dict]:
"""检查是否允许请求"""
# 检查本地缓存
cache_key = f"{identifier}:{rule_name}"
cached = self.local_cache.get(cache_key)
if cached and time.time() – cached['timestamp'] < self.cache_ttl:
return cached['allowed'], cached['result']
# 获取规则配置
rule = self.config.get('rules', {}).get(rule_name)
if not rule:
# 没有规则配置,默认允许
return True, {'allowed': True, 'rule': rule_name}
algorithm = rule.get('algorithm', 'token_bucket')
params = rule.get('params', {})
# 根据算法调用相应的Redis限制器
if algorithm == 'token_bucket':
allowed, result = self.redis_limiter.token_bucket_is_allowed(
f"{rule_name}:{identifier}",
params.get('capacity', 100),
params.get('refill_rate', 1.0),
params.get('cost', 1)
)
elif algorithm == 'fixed_window':
allowed, result = self.redis_limiter.fixed_window_is_allowed(
f"{rule_name}:{identifier}",
params.get('limit', 100),
params.get('window_seconds', 60),
params.get('cost', 1)
)
elif algorithm == 'sliding_window':
allowed, result = self.redis_limiter.sliding_window_is_allowed(
f"{rule_name}:{identifier}",
params.get('limit', 100),
params.get('window_seconds', 60),
params.get('cost', 1)
)
else:
# 未知算法,默认允许
return True, {'allowed': True, 'rule': rule_name}
# 添加规则信息到结果
result['rule'] = rule_name
result['algorithm'] = algorithm
# 缓存结果
self.local_cache[cache_key] = {
'allowed': allowed,
'result': result,
'timestamp': time.time()
}
return allowed, result
def get_all_quotas(self, identifier: str) -> Dict:
"""获取所有规则的配额信息"""
quotas = {}
for rule_name, rule in self.config.get('rules', {}).items():
algorithm = rule.get('algorithm', 'token_bucket')
params = rule.get('params', {})
quota = self.redis_limiter.get_quotas(
f"{rule_name}:{identifier}",
algorithm,
**params
)
if quota:
quotas[rule_name] = {
**quota,
'algorithm': algorithm,
'rule': rule_name
}
return quotas
def create_rate_limit_response(self, identifier: str, rule_name: str,
result: Dict) -> Tuple[Dict, Dict]:
"""创建速率限制响应"""
if result.get('allowed', True):
# 允许请求,添加速率限制头部
headers = {
'X-RateLimit-Limit': str(result.get('limit', 0)),
'X-RateLimit-Remaining': str(result.get('remaining', 0)),
'X-RateLimit-Reset': str(int(result.get('reset_time', time.time()))),
'X-RateLimit-Rule': rule_name
}
return {'allowed': True}, headers
else:
# 请求被限制
retry_after = result.get('retry_after', 60)
reset_time = result.get('reset_time', time.time() + retry_after)
# 响应体
response_body = {
'error': {
'code': 'RATE_LIMIT_EXCEEDED',
'message': 'Too many requests',
'details': {
'rule': rule_name,
'retry_after': retry_after,
'reset_time': reset_time,
'limit': result.get('limit', 0),
'remaining': result.get('remaining', 0)
}
}
}
# 响应头部
headers = {
'Retry-After': str(retry_after),
'X-RateLimit-Limit': str(result.get('limit', 0)),
'X-RateLimit-Remaining': str(result.get('remaining', 0)),
'X-RateLimit-Reset': str(int(reset_time)),
'X-RateLimit-Rule': rule_name
}
return response_body, headers
# 配置示例
RATE_LIMIT_CONFIG = {
'rules': {
'ip_global': {
'algorithm': 'token_bucket',
'params': {
'capacity': 1000,
'refill_rate': 10.0, # 10请求/秒
'cost': 1
},
'description': '全局IP限制'
},
'user_auth': {
'algorithm': 'fixed_window',
'params': {
'limit': 10,
'window_seconds': 60, # 10请求/分钟
'cost': 1
},
'description': '用户认证限制'
},
'api_endpoint': {
'algorithm': 'sliding_window',
'params': {
'limit': 60,
'window_seconds': 60, # 60请求/分钟
'cost': 1
},
'description': 'API端点限制'
},
'upload_endpoint': {
'algorithm': 'token_bucket',
'params': {
'capacity': 10,
'refill_rate': 0.1, # 每10秒1个请求
'cost': 1
},
'description': '上传端点限制'
}
},
'default_rule': 'ip_global'
}
22.3.2 智能速率限制中间件
python
# 智能速率限制中间件
from flask import Flask, request, jsonify, g
import time
from functools import wraps
import uuid
class IntelligentRateLimitMiddleware:
"""智能速率限制中间件"""
def __init__(self, app: Flask = None, config: Dict = None):
self.app = app
self.config = {
'enabled': True,
'default_limits': {
'ip': '100 per hour',
'user': '1000 per day',
'endpoint': '60 per minute'
},
'exempt_paths': ['/health', '/metrics'],
'exempt_methods': ['OPTIONS'],
'cost_calculator': self.default_cost_calculator,
'identifier_extractor': self.default_identifier_extractor,
'response_handler': self.default_response_handler,
'adaptive_enabled': True,
'anomaly_detection_enabled': True,
** (config or {})
}
# 初始化限制器
self.rate_limiter = DistributedRateLimiter(
redis_client=redis.Redis(),
config=RATE_LIMIT_CONFIG
)
# 异常检测器
self.anomaly_detector = RateLimitAnomalyDetector()
# 自适应调整器
self.adaptive_adjuster = AdaptiveRateAdjuster()
if app:
self.init_app(app)
def init_app(self, app: Flask):
"""初始化Flask应用"""
self.app = app
# 注册中间件
@app.before_request
def rate_limit_check():
self.check_rate_limit()
# 注册错误处理器
@app.errorhandler(429)
def handle_rate_limit(e):
return self.handle_rate_limit_error(e)
def check_rate_limit(self):
"""检查速率限制"""
# 检查是否启用
if not self.config['enabled']:
return
# 检查豁免路径
if request.path in self.config['exempt_paths']:
return
# 检查豁免方法
if request.method in self.config['exempt_methods']:
return
# 提取标识符
identifiers = self.extract_identifiers(request)
# 确定适用的规则
rules = self.determine_applicable_rules(request, identifiers)
# 检查每个规则
for rule_name, identifier in rules:
allowed, result = self.rate_limiter.is_allowed(identifier, rule_name)
if not allowed:
# 检测异常行为
if self.config['anomaly_detection_enabled']:
self.anomaly_detector.record_violation(
identifier, rule_name, request
)
# 触发速率限制
self.trigger_rate_limit(identifier, rule_name, result)
break
# 记录请求(用于自适应调整)
if self.config['adaptive_enabled']:
self.adaptive_adjuster.record_request(request, identifiers)
def extract_identifiers(self, request) -> Dict[str, str]:
"""提取标识符"""
identifiers = {}
# 提取IP
if request.headers.get('X-Forwarded-For'):
ip = request.headers['X-Forwarded-For'].split(',')[0].strip()
else:
ip = request.remote_addr
identifiers['ip'] = ip
# 提取用户ID(如果已认证)
if hasattr(g, 'user') and g.user:
identifiers['user'] = g.user.id
elif request.headers.get('X-User-ID'):
identifiers['user'] = request.headers['X-User-ID']
# 提取API密钥
if request.headers.get('X-API-Key'):
identifiers['api_key'] = request.headers['X-API-Key']
# 提取会话ID
if request.cookies.get('session_id'):
identifiers['session'] = request.cookies['session_id']
return identifiers
def determine_applicable_rules(self, request, identifiers: Dict) -> List[Tuple[str, str]]:
"""确定适用的规则"""
rules = []
# 基础规则:IP限制
if 'ip' in identifiers:
rules.append(('ip_global', identifiers['ip']))
# 用户规则
if 'user' in identifiers:
# 用户特定限制
rules.append(('user_auth', identifiers['user']))
# 端点特定用户限制
endpoint_rule = f"user_endpoint:{request.path}"
rules.append((endpoint_rule, identifiers['user']))
# API密钥规则
if 'api_key' in identifiers:
rules.append(('api_key', identifiers['api_key']))
# 端点规则
endpoint_key = f"endpoint:{request.method}:{request.path}"
rules.append(('api_endpoint', endpoint_key))
# 特殊端点规则
if request.path.startswith('/api/upload'):
rules.append(('upload_endpoint', identifiers.get('ip', 'unknown')))
return rules
def default_cost_calculator(self, request) -> int:
"""默认成本计算器"""
# 根据请求大小、复杂度等计算成本
cost = 1
# 请求体大小成本
content_length = request.content_length or 0
if content_length > 1024 * 1024: # 1MB
cost += 1
elif content_length > 10 * 1024 * 1024: # 10MB
cost += 5
# 复杂操作成本
if request.method in ['POST', 'PUT', 'DELETE']:
cost += 1
# 敏感端点成本
if request.path.startswith('/api/admin/'):
cost += 2
return cost
def default_identifier_extractor(self, request) -> str:
"""默认标识符提取器"""
# 使用IP作为默认标识符
if request.headers.get('X-Forwarded-For'):
return request.headers['X-Forwarded-For'].split(',')[0].strip()
return request.remote_addr
def default_response_handler(self, rule_name: str, result: Dict) -> Tuple[Dict, int, Dict]:
"""默认响应处理器"""
retry_after = result.get('retry_after', 60)
response_body = {
'error': {
'code': 'RATE_LIMIT_EXCEEDED',
'message': 'Too many requests. Please try again later.',
'details': {
'rule': rule_name,
'retry_after': retry_after,
'reset_time': result.get('reset_time'),
'limit': result.get('limit'),
'remaining': result.get('remaining')
}
}
}
headers = {
'Retry-After': str(retry_after),
'X-RateLimit-Limit': str(result.get('limit', 0)),
'X-RateLimit-Remaining': str(result.get('remaining', 0)),
'X-RateLimit-Reset': str(int(result.get('reset_time', time.time()))),
'X-RateLimit-Rule': rule_name
}
return response_body, 429, headers
def trigger_rate_limit(self, identifier: str, rule_name: str, result: Dict):
"""触发速率限制"""
# 记录限制事件
self.log_rate_limit_event(identifier, rule_name, result)
# 生成响应
response_body, status_code, headers = self.config['response_handler'](
rule_name, result
)
# 抛出异常,由错误处理器处理
from werkzeug.exceptions import TooManyRequests
raise TooManyRequests(response=jsonify(response_body), headers=headers)
def log_rate_limit_event(self, identifier: str, rule_name: str, result: Dict):
"""记录速率限制事件"""
event = {
'timestamp': time.time(),
'identifier': identifier,
'rule': rule_name,
'remaining': result.get('remaining', 0),
'limit': result.get('limit', 0),
'retry_after': result.get('retry_after', 0),
'request_path': request.path,
'request_method': request.method,
'user_agent': request.headers.get('User-Agent'),
'client_ip': request.remote_addr
}
# 在实际应用中,这里会记录到日志系统
print(f"[Rate Limit] {json.dumps(event)}")
# 写入文件日志
with open('rate_limit_events.log', 'a') as f:
f.write(json.dumps(event) + '\\n')
def handle_rate_limit_error(self, error):
"""处理速率限制错误"""
# 从错误中提取响应和头部
response = error.response
return response
def get_rate_limit_info(self, identifier: str) -> Dict:
"""获取速率限制信息"""
return self.rate_limiter.get_all_quotas(identifier)
class RateLimitAnomalyDetector:
"""速率限制异常检测器"""
def __init__(self):
self.violation_history = {}
self.anomaly_patterns = {}
def record_violation(self, identifier: str, rule_name: str, request):
"""记录违规"""
key = f"{identifier}:{rule_name}"
if key not in self.violation_history:
self.violation_history[key] = {
'count': 0,
'first_violation': time.time(),
'last_violation': time.time(),
'requests': []
}
record = self.violation_history[key]
record['count'] += 1
record['last_violation'] = time.time()
# 记录请求详情
request_info = {
'timestamp': time.time(),
'path': request.path,
'method': request.method,
'user_agent': request.headers.get('User-Agent'),
'client_ip': request.remote_addr
}
record['requests'].append(request_info)
# 检测异常模式
self.detect_anomaly(key, record)
def detect_anomaly(self, key: str, record: Dict):
"""检测异常模式"""
# 检测频繁违规
time_window = 300 # 5分钟
violations_in_window = sum(
1 for req in record['requests']
if time.time() – req['timestamp'] < time_window
)
if violations_in_window > 10:
# 频繁违规,可能是恶意攻击
self.trigger_anomaly_alert(key, 'frequent_violations', {
'violations_in_window': violations_in_window,
'time_window': time_window
})
# 检测分布式攻击模式
if self.is_distributed_attack_pattern(record):
self.trigger_anomaly_alert(key, 'distributed_attack', {
'request_patterns': len(set(r['client_ip'] for r in record['requests']))
})
def is_distributed_attack_pattern(self, record: Dict) -> bool:
"""检测分布式攻击模式"""
if len(record['requests']) < 20:
return False
# 检查是否来自多个IP地址
unique_ips = set(r['client_ip'] for r in record['requests'])
if len(unique_ips) > 5:
return True
# 检查是否有规律的请求模式
timestamps = [r['timestamp'] for r in record['requests']]
if len(timestamps) >= 10:
intervals = [timestamps[i+1] – timestamps[i] for i in range(len(timestamps)-1)]
avg_interval = sum(intervals) / len(intervals)
# 检查间隔是否过于规律
if avg_interval > 0:
deviation = sum(abs(i – avg_interval) for i in intervals) / len(intervals)
if deviation / avg_interval < 0.1: # 偏差小于10%
return True
return False
def trigger_anomaly_alert(self, key: str, anomaly_type: str, details: Dict):
"""触发异常警报"""
alert = {
'timestamp': time.time(),
'key': key,
'type': anomaly_type,
'details': details,
'severity': 'high'
}
print(f"[Rate Limit Anomaly] {json.dumps(alert)}")
with open('rate_limit_anomalies.log', 'a') as f:
f.write(json.dumps(alert) + '\\n')
class AdaptiveRateAdjuster:
"""自适应速率调整器"""
def __init__(self):
self.request_history = deque(maxlen=1000)
self.adjustment_history = []
def record_request(self, request, identifiers: Dict):
"""记录请求"""
request_record = {
'timestamp': time.time(),
'path': request.path,
'method': request.method,
'identifiers': identifiers,
'response_time': None, # 将在响应时填充
'status_code': None
}
self.request_history.append(request_record)
# 定期分析并调整
if len(self.request_history) % 100 == 0:
self.analyze_and_adjust()
def analyze_and_adjust(self):
"""分析并调整速率限制"""
# 分析请求模式
recent_requests = list(self.request_history)[-100:] # 最近100个请求
if not recent_requests:
return
# 计算请求速率
time_window = recent_requests[-1]['timestamp'] – recent_requests[0]['timestamp']
request_rate = len(recent_requests) / time_window if time_window > 0 else 0
# 计算错误率
error_requests = [r for r in recent_requests if r.get('status_code', 200) >= 400]
error_rate = len(error_requests) / len(recent_requests) if recent_requests else 0
# 根据分析结果调整
if error_rate > 0.1: # 错误率超过10%
# 降低限制,可能是服务器压力大
self.adjust_limits('decrease', {
'reason': 'high_error_rate',
'error_rate': error_rate,
'request_rate': request_rate
})
elif request_rate > 100 and error_rate < 0.01: # 高请求率,低错误率
# 可以考虑增加限制
self.adjust_limits('increase', {
'reason': 'high_throughput_low_errors',
'request_rate': request_rate,
'error_rate': error_rate
})
def adjust_limits(self, direction: str, context: Dict):
"""调整限制"""
adjustment = {
'timestamp': time.time(),
'direction': direction,
'context': context
}
self.adjustment_history.append(adjustment)
print(f"[Rate Limit Adjustment] {json.dumps(adjustment)}")
22.4 客户端处理策略
22.4.1 智能重试与退避策略
javascript
// 客户端速率限制处理策略
class RateLimitAwareClient {
constructor(config = {}) {
this.config = {
baseDelay: 1000,
maxDelay: 60000,
backoffFactor: 2,
jitter: 0.1,
maxRetries: 5,
respectRetryAfter: true,
quotaMonitoring: true,
adaptiveBackoff: true,
…config
};
this.rateLimitInfo = new Map();
this.retryQueue = [];
this.quotaHistory = [];
this.circuitBreakers = new Map();
// 初始化监控
this.initQuotaMonitoring();
}
async request(endpoint, options = {}) {
const requestId = this.generateRequestId();
const startTime = Date.now();
// 检查速率限制状态
const rateLimitCheck = this.checkRateLimitState(endpoint, options);
if (!rateLimitCheck.allowed) {
// 速率限制已触发,等待或拒绝
if (options.ignoreRateLimit) {
return this.executeRequest(requestId, endpoint, options);
} else {
return this.handleRateLimitedRequest(requestId, endpoint, options, rateLimitCheck);
}
}
// 执行请求
try {
const response = await this.executeRequest(requestId, endpoint, options);
// 更新速率限制信息
this.updateRateLimitInfo(endpoint, response);
return response;
} catch (error) {
// 处理错误
if (error.name === 'RateLimitError') {
return this.handleRateLimitError(requestId, endpoint, options, error);
}
throw error;
}
}
checkRateLimitState(endpoint, options) {
const endpointKey = this.getEndpointKey(endpoint, options);
const info = this.rateLimitInfo.get(endpointKey);
if (!info) {
return { allowed: true };
}
// 检查剩余配额
if (info.remaining <= 0) {
const now = Date.now();
const resetTime = info.resetTime * 1000; // 转换为毫秒
if (now < resetTime) {
// 仍在限制期内
const retryAfter = Math.ceil((resetTime – now) / 1000);
return {
allowed: false,
reason: 'quota_exhausted',
retryAfter,
resetTime
};
} else {
// 限制已过期
return { allowed: true };
}
}
// 检查电路断路器
const circuitKey = this.getCircuitKey(endpoint, options);
if (!this.isCircuitClosed(circuitKey)) {
return {
allowed: false,
reason: 'circuit_open',
circuitKey
};
}
return { allowed: true };
}
async executeRequest(requestId, endpoint, options) {
const controller = new AbortController();
const timeoutId = setTimeout(() => controller.abort(), options.timeout || 30000);
try {
const response = await fetch(endpoint, {
…options,
signal: controller.signal,
headers: {
…options.headers,
'X-Request-ID': requestId,
'X-Client-Version': '1.0.0'
}
});
clearTimeout(timeoutId);
// 检查速率限制头部
this.extractRateLimitHeaders(endpoint, response);
// 检查状态码
if (response.status === 429) {
throw new RateLimitError('Rate limit exceeded', response);
}
return response;
} catch (error) {
clearTimeout(timeoutId);
throw error;
}
}
extractRateLimitHeaders(endpoint, response) {
const endpointKey = this.getEndpointKey(endpoint);
const limit = response.headers.get('X-RateLimit-Limit');
const remaining = response.headers.get('X-RateLimit-Remaining');
const reset = response.headers.get('X-RateLimit-Reset');
const retryAfter = response.headers.get('Retry-After');
const rule = response.headers.get('X-RateLimit-Rule');
if (limit !== null && remaining !== null && reset !== null) {
this.rateLimitInfo.set(endpointKey, {
limit: parseInt(limit, 10),
remaining: parseInt(remaining, 10),
resetTime: parseInt(reset, 10),
rule: rule,
lastUpdated: Date.now()
});
// 记录配额历史
this.recordQuotaHistory(endpointKey, {
limit: parseInt(limit, 10),
remaining: parseInt(remaining, 10),
timestamp: Date.now()
});
}
// 处理Retry-After头部
if (retryAfter !== null && this.config.respectRetryAfter) {
const retryAfterSeconds = parseInt(retryAfter, 10);
this.scheduleRetryDelay(endpointKey, retryAfterSeconds * 1000);
}
}
async handleRateLimitedRequest(requestId, endpoint, options, rateLimitCheck) {
const endpointKey = this.getEndpointKey(endpoint, options);
// 根据原因采取不同策略
switch (rateLimitCheck.reason) {
case 'quota_exhausted':
return this.handleQuotaExhausted(requestId, endpoint, options, rateLimitCheck);
case 'circuit_open':
return this.handleCircuitOpen(requestId, endpoint, options, rateLimitCheck);
default:
throw new RateLimitError(`Rate limited: ${rateLimitCheck.reason}`);
}
}
async handleQuotaExhausted(requestId, endpoint, options, rateLimitCheck) {
const { retryAfter, resetTime } = rateLimitCheck;
// 计算等待时间
let waitTime = retryAfter * 1000; // 转换为毫秒
if (this.config.adaptiveBackoff) {
// 自适应调整等待时间
waitTime = this.calculateAdaptiveWaitTime(endpoint, waitTime);
}
// 添加抖动
waitTime = this.addJitter(waitTime);
// 记录等待
this.logRateLimitWait(endpoint, waitTime, 'quota_exhausted');
// 等待后重试
await this.sleep(waitTime);
// 重试请求
return this.request(endpoint, {
…options,
retryCount: (options.retryCount || 0) + 1
});
}
async handleRateLimitError(requestId, endpoint, options, error) {
const retryCount = options.retryCount || 0;
if (retryCount >= this.config.maxRetries) {
throw new MaxRetriesError('Maximum retries exceeded', error);
}
// 从响应中提取Retry-After
let retryAfter = 60; // 默认60秒
if (error.response) {
const retryAfterHeader = error.response.headers.get('Retry-After');
if (retryAfterHeader) {
retryAfter = parseInt(retryAfterHeader, 10);
}
}
// 计算退避延迟
const delay = this.calculateBackoffDelay(retryCount, retryAfter);
// 记录重试
this.logRetry(endpoint, retryCount, delay, 'rate_limit');
// 等待后重试
await this.sleep(delay);
return this.request(endpoint, {
…options,
retryCount: retryCount + 1
});
}
calculateBackoffDelay(retryCount, retryAfter = 0) {
// 指数退避
let delay = this.config.baseDelay * Math.pow(this.config.backoffFactor, retryCount);
// 考虑Retry-After头部
if (retryAfter > 0) {
delay = Math.max(delay, retryAfter * 1000);
}
// 限制最大延迟
delay = Math.min(delay, this.config.maxDelay);
// 添加抖动
delay = this.addJitter(delay);
return delay;
}
calculateAdaptiveWaitTime(endpoint, baseWaitTime) {
const endpointKey = this.getEndpointKey(endpoint);
const history = this.getQuotaHistory(endpointKey);
if (history.length < 5) {
return baseWaitTime;
}
// 分析历史配额使用模式
const recentUsage = history.slice(-10);
const avgUsageRate = this.calculateUsageRate(recentUsage);
// 根据使用率调整等待时间
if (avgUsageRate > 0.8) {
// 高使用率,增加等待时间
return baseWaitTime * 1.5;
} else if (avgUsageRate < 0.3) {
// 低使用率,减少等待时间
return baseWaitTime * 0.7;
}
return baseWaitTime;
}
calculateUsageRate(quotaHistory) {
if (quotaHistory.length < 2) {
return 0;
}
let totalUsage = 0;
let totalTime = 0;
for (let i = 1; i < quotaHistory.length; i++) {
const prev = quotaHistory[i – 1];
const curr = quotaHistory[i];
const usage = prev.remaining – curr.remaining;
const timeDiff = curr.timestamp – prev.timestamp;
if (timeDiff > 0 && prev.limit > 0) {
totalUsage += usage;
totalTime += timeDiff;
}
}
if (totalTime === 0) {
return 0;
}
const avgUsagePerMs = totalUsage / totalTime;
const limitPerMs = quotaHistory[0].limit / (24 * 60 * 60 * 1000); // 每天的限制
return avgUsagePerMs / limitPerMs;
}
addJitter(delay) {
const jitter = this.config.jitter;
const jitterAmount = delay * jitter;
const randomJitter = (Math.random() * 2 – 1) * jitterAmount;
return Math.max(0, delay + randomJitter);
}
// 电路断路器
isCircuitClosed(circuitKey) {
if (!this.circuitBreakers.has(circuitKey)) {
return true;
}
const circuit = this.circuitBreakers.get(circuitKey);
if (circuit.state === 'open') {
// 检查是否应该进入半开状态
if (Date.now() – circuit.openedAt > 60000) { // 60秒后
circuit.state = 'half_open';
circuit.halfOpenAttempts = 0;
return true;
}
return false;
}
if (circuit.state === 'half_open') {
if (circuit.halfOpenAttempts >= 3) {
return false;
}
circuit.halfOpenAttempts++;
return true;
}
return true; // closed
}
updateCircuitBreaker(circuitKey, success) {
if (!this.circuitBreakers.has(circuitKey)) {
this.circuitBreakers.set(circuitKey, {
state: 'closed',
failureCount: 0,
successCount: 0
});
}
const circuit = this.circuitBreakers.get(circuitKey);
if (success) {
circuit.successCount++;
circuit.failureCount = Math.max(0, circuit.failureCount – 1);
if (circuit.state === 'half_open' && circuit.successCount >= 3) {
circuit.state = 'closed';
circuit.failureCount = 0;
circuit.successCount = 0;
}
} else {
circuit.failureCount++;
if (circuit.failureCount >= 5 && circuit.state === 'closed') {
circuit.state = 'open';
circuit.openedAt = Date.now();
}
}
}
// 配额监控
initQuotaMonitoring() {
if (this.config.quotaMonitoring) {
// 定期刷新配额信息
setInterval(() => this.refreshQuotaInfo(), 30000);
// 监控配额使用率
setInterval(() => this.monitorQuotaUsage(), 60000);
}
}
refreshQuotaInfo() {
// 清理过期的配额信息
const now = Date.now();
for (const [key, info] of this.rateLimitInfo.entries()) {
const resetTime = info.resetTime * 1000;
if (now > resetTime + 60000) { // 重置时间后1分钟
this.rateLimitInfo.delete(key);
}
}
}
monitorQuotaUsage() {
const warnings = [];
for (const [endpointKey, info] of this.rateLimitInfo.entries()) {
const usageRate = info.remaining / info.limit;
if (usageRate < 0.1) { // 剩余不足10%
warnings.push({
endpointKey,
remaining: info.remaining,
limit: info.limit,
resetTime: new Date(info.resetTime * 1000).toISOString()
});
}
}
if (warnings.length > 0) {
this.emitQuotaWarning(warnings);
}
}
recordQuotaHistory(endpointKey, quota) {
this.quotaHistory.push({
endpointKey,
…quota
});
// 保持历史记录大小
if (this.quotaHistory.length > 1000) {
this.quotaHistory = this.quotaHistory.slice(-500);
}
}
getQuotaHistory(endpointKey, limit = 50) {
return this.quotaHistory
.filter(q => q.endpointKey === endpointKey)
.slice(-limit);
}
// 工具方法
generateRequestId() {
return `req_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
}
getEndpointKey(endpoint, options = {}) {
const url = new URL(endpoint, window.location.origin);
const method = options.method || 'GET';
return `${method}:${url.pathname}`;
}
getCircuitKey(endpoint, options = {}) {
const endpointKey = this.getEndpointKey(endpoint, options);
return `circuit:${endpointKey}`;
}
sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
logRateLimitWait(endpoint, waitTime, reason) {
console.log(`Rate limit wait for ${endpoint}: ${waitTime}ms (${reason})`);
}
logRetry(endpoint, retryCount, delay, reason) {
console.log(`Retry ${retryCount + 1} for ${endpoint} in ${delay}ms (${reason})`);
}
emitQuotaWarning(warnings) {
// 在实际应用中,这里会触发事件或显示通知
console.warn('Quota warnings:', warnings);
}
getRateLimitInfo() {
return Array.from(this.rateLimitInfo.entries()).map(([key, info]) => ({
endpoint: key,
…info,
lastUpdated: new Date(info.lastUpdated).toISOString()
}));
}
}
class RateLimitError extends Error {
constructor(message, response) {
super(message);
this.name = 'RateLimitError';
this.response = response;
}
}
class MaxRetriesError extends Error {
constructor(message, originalError) {
super(message);
this.name = 'MaxRetriesError';
this.originalError = originalError;
}
}
// 使用示例
const client = new RateLimitAwareClient({
baseDelay: 1000,
maxRetries: 3,
adaptiveBackoff: true,
quotaMonitoring: true
});
// 发起请求
async function fetchData() {
try {
const response = await client.request('/api/data', {
headers: {
'Authorization': `Bearer ${token}`
}
});
return await response.json();
} catch (error) {
if (error.name === 'RateLimitError') {
// 显示用户友好的错误消息
showRateLimitError(error);
return null;
}
throw error;
}
}
// 显示速率限制错误的UI组件
function showRateLimitError(error) {
const container = document.getElementById('error-container');
let retryAfter = 60;
if (error.response) {
const retryAfterHeader = error.response.headers.get('Retry-After');
if (retryAfterHeader) {
retryAfter = parseInt(retryAfterHeader, 10);
}
}
const html = `
<div class="rate-limit-error">
<h3>⚠️ 请求过于频繁</h3>
<p>您发送了太多请求,请稍后再试。</p>
<div class="retry-info">
<p>建议在 <strong>${retryAfter}</strong> 秒后重试</p>
<div class="countdown" data-seconds="${retryAfter}">
倒计时: <span class="seconds">${retryAfter}</span> 秒
</div>
</div>
<div class="actions">
<button οnclick="retryNow()" class="retry-btn">
立即重试
</button>
<button οnclick="dismissError()" class="dismiss-btn">
取消
</button>
</div>
<div class="tips">
<h4>如何避免此问题:</h4>
<ul>
<li>避免频繁刷新页面</li>
<li>使用分页加载更多数据</li>
<li>合理使用缓存</li>
</ul>
</div>
</div>
`;
container.innerHTML = html;
// 启动倒计时
startCountdown(retryAfter);
}
22.4.2 配额使用可视化仪表板
javascript
// 速率限制配额可视化仪表板
import React, { useState, useEffect, useCallback } from 'react';
import {
LineChart, Line, BarChart, Bar, PieChart, Pie, Cell,
XAxis, YAxis, CartesianGrid, Tooltip, Legend, ResponsiveContainer,
AreaChart, Area, RadarChart, Radar, PolarGrid, PolarAngleAxis, PolarRadiusAxis
} from 'recharts';
import { AlertTriangle, Clock, Zap, Battery, RefreshCw, Shield } from 'lucide-react';
function RateLimitDashboard() {
const [timeRange, setTimeRange] = useState('1h');
const [quotaData, setQuotaData] = useState([]);
const [violationHistory, setViolationHistory] = useState([]);
const [endpointStats, setEndpointStats] = useState({});
const [realTimeUpdates, setRealTimeUpdates] = useState([]);
// 获取配额数据
useEffect(() => {
const fetchQuotaData = async () => {
try {
const response = await fetch(`/api/rate-limit/quotas?range=${timeRange}`);
const data = await response.json();
setQuotaData(data.quotas || []);
setViolationHistory(data.violations || []);
setEndpointStats(data.endpointStats || {});
} catch (error) {
console.error('Failed to fetch quota data:', error);
}
};
fetchQuotaData();
const interval = setInterval(fetchQuotaData, 30000); // 每30秒更新
return () => clearInterval(interval);
}, [timeRange]);
// WebSocket连接接收实时更新
useEffect(() => {
const ws = new WebSocket('wss://api.example.com/rate-limit/updates');
ws.onmessage = (event) => {
const update = JSON.parse(event.data);
setRealTimeUpdates(prev => [update, …prev.slice(0, 49)]);
};
return () => ws.close();
}, []);
// 处理重置配额
const handleResetQuota = useCallback(async (endpoint, rule) => {
try {
await fetch('/api/rate-limit/reset', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ endpoint, rule })
});
alert('配额已重置');
} catch (error) {
alert('重置失败: ' + error.message);
}
}, []);
// 处理调整限制
const handleAdjustLimit = useCallback(async (endpoint, rule, newLimit) => {
try {
await fetch('/api/rate-limit/adjust', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ endpoint, rule, newLimit })
});
alert('限制已调整');
} catch (error) {
alert('调整失败: ' + error.message);
}
}, []);
// 生成图表数据
const generateUsageChartData = () => {
if (!quotaData.length) return [];
// 按时间分组
const timeGroups = {};
quotaData.forEach(quota => {
const date = new Date(quota.timestamp);
const hour = `${date.getHours()}:00`;
if (!timeGroups[hour]) {
timeGroups[hour] = {
time: hour,
used: 0,
total: 0,
violations: 0
};
}
timeGroups[hour].used += quota.used || 0;
timeGroups[hour].total += quota.limit || 0;
if (quota.remaining === 0) {
timeGroups[hour].violations++;
}
});
return Object.values(timeGroups).sort((a, b) => a.time.localeCompare(b.time));
};
const generateEndpointDistributionData = () => {
const endpointData = [];
Object.entries(endpointStats).forEach(([endpoint, stats]) => {
endpointData.push({
name: endpoint.length > 20 ? endpoint.substring(0, 20) + '…' : endpoint,
violations: stats.violations || 0,
requests: stats.requests || 0,
usageRate: stats.usageRate || 0
});
});
return endpointData.sort((a, b) => b.violations – a.violations).slice(0, 10);
};
const generateRuleDistributionData = () => {
const ruleData = [];
const ruleMap = {};
quotaData.forEach(quota => {
const rule = quota.rule || 'default';
if (!ruleMap[rule]) {
ruleMap[rule] = {
name: rule,
violations: 0,
total: 0
};
}
ruleMap[rule].violations += quota.remaining === 0 ? 1 : 0;
ruleMap[rule].total++;
});
Object.values(ruleMap).forEach(rule => {
ruleData.push({
name: rule.name,
value: rule.violations,
percentage: rule.total > 0 ? (rule.violations / rule.total * 100).toFixed(1) : 0
});
});
return ruleData.sort((a, b) => b.value – a.value).slice(0, 8);
};
// 颜色配置
const COLORS = ['#0088FE', '#00C49F', '#FFBB28', '#FF8042', '#8884D8', '#82CA9D', '#FF6B6B', '#FFD166'];
return (
<div className="dashboard rate-limit-dashboard">
{/* 仪表板头部 */}
<div className="dashboard-header">
<div className="header-left">
<Shield size={32} className="header-icon" />
<h1>速率限制监控</h1>
<span className="time-range-label">
{timeRange === '1h' ? '1小时' :
timeRange === '24h' ? '24小时' :
timeRange === '7d' ? '7天' : '30天'}
</span>
</div>
<div className="header-right">
<div className="time-range-selector">
{['1h', '24h', '7d', '30d'].map(range => (
<button
key={range}
className={`time-range-btn ${timeRange === range ? 'active' : ''}`}
onClick={() => setTimeRange(range)}
>
{range}
</button>
))}
</div>
<button className="refresh-btn" onClick={() => window.location.reload()}>
<RefreshCw size={16} /> 刷新
</button>
</div>
</div>
{/* 关键指标卡片 */}
<div className="key-metrics-cards">
<div className="metric-card total-requests">
<div className="metric-icon">
<Zap size={24} />
</div>
<div className="metric-content">
<h3>总请求数</h3>
<p className="metric-value">
{endpointStats.totalRequests?.toLocaleString() || '0'}
</p>
<p className="metric-trend">
成功率: {endpointStats.successRate ? (endpointStats.successRate * 100).toFixed(1) + '%' : 'N/A'}
</p>
</div>
</div>
<div className="metric-card violations">
<div className="metric-icon">
<AlertTriangle size={24} />
</div>
<div className="metric-content">
<h3>违规次数</h3>
<p className="metric-value">
{violationHistory.length.toLocaleString()}
</p>
<p className="metric-trend">
违规率: {quotaData.length > 0 ?
((violationHistory.length / quotaData.length) * 100).toFixed(1) + '%' : '0%'}
</p>
</div>
</div>
<div className="metric-card quota-usage">
<div className="metric-icon">
<Battery size={24} />
</div>
<div className="metric-content">
<h3>配额使用率</h3>
<p className="metric-value">
{endpointStats.avgUsageRate ?
(endpointStats.avgUsageRate * 100).toFixed(1) + '%' : '0%'}
</p>
<p className="metric-trend">
峰值: {endpointStats.peakUsageRate ?
(endpointStats.peakUsageRate * 100).toFixed(1) + '%' : 'N/A'}
</p>
</div>
</div>
<div className="metric-card avg-wait-time">
<div className="metric-icon">
<Clock size={24} />
</div>
<div className="metric-content">
<h3>平均等待时间</h3>
<p className="metric-value">
{endpointStats.avgWaitTime ?
endpointStats.avgWaitTime.toFixed(1) + 's' : '0s'}
</p>
<p className="metric-trend">
最长: {endpointStats.maxWaitTime ?
endpointStats.maxWaitTime.toFixed(1) + 's' : 'N/A'}
</p>
</div>
</div>
</div>
{/* 图表区域 */}
<div className="charts-section">
<div className="chart-row">
<div className="chart-container large">
<h3>配额使用趋势</h3>
<ResponsiveContainer width="100%" height={300}>
<AreaChart data={generateUsageChartData()}>
<CartesianGrid strokeDasharray="3 3" />
<XAxis dataKey="time" />
<YAxis />
<Tooltip />
<Legend />
<Area
type="monotone"
dataKey="used"
stackId="1"
stroke="#8884d8"
fill="#8884d8"
fillOpacity={0.3}
name="已使用"
/>
<Area
type="monotone"
dataKey="total"
stackId="1"
stroke="#82ca9d"
fill="#82ca9d"
fillOpacity={0.3}
name="总配额"
/>
<Line
type="monotone"
dataKey="violations"
stroke="#ff6b6b"
strokeWidth={2}
dot={{ r: 4 }}
name="违规次数"
/>
</AreaChart>
</ResponsiveContainer>
</div>
<div className="chart-container">
<h3>规则违规分布</h3>
<ResponsiveContainer width="100%" height={300}>
<PieChart>
<Pie
data={generateRuleDistributionData()}
cx="50%"
cy="50%"
labelLine={false}
label={({ name, percentage }) => `${name}: ${percentage}%`}
outerRadius={80}
fill="#8884d8"
dataKey="value"
>
{generateRuleDistributionData().map((entry, index) => (
<Cell key={`cell-${index}`} fill={COLORS[index % COLORS.length]} />
))}
</Pie>
<Tooltip formatter={(value, name, props) => [
`${value}次 (${props.payload.percentage}%)`,
'违规次数'
]} />
</PieChart>
</ResponsiveContainer>
</div>
</div>
<div className="chart-row">
<div className="chart-container large">
<h3>端点违规排名</h3>
<ResponsiveContainer width="100%" height={300}>
<BarChart data={generateEndpointDistributionData()}>
<CartesianGrid strokeDasharray="3 3" />
<XAxis dataKey="name" angle={-45} textAnchor="end" height={80} />
<YAxis />
<Tooltip />
<Legend />
<Bar dataKey="violations" fill="#ff6b6b" name="违规次数" />
<Bar dataKey="requests" fill="#8884d8" name="总请求数" />
</BarChart>
</ResponsiveContainer>
</div>
<div className="chart-container">
<h3>配额使用率雷达图</h3>
<ResponsiveContainer width="100%" height={300}>
<RadarChart data={generateRadarData()}>
<PolarGrid />
<PolarAngleAxis dataKey="subject" />
<PolarRadiusAxis angle={30} domain={[0, 100]} />
<Radar
name="使用率"
dataKey="usage"
stroke="#8884d8"
fill="#8884d8"
fillOpacity={0.3}
/>
<Radar
name="目标"
dataKey="target"
stroke="#82ca9d"
fill="#82ca9d"
fillOpacity={0.3}
/>
<Legend />
<Tooltip />
</RadarChart>
</ResponsiveContainer>
</div>
</div>
</div>
{/* 实时违规事件 */}
<div className="realtime-events-section">
<h3>实时违规事件</h3>
<div className="events-table-container">
<table className="events-table">
<thead>
<tr>
<th>时间</th>
<th>端点</th>
<th>规则</th>
<th>客户端IP</th>
<th>剩余配额</th>
<th>重试等待</th>
<th>操作</th>
</tr>
</thead>
<tbody>
{realTimeUpdates.map((event, index) => (
<tr key={index} className="event-row">
<td>
{new Date(event.timestamp).toLocaleTimeString()}
</td>
<td className="endpoint-cell">
<code title={event.endpoint}>
{event.endpoint?.length > 30 ?
event.endpoint.substring(0, 30) + '…' :
event.endpoint}
</code>
</td>
<td>
<span className="rule-badge">
{event.rule || 'default'}
</span>
</td>
<td>
<code>{event.client_ip}</code>
</td>
<td>
<span className={`quota-badge ${event.remaining === 0 ? 'exhausted' : 'available'}`}>
{event.remaining}/{event.limit}
</span>
</td>
<td>
<span className="wait-time">
{event.retry_after || 0}s
</span>
</td>
<td>
<button
className="action-btn view-details"
onClick={() => viewEventDetails(event)}
>
详情
</button>
<button
className="action-btn reset-quota"
onClick={() => handleResetQuota(event.endpoint, event.rule)}
>
重置配额
</button>
</td>
</tr>
))}
</tbody>
</table>
</div>
</div>
{/* 配额管理 */}
<div className="quota-management-section">
<h3>配额管理</h3>
<div className="quota-list">
{quotaData.slice(0, 10).map((quota, index) => (
<div key={index} className="quota-item">
<div className="quota-header">
<span className="quota-endpoint">{quota.endpoint}</span>
<span className="quota-rule">{quota.rule}</span>
</div>
<div className="quota-progress">
<div className="progress-bar">
<div
className="progress-fill"
style={{ width: `${(quota.used / quota.limit) * 100}%` }}
></div>
</div>
<div className="quota-numbers">
<span>{quota.used} / {quota.limit}</span>
<span>{((quota.used / quota.limit) * 100).toFixed(1)}%</span>
</div>
</div>
<div className="quota-actions">
<button
className="btn btn-sm btn-primary"
onClick={() => handleAdjustLimit(quota.endpoint, quota.rule, quota.limit * 1.1)}
>
增加10%
</button>
<button
className="btn btn-sm btn-secondary"
onClick={() => handleAdjustLimit(quota.endpoint, quota.rule, quota.limit * 0.9)}
>
减少10%
</button>
<button
className="btn btn-sm btn-tertiary"
onClick={() => handleResetQuota(quota.endpoint, quota.rule)}
>
立即重置
</button>
</div>
</div>
))}
</div>
</div>
{/* 配置建议 */}
<div className="recommendations-section">
<h3>配置建议</h3>
<div className="recommendations-list">
{generateRecommendations().map((rec, index) => (
<div key={index} className="recommendation-card">
<div className="recommendation-icon">💡</div>
<div className="recommendation-content">
<h4>{rec.title}</h4>
<p>{rec.description}</p>
<div className="recommendation-actions">
<button
className="btn btn-sm btn-primary"
onClick={() => applyRecommendation(rec)}
>
应用建议
</button>
<button
className="btn btn-sm btn-secondary"
onClick={() => dismissRecommendation(rec.id)}
>
忽略
</button>
</div>
</div>
</div>
))}
</div>
</div>
</div>
);
// 辅助函数
function generateRadarData() {
return [
{ subject: '认证端点', usage: 85, target: 70 },
{ subject: '数据查询', usage: 60, target: 80 },
{ subject: '文件上传', usage: 95, target: 50 },
{ subject: '管理接口', usage: 40, target: 30 },
{ subject: '公共API', usage: 75, target: 90 },
{ subject: 'Webhook', usage: 20, target: 40 }
];
}
function generateRecommendations() {
const recommendations = [];
// 基于数据分析生成建议
const highViolationEndpoints = generateEndpointDistributionData()
.filter(e => e.violations > 10);
if (highViolationEndpoints.length > 0) {
highViolationEndpoints.forEach(endpoint => {
recommendations.push({
id: `increase-${endpoint.name}`,
title: `增加 ${endpoint.name} 的配额限制`,
description: `该端点有 ${endpoint.violations} 次违规,建议增加配额限制以提高可用性。`,
type: 'increase_limit',
endpoint: endpoint.name,
currentLimit: endpoint.requests,
suggestedLimit: Math.ceil(endpoint.requests * 1.5)
});
});
}
// 检查使用率低的端点
const lowUsageEndpoints = generateEndpointDistributionData()
.filter(e => e.usageRate < 0.3 && e.requests > 100);
if (lowUsageEndpoints.length > 0) {
lowUsageEndpoints.forEach(endpoint => {
recommendations.push({
id: `decrease-${endpoint.name}`,
title: `优化 ${endpoint.name} 的配额配置`,
description: `该端点使用率较低(${(endpoint.usageRate * 100).toFixed(1)}%),可以考虑减少配额以释放资源。`,
type: 'decrease_limit',
endpoint: endpoint.name,
currentLimit: endpoint.requests,
suggestedLimit: Math.ceil(endpoint.requests * 0.7)
});
});
}
// 通用建议
recommendations.push({
id: 'implement-adaptive',
title: '实施自适应速率限制',
description: '根据历史使用模式动态调整限制,提高系统效率和用户体验。',
type: 'feature',
priority: 'high'
});
recommendations.push({
id: 'add-client-education',
title: '添加客户端配额教育',
description: '在API响应中添加配额使用信息,帮助客户端更好地管理请求。',
type: 'improvement',
priority: 'medium'
});
return recommendations.slice(0, 5);
}
function viewEventDetails(event) {
console.log('Event details:', event);
alert(`事件详情:\\n${JSON.stringify(event, null, 2)}`);
}
function applyRecommendation(recommendation) {
console.log('Applying recommendation:', recommendation);
if (recommendation.type === 'increase_limit' || recommendation.type === 'decrease_limit') {
handleAdjustLimit(
recommendation.endpoint,
'custom_rule',
recommendation.suggestedLimit
);
}
alert(`已应用建议: ${recommendation.title}`);
}
function dismissRecommendation(id) {
console.log('Dismissing recommendation:', id);
// 在实际应用中,这里会标记建议为已忽略
}
}
第23章:其他4xx状态码详解(402、406-418、421-428、431、451)
引言
HTTP状态码是Web通信的基础语言,其中4xx系列专门指示客户端错误。虽然404、403和400广为人知,但HTTP协议还定义了许多专门化的4xx状态码,为特定错误场景提供精确语义。这些"边缘案例"状态码在现代API设计、微服务架构和RESTful服务中扮演着越来越重要的角色。本章将深入探讨23个不常用但重要的4xx状态码,涵盖从支付系统到法律合规的广泛场景。
23.1 402 Payment Required(需要付款)
23.1.1 定义与语义
402状态码在HTTP/1.1规范中定义为"保留用于未来使用",但实际上已被广泛接受为表示需要付款才能访问资源的状态。虽然从未被正式标准化,但它已成为支付网关、订阅服务和微交易API的事实标准。
23.1.2 使用场景
订阅内容访问:用户尝试访问需要订阅的内容
API调用计费:超过免费额度的API请求
数字商品购买:尝试下载未购买的电子书、软件等
服务额度耗尽:云服务、SaaS平台的用量超限
23.1.3 实现示例
http
HTTP/1.1 402 Payment Required
Content-Type: application/json
X-Payment-URL: https://api.example.com/payment/order_123
Retry-After: 3600
{
"error": "payment_required",
"message": "You have exceeded your free API call limit",
"upgrade_url": "https://api.example.com/plans",
"current_usage": {
"used": 1000,
"limit": 100
}
}
23.1.4 最佳实践
-
始终提供清晰的错误信息和解决指引
-
包含支付URL或升级路径
-
对于API限流,使用429状态码可能更合适
-
考虑支持多种支付协议(如Web Monetization API)
23.2 406 Not Acceptable(不可接受)
23.2.1 定义与语义
406状态码表示服务器无法生成客户端Accept头指定的响应内容。当服务器不支持客户端请求的媒体类型时返回此状态码。
23.2.2 内容协商机制
HTTP内容协商涉及多个请求头:
-
Accept:客户端接受的媒体类型
-
Accept-Charset:字符集偏好
-
Accept-Encoding:内容编码偏好
-
Accept-Language:语言偏好
23.2.3 实现模式
python
# Flask示例:处理内容协商
@app.route('/resource')
def get_resource():
accepted_types = request.accept_mimetypes
if 'application/json' in accepted_types:
return jsonify(data)
elif 'application/xml' in accepted_types:
return xml_response(data)
else:
# 返回406并提供支持的格式
response = jsonify({
'error': 'Unsupported media type requested',
'supported_types': [
'application/json',
'application/xml',
'text/html'
]
})
response.status_code = 406
response.headers['Content-Type'] = 'application/json'
return response
23.2.4 变体选择头
HTTP/1.1引入了Vary头,指示缓存服务器根据哪些请求头生成不同响应:
text
Vary: Accept, Accept-Language, User-Agent
23.2.5 最佳实践
-
实现优雅降级(如JSON作为默认格式)
-
在406响应中列出支持的媒体类型
-
正确设置Vary头以支持缓存
-
考虑使用格式参数作为后备方案(如?format=json)
23.3 407 Proxy Authentication Required(需要代理认证)
23.3.1 定义与语义
407状态码表示客户端必须首先通过代理服务器的身份验证。与401类似,但专门用于代理认证。
23.3.2 代理认证方案
Basic:基本认证(Base64编码)
Digest:摘要认证(更安全)
Bearer:令牌认证
NTLM:Windows认证协议
23.3.3 工作流程
text
客户端 → 代理: GET /resource HTTP/1.1
代理 → 客户端: HTTP/1.1 407 Proxy Authentication Required
Proxy-Authenticate: Basic realm="CorpProxy"
客户端 → 代理: GET /resource HTTP/1.1
Proxy-Authorization: Basic dXNlcjpwYXNz
代理 → 后端: 转发请求
后端 → 客户端: 通过代理返回响应
23.3.4 安全考虑
-
始终使用HTTPS传输代理凭证
-
考虑使用摘要认证避免明文传输
-
实现凭证轮换机制
-
监控代理认证失败尝试
23.3.5 实现示例
nginx
# Nginx代理认证配置
location / {
auth_basic "Proxy Authentication Required";
auth_basic_user_file /etc/nginx/.htpasswd;
proxy_pass http://backend;
# 转发认证头到后端(如果需要)
proxy_set_header Authorization $http_authorization;
}
23.4 408 Request Timeout(请求超时)
23.4.1 定义与语义
408状态码表示服务器在等待请求时超时。服务器愿意继续等待,但客户端没有在服务器准备等待的时间内完成请求发送。
23.4.2 超时原因分析
网络延迟:客户端与服务器间的高延迟
请求体过大:上传大文件时传输缓慢
客户端故障:客户端崩溃或连接中断
网络中断:中间网络设备问题
23.4.3 服务器配置
nginx
# Nginx超时配置
http {
# 客户端读取超时(两个连续操作间隔)
client_header_timeout 60s;
# 客户端发送请求体超时
client_body_timeout 60s;
# 保持连接超时
keepalive_timeout 75s;
# 发送响应到客户端超时
send_timeout 60s;
}
23.4.4 重试策略
javascript
// 指数退避重试策略
async function fetchWithRetry(url, options, maxRetries = 3) {
let lastError;
for (let i = 0; i < maxRetries; i++) {
try {
const response = await fetch(url, options);
return response;
} catch (error) {
lastError = error;
// 检查是否为超时错误
if (error.name === 'TimeoutError' || error.status === 408) {
// 指数退避:1s, 2s, 4s…
const delay = Math.pow(2, i) * 1000;
await new Promise(resolve => setTimeout(resolve, delay));
continue;
}
// 非超时错误,立即抛出
throw error;
}
}
throw lastError;
}
23.4.5 最佳实践
-
实现幂等请求以安全重试
-
客户端使用指数退避算法
-
服务器设置合理的超时阈值
-
监控超时率以识别性能问题
23.5 409 Conflict(冲突)
23.5.1 定义与语义
409状态码表示请求与服务器当前状态冲突。通常用于并发修改场景,如版本冲突或资源锁定。
23.5.2 使用场景
版本冲突:基于版本号的乐观锁
唯一约束违反:数据库唯一键冲突
业务规则冲突:违反领域特定规则
资源状态冲突:尝试修改已删除的资源
23.5.3 乐观锁实现
java
// Spring Boot示例:使用ETag实现乐观锁
@PutMapping("/products/{id}")
public ResponseEntity<?> updateProduct(
@PathVariable Long id,
@RequestBody Product product,
@RequestHeader("If-Match") String ifMatch) {
Product existing = productRepository.findById(id)
.orElseThrow(() -> new ResourceNotFoundException());
// 检查ETag是否匹配
String currentETag = generateETag(existing);
if (!currentETag.equals(ifMatch)) {
return ResponseEntity.status(409)
.header("ETag", currentETag)
.body(Map.of(
"error", "conflict",
"message", "Resource has been modified by another user",
"current_version", currentETag
));
}
// 更新资源并生成新ETag
Product updated = productRepository.save(product);
String newETag = generateETag(updated);
return ResponseEntity.ok()
.header("ETag", newETag)
.body(updated);
}
23.5.4 冲突解决策略
最后写入获胜:简单的覆盖策略
手动合并:提示用户解决冲突
自动合并:基于规则的自动合并
操作转换:实时协作系统使用
23.5.5 响应格式建议
json
{
"error": "conflict",
"message": "Resource update conflict",
"conflict_details": {
"field": "email",
"reason": "already_exists",
"existing_value": "user@example.com"
},
"resolution_options": [
{
"action": "overwrite",
"method": "PUT",
"url": "/resource/123?force=true"
},
{
"action": "merge",
"method": "PATCH",
"url": "/resource/123"
}
]
}
23.6 410 Gone(已删除)
23.6.1 定义与语义
410状态码表示请求的资源在服务器上已永久删除,且没有转发地址。与404不同,410明确表示资源曾经存在但被有意删除。
23.6.2 与404的区别
| 资源是否存在过 | 未知 | 曾经存在 |
| 是否永久性 | 可能是临时的 | 永久删除 |
| 缓存建议 | 可短期缓存 | 应长期缓存 |
| 客户端响应 | 可重试 | 应删除引用 |
23.6.3 使用场景
用户删除内容:社交媒体帖子、评论
内容下架:已停止的产品页面
URL重构:旧URL不再可用且无重定向
合规要求:法律要求删除的内容
23.6.4 SEO考虑
html
<!– 对于SEO友好的410页面 –>
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="robots" content="noindex, follow">
<title>Content Removed</title>
</head>
<body>
<h1>This content has been permanently removed</h1>
<p>The resource you requested was intentionally removed and is no longer available.</p>
<!– 提供相关替代内容 –>
<div class="suggestions">
<h2>You might be interested in:</h2>
<ul>
<li><a href="/new-location">Updated version of this content</a></li>
<li><a href="/related-topic">Related information</a></li>
</ul>
</div>
<!– 结构化数据帮助搜索引擎理解 –>
<script type="application/ld+json">
{
"@context": "https://schema.org",
"@type": "WebPage",
"name": "Content Removed",
"description": "This page indicates that previously available content has been permanently removed.",
"mainEntity": {
"@type": "Message",
"text": "The requested resource was intentionally removed and is no longer available."
}
}
</script>
</body>
</html>
23.6.5 实现模式
python
# Django视图示例
from django.http import HttpResponseGone
from django.views.generic import DetailView
class DeletedResourceView(DetailView):
def get(self, request, *args, **kwargs):
# 检查资源是否被标记为删除
resource = self.get_object()
if resource.deleted_at:
response = HttpResponseGone()
response['Link'] = '</related-resource>; rel="related"'
response['X-Deleted-At'] = resource.deleted_at.isoformat()
response['X-Deleted-Reason'] = resource.deletion_reason
return response
return super().get(request, *args, **kwargs)
23.6.6 最佳实践
-
为已删除资源保留元数据一段时间
-
在响应中包含删除原因和时间戳
-
提供相关资源的链接
-
更新内部链接和站点地图
23.7 411 Length Required(需要内容长度)
23.7.1 定义与语义
411状态码表示服务器拒绝处理没有Content-Length头的请求。某些服务器要求提前知道请求体大小以进行安全或效率优化。
23.7.2 使用场景
文件上传:需要知道上传文件大小
API限流:基于内容长度的配额检查
安全扫描:提前检测过大请求
资源预分配:提前分配存储空间
23.7.3 分块传输编码
当无法提前知道内容大小时,可使用分块传输编码:
http
POST /upload HTTP/1.1
Host: example.com
Transfer-Encoding: chunked
7\\r\\n
Mozilla\\r\\n
9\\r\\n
Developer\\r\\n
7\\r\\n
Network\\r\\n
0\\r\\n
\\r\\n
23.7.4 客户端处理
javascript
// 现代JavaScript中自动处理Content-Length
async function uploadFile(file) {
const formData = new FormData();
formData.append('file', file);
// Fetch API会自动计算和设置Content-Length
const response = await fetch('/upload', {
method: 'POST',
body: formData,
// 注意:FormData使用multipart/form-data时
// 浏览器会自动处理,不需要手动设置
});
return response.json();
}
// 手动设置Content-Length的示例
async function sendJsonData(data) {
const jsonString = JSON.stringify(data);
const response = await fetch('/api/data', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Content-Length': Buffer.byteLength(jsonString, 'utf8').toString()
},
body: jsonString
});
return response.json();
}
23.7.5 服务器配置
nginx
# Nginx配置:限制请求体大小并需要Content-Length
http {
# 启用请求体大小检查
client_max_body_size 10m;
# 对于某些端点要求Content-Length
location /api/upload {
# 如果请求体超过1M,要求Content-Length
if ($request_body_length > 1m) {
# 检查是否有Content-Length头
if ($http_content_length = "") {
return 411;
}
}
# 正常处理
proxy_pass http://backend;
}
}
23.7.6 最佳实践
-
对于小请求体,服务器可更灵活
-
大文件上传应要求Content-Length
-
支持分块传输作为备选方案
-
提供清晰的错误信息说明要求
23.8 412 Precondition Failed(前提条件失败)
23.8.1 定义与语义
412状态码表示请求中提供的条件头(如If-Match、If-None-Match、If-Modified-Since等)评估失败。
23.8.2 条件请求头
If-Match:要求ETag匹配
If-None-Match:要求ETag不匹配
If-Modified-Since:要求资源在指定时间后被修改
If-Unmodified-Since:要求资源在指定时间后未修改
If-Range:用于断点续传
23.8.3 乐观锁实现
python
# FastAPI示例:使用ETag实现乐观锁
from fastapi import FastAPI, Header, HTTPException
import hashlib
import json
app = FastAPI()
def generate_etag(data: dict) -> str:
"""生成资源的ETag"""
data_str = json.dumps(data, sort_keys=True)
return hashlib.md5(data_str.encode()).hexdigest()
resources = {
"1": {"id": 1, "name": "Resource 1", "version": 1}
}
@app.put("/resources/{resource_id}")
async def update_resource(
resource_id: str,
updates: dict,
if_match: str = Header(None, alias="If-Match")
):
if resource_id not in resources:
raise HTTPException(status_code=404)
current = resources[resource_id]
current_etag = generate_etag(current)
# 检查ETag是否匹配
if if_match and if_match != current_etag:
raise HTTPException(
status_code=412,
detail={
"error": "precondition_failed",
"message": "Resource has been modified",
"current_etag": current_etag
}
)
# 更新资源
current.update(updates)
current["version"] += 1
new_etag = generate_etag(current)
return {
"data": current,
"etag": new_etag
}
23.8.4 缓存验证场景
http
# 缓存验证请求
GET /resource HTTP/1.1
Host: api.example.com
If-None-Match: "abc123"
# 响应(如果未修改)
HTTP/1.1 304 Not Modified
ETag: "abc123"
Cache-Control: max-age=3600
# 响应(如果已修改)
HTTP/1.1 200 OK
ETag: "def456"
Cache-Control: max-age=3600
Content-Type: application/json
{"data": "updated"}
23.8.5 最佳实践
-
为可变资源实现ETag支持
-
在响应中始终返回ETag头
-
支持弱ETag(以W/开头)用于启发式比较
-
提供清晰的错误信息说明前提条件
23.9 413 Payload Too Large(请求体过大)
23.9.1 定义与语义
413状态码表示请求体超过服务器能够处理的大小限制。服务器可以关闭连接或返回Retry-After头指示何时重试。
23.9.2 配置示例
nginx
# Nginx请求体大小限制
http {
# 全局请求体大小限制
client_max_body_size 10m;
# 特定位置更大限制
location /api/upload {
client_max_body_size 100m;
# 大文件上传超时设置
client_body_timeout 300s;
client_header_timeout 300s;
# 缓冲区设置
client_body_buffer_size 128k;
client_body_temp_path /tmp/nginx_upload;
}
# 特定位置更小限制
location /api/json {
client_max_body_size 1m;
}
}
23.9.3 错误处理
http
HTTP/1.1 413 Payload Too Large
Content-Type: application/problem+json
Retry-After: 3600
{
"type": "https://example.com/errors/payload-too-large",
"title": "Request payload too large",
"status": 413,
"detail": "Maximum request body size is 10MB",
"instance": "/api/upload",
"max_size": 10485760,
"current_size": 15728640,
"suggestions": [
"Compress your data before uploading",
"Split the data into smaller chunks",
"Use our resumable upload API at /api/resumable-upload"
]
}
23.9.4 大文件上传策略
分块上传:将大文件分成小块
断点续传:支持从中断处继续
流式上传:边上传边处理
外部存储:上传到S3等对象存储
23.9.5 实现分块上传
javascript
// 客户端分块上传实现
class ChunkedUploader {
constructor(file, chunkSize = 5 * 1024 * 1024) { // 5MB
this.file = file;
this.chunkSize = chunkSize;
this.totalChunks = Math.ceil(file.size / chunkSize);
this.uploadedChunks = new Set();
this.uploadId = null;
}
async startUpload() {
// 初始化上传会话
const response = await fetch('/api/upload/init', {
method: 'POST',
headers: {'Content-Type': 'application/json'},
body: JSON.stringify({
filename: this.file.name,
file_size: this.file.size,
chunk_size: this.chunkSize,
total_chunks: this.totalChunks
})
});
const { upload_id } = await response.json();
this.uploadId = upload_id;
// 上传所有分块
for (let chunkIndex = 0; chunkIndex < this.totalChunks; chunkIndex++) {
await this.uploadChunk(chunkIndex);
}
// 完成上传
await this.completeUpload();
}
async uploadChunk(chunkIndex) {
if (this.uploadedChunks.has(chunkIndex)) {
return; // 已上传
}
const start = chunkIndex * this.chunkSize;
const end = Math.min(start + this.chunkSize, this.file.size);
const chunk = this.file.slice(start, end);
const formData = new FormData();
formData.append('chunk', chunk);
formData.append('chunk_index', chunkIndex);
formData.append('upload_id', this.uploadId);
await fetch('/api/upload/chunk', {
method: 'POST',
body: formData
});
this.uploadedChunks.add(chunkIndex);
}
async completeUpload() {
await fetch('/api/upload/complete', {
method: 'POST',
headers: {'Content-Type': 'application/json'},
body: JSON.stringify({
upload_id: this.uploadId,
uploaded_chunks: Array.from(this.uploadedChunks)
})
});
}
}
23.9.6 最佳实践
-
提供清晰的错误信息,包括最大允许大小
-
实现分块上传支持大文件
-
考虑使用CDN或对象存储处理大文件
-
监控大文件上传频率和大小分布
23.10 414 URI Too Long(URI过长)
23.10.1 定义与语义
414状态码表示客户端请求的URI长度超过服务器能够处理的限制。虽然HTTP规范没有指定最大URI长度,但服务器实现通常有限制。
23.10.2 各服务器限制
| Nginx | 取决于系统,通常8KB | large_client_header_buffers |
| Apache | 8KB | LimitRequestLine |
| IIS | 16KB | maxUrl in web.config |
| Node.js | 取决于实现 | 可配置 |
| 浏览器 | 约2000字符 | 无标准 |
23.10.3 常见原因
过度使用查询参数:GET请求携带大量参数
深度嵌套路径:RESTful API的深度资源引用
Base64编码数据:在URL中嵌入数据
错误的链接生成:无限循环生成参数
23.10.4 解决方案
javascript
// 将长GET请求转换为POST
function convertLongGetToPost(url) {
const urlObj = new URL(url);
// 如果URL超过安全长度,转换为POST
if (url.length > 2000) {
const params = {};
urlObj.searchParams.forEach((value, key) => {
params[key] = value;
});
return {
method: 'POST',
url: urlObj.pathname,
body: JSON.stringify(params),
headers: {
'Content-Type': 'application/json',
'X-Original-Method': 'GET'
}
};
}
return { method: 'GET', url };
}
// 服务器端处理
app.post('/api/data', (req, res) => {
// 检查是否为GET转换的POST请求
if (req.get('X-Original-Method') === 'GET') {
// 按照GET语义处理,但使用POST主体
const params = req.body;
// 处理逻辑…
}
});
23.10.5 最佳实践
-
对于复杂查询,使用POST替代GET
-
避免在URL中嵌入大块数据
-
使用分页和过滤减少数据量
-
监控URI长度异常
23.11 415 Unsupported Media Type(不支持的媒体类型)
23.11.1 定义与语义
415状态码表示服务器不支持请求的媒体类型。通常发生在请求的Content-Type不被服务器接受时。
23.11.2 常见媒体类型
| application/json | JSON数据 | REST API |
| application/xml | XML数据 | SOAP API |
| multipart/form-data | 表单数据 | 文件上传 |
| application/x-www-form-urlencoded | URL编码表单 | 简单表单 |
| text/plain | 纯文本 | 简单数据 |
23.11.3 实现示例
python
# FastAPI媒体类型验证
from fastapi import FastAPI, UploadFile, File, HTTPException
from fastapi.responses import JSONResponse
app = FastAPI()
# 允许的媒体类型
ALLOWED_MEDIA_TYPES = {
'application/json',
'application/xml',
'text/plain',
'multipart/form-data'
}
@app.middleware("http")
async def check_media_type(request, call_next):
# 检查请求方法是否需要body
if request.method in ['POST', 'PUT', 'PATCH']:
content_type = request.headers.get('content-type', '')
# 提取主媒体类型
main_type = content_type.split(';')[0].strip() if content_type else ''
# 如果提供了Content-Type但不被支持
if content_type and main_type not in ALLOWED_MEDIA_TYPES:
return JSONResponse(
status_code=415,
content={
"error": "unsupported_media_type",
"message": f"Media type '{main_type}' is not supported",
"supported_types": list(ALLOWED_MEDIA_TYPES),
"received_type": content_type
}
)
response = await call_next(request)
return response
@app.post("/api/data")
async def create_data(data: dict):
return {"id": 123, **data}
23.11.4 内容类型协商
http
# 客户端请求
POST /api/resource HTTP/1.1
Host: api.example.com
Content-Type: application/json
Accept: application/json, application/xml;q=0.9, text/plain;q=0.5
{"name": "test"}
# 服务器响应
HTTP/1.1 200 OK
Content-Type: application/json
Vary: Accept
{"id": 1, "name": "test"}
23.11.5 最佳实践
-
在415响应中列出支持的媒体类型
-
提供默认媒体类型作为后备
-
实现严格模式用于生产,宽松模式用于开发
-
记录API期望的媒体类型
23.12 416 Range Not Satisfiable(范围请求无法满足)
23.12.1 定义与语义
416状态码表示服务器无法提供请求的Range头指定的字节范围。通常发生在请求的范围超出资源大小时。
23.12.2 范围请求机制
http
# 请求特定范围
GET /large-file.pdf HTTP/1.1
Host: example.com
Range: bytes=0-999
# 成功响应
HTTP/1.1 206 Partial Content
Content-Type: application/pdf
Content-Range: bytes 0-999/5000
Content-Length: 1000
[二进制数据]
# 无效范围请求
GET /large-file.pdf HTTP/1.1
Host: example.com
Range: bytes=5000-6000
# 416响应
HTTP/1.1 416 Range Not Satisfiable
Content-Range: bytes */5000
23.12.3 断点续传实现
python
# Flask断点续传实现
from flask import Flask, request, send_file, Response
import os
app = Flask(__name__)
@app.route('/download/<filename>')
def download_file(filename):
filepath = os.path.join('uploads', filename)
if not os.path.exists(filepath):
return {'error': 'file_not_found'}, 404
file_size = os.path.getsize(filepath)
# 检查Range头
range_header = request.headers.get('Range')
if range_header:
# 解析范围请求
try:
byte1, byte2 = 0, None
# 格式:bytes=start-end
range_ = range_header.replace('bytes=', '').split('-')
byte1 = int(range_[0]) if range_[0] else 0
if range_[1]:
byte2 = int(range_[1])
if byte2 is None:
byte2 = file_size – 1
elif byte2 >= file_size:
# 范围超出文件大小
return Response(
status=416,
headers={
'Content-Range': f'bytes */{file_size}',
'Accept-Ranges': 'bytes'
}
)
length = byte2 – byte1 + 1
# 发送部分内容
def generate():
with open(filepath, 'rb') as f:
f.seek(byte1)
remaining = length
chunk_size = 8192
while remaining > 0:
chunk = f.read(min(chunk_size, remaining))
if not chunk:
break
remaining -= len(chunk)
yield chunk
response = Response(generate(), 206)
response.headers.add('Content-Range', f'bytes {byte1}-{byte2}/{file_size}')
response.headers.add('Accept-Ranges', 'bytes')
response.headers.add('Content-Length', str(length))
response.headers.add('Content-Type', 'application/octet-stream')
return response
except ValueError:
# 无效的Range头格式
pass
# 完整文件下载
return send_file(filepath, as_attachment=True)
23.12.4 客户端实现
javascript
// 支持断点续传的下载器
class ResumableDownloader {
constructor(url, filename) {
this.url = url;
this.filename = filename;
this.downloadedBytes = 0;
this.totalBytes = 0;
this.isPaused = false;
}
async start() {
try {
// 尝试获取文件信息
const headResponse = await fetch(this.url, { method: 'HEAD' });
if (!headResponse.ok) {
throw new Error(`Failed to get file info: ${headResponse.status}`);
}
this.totalBytes = parseInt(headResponse.headers.get('Content-Length')) || 0;
const acceptRanges = headResponse.headers.get('Accept-Ranges') === 'bytes';
// 检查本地已下载部分
const downloaded = await this.getDownloadedSize();
this.downloadedBytes = downloaded;
if (acceptRanges && downloaded > 0 && downloaded < this.totalBytes) {
// 断点续传
await this.resumeDownload();
} else {
// 全新下载
await this.startNewDownload();
}
} catch (error) {
console.error('Download failed:', error);
throw error;
}
}
async resumeDownload() {
const headers = {
'Range': `bytes=${this.downloadedBytes}-`
};
const response = await fetch(this.url, { headers });
if (response.status === 416) {
// 范围无效,重新开始
this.downloadedBytes = 0;
await this.startNewDownload();
return;
}
if (response.status !== 206) {
throw new Error(`Unexpected status: ${response.status}`);
}
await this.processResponse(response);
}
async startNewDownload() {
const response = await fetch(this.url);
if (!response.ok) {
throw new Error(`Download failed: ${response.status}`);
}
this.downloadedBytes = 0;
await this.processResponse(response);
}
async processResponse(response) {
const reader = response.body.getReader();
const fileStream = await this.getFileStream();
while (true) {
if (this.isPaused) {
await new Promise(resolve => {
this.resumeCallback = resolve;
});
}
const { done, value } = await reader.read();
if (done) {
fileStream.close();
console.log('Download completed');
return;
}
await fileStream.write(value);
this.downloadedBytes += value.length;
// 更新进度
const progress = this.totalBytes > 0
? (this.downloadedBytes / this.totalBytes * 100).toFixed(2)
: 'unknown';
console.log(`Progress: ${progress}%`);
}
}
pause() {
this.isPaused = true;
}
resume() {
this.isPaused = false;
if (this.resumeCallback) {
this.resumeCallback();
this.resumeCallback = null;
}
}
async getDownloadedSize() {
// 从本地存储获取已下载大小
// 实现取决于存储方式(IndexedDB、文件系统API等)
return 0;
}
async getFileStream() {
// 获取文件写入流
// 实现取决于存储方式
return {
write: async () => {},
close: () => {}
};
}
}
23.12.5 最佳实践
-
始终在响应中包含Accept-Ranges头
-
对于416响应,包含Content-Range头显示有效范围
-
实现完整的分块下载支持
-
考虑支持多范围请求(multipart/byteranges)
23.13 417 Expectation Failed(期望失败)
23.13.1 定义与语义
417状态码表示服务器无法满足请求的Expect头中的期望。最常见的场景是Expect: 100-continue。
23.13.2 100 Continue机制
http
# 客户端请求(使用100-continue)
POST /upload HTTP/1.1
Host: example.com
Content-Type: application/json
Content-Length: 1024
Expect: 100-continue
# 服务器响应
# 情况1:接受请求
HTTP/1.1 100 Continue
# 客户端发送请求体
{"data": "…"}
# 服务器最终响应
HTTP/1.1 201 Created
# 情况2:拒绝请求
HTTP/1.1 417 Expectation Failed
Content-Type: application/json
{
"error": "expectation_failed",
"message": "Server cannot accept the request",
"reason": "File size limit exceeded"
}
23.13.3 使用场景
大文件上传验证:检查权限和配额后再接收数据
实时处理验证:确保服务器能处理请求
条件请求:基于特定条件接受请求
资源预留:检查资源可用性
23.13.4 实现示例
python
# FastAPI Expect头处理
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import JSONResponse
app = FastAPI()
@app.middleware("http")
async def handle_expect_header(request: Request, call_next):
expect_header = request.headers.get("expect", "").lower()
if expect_header == "100-continue":
# 执行预检查
content_length = request.headers.get("content-length")
if content_length and int(content_length) > 10 * 1024 * 1024: # 10MB
# 请求体太大,拒绝
return JSONResponse(
status_code=417,
content={
"error": "expectation_failed",
"message": "File size exceeds limit",
"max_size": 10 * 1024 * 1024,
"requested_size": int(content_length)
}
)
# 检查认证
auth_header = request.headers.get("authorization")
if not auth_header or not validate_token(auth_header):
return JSONResponse(
status_code=417,
content={
"error": "expectation_failed",
"message": "Authentication required"
}
)
# 发送100 Continue响应
from starlette.responses import Response
response = Response(status_code=100)
await response(scope=request.scope, receive=request.receive, send=request.send)
# 继续正常处理
response = await call_next(request)
return response
def validate_token(token: str) -> bool:
# 实现令牌验证
return token.startswith("Bearer ")
23.13.5 客户端实现
javascript
// 使用Expect: 100-continue的客户端
async function uploadWithContinue(file, url) {
return new Promise((resolve, reject) => {
const xhr = new XMLHttpRequest();
xhr.open('POST', url, true);
// 设置Expect头
xhr.setRequestHeader('Expect', '100-continue');
xhr.setRequestHeader('Content-Type', 'application/octet-stream');
// 监听100-continue响应
xhr.onreadystatechange = function() {
if (xhr.readyState === XMLHttpRequest.HEADERS_RECEIVED) {
// 检查状态码
if (xhr.status === 100) {
console.log('Server ready to receive data');
} else if (xhr.status === 417) {
reject(new Error('Server rejected the request'));
}
}
};
xhr.onload = function() {
if (xhr.status >= 200 && xhr.status < 300) {
resolve(xhr.response);
} else {
reject(new Error(`Upload failed: ${xhr.status}`));
}
};
xhr.onerror = function() {
reject(new Error('Network error'));
};
xhr.send(file);
});
}
// 使用Fetch API(自动处理100-continue)
async function uploadWithFetch(file, url) {
const response = await fetch(url, {
method: 'POST',
headers: {
'Content-Type': 'application/octet-stream',
// Fetch API会自动处理Expect头
},
body: file
});
if (!response.ok) {
throw new Error(`Upload failed: ${response.status}`);
}
return response.json();
}
23.13.6 最佳实践
-
对于大文件上传实现100-continue支持
-
提供清晰的拒绝原因
-
考虑超时处理(客户端等待100响应的超时)
-
监控417错误频率以识别客户端问题
23.14 418 I'm a Teapot(我是茶壶)
23.14.1 起源与定义
418状态码最初是1998年愚人节的RFC 2324(超文本咖啡壶控制协议)的一部分,用于表示服务器是一个茶壶,无法煮咖啡。虽然最初是玩笑,但已被许多服务器实现并用于表示"这个端点永远不会工作"。
23.14.2 现代用途
API端点占位符:表示尚未实现的端点
内部玩笑:开发团队间的幽默
安全措施:混淆攻击者
复活节彩蛋:隐藏功能或玩笑响应
23.14.3 实现示例
python
# Flask 418实现
from flask import Flask, jsonify
app = Flask(__name__)
@app.route('/brew-coffee')
def brew_coffee():
"""HTCPCP端点 – 总是返回418"""
response = jsonify({
"error": "I'm a teapot",
"message": "The requested entity body is short and stout.",
"hint": "Tip me over and pour me out.",
"rfc": "https://tools.ietf.org/html/rfc2324",
"solution": "Try making tea instead."
})
response.status_code = 418
response.headers['X-Teapot-Type'] = 'Robust Brown Betty'
response.headers['X-Brewing-Time'] = '4 minutes'
return response
@app.route('/make-tea')
def make_tea():
"""正确的HTCPCP端点"""
return jsonify({
"status": "brewing",
"type": "Earl Grey",
"strength": "medium",
"message": "Your tea will be ready shortly."
})
23.14.4 创意使用
javascript
// 创意418页面
function createTeapotResponse() {
return {
status: 418,
headers: {
'Content-Type': 'application/json',
'X-Teapot': 'true',
'X-Brew-Advice': 'Use freshly boiled water at 95°C',
'X-Tea-Type': 'Oolong'
},
body: JSON.stringify({
error: "I'm a teapot",
message: "This server is a teapot, not a coffee pot.",
instructions: {
step1: "Fill me with fresh water",
step2: "Heat to 95°C (203°F)",
step3: "Add 1 teaspoon of tea leaves per cup",
step4: "Steep for 3-5 minutes",
step5: "Pour and enjoy"
},
tea_suggestions: [
{
name: "Green Tea",
temperature: "80°C",
time: "2-3 minutes"
},
{
name: "Black Tea",
temperature: "95°C",
time: "3-5 minutes"
},
{
name: "Herbal Tea",
temperature: "100°C",
time: "5-7 minutes"
}
],
fun_fact: "The 418 status code was defined in RFC 2324 (Hyper Text Coffee Pot Control Protocol) as an April Fools' joke in 1998."
}, null, 2)
};
}
// Express中间件检测418请求
app.use((req, res, next) => {
// 检查是否为咖啡相关请求
if (req.path.includes('coffee') ||
req.get('X-Beverage') === 'coffee') {
const teapotResponse = createTeapotResponse();
res.status(teapotResponse.status);
Object.entries(teapotResponse.headers)
.forEach(([key, value]) => res.set(key, value));
res.send(teapotResponse.body);
return;
}
next();
});
23.14.5 最佳实践
-
谨慎使用418,避免混淆真正的错误
-
考虑在开发环境使用,生产环境禁用
-
可以用于API版本控制(旧版本返回418)
-
确保不会影响SEO或爬虫
23.15 421 Misdirected Request(错误定向请求)
23.15.1 定义与语义
421状态码在HTTP/2中引入,表示请求被发送到无法生成响应的服务器。通常发生在连接重用和服务器名称指示(SNI)不匹配时。
23.15.2 HTTP/2连接重用
HTTP/2允许在单个连接上多路复用多个请求。但当客户端尝试在连接上发送针对不同主机的请求时,服务器可能返回421。
23.15.3 SNI(服务器名称指示)
SNI是TLS扩展,允许客户端在握手时指定要连接的主机名。这对于虚拟主机托管至关重要。
23.15.4 问题场景
text
客户端 → 服务器: TLS握手(SNI: api.example.com)
客户端 → 服务器: 请求1: GET /users (Host: api.example.com)
服务器 → 客户端: 200 OK
客户端 → 服务器: 请求2: GET /orders (Host: shop.example.com)
服务器 → 客户端: 421 Misdirected Request
(因为连接是为api.example.com建立的)
23.15.5 解决方案
nginx
# Nginx配置处理多个主机名
server {
listen 443 ssl http2;
server_name api.example.com;
ssl_certificate /certs/api.example.com.crt;
ssl_certificate_key /certs/api.example.com.key;
location / {
proxy_pass http://api_backend;
}
}
server {
listen 443 ssl http2;
server_name shop.example.com;
ssl_certificate /certs/shop.example.com.crt;
ssl_certificate_key /certs/shop.example.com.key;
location / {
proxy_pass http://shop_backend;
}
}
# 或者使用通配符证书
server {
listen 443 ssl http2;
server_name api.example.com shop.example.com;
ssl_certificate /certs/wildcard.example.com.crt;
ssl_certificate_key /certs/wildcard.example.com.key;
# 基于Host头路由
location / {
if ($host = "api.example.com") {
proxy_pass http://api_backend;
}
if ($host = "shop.example.com") {
proxy_pass http://shop_backend;
}
}
}
23.15.6 客户端处理
javascript
// HTTP/2客户端连接管理
class H2ConnectionManager {
constructor() {
this.connections = new Map(); // host -> connection
this.pendingRequests = new Map(); // host -> [requests]
}
async request(host, path, options = {}) {
// 检查是否有现有连接
let connection = this.connections.get(host);
if (!connection) {
// 创建新连接
connection = await this.createConnection(host);
this.connections.set(host, connection);
// 监听连接错误
connection.on('error', (error) => {
console.error(`Connection to ${host} failed:`, error);
this.connections.delete(host);
});
// 监听421错误
connection.on('stream', (stream) => {
stream.on('response', (headers) => {
if (headers[':status'] === '421') {
console.log(`Received 421 for ${host}, closing connection`);
connection.close();
this.connections.delete(host);
// 重试待处理请求
this.retryPendingRequests(host);
}
});
});
}
// 发送请求
return new Promise((resolve, reject) => {
const stream = connection.request({
':method': options.method || 'GET',
':path': path,
':authority': host,
…options.headers
});
stream.on('response', (headers) => {
const chunks = [];
stream.on('data', (chunk) => {
chunks.push(chunk);
});
stream.on('end', () => {
const response = {
status: headers[':status'],
headers,
body: Buffer.concat(chunks)
};
resolve(response);
});
stream.on('error', reject);
});
if (options.body) {
stream.end(options.body);
} else {
stream.end();
}
});
}
async createConnection(host) {
// 实现HTTP/2连接创建
// 注意:需要适当的HTTP/2客户端库
throw new Error('Not implemented');
}
retryPendingRequests(host) {
const pending = this.pendingRequests.get(host) || [];
this.pendingRequests.delete(host);
pending.forEach(request => {
this.request(host, request.path, request.options)
.then(request.resolve)
.catch(request.reject);
});
}
}
23.15.7 最佳实践
-
为不同主机名使用通配符或多域名证书
-
客户端正确管理HTTP/2连接池
-
监控421错误以识别配置问题
-
考虑使用HTTP/1.1作为备用方案
23.16 422 Unprocessable Entity(无法处理的实体)
23.16.1 定义与语义
422状态码来自WebDAV规范,表示服务器理解请求实体的内容类型,且语法正确,但无法处理其中包含的指令。常用于验证错误。
23.16.2 与400的区别
-
400 Bad Request:请求语法错误或无法解析
-
422 Unprocessable Entity:请求语法正确,但语义错误
23.16.3 验证错误示例
json
// 请求
POST /api/users HTTP/1.1
Content-Type: application/json
{
"email": "invalid-email",
"age": -5,
"password": "123"
}
// 响应
HTTP/1.1 422 Unprocessable Entity
Content-Type: application/problem+json
{
"type": "https://example.com/errors/validation",
"title": "Validation Failed",
"status": 422,
"detail": "The request contains validation errors",
"instance": "/api/users",
"errors": [
{
"field": "email",
"code": "invalid_format",
"message": "Must be a valid email address",
"value": "invalid-email"
},
{
"field": "age",
"code": "minimum",
"message": "Must be greater than 0",
"value": -5,
"constraint": 0
},
{
"field": "password",
"code": "min_length",
"message": "Must be at least 8 characters",
"value": "123",
"constraint": 8
}
]
}
23.16.4 实现模式
python
# FastAPI验证错误处理
from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import JSONResponse
from pydantic import BaseModel, EmailStr, validator
from typing import List
app = FastAPI()
class UserCreate(BaseModel):
email: EmailStr
age: int
password: str
@validator('age')
def age_must_be_positive(cls, v):
if v <= 0:
raise ValueError('age must be positive')
return v
@validator('password')
def password_length(cls, v):
if len(v) < 8:
raise ValueError('password must be at least 8 characters')
return v
@app.exception_handler(ValueError)
async def validation_exception_handler(request: Request, exc: ValueError):
# 将验证错误转换为422响应
return JSONResponse(
status_code=422,
content={
"detail": str(exc),
"errors": [
{
"loc": ["body"],
"msg": str(exc),
"type": "value_error"
}
]
}
)
@app.post("/users/")
async def create_user(user: UserCreate):
# 如果验证通过,创建用户
return {"message": "User created", "user": user.dict()}
# 更精细的错误处理
from pydantic import ValidationError
@app.exception_handler(ValidationError)
async def pydantic_validation_handler(request: Request, exc: ValidationError):
errors = []
for error in exc.errors():
errors.append({
"field": ".".join(str(loc) for loc in error["loc"]),
"code": error["type"],
"message": error["msg"],
"context": error.get("ctx")
})
return JSONResponse(
status_code=422,
content={
"error": "validation_failed",
"message": "The request contains validation errors",
"errors": errors
}
)
23.16.5 前端集成
javascript
// 前端表单验证和422错误处理
class FormValidator {
constructor(formId) {
this.form = document.getElementById(formId);
this.errorsContainer = document.createElement('div');
this.errorsContainer.className = 'validation-errors';
this.form.parentNode.insertBefore(this.errorsContainer, this.form);
this.form.addEventListener('submit', this.handleSubmit.bind(this));
}
async handleSubmit(event) {
event.preventDefault();
// 清除之前的错误
this.clearErrors();
// 收集表单数据
const formData = new FormData(this.form);
const data = Object.fromEntries(formData.entries());
try {
const response = await fetch(this.form.action, {
method: this.form.method,
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify(data)
});
if (response.status === 422) {
// 处理验证错误
const result = await response.json();
this.displayErrors(result.errors);
return;
}
if (!response.ok) {
throw new Error(`Request failed: ${response.status}`);
}
// 成功处理
const result = await response.json();
this.onSuccess(result);
} catch (error) {
console.error('Form submission error:', error);
this.displayGenericError(error.message);
}
}
displayErrors(errors) {
this.errorsContainer.innerHTML = '';
// 按字段分组错误
const fieldErrors = {};
errors.forEach(error => {
if (!fieldErrors[error.field]) {
fieldErrors[error.field] = [];
}
fieldErrors[error.field].push(error.message);
});
// 显示错误
Object.entries(fieldErrors).forEach(([field, messages]) => {
const fieldElement = this.form.querySelector(`[name="${field}"]`);
if (fieldElement) {
// 添加错误类
fieldElement.classList.add('error');
// 创建错误消息
const errorElement = document.createElement('div');
errorElement.className = 'field-error';
errorElement.textContent = messages.join(', ');
fieldElement.parentNode.appendChild(errorElement);
}
// 添加到错误容器
const errorSummary = document.createElement('div');
errorSummary.className = 'error-summary';
errorSummary.innerHTML = `
<strong>${field}:</strong>
<ul>
${messages.map(msg => `<li>${msg}</li>`).join('')}
</ul>
`;
this.errorsContainer.appendChild(errorSummary);
});
// 滚动到错误
this.errorsContainer.scrollIntoView({ behavior: 'smooth' });
}
clearErrors() {
// 清除字段错误样式
this.form.querySelectorAll('.error').forEach(el => {
el.classList.remove('error');
});
// 移除错误消息
this.form.querySelectorAll('.field-error').forEach(el => {
el.remove();
});
// 清空错误容器
this.errorsContainer.innerHTML = '';
}
displayGenericError(message) {
this.errorsContainer.innerHTML = `
<div class="alert alert-danger">
<strong>Error:</strong> ${message}
</div>
`;
}
onSuccess(result) {
// 成功回调
console.log('Form submitted successfully:', result);
this.form.reset();
this.clearErrors();
alert('Form submitted successfully!');
}
}
23.16.6 最佳实践
-
提供详细的验证错误信息
-
使用标准错误格式(如RFC 7807)
-
实现客户端验证以减少422请求
-
考虑使用JSON Schema进行验证
23.17 423 Locked(已锁定)
23.17.1 定义与语义
423状态码来自WebDAV规范,表示请求的资源被锁定。客户端应等待锁释放或联系锁持有者。
23.17.2 锁类型
独占锁:只有一个客户端可修改资源
共享锁:多个客户端可读取,但只有一个可写入
深度锁:锁住资源及其所有子资源
23.17.3 锁机制
http
# 锁定请求
LOCK /document.txt HTTP/1.1
Host: example.com
Timeout: Infinite, Second-3600
Depth: infinity
Content-Type: application/xml
<?xml version="1.0" encoding="utf-8" ?>
<D:lockinfo xmlns:D="DAV:">
<D:lockscope><D:exclusive/></D:lockscope>
<D:locktype><D:write/></D:locktype>
<D:owner>
<D:href>mailto:user@example.com</D:href>
</D:owner>
</D:lockinfo>
# 锁定响应
HTTP/1.1 200 OK
Content-Type: application/xml
<?xml version="1.0" encoding="utf-8" ?>
<D:prop xmlns:D="DAV:">
<D:lockdiscovery>
<D:activelock>
<D:locktype><D:write/></D:locktype>
<D:lockscope><D:exclusive/></D:lockscope>
<D:depth>infinity</D:depth>
<D:owner>
<D:href>mailto:user@example.com</D:href>
</D:owner>
<D:locktoken>
<D:href>urn:uuid:e71d4fae-5dec-11d0-a765-00a0c91e6bf6</D:href>
</D:locktoken>
<D:timeout>Second-3600</D:timeout>
</D:activelock>
</D:lockdiscovery>
</D:prop>
# 尝试修改已锁定的资源
PUT /document.txt HTTP/1.1
Host: example.com
If: <urn:uuid:e71d4fae-5dec-11d0-a765-00a0c91e6bf6>
Content-Type: text/plain
New content
# 响应(如果没有正确的锁令牌)
HTTP/1.1 423 Locked
Content-Type: application/xml
<?xml version="1.0" encoding="utf-8" ?>
<D:error xmlns:D="DAV:">
<D:lock-token-submitted>
<D:href>/document.txt</D:href>
</D:lock-token-submitted>
</D:error>
23.17.4 现代API实现
python
# Flask资源锁实现
from flask import Flask, request, jsonify
import threading
import time
import uuid
from datetime import datetime, timedelta
app = Flask(__name__)
class ResourceLock:
def __init__(self):
self.locks = {}
self.lock = threading.Lock()
def acquire(self, resource_id, client_id, timeout=300):
"""获取资源锁"""
with self.lock:
current_time = datetime.now()
# 检查是否已锁定
if resource_id in self.locks:
lock_info = self.locks[resource_id]
# 检查锁是否过期
if lock_info['expires'] < current_time:
# 锁已过期,可以获取
pass
elif lock_info['client_id'] == client_id:
# 同一客户端,更新超时
lock_info['expires'] = current_time + timedelta(seconds=timeout)
return True
else:
# 资源被其他客户端锁定
return False
# 获取新锁
lock_token = str(uuid.uuid4())
self.locks[resource_id] = {
'client_id': client_id,
'token': lock_token,
'created': current_time,
'expires': current_time + timedelta(seconds=timeout),
'timeout': timeout
}
return lock_token
def release(self, resource_id, client_id=None, token=None):
"""释放资源锁"""
with self.lock:
if resource_id not in self.locks:
return True
lock_info = self.locks[resource_id]
# 检查权限
if client_id and lock_info['client_id'] != client_id:
return False
if token and lock_info['token'] != token:
return False
# 释放锁
del self.locks[resource_id]
return True
def check(self, resource_id, token=None):
"""检查资源是否被锁定"""
with self.lock:
if resource_id not in self.locks:
return True
lock_info = self.locks[resource_id]
# 检查是否过期
if lock_info['expires'] < datetime.now():
del self.locks[resource_id]
return True
# 检查令牌
if token and lock_info['token'] == token:
return True
return False
# 全局锁管理器
lock_manager = ResourceLock()
@app.route('/api/documents/<document_id>/lock', methods=['POST'])
def lock_document(document_id):
"""锁定文档"""
client_id = request.headers.get('X-Client-ID')
if not client_id:
return jsonify({'error': 'client_id_required'}), 400
timeout = request.json.get('timeout', 300)
# 尝试获取锁
result = lock_manager.acquire(document_id, client_id, timeout)
if result is True:
# 已持有锁
return jsonify({'status': 'already_locked'}), 200
elif result is False:
# 被其他客户端锁定
lock_info = get_lock_info(document_id)
return jsonify({
'error': 'resource_locked',
'message': 'Document is locked by another client',
'locked_by': lock_info['client_id'],
'expires_at': lock_info['expires'].isoformat()
}), 423
else:
# 成功获取锁
return jsonify({
'lock_token': result,
'timeout': timeout,
'expires_in': timeout
}), 200
@app.route('/api/documents/<document_id>', methods=['PUT'])
def update_document(document_id):
"""更新文档(需要锁)"""
lock_token = request.headers.get('X-Lock-Token')
# 检查锁
if not lock_manager.check(document_id, lock_token):
return jsonify({
'error': 'resource_locked',
'message': 'Document is locked. Provide valid lock token.',
'required_header': 'X-Lock-Token'
}), 423
# 更新文档逻辑
# …
return jsonify({'status': 'updated'}), 200
def get_lock_info(resource_id):
"""获取锁信息(简化实现)"""
# 实际实现需要线程安全地访问lock_manager
return {'client_id': 'unknown', 'expires': datetime.now()}
23.17.5 最佳实践
-
实现锁超时机制防止死锁
-
提供锁令牌以便客户端后续操作
-
支持锁查询和释放操作
-
考虑分布式锁方案(如Redis)
23.18 424 Failed Dependency(依赖失败)
23.18.1 定义与语义
424状态码来自WebDAV规范,表示请求的操作依赖于另一个操作,且那个操作失败。用于事务性操作或多资源操作。
23.18.2 使用场景
批量操作:部分操作失败导致整体失败
事务处理:数据库事务回滚
文件操作:移动包含多个文件的文件夹
API组合:调用多个微服务
23.18.3 实现示例
python
# Django事务性操作
from django.db import transaction
from django.http import JsonResponse
from django.views import View
import logging
logger = logging.getLogger(__name__)
class BatchCreateView(View):
@transaction.atomic
def post(self, request):
data = request.json
created_resources = []
errors = []
# 创建保存点
sid = transaction.savepoint()
try:
for item in data['items']:
try:
# 创建资源
resource = self.create_resource(item)
created_resources.append(resource)
except ValidationError as e:
# 单个资源创建失败
errors.append({
'item': item,
'error': str(e),
'field': getattr(e, 'field', None)
})
# 回滚到保存点
transaction.savepoint_rollback(sid)
raise BatchOperationError(f"Failed to create item: {item}")
# 所有操作成功,提交事务
transaction.savepoint_commit(sid)
return JsonResponse({
'status': 'success',
'created': len(created_resources),
'resources': created_resources
})
except BatchOperationError as e:
# 返回424响应
return JsonResponse({
'error': 'failed_dependency',
'message': 'Batch operation failed due to dependency errors',
'details': errors,
'successful_count': len(created_resources),
'failed_count': len(errors)
}, status=424)
except Exception as e:
# 其他错误
logger.error(f"Batch operation failed: {e}")
return JsonResponse({
'error': 'internal_error',
'message': 'Internal server error'
}, status=500)
def create_resource(self, item):
# 实现资源创建逻辑
# 可能抛出ValidationError
pass
class BatchOperationError(Exception):
pass
23.18.4 微服务场景
javascript
// 微服务编排中的依赖失败处理
class Orchestrator {
async processOrder(order) {
const steps = [
this.validateOrder.bind(this, order),
this.reserveInventory.bind(this, order),
this.processPayment.bind(this, order),
this.shipOrder.bind(this, order),
this.sendConfirmation.bind(this, order)
];
const results = [];
const errors = [];
for (let i = 0; i < steps.length; i++) {
try {
const result = await steps[i]();
results.push({
step: i + 1,
success: true,
result
});
} catch (error) {
errors.push({
step: i + 1,
success: false,
error: error.message,
dependency_failed: i > 0 ? i : null
});
// 如果依赖步骤失败,后续步骤无法执行
if (i > 0) {
// 补偿已完成的步骤
await this.compensate(results.slice(0, i));
return {
status: 'failed',
error_type: 'dependency_failed',
status_code: 424,
completed_steps: results.length,
failed_step: i + 1,
errors,
compensation_status: 'executed'
};
}
// 第一步失败,直接返回错误
return {
status: 'failed',
error_type: 'initial_failure',
status_code: 400,
failed_step: 1,
error: error.message
};
}
}
return {
status: 'success',
results
};
}
async compensate(completedSteps) {
// 执行补偿操作
for (const step of completedSteps.reverse()) {
try {
await this.reverseStep(step);
} catch (error) {
console.error(`Compensation failed for step ${step.step}:`, error);
}
}
}
async reverseStep(step) {
// 实现步骤回滚逻辑
switch (step.step) {
case 2: // 库存预留
await this.releaseInventory(step.result.inventory_id);
break;
case 3: // 支付处理
await this.refundPayment(step.result.payment_id);
break;
// 其他步骤的回滚…
}
}
}
23.18.5 最佳实践
-
实现原子操作或事务补偿
-
提供详细的失败依赖信息
-
设计幂等操作以便重试
-
监控依赖失败率以识别系统弱点
23.19 425 Too Early(太早)
23.19.1 定义与语义
425状态码来自RFC 8470,表示服务器不愿意处理请求,因为它可能被重放。用于防止重放攻击,特别是在0-RTT(零往返时间)TLS连接中。
23.19.2 TLS 1.3和0-RTT
TLS 1.3引入了0-RTT特性,允许客户端在TLS握手完成前发送数据。这带来了重放攻击的风险。
23.19.3 重放攻击防护
python
# Flask 0-RTT请求处理
from flask import Flask, request, jsonify
import hashlib
import time
from collections import deque
app = Flask(__name__)
class ReplayProtection:
def __init__(self, window_size=300): # 5分钟窗口
self.window_size = window_size
self.seen_requests = deque(maxlen=10000)
def is_replay(self, request_data, client_id):
"""检查是否为重放请求"""
# 创建请求指纹
fingerprint = self.create_fingerprint(request_data, client_id)
# 检查是否已见过
if fingerprint in self.seen_requests:
return True
# 添加到已见列表
self.seen_requests.append(fingerprint)
# 清理过期指纹(简化实现)
if len(self.seen_requests) % 100 == 0:
self.cleanup()
return False
def create_fingerprint(self, request_data, client_id):
"""创建请求指纹"""
components = [
client_id,
str(time.time() // self.window_size), # 时间窗口
request.method,
request.path,
hashlib.sha256(request.get_data()).hexdigest()[:16]
]
return hashlib.sha256('|'.join(components).encode()).hexdigest()
def cleanup(self):
"""清理过期指纹(简化)"""
# 实际实现需要基于时间戳清理
pass
replay_protection = ReplayProtection()
@app.before_request
def check_replay():
# 检查是否为0-RTT请求
early_data = request.headers.get('Early-Data') == '1'
if early_data:
# 提取客户端标识
client_id = request.headers.get('X-Client-ID') or request.remote_addr
# 检查重放
if replay_protection.is_replay(request, client_id):
return jsonify({
'error': 'too_early',
'message': 'Request may be replayed',
'retry_after': 1,
'hint': 'Wait 1 second and retry without early data'
}), 425
@app.route('/api/order', methods=['POST'])
def create_order():
# 检查是否为0-RTT请求
early_data = request.headers.get('Early-Data') == '1'
if early_data:
# 对于0-RTT请求,只允许幂等操作
# 或者执行额外验证
return jsonify({
'status': 'accepted_with_caution',
'message': 'Order accepted via 0-RTT, additional verification may be required',
'order_id': generate_order_id(),
'warning': 'This request was sent via 0-RTT TLS'
})
# 正常请求处理
return jsonify({
'status': 'created',
'order_id': generate_order_id()
})
def generate_order_id():
import uuid
return str(uuid.uuid4())
23.19.4 客户端实现
javascript
// 支持0-RTT的客户端
class EarlyDataClient {
constructor(baseURL) {
this.baseURL = baseURL;
this.earlyDataSupported = false;
this.earlyDataAttempts = new Map(); // requestId -> timestamp
}
async request(endpoint, options = {}) {
const requestId = this.generateRequestId();
const url = `${this.baseURL}${endpoint}`;
// 检查是否支持0-RTT
if (this.earlyDataSupported && this.isIdempotent(options.method)) {
// 尝试0-RTT请求
try {
const earlyOptions = {
…options,
headers: {
…options.headers,
'Early-Data': '1',
'X-Request-ID': requestId
}
};
this.earlyDataAttempts.set(requestId, Date.now());
const response = await fetch(url, earlyOptions);
if (response.status === 425) {
// 太早,需要重试
console.log('Received 425 Too Early, retrying without early data');
// 移除Early-Data头
const retryOptions = { …options };
if (retryOptions.headers) {
delete retryOptions.headers['Early-Data'];
}
// 等待建议的时间
const retryAfter = response.headers.get('Retry-After');
if (retryAfter) {
await this.delay(parseInt(retryAfter) * 1000);
}
return this.requestWithoutEarlyData(url, retryOptions);
}
// 检查是否为谨慎接受的响应
if (response.status === 200) {
const data = await response.json();
if (data.warning && data.warning.includes('0-RTT')) {
console.warn('Request accepted via 0-RTT with caution');
}
}
return response;
} catch (error) {
console.error('Early data request failed:', error);
// 回退到正常请求
return this.requestWithoutEarlyData(url, options);
}
}
// 正常请求
return this.requestWithoutEarlyData(url, options);
}
async requestWithoutEarlyData(url, options) {
// 移除可能的Early-Data头
const cleanOptions = { …options };
if (cleanOptions.headers) {
delete cleanOptions.headers['Early-Data'];
}
return fetch(url, cleanOptions);
}
isIdempotent(method) {
// 检查方法是否幂等
const idempotentMethods = ['GET', 'HEAD', 'PUT', 'DELETE', 'OPTIONS', 'TRACE'];
return idempotentMethods.includes(method.toUpperCase());
}
generateRequestId() {
return `${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;
}
delay(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
async detectEarlyDataSupport() {
// 检测服务器是否支持0-RTT
try {
const response = await fetch(`${this.baseURL}/.well-known/support-early-data`, {
method: 'HEAD'
});
this.earlyDataSupported = response.headers.get('Supports-Early-Data') === '1';
return this.earlyDataSupported;
} catch (error) {
this.earlyDataSupported = false;
return false;
}
}
}
23.19.5 最佳实践
-
只对幂等操作使用0-RTT
-
实现重放保护机制
-
为0-RTT请求提供明确的警告
-
监控425错误率以调整重放窗口
23.20 426 Upgrade Required(需要升级)
23.20.1 定义与语义
426状态码表示服务器拒绝使用当前协议处理请求,但愿意在客户端升级到不同协议后处理。通常用于协议升级协商。
23.20.2 协议升级机制
http
# 客户端请求升级
GET /chat HTTP/1.1
Host: server.example.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
Sec-WebSocket-Version: 13
# 服务器同意升级
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=
# 服务器拒绝升级(要求其他协议)
HTTP/1.1 426 Upgrade Required
Upgrade: TLS/1.2, HTTP/1.1
Connection: Upgrade
Content-Type: application/json
{
"error": "upgrade_required",
"message": "Please upgrade to a more secure protocol",
"supported_upgrades": [
{
"protocol": "TLS/1.2",
"description": "Transport Layer Security 1.2"
},
{
"protocol": "HTTP/2",
"description": "HTTP version 2"
}
]
}
23.20.3 使用场景
安全协议升级:要求从HTTP升级到HTTPS
API版本弃用:要求使用新版本API
传输协议升级:从HTTP/1.1升级到HTTP/2或HTTP/3
WebSocket握手:协商WebSocket协议
23.20.4 实现示例
python
# Flask协议升级处理
from flask import Flask, request, jsonify
import ssl
app = Flask(__name__)
@app.before_request
def require_secure_connection():
# 检查是否为HTTPS
if not request.is_secure:
# 检查是否支持升级
upgrade_header = request.headers.get('Upgrade')
if upgrade_header and 'TLS' in upgrade_header:
# 客户端已请求升级
return # 允许继续,将由底层服务器处理
# 返回426要求升级
response = jsonify({
'error': 'upgrade_required',
'message': 'Secure connection required',
'upgrade_to': 'HTTPS',
'url': f'https://{request.host}{request.path}',
'status_code': 301 # 也提供重定向选项
})
response.status_code = 426
response.headers['Upgrade'] = 'TLS/1.2, TLS/1.3'
response.headers['Connection'] = 'Upgrade'
return response
@app.route('/api/deprecated', methods=['GET'])
def deprecated_api():
"""已弃用的API端点"""
# 检查客户端是否使用新版本
api_version = request.headers.get('X-API-Version', '1.0')
if api_version == '1.0':
# 要求升级到v2
response = jsonify({
'error': 'api_version_deprecated',
'message': 'API v1.0 is deprecated',
'upgrade_to': 'v2.0',
'documentation': 'https://api.example.com/v2/docs',
'sunset_date': '2024-12-31'
})
response.status_code = 426
response.headers['X-API-Deprecated'] = 'true'
response.headers['X-API-Sunset'] = '2024-12-31T23:59:59Z'
return response
# 新版本API处理
return jsonify({'data': 'from v2 API'})
# WebSocket升级处理
@app.route('/ws')
def websocket_endpoint():
# Flask本身不直接处理WebSocket
# 这通常由专门的WebSocket服务器处理
# 以下为概念性代码
upgrade = request.headers.get('Upgrade', '').lower()
if upgrade == 'websocket':
# 检查WebSocket版本
ws_version = request.headers.get('Sec-WebSocket-Version')
if ws_version != '13':
response = jsonify({
'error': 'unsupported_websocket_version',
'message': f'WebSocket version {ws_version} not supported',
'supported_versions': ['13']
})
response.status_code = 426
response.headers['Sec-WebSocket-Version'] = '13'
return response
# WebSocket握手将由底层服务器处理
return '', 101 # 实际应由WebSocket服务器处理
return jsonify({'error': 'websocket_upgrade_required'}), 426
23.20.5 客户端处理
javascript
// 处理426升级的客户端
class UpgradeAwareClient {
constructor(baseURL) {
this.baseURL = baseURL;
this.upgradeHandlers = new Map();
// 注册升级处理器
this.registerUpgradeHandler('TLS', this.handleTlsUpgrade.bind(this));
this.registerUpgradeHandler('websocket', this.handleWebSocketUpgrade.bind(this));
this.registerUpgradeHandler('h2', this.handleHttp2Upgrade.bind(this));
}
async request(options) {
try {
const response = await fetch(this.baseURL + options.path, options);
if (response.status === 426) {
// 需要升级
return this.handleUpgradeResponse(response, options);
}
return response;
} catch (error) {
console.error('Request failed:', error);
throw error;
}
}
async handleUpgradeResponse(response, originalOptions) {
const upgradeHeader = response.headers.get('Upgrade');
if (!upgradeHeader) {
throw new Error('426 response without Upgrade header');
}
// 解析支持的升级选项
const upgrades = upgradeHeader.split(',').map(s => s.trim());
// 选择支持的升级
for (const upgrade of upgrades) {
const handler = this.upgradeHandlers.get(upgrade.toLowerCase());
if (handler) {
return handler(response, originalOptions);
}
}
throw new Error(`No handler for required upgrade: ${upgrades.join(', ')}`);
}
async handleTlsUpgrade(response, originalOptions) {
// 从HTTP升级到HTTPS
const httpsUrl = this.baseURL.replace('http://', 'https://');
console.log('Upgrading to HTTPS…');
// 重试请求到HTTPS端点
const upgradedOptions = { …originalOptions };
upgradedOptions.path = originalOptions.path;
return fetch(httpsUrl + upgradedOptions.path, upgradedOptions);
}
async handleWebSocketUpgrade(response, originalOptions) {
// 升级到WebSocket
const wsUrl = this.baseURL.replace('http', 'ws') + originalOptions.path;
console.log('Upgrading to WebSocket…');
// 创建WebSocket连接
return new Promise((resolve, reject) => {
const ws = new WebSocket(wsUrl);
ws.onopen = () => {
console.log('WebSocket connected');
// 如果原始请求是GET,可以解析为成功
if (originalOptions.method === 'GET') {
resolve({
status: 101,
ok: true,
webSocket: ws
});
} else {
// 对于其他方法,需要额外处理
reject(new Error('WebSocket upgrade only supported for GET requests'));
}
};
ws.onerror = (error) => {
reject(new Error(`WebSocket connection failed: ${error}`));
};
});
}
async handleHttp2Upgrade(response, originalOptions) {
// HTTP/2升级(浏览器中通常自动处理)
console.log('Upgrading to HTTP/2…');
// 对于浏览器,只需重试请求
return fetch(this.baseURL + originalOptions.path, originalOptions);
}
registerUpgradeHandler(protocol, handler) {
this.upgradeHandlers.set(protocol.toLowerCase(), handler);
}
}
23.20.6 最佳实践
-
提供清晰的升级说明和文档链接
-
支持多种升级选项
-
实现优雅降级
-
监控协议使用情况以计划弃用
23.21 428 Precondition Required(需要前提条件)
23.21.1 定义与语义
428状态码表示源服务器要求请求是条件性的。用于防止"丢失更新"问题,客户端在更新资源前必须先获取当前状态。
23.21.2 与412的区别
-
412 Precondition Failed:客户端提供了条件头,但条件不满足
-
428 Precondition Required:服务器要求条件请求,但客户端没有提供条件头
23.21.3 实现模式
python
# Django条件请求要求
from django.http import JsonResponse
from django.views import View
from django.utils.decorators import method_decorator
from django.views.decorators.http import require_http_methods
import hashlib
import json
class ConditionalUpdateMixin:
"""要求条件更新的Mixin"""
def require_conditional_request(self, request, *args, **kwargs):
"""检查是否满足条件请求要求"""
# 对于安全方法(GET、HEAD)不需要条件
if request.method in ['GET', 'HEAD', 'OPTIONS']:
return True
# 检查条件头
has_condition = any([
request.META.get('HTTP_IF_MATCH'),
request.META.get('HTTP_IF_NONE_MATCH'),
request.META.get('HTTP_IF_MODIFIED_SINCE'),
request.META.get('HTTP_IF_UNMODIFIED_SINCE')
])
if not has_condition:
# 返回428,要求条件请求
response = JsonResponse({
'error': 'precondition_required',
'message': 'This request requires conditional headers',
'required_headers': [
'If-Match or If-None-Match',
'If-Modified-Since or If-Unmodified-Since'
],
'how_to': 'First perform a GET request to get the current ETag, then include it in If-Match header'
})
response.status_code = 428
response['Precondition-Required'] = 'true'
return response
return True
class ResourceDetailView(ConditionalUpdateMixin, View):
"""需要条件更新的资源详情视图"""
def dispatch(self, request, *args, **kwargs):
# 检查条件请求要求
requirement_check = self.require_conditional_request(request, *args, **kwargs)
if requirement_check is not True:
return requirement_check
return super().dispatch(request, *args, **kwargs)
def get(self, request, resource_id):
"""获取资源(包含ETag)"""
resource = self.get_resource(resource_id)
# 生成ETag
etag = self.generate_etag(resource)
response = JsonResponse(resource)
response['ETag'] = etag
response['Last-Modified'] = resource['modified_at']
return response
def put(self, request, resource_id):
"""更新资源(需要If-Match)"""
resource = self.get_resource(resource_id)
current_etag = self.generate_etag(resource)
# 检查If-Match头
if_match = request.META.get('HTTP_IF_MATCH')
if not if_match:
# 这不应该发生,因为dispatch已经检查了
return JsonResponse({
'error': 'precondition_required',
'message': 'If-Match header is required'
}, status=428)
# 验证ETag
if if_match != current_etag and if_match != '*':
return JsonResponse({
'error': 'precondition_failed',
'message': 'Resource has been modified',
'current_etag': current_etag
}, status=412)
# 更新资源
updated_resource = self.update_resource(resource_id, request.body)
new_etag = self.generate_etag(updated_resource)
response = JsonResponse(updated_resource)
response['ETag'] = new_etag
return response
def get_resource(self, resource_id):
"""获取资源(简化实现)"""
return {
'id': resource_id,
'name': f'Resource {resource_id}',
'modified_at': '2024-01-15T10:30:00Z',
'data': 'Some data'
}
def generate_etag(self, resource):
"""生成ETag"""
content = json.dumps(resource, sort_keys=True)
return hashlib.md5(content.encode()).hexdigest()
def update_resource(self, resource_id, data):
"""更新资源(简化实现)"""
resource = self.get_resource(resource_id)
# 实际实现会解析和更新数据
resource['modified_at'] = '2024-01-15T11:00:00Z'
return resource
23.21.4 客户端流程
javascript
// 处理428的客户端
class ConditionalRequestClient {
async updateResource(resourceId, updates) {
// 1. 首先获取资源以获取ETag
const getResponse = await fetch(`/api/resources/${resourceId}`);
if (!getResponse.ok) {
throw new Error(`Failed to get resource: ${getResponse.status}`);
}
const etag = getResponse.headers.get('ETag');
const resource = await getResponse.json();
// 2. 使用ETag进行条件更新
const updateResponse = await fetch(`/api/resources/${resourceId}`, {
method: 'PUT',
headers: {
'Content-Type': 'application/json',
'If-Match': etag
},
body: JSON.stringify({
…resource,
…updates
})
});
if (updateResponse.status === 428) {
// 服务器要求条件请求,但我们已经提供了
// 这可能是配置错误
throw new Error('Server requires conditional request but rejected our If-Match header');
}
if (updateResponse.status === 412) {
// 前提条件失败(资源已被修改)
// 获取新版本并重试或通知用户
const error = await updateResponse.json();
throw new Error(`Update conflict: ${error.message}`);
}
if (!updateResponse.ok) {
throw new Error(`Update failed: ${updateResponse.status}`);
}
return updateResponse.json();
}
async safeUpdateWithRetry(resourceId, updates, maxRetries = 3) {
for (let attempt = 0; attempt < maxRetries; attempt++) {
try {
return await this.updateResource(resourceId, updates);
} catch (error) {
if (error.message.includes('Update conflict') && attempt < maxRetries – 1) {
// 冲突错误,等待后重试
console.log(`Update conflict, retrying (attempt ${attempt + 1})…`);
await this.delay(100 * Math.pow(2, attempt)); // 指数退避
continue;
}
throw error;
}
}
}
delay(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
}
23.21.5 最佳实践
-
为所有非安全方法要求条件请求
-
提供清晰的错误信息和解决步骤
-
实现乐观锁避免更新冲突
-
考虑支持弱ETag用于性能优化
23.22 429 Too Many Requests(请求过多)
注:虽然标题中未列出429,但由于它常与其他4xx状态码一起讨论,且非常重要,此处简要涵盖。
23.22.1 定义与语义
429状态码表示用户在给定时间内发送了太多请求("限流")。用于防止滥用和保证服务可用性。
23.22.2 限流头信息
http
HTTP/1.1 429 Too Many Requests
Content-Type: application/json
Retry-After: 3600
X-RateLimit-Limit: 100
X-RateLimit-Remaining: 0
X-RateLimit-Reset: 1673780400
{
"error": "rate_limit_exceeded",
"message": "Too many requests, please try again later",
"retry_after": 3600,
"limit": 100,
"period": "hour",
"documentation": "https://api.example.com/docs/rate-limiting"
}
23.22.3 限流策略
令牌桶算法:平滑限流,允许突发
漏桶算法:恒定速率处理
固定窗口计数器:简单计数,但可能允许瞬时超限
滑动窗口日志:精确但内存消耗大
分布式限流:用于微服务架构
23.23 431 Request Header Fields Too Large(请求头字段太大)
23.23.1 定义与语义
431状态码表示服务器不愿意处理请求,因为单个头字段或所有头字段的总大小超过限制。
23.23.2 常见限制
| Nginx | 4KB/8KB(取决于系统) | large_client_header_buffers |
| Apache | 8KB | LimitRequestFieldSize |
| IIS | 16KB | maxRequestLength |
| Node.js | 16KB | maxHeaderSize |
23.23.3 问题诊断
python
# 诊断头大小问题
def diagnose_header_size(request):
total_size = 0
header_sizes = {}
for key, value in request.headers.items():
size = len(key) + len(value) + 2 # +2 for ": "
header_sizes[key] = size
total_size += size
return {
'total_size': total_size,
'header_sizes': header_sizes,
'largest_headers': sorted(
header_sizes.items(),
key=lambda x: x[1],
reverse=True
)[:5]
}
# 中间件检查头大小
from django.http import JsonResponse
class HeaderSizeMiddleware:
def __init__(self, get_response):
self.get_response = get_response
self.max_header_size = 8 * 1024 # 8KB
def __call__(self, request):
# 计算请求头大小
header_size = sum(
len(key) + len(value) + 2
for key, value in request.headers.items()
)
if header_size > self.max_header_size:
return JsonResponse({
'error': 'request_header_fields_too_large',
'message': 'Request headers exceed size limit',
'max_size': self.max_header_size,
'actual_size': header_size,
'largest_headers': self.get_largest_headers(request)
}, status=431)
return self.get_response(request)
def get_largest_headers(self, request):
headers = []
for key, value in request.headers.items():
size = len(key) + len(value) + 2
headers.append({'name': key, 'size': size})
return sorted(headers, key=lambda x: x['size'], reverse=True)[:3]
23.23.4 客户端优化
javascript
// 优化请求头大小
class HeaderOptimizer {
constructor() {
this.essentialHeaders = new Set([
'authorization',
'content-type',
'accept',
'user-agent'
]);
}
optimizeHeaders(headers) {
const optimized = {};
let totalSize = 0;
for (const [key, value] of Object.entries(headers)) {
const lowerKey = key.toLowerCase();
// 只保留必要头
if (this.essentialHeaders.has(lowerKey) ||
lowerKey.startsWith('x-') ||
lowerKey.startsWith('sec-')) {
optimized[key] = value;
totalSize += key.length + value.length + 2;
}
}
// 检查是否超过限制
if (totalSize > 8 * 1024) { // 8KB
console.warn(`Request headers are large: ${totalSize} bytes`);
// 尝试压缩大值头
return this.compressHeaders(optimized);
}
return optimized;
}
compressHeaders(headers) {
const compressed = { …headers };
// 对于大值的自定义头,可以考虑压缩
for (const [key, value] of Object.entries(headers)) {
if (key.startsWith('X-') && value.length > 1024) {
// 使用Base64编码或其他压缩
compressed[key] = this.compressValue(value);
}
}
return compressed;
}
compressValue(value) {
// 简单压缩示例
if (typeof value === 'object') {
// 如果是对象,转为紧凑JSON
return JSON.stringify(value);
}
return value;
}
async makeRequest(url, options) {
const optimizedHeaders = this.optimizeHeaders(options.headers || {});
const optimizedOptions = {
…options,
headers: optimizedHeaders
};
const response = await fetch(url, optimizedOptions);
if (response.status === 431) {
// 头仍然太大,需要进一步优化
console.error('Headers still too large after optimization');
// 获取诊断信息
const error = await response.json();
console.error('Large headers:', error.largest_headers);
throw new Error('Request headers too large');
}
return response;
}
}
23.23.5 服务器配置
nginx
# Nginx头大小配置
http {
# 客户端头缓冲区大小
client_header_buffer_size 1k;
# 大客户端头缓冲区
large_client_header_buffers 4 8k;
# 请求行最大大小
client_max_body_size 10m;
# 特定位置更严格的限制
location /api/ {
# 更严格的头大小限制
large_client_header_buffers 2 4k;
# 自定义错误处理
error_page 431 /431.json;
location = /431.json {
internal;
default_type application/json;
return 200 '{"error":"headers_too_large","message":"Request headers exceed 4KB limit"}';
}
}
}
23.23.6 最佳实践
-
监控头大小分布以设置合理限制
-
提供清晰的错误信息指出哪些头太大
-
考虑支持头压缩(如HTTP/2 HPACK)
-
避免在头中存储大量数据
23.24 451 Unavailable For Legal Reasons(因法律原因不可用)
23.24.1 定义与语义
451状态码表示服务器由于法律原因无法提供请求的资源。引用自雷·布拉德伯里的《华氏451》,该状态码专门用于内容审查场景。
23.24.2 使用场景
政府审查:政府要求屏蔽的内容
版权问题:侵权内容移除
法院命令:根据法律命令屏蔽
地区限制:内容地理封锁
GDPR合规:数据保护要求移除
23.24.3 响应格式
http
HTTP/1.1 451 Unavailable For Legal Reasons
Content-Type: application/json
X-Censorship-Reason: government-order
X-Blocking-Authority: Ministry of Truth
X-Appeal-URL: https://example.com/appeal
Link: <https://eff.org/censorship>; rel="related"
{
"error": "unavailable_for_legal_reasons",
"message": "This resource is not available in your country due to legal restrictions",
"reference": "Government Order 2024-01",
"reason": "violates_local_laws",
"country": "DE",
"blocking_authority": "Federal Agency",
"block_date": "2024-01-15",
"appeal_process": {
"url": "https://example.com/appeal",
"email": "legal@example.com",
"deadline": "2024-02-15"
},
"alternatives": [
{
"description": "Similar content available in other regions",
"url": "https://global.example.com/content/123"
}
]
}
23.24.4 实现示例
python
# Django地理封锁和内容审查
from django.http import JsonResponse
from django.views import View
from django.utils.decorators import method_decorator
from django.views.decorators.vary import vary_on_headers
import geoip2.database
import json
class LegalComplianceMiddleware:
"""法律合规中间件"""
def __init__(self, get_response):
self.get_response = get_response
self.geoip_reader = geoip2.database.Reader('/path/to/GeoLite2-Country.mmdb')
self.blocked_content = self.load_blocked_content()
def __call__(self, request):
response = self.get_response(request)
# 检查是否需要地理封锁
if self.is_blocked_in_country(request):
return self.create_451_response(request)
return response
def is_blocked_in_country(self, request):
"""检查内容是否在用户所在国家被封锁"""
try:
# 获取用户国家
country_code = self.get_user_country(request)
# 获取请求的路径
path = request.path
# 检查是否在封锁列表中
if path in self.blocked_content:
blocked_in = self.blocked_content[path].get('blocked_countries', [])
return country_code in blocked_in
except Exception as e:
print(f"Error checking geo-block: {e}")
return False
def get_user_country(self, request):
"""从IP获取用户国家"""
# 从X-Forwarded-For或直接IP获取
ip = self.get_client_ip(request)
try:
response = self.geoip_reader.country(ip)
return response.country.iso_code
except:
# 默认国家或基于其他头判断
return request.META.get('HTTP_CF_IPCOUNTRY', 'US')
def get_client_ip(self, request):
"""获取客户端IP"""
x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
if x_forwarded_for:
ip = x_forwarded_for.split(',')[0]
else:
ip = request.META.get('REMOTE_ADDR')
return ip
def create_451_response(self, request):
"""创建451响应"""
path = request.path
blocking_info = self.blocked_content.get(path, {})
response = JsonResponse({
'error': 'unavailable_for_legal_reasons',
'message': blocking_info.get('message', 'Content not available in your region'),
'reason': blocking_info.get('reason', 'legal_restriction'),
'reference': blocking_info.get('reference'),
'country': self.get_user_country(request),
'blocking_authority': blocking_info.get('authority'),
'block_date': blocking_info.get('block_date'),
'appeal': blocking_info.get('appeal_url'),
'alternatives': blocking_info.get('alternatives', [])
})
response.status_code = 451
# 添加信息头
response['X-Censorship-Reason'] = blocking_info.get('reason', 'unknown')
if blocking_info.get('authority'):
response['X-Blocking-Authority'] = blocking_info['authority']
if blocking_info.get('appeal_url'):
response['X-Appeal-URL'] = blocking_info['appeal_url']
return response
def load_blocked_content(self):
"""加载封锁内容列表(从数据库或文件)"""
# 示例数据
return {
'/content/restricted-article': {
'blocked_countries': ['CN', 'RU', 'IR'],
'reason': 'government_order',
'authority': 'Ministry of Culture',
'reference': 'GO-2024-001',
'block_date': '2024-01-01',
'appeal_url': 'https://example.com/appeal/restricted-article',
'alternatives': [
{
'url': '/content/similar-article',
'description': 'Similar topic available'
}
]
}
}
class ContentDetailView(View):
"""内容详情视图(可能返回451)"""
@method_decorator(vary_on_headers('X-Country-Code'))
def get(self, request, content_id):
content = self.get_content(content_id)
if content.get('blocked', False):
# 检查用户国家
user_country = request.META.get('HTTP_X_COUNTRY_CODE', 'US')
if user_country in content.get('blocked_countries', []):
return JsonResponse({
'error': 'unavailable_for_legal_reasons',
'message': f'This content is not available in {user_country}',
'content_id': content_id,
'country': user_country
}, status=451)
return JsonResponse(content)
def get_content(self, content_id):
"""获取内容(简化实现)"""
return {
'id': content_id,
'title': 'Sample Content',
'blocked': True,
'blocked_countries': ['CN', 'RU'],
'available_countries': ['US', 'UK', 'DE']
}
23.24.5 透明度报告
json
{
"transparency_report": {
"period": "2024-Q1",
"total_requests": 1000000,
"451_responses": 1250,
"blocking_reasons": {
"government_order": 800,
"copyright": 300,
"court_order": 100,
"terms_violation": 50
},
"countries_affected": [
{"country": "CN", "blocks": 500},
{"country": "RU", "blocks": 300},
{"country": "IR", "blocks": 200},
{"country": "DE", "blocks": 50}
],
"appeals": {
"received": 200,
"granted": 50,
"denied": 100,
"pending": 50
},
"documentation": "https://example.com/transparency/2024-Q1"
}
}
23.24.6 最佳实践
-
提供详细的封锁原因和法律依据
-
实现透明的上诉流程
-
发布透明度报告
-
考虑用户隐私(避免过度追踪)
-
提供替代内容选项
总结
本章详细探讨了23个"其他"4xx HTTP状态码,每个状态码都有其特定的使用场景和最佳实践。从支付系统(402)到法律合规(451),这些状态码为现代Web开发和API设计提供了丰富的语义工具。
关键要点:
精确语义:使用正确的状态码可以提供更清晰的错误信息
用户体验:良好的错误响应可以指导用户解决问题
安全性:适当的状态码有助于防止滥用和攻击
合规性:特定状态码(如451)支持法律合规要求
互操作性:标准状态码促进系统间的互操作性
实施建议:
渐进采用:从最重要的状态码开始,逐步实现更多
文档化:为API消费者提供状态码的详细文档
监控:跟踪状态码分布以识别问题
测试:为各种错误场景编写测试用例
客户端处理:确保客户端能优雅处理所有状态码
网硕互联帮助中心



评论前必须登录!
注册