云计算百科
云计算领域专业知识百科平台

HTTP 状态码:客户端与服务器的通信语言——第四部分:客户端错误状态码(4xx)深度解读(三)

第22章:429 Too Many Requests – 速率限制实现深度分析

22.1 定义与语义

429 Too Many Requests 状态码表示用户在给定时间内发送了太多请求("速率限制")。该状态码的核心含义是:

  • 客户端请求频率超过服务器限制

  • 服务器暂时拒绝处理当前请求

  • 客户端应该在等待一段时间后重试

关键特性:

  • 速率限制:限制单位时间内的请求数量

  • 可恢复错误:等待一段时间后可恢复正常

  • 包含重试信息:响应通常包含何时可重试的指示

协议要求:

  • 应包含描述限制详情的响应体

  • 可以包含 Retry-After 头部指示重试时间

  • 可能包含速率限制配额信息

22.2 速率限制策略分类

22.2.1 常见速率限制算法

python

# 速率限制算法分类与实现
from abc import ABC, abstractmethod
import time
from typing import Dict, List, Optional, Tuple
import math
from collections import deque
import threading

class RateLimiter(ABC):
"""速率限制器抽象基类"""

@abstractmethod
def is_allowed(self, key: str, weight: int = 1) -> Tuple[bool, Dict]:
"""检查是否允许请求"""
pass

@abstractmethod
def get_remaining(self, key: str) -> int:
"""获取剩余配额"""
pass

@abstractmethod
def get_reset_time(self, key: str) -> float:
"""获取重置时间"""
pass

class TokenBucketLimiter(RateLimiter):
"""令牌桶算法实现"""

def __init__(self, capacity: int, refill_rate: float):
"""
Args:
capacity: 桶容量
refill_rate: 每秒补充的令牌数
"""
self.capacity = capacity
self.refill_rate = refill_rate
self.buckets: Dict[str, Dict] = {}
self.lock = threading.RLock()

def is_allowed(self, key: str, weight: int = 1) -> Tuple[bool, Dict]:
with self.lock:
current_time = time.time()

# 获取或创建桶
if key not in self.buckets:
self.buckets[key] = {
'tokens': self.capacity,
'last_refill': current_time
}

bucket = self.buckets[key]

# 补充令牌
time_passed = current_time – bucket['last_refill']
tokens_to_add = time_passed * self.refill_rate
bucket['tokens'] = min(self.capacity, bucket['tokens'] + tokens_to_add)
bucket['last_refill'] = current_time

# 检查是否有足够令牌
if bucket['tokens'] >= weight:
bucket['tokens'] -= weight
remaining = bucket['tokens']
reset_time = current_time + (self.capacity – bucket['tokens']) / self.refill_rate

return True, {
'remaining': int(remaining),
'reset_time': reset_time,
'limit': self.capacity
}
else:
# 计算需要等待的时间
tokens_needed = weight – bucket['tokens']
wait_time = tokens_needed / self.refill_rate
remaining = 0
reset_time = current_time + wait_time

return False, {
'remaining': remaining,
'reset_time': reset_time,
'retry_after': math.ceil(wait_time),
'limit': self.capacity
}

def get_remaining(self, key: str) -> int:
with self.lock:
if key not in self.buckets:
return self.capacity

bucket = self.buckets[key]
current_time = time.time()

# 补充令牌
time_passed = current_time – bucket['last_refill']
tokens_to_add = time_passed * self.refill_rate
tokens = min(self.capacity, bucket['tokens'] + tokens_to_add)

return int(tokens)

def get_reset_time(self, key: str) -> float:
with self.lock:
if key not in self.buckets:
return time.time()

bucket = self.buckets[key]
current_time = time.time()

# 计算令牌完全补充的时间
if bucket['tokens'] < self.capacity:
tokens_needed = self.capacity – bucket['tokens']
return current_time + tokens_needed / self.refill_rate
else:
return current_time

class FixedWindowLimiter(RateLimiter):
"""固定窗口算法实现"""

def __init__(self, requests_per_window: int, window_seconds: int):
"""
Args:
requests_per_window: 每个窗口允许的请求数
window_seconds: 窗口大小(秒)
"""
self.requests_per_window = requests_per_window
self.window_seconds = window_seconds
self.windows: Dict[str, Dict] = {}
self.lock = threading.RLock()

def is_allowed(self, key: str, weight: int = 1) -> Tuple[bool, Dict]:
with self.lock:
current_time = time.time()
window_start = math.floor(current_time / self.window_seconds) * self.window_seconds

# 获取或创建窗口
if key not in self.windows:
self.windows[key] = {
'window_start': window_start,
'count': 0
}

window = self.windows[key]

# 如果窗口已过期,重置
if window['window_start'] < window_start:
window['window_start'] = window_start
window['count'] = 0

# 检查是否超过限制
if window['count'] + weight <= self.requests_per_window:
window['count'] += weight
remaining = self.requests_per_window – window['count']
reset_time = window['window_start'] + self.window_seconds

return True, {
'remaining': remaining,
'reset_time': reset_time,
'limit': self.requests_per_window
}
else:
# 已超过限制
remaining = 0
reset_time = window['window_start'] + self.window_seconds
retry_after = math.ceil(reset_time – current_time)

return False, {
'remaining': remaining,
'reset_time': reset_time,
'retry_after': retry_after,
'limit': self.requests_per_window
}

def get_remaining(self, key: str) -> int:
with self.lock:
current_time = time.time()
window_start = math.floor(current_time / self.window_seconds) * self.window_seconds

if key not in self.windows:
return self.requests_per_window

window = self.windows[key]

# 如果窗口已过期,返回完整配额
if window['window_start'] < window_start:
return self.requests_per_window

return max(0, self.requests_per_window – window['count'])

def get_reset_time(self, key: str) -> float:
with self.lock:
current_time = time.time()
window_start = math.floor(current_time / self.window_seconds) * self.window_seconds

if key not in self.windows:
return window_start + self.window_seconds

window = self.windows[key]

# 如果窗口已过期,返回下一个窗口结束时间
if window['window_start'] < window_start:
return window_start + self.window_seconds

return window['window_start'] + self.window_seconds

class SlidingWindowLogLimiter(RateLimiter):
"""滑动窗口日志算法实现"""

def __init__(self, requests_per_window: int, window_seconds: int):
"""
Args:
requests_per_window: 每个窗口允许的请求数
window_seconds: 窗口大小(秒)
"""
self.requests_per_window = requests_per_window
self.window_seconds = window_seconds
self.logs: Dict[str, deque] = {}
self.lock = threading.RLock()

def is_allowed(self, key: str, weight: int = 1) -> Tuple[bool, Dict]:
with self.lock:
current_time = time.time()
window_start = current_time – self.window_seconds

# 获取或创建日志队列
if key not in self.logs:
self.logs[key] = deque()

log = self.logs[key]

# 清理过期记录
while log and log[0] < window_start:
log.popleft()

# 检查是否超过限制
if len(log) + weight <= self.requests_per_window:
# 允许请求,记录时间戳
for _ in range(weight):
log.append(current_time)

remaining = self.requests_per_window – len(log)
# 计算重置时间(最早请求的时间 + 窗口大小)
reset_time = log[0] + self.window_seconds if log else current_time + self.window_seconds

return True, {
'remaining': remaining,
'reset_time': reset_time,
'limit': self.requests_per_window
}
else:
# 已超过限制
remaining = 0
# 计算最早可重试时间
retry_time = log[0] + self.window_seconds if log else current_time
retry_after = math.ceil(retry_time – current_time)
reset_time = retry_time

return False, {
'remaining': remaining,
'reset_time': reset_time,
'retry_after': max(0, retry_after),
'limit': self.requests_per_window
}

def get_remaining(self, key: str) -> int:
with self.lock:
current_time = time.time()
window_start = current_time – self.window_seconds

if key not in self.logs:
return self.requests_per_window

log = self.logs[key]

# 清理过期记录
while log and log[0] < window_start:
log.popleft()

return max(0, self.requests_per_window – len(log))

def get_reset_time(self, key: str) -> float:
with self.lock:
current_time = time.time()
window_start = current_time – self.window_seconds

if key not in self.logs:
return current_time + self.window_seconds

log = self.logs[key]

# 清理过期记录
while log and log[0] < window_start:
log.popleft()

if log:
# 最早请求的时间 + 窗口大小
return log[0] + self.window_seconds
else:
return current_time + self.window_seconds

class AdaptiveRateLimiter:
"""自适应速率限制器"""

def __init__(self, base_limiter: RateLimiter, adaptation_config: Dict = None):
self.base_limiter = base_limiter
self.config = {
'min_requests_per_second': 1,
'max_requests_per_second': 100,
'scale_up_factor': 1.1, # 增加10%
'scale_down_factor': 0.9, # 减少10%
'scale_up_interval': 60, # 每60秒评估一次增加
'scale_down_threshold': 0.8, # 使用率80%时考虑增加
'load_shedding_threshold': 0.95, # 使用率95%时开始降级
** (adaptation_config or {})
}

self.usage_history = deque(maxlen=100)
self.adaptation_history = []
self.last_adaptation_time = time.time()

def is_allowed(self, key: str, weight: int = 1) -> Tuple[bool, Dict]:
# 检查基础限制器
allowed, result = self.base_limiter.is_allowed(key, weight)

# 记录使用情况
self.record_usage(key, allowed, result)

# 自适应调整
self.adaptive_adjustment()

return allowed, result

def record_usage(self, key: str, allowed: bool, result: Dict):
"""记录使用情况"""
current_time = time.time()

usage = {
'timestamp': current_time,
'key': key,
'allowed': allowed,
'remaining': result.get('remaining', 0),
'limit': result.get('limit', 0)
}

self.usage_history.append(usage)

def adaptive_adjustment(self):
"""自适应调整"""
current_time = time.time()

# 检查是否到达调整间隔
if current_time – self.last_adaptation_time < self.config['scale_up_interval']:
return

# 分析最近的使用情况
recent_history = [u for u in self.usage_history
if u['timestamp'] > current_time – self.config['scale_up_interval']]

if not recent_history:
return

# 计算平均使用率
total_requests = len(recent_history)
allowed_requests = sum(1 for u in recent_history if u['allowed'])
usage_rate = allowed_requests / total_requests if total_requests > 0 else 0

# 调整逻辑
if usage_rate > self.config['scale_down_threshold']:
# 使用率高,考虑增加限制
self.scale_up()
elif usage_rate < 0.5:
# 使用率低,考虑减少限制
self.scale_down()

self.last_adaptation_time = current_time

def scale_up(self):
"""增加速率限制"""
# 获取当前限制器配置
if isinstance(self.base_limiter, TokenBucketLimiter):
new_capacity = min(
self.base_limiter.capacity * self.config['scale_up_factor'],
self.config['max_requests_per_second'] * 60 # 转换为每分钟
)
self.base_limiter.capacity = new_capacity

self.adaptation_history.append({
'timestamp': time.time(),
'action': 'scale_up',
'new_capacity': new_capacity,
'reason': 'high_usage_rate'
})

elif isinstance(self.base_limiter, FixedWindowLimiter):
new_limit = min(
self.base_limiter.requests_per_window * self.config['scale_up_factor'],
self.config['max_requests_per_second'] * self.base_limiter.window_seconds
)
self.base_limiter.requests_per_window = int(new_limit)

self.adaptation_history.append({
'timestamp': time.time(),
'action': 'scale_up',
'new_limit': new_limit,
'reason': 'high_usage_rate'
})

def scale_down(self):
"""减少速率限制"""
if isinstance(self.base_limiter, TokenBucketLimiter):
new_capacity = max(
self.base_limiter.capacity * self.config['scale_down_factor'],
self.config['min_requests_per_second'] * 60
)
self.base_limiter.capacity = new_capacity

self.adaptation_history.append({
'timestamp': time.time(),
'action': 'scale_down',
'new_capacity': new_capacity,
'reason': 'low_usage_rate'
})

elif isinstance(self.base_limiter, FixedWindowLimiter):
new_limit = max(
self.base_limiter.requests_per_window * self.config['scale_down_factor'],
self.config['min_requests_per_second'] * self.base_limiter.window_seconds
)
self.base_limiter.requests_per_window = int(new_limit)

self.adaptation_history.append({
'timestamp': time.time(),
'action': 'scale_down',
'new_limit': new_limit,
'reason': 'low_usage_rate'
})

22.2.2 多层速率限制策略

python

# 多层速率限制策略
from typing import List, Dict, Optional, Tuple
import hashlib

class MultiLayerRateLimiter:
"""多层速率限制器"""

def __init__(self, layers_config: List[Dict]):
"""
Args:
layers_config: 各层配置列表,按顺序检查
[
{
'limiter_class': TokenBucketLimiter,
'args': [100, 1.0], # capacity, refill_rate
'kwargs': {},
'key_builder': 'ip', # ip, user, endpoint, custom
'cost': 1, # 请求成本
'name': 'ip_layer'
},

]
"""
self.layers = []
self.layer_configs = layers_config
self.init_layers()

def init_layers(self):
"""初始化各层限制器"""
for config in self.layer_configs:
limiter_class = config['limiter_class']
args = config.get('args', [])
kwargs = config.get('kwargs', {})

limiter = limiter_class(*args, **kwargs)

self.layers.append({
'limiter': limiter,
'key_builder': config.get('key_builder', 'ip'),
'cost': config.get('cost', 1),
'name': config.get('name', 'unnamed_layer'),
'priority': config.get('priority', 0)
})

# 按优先级排序
self.layers.sort(key=lambda x: x['priority'], reverse=True)

def is_allowed(self, request_info: Dict) -> Tuple[bool, Dict]:
"""
检查请求是否被允许

Args:
request_info: 请求信息
{
'ip': '127.0.0.1',
'user_id': 'user123',
'endpoint': '/api/users',
'method': 'GET',
'headers': {…}
}

Returns:
(allowed, result)
"""
results = []

for layer in self.layers:
# 构建键
key = self.build_key(layer['key_builder'], request_info)
cost = layer['cost']

# 检查该层限制
allowed, result = layer['limiter'].is_allowed(key, cost)
result['layer'] = layer['name']

results.append(result)

if not allowed:
# 该层被限制,立即返回
return False, {
'allowed': False,
'first_blocking_layer': layer['name'],
'details': results,
'retry_after': result.get('retry_after'),
'reset_time': result.get('reset_time')
}

# 所有层都允许
return True, {
'allowed': True,
'details': results
}

def build_key(self, key_builder: str, request_info: Dict) -> str:
"""构建限制键"""
if key_builder == 'ip':
return f"ip:{request_info.get('ip', 'unknown')}"

elif key_builder == 'user':
user_id = request_info.get('user_id')
if user_id:
return f"user:{user_id}"
else:
# 未认证用户使用IP
return f"ip:{request_info.get('ip', 'unknown')}"

elif key_builder == 'endpoint':
endpoint = request_info.get('endpoint', 'unknown')
method = request_info.get('method', 'GET')
return f"endpoint:{method}:{endpoint}"

elif key_builder == 'user_endpoint':
user_id = request_info.get('user_id', 'anonymous')
endpoint = request_info.get('endpoint', 'unknown')
method = request_info.get('method', 'GET')
return f"user_endpoint:{user_id}:{method}:{endpoint}"

elif key_builder == 'custom':
# 自定义键构建逻辑
custom_data = request_info.get('custom_key_data', '')
key_hash = hashlib.md5(custom_data.encode()).hexdigest()
return f"custom:{key_hash}"

else:
# 默认使用IP
return f"ip:{request_info.get('ip', 'unknown')}"

def get_quotas(self, request_info: Dict) -> Dict:
"""获取所有层的配额信息"""
quotas = {}

for layer in self.layers:
key = self.build_key(layer['key_builder'], request_info)
limiter = layer['limiter']

remaining = limiter.get_remaining(key)
reset_time = limiter.get_reset_time(key)

quotas[layer['name']] = {
'remaining': remaining,
'reset_time': reset_time,
'limit': self.get_layer_limit(layer),
'key_builder': layer['key_builder']
}

return quotas

def get_layer_limit(self, layer: Dict) -> int:
"""获取层的限制值"""
limiter = layer['limiter']

if isinstance(limiter, TokenBucketLimiter):
return limiter.capacity
elif isinstance(limiter, FixedWindowLimiter):
return limiter.requests_per_window
elif isinstance(limiter, SlidingWindowLogLimiter):
return limiter.requests_per_window
else:
return 0

def get_rate_limit_headers(self, request_info: Dict) -> Dict[str, str]:
"""获取速率限制头部信息"""
quotas = self.get_quotas(request_info)

# 找到最严格的限制
strictest_layer = None
strictest_remaining = float('inf')

for name, quota in quotas.items():
if quota['remaining'] < strictest_remaining:
strictest_remaining = quota['remaining']
strictest_layer = name

if strictest_layer:
quota = quotas[strictest_layer]

headers = {
'X-RateLimit-Limit': str(quota['limit']),
'X-RateLimit-Remaining': str(quota['remaining']),
'X-RateLimit-Reset': str(int(quota['reset_time']))
}

# 添加重试头部(如果剩余为0)
if quota['remaining'] <= 0:
retry_after = max(0, math.ceil(quota['reset_time'] – time.time()))
headers['Retry-After'] = str(retry_after)

return headers

return {}

# 使用示例
def create_production_rate_limiter():
"""创建生产环境使用的多层速率限制器"""

layers_config = [
# 第一层:IP基础限制(防止滥用)
{
'limiter_class': TokenBucketLimiter,
'args': [100, 1.0], # 100请求,每秒补充1个
'key_builder': 'ip',
'cost': 1,
'name': 'ip_basic',
'priority': 10
},

# 第二层:用户级别限制
{
'limiter_class': FixedWindowLimiter,
'args': [1000, 3600], # 1000请求/小时
'key_builder': 'user',
'cost': 1,
'name': 'user_hourly',
'priority': 20
},

# 第三层:端点级别限制
{
'limiter_class': SlidingWindowLogLimiter,
'args': [60, 60], # 60请求/分钟
'key_builder': 'endpoint',
'cost': 1,
'name': 'endpoint_minute',
'priority': 30
},

# 第四层:敏感端点更严格限制
{
'limiter_class': FixedWindowLimiter,
'args': [10, 60], # 10请求/分钟(用于敏感操作)
'key_builder': 'user_endpoint',
'cost': 1,
'name': 'sensitive_endpoint',
'priority': 40,
'condition': lambda req: req.get('endpoint', '').startswith('/api/admin/')
}
]

return MultiLayerRateLimiter(layers_config)

22.3 详细实现与最佳实践

22.3.1 分布式速率限制实现

python

# 基于Redis的分布式速率限制器
import redis
import json
import pickle
from typing import Optional, Tuple, Dict, Any
import time
import hashlib

class RedisRateLimiter:
"""基于Redis的分布式速率限制器"""

def __init__(self, redis_client: redis.Redis, namespace: str = "ratelimit"):
self.redis = redis_client
self.namespace = namespace
self.lua_scripts = self.load_lua_scripts()

def load_lua_scripts(self) -> Dict[str, str]:
"""加载Lua脚本(原子操作)"""

# 令牌桶算法的Lua脚本
token_bucket_script = """
local key = KEYS[1]
local capacity = tonumber(ARGV[1])
local refill_rate = tonumber(ARGV[2])
local weight = tonumber(ARGV[3])
local now = tonumber(ARGV[4])

local bucket = redis.call('HMGET', key, 'tokens', 'last_refill')
local tokens
local last_refill

if bucket[1] == false then
— 初始化桶
tokens = capacity
last_refill = now
redis.call('HMSET', key, 'tokens', tokens, 'last_refill', last_refill)
else
tokens = tonumber(bucket[1])
last_refill = tonumber(bucket[2])

— 补充令牌
local time_passed = now – last_refill
local tokens_to_add = time_passed * refill_rate
tokens = math.min(capacity, tokens + tokens_to_add)
last_refill = now
end

— 检查是否有足够令牌
if tokens >= weight then
tokens = tokens – weight
redis.call('HMSET', key, 'tokens', tokens, 'last_refill', last_refill)

— 计算重置时间
local reset_time = now + (capacity – tokens) / refill_rate

return {1, tokens, reset_time, capacity}
else
— 计算需要等待的时间
local tokens_needed = weight – tokens
local wait_time = tokens_needed / refill_rate
local reset_time = now + wait_time

return {0, tokens, reset_time, capacity, wait_time}
end
"""

# 固定窗口算法的Lua脚本
fixed_window_script = """
local key = KEYS[1]
local limit = tonumber(ARGV[1])
local window = tonumber(ARGV[2])
local weight = tonumber(ARGV[3])
local now = tonumber(ARGV[4])

local window_start = math.floor(now / window) * window
local window_key = key .. ':' .. tostring(window_start)

— 获取当前计数
local count = redis.call('GET', window_key)
if count == false then
count = 0
else
count = tonumber(count)
end

— 检查是否超过限制
if count + weight <= limit then
— 增加计数
redis.call('INCRBY', window_key, weight)
— 设置过期时间(窗口结束时)
redis.call('EXPIRE', window_key, window)

local remaining = limit – (count + weight)
local reset_time = window_start + window

return {1, remaining, reset_time, limit}
else
— 已超过限制
local remaining = 0
local reset_time = window_start + window
local retry_after = reset_time – now

return {0, remaining, reset_time, limit, retry_after}
end
"""

# 滑动窗口算法的Lua脚本
sliding_window_script = """
local key = KEYS[1]
local limit = tonumber(ARGV[1])
local window = tonumber(ARGV[2])
local weight = tonumber(ARGV[3])
local now = tonumber(ARGV[4])

local window_start = now – window

— 获取所有记录
local records = redis.call('ZRANGEBYSCORE', key, window_start, now)
local count = #records

— 检查是否超过限制
if count + weight <= limit then
— 添加新记录
for i = 1, weight do
redis.call('ZADD', key, now, now .. ':' .. i)
end

— 清理过期记录
redis.call('ZREMRANGEBYSCORE', key, 0, window_start)

— 设置过期时间
redis.call('EXPIRE', key, window)

— 计算剩余配额
local remaining = limit – (count + weight)

— 计算重置时间(最早记录的时间 + 窗口大小)
local reset_time
if count > 0 then
local oldest = redis.call('ZRANGE', key, 0, 0, 'WITHSCORES')[2]
reset_time = oldest + window
else
reset_time = now + window
end

return {1, remaining, reset_time, limit}
else
— 已超过限制
local remaining = 0

— 计算最早可重试时间
local reset_time
if count > 0 then
local oldest = redis.call('ZRANGE', key, 0, 0, 'WITHSCORES')[2]
reset_time = oldest + window
else
reset_time = now + window
end

local retry_after = reset_time – now

return {0, remaining, reset_time, limit, retry_after}
end
"""

return {
'token_bucket': self.redis.script_load(token_bucket_script),
'fixed_window': self.redis.script_load(fixed_window_script),
'sliding_window': self.redis.script_load(sliding_window_script)
}

def token_bucket_is_allowed(self, key: str, capacity: int,
refill_rate: float, weight: int = 1) -> Tuple[bool, Dict]:
"""令牌桶算法检查"""
redis_key = f"{self.namespace}:token_bucket:{key}"
now = time.time()

result = self.redis.evalsha(
self.lua_scripts['token_bucket'],
1, # key数量
redis_key,
capacity,
refill_rate,
weight,
now
)

allowed = bool(result[0])
tokens = result[1]
reset_time = result[2]
limit = result[3]

if allowed:
return True, {
'remaining': int(tokens),
'reset_time': reset_time,
'limit': limit
}
else:
wait_time = result[4]
return False, {
'remaining': int(tokens),
'reset_time': reset_time,
'retry_after': math.ceil(wait_time),
'limit': limit
}

def fixed_window_is_allowed(self, key: str, limit: int,
window_seconds: int, weight: int = 1) -> Tuple[bool, Dict]:
"""固定窗口算法检查"""
redis_key = f"{self.namespace}:fixed_window:{key}"
now = time.time()

result = self.redis.evalsha(
self.lua_scripts['fixed_window'],
1,
redis_key,
limit,
window_seconds,
weight,
now
)

allowed = bool(result[0])
remaining = result[1]
reset_time = result[2]
limit_val = result[3]

if allowed:
return True, {
'remaining': int(remaining),
'reset_time': reset_time,
'limit': limit_val
}
else:
retry_after = result[4]
return False, {
'remaining': int(remaining),
'reset_time': reset_time,
'retry_after': math.ceil(retry_after),
'limit': limit_val
}

def sliding_window_is_allowed(self, key: str, limit: int,
window_seconds: int, weight: int = 1) -> Tuple[bool, Dict]:
"""滑动窗口算法检查"""
redis_key = f"{self.namespace}:sliding_window:{key}"
now = time.time()

result = self.redis.evalsha(
self.lua_scripts['sliding_window'],
1,
redis_key,
limit,
window_seconds,
weight,
now
)

allowed = bool(result[0])
remaining = result[1]
reset_time = result[2]
limit_val = result[3]

if allowed:
return True, {
'remaining': int(remaining),
'reset_time': reset_time,
'limit': limit_val
}
else:
retry_after = result[4]
return False, {
'remaining': int(remaining),
'reset_time': reset_time,
'retry_after': math.ceil(retry_after),
'limit': limit_val
}

def get_quotas(self, key: str, algorithm: str = 'token_bucket',
**kwargs) -> Optional[Dict]:
"""获取配额信息"""
if algorithm == 'token_bucket':
capacity = kwargs.get('capacity', 100)
refill_rate = kwargs.get('refill_rate', 1.0)

redis_key = f"{self.namespace}:token_bucket:{key}"
bucket = self.redis.hgetall(redis_key)

if not bucket:
return {
'remaining': capacity,
'limit': capacity,
'reset_time': time.time()
}

tokens = float(bucket.get(b'tokens', 0))
last_refill = float(bucket.get(b'last_refill', time.time()))

# 补充令牌
now = time.time()
time_passed = now – last_refill
tokens_to_add = time_passed * refill_rate
tokens = min(capacity, tokens + tokens_to_add)

# 计算重置时间
if tokens < capacity:
reset_time = now + (capacity – tokens) / refill_rate
else:
reset_time = now

return {
'remaining': int(tokens),
'limit': capacity,
'reset_time': reset_time
}

elif algorithm == 'fixed_window':
limit = kwargs.get('limit', 100)
window_seconds = kwargs.get('window_seconds', 60)

now = time.time()
window_start = math.floor(now / window_seconds) * window_seconds
redis_key = f"{self.namespace}:fixed_window:{key}:{window_start}"

count = int(self.redis.get(redis_key) or 0)
remaining = max(0, limit – count)
reset_time = window_start + window_seconds

return {
'remaining': remaining,
'limit': limit,
'reset_time': reset_time
}

return None

def reset_limits(self, key: str, algorithm: str = 'token_bucket'):
"""重置限制"""
if algorithm == 'token_bucket':
redis_key = f"{self.namespace}:token_bucket:{key}"
self.redis.delete(redis_key)
elif algorithm == 'fixed_window':
# 删除所有相关的窗口键
pattern = f"{self.namespace}:fixed_window:{key}:*"
keys = self.redis.keys(pattern)
if keys:
self.redis.delete(*keys)
elif algorithm == 'sliding_window':
redis_key = f"{self.namespace}:sliding_window:{key}"
self.redis.delete(redis_key)

class DistributedRateLimiter:
"""分布式速率限制管理器"""

def __init__(self, redis_client: redis.Redis, config: Dict):
self.redis = redis_client
self.config = config
self.redis_limiter = RedisRateLimiter(redis_client)

# 本地缓存,减少Redis访问
self.local_cache = {}
self.cache_ttl = 1 # 本地缓存1秒

def is_allowed(self, identifier: str, rule_name: str) -> Tuple[bool, Dict]:
"""检查是否允许请求"""

# 检查本地缓存
cache_key = f"{identifier}:{rule_name}"
cached = self.local_cache.get(cache_key)
if cached and time.time() – cached['timestamp'] < self.cache_ttl:
return cached['allowed'], cached['result']

# 获取规则配置
rule = self.config.get('rules', {}).get(rule_name)
if not rule:
# 没有规则配置,默认允许
return True, {'allowed': True, 'rule': rule_name}

algorithm = rule.get('algorithm', 'token_bucket')
params = rule.get('params', {})

# 根据算法调用相应的Redis限制器
if algorithm == 'token_bucket':
allowed, result = self.redis_limiter.token_bucket_is_allowed(
f"{rule_name}:{identifier}",
params.get('capacity', 100),
params.get('refill_rate', 1.0),
params.get('cost', 1)
)
elif algorithm == 'fixed_window':
allowed, result = self.redis_limiter.fixed_window_is_allowed(
f"{rule_name}:{identifier}",
params.get('limit', 100),
params.get('window_seconds', 60),
params.get('cost', 1)
)
elif algorithm == 'sliding_window':
allowed, result = self.redis_limiter.sliding_window_is_allowed(
f"{rule_name}:{identifier}",
params.get('limit', 100),
params.get('window_seconds', 60),
params.get('cost', 1)
)
else:
# 未知算法,默认允许
return True, {'allowed': True, 'rule': rule_name}

# 添加规则信息到结果
result['rule'] = rule_name
result['algorithm'] = algorithm

# 缓存结果
self.local_cache[cache_key] = {
'allowed': allowed,
'result': result,
'timestamp': time.time()
}

return allowed, result

def get_all_quotas(self, identifier: str) -> Dict:
"""获取所有规则的配额信息"""
quotas = {}

for rule_name, rule in self.config.get('rules', {}).items():
algorithm = rule.get('algorithm', 'token_bucket')
params = rule.get('params', {})

quota = self.redis_limiter.get_quotas(
f"{rule_name}:{identifier}",
algorithm,
**params
)

if quota:
quotas[rule_name] = {
**quota,
'algorithm': algorithm,
'rule': rule_name
}

return quotas

def create_rate_limit_response(self, identifier: str, rule_name: str,
result: Dict) -> Tuple[Dict, Dict]:
"""创建速率限制响应"""

if result.get('allowed', True):
# 允许请求,添加速率限制头部
headers = {
'X-RateLimit-Limit': str(result.get('limit', 0)),
'X-RateLimit-Remaining': str(result.get('remaining', 0)),
'X-RateLimit-Reset': str(int(result.get('reset_time', time.time()))),
'X-RateLimit-Rule': rule_name
}

return {'allowed': True}, headers

else:
# 请求被限制
retry_after = result.get('retry_after', 60)
reset_time = result.get('reset_time', time.time() + retry_after)

# 响应体
response_body = {
'error': {
'code': 'RATE_LIMIT_EXCEEDED',
'message': 'Too many requests',
'details': {
'rule': rule_name,
'retry_after': retry_after,
'reset_time': reset_time,
'limit': result.get('limit', 0),
'remaining': result.get('remaining', 0)
}
}
}

# 响应头部
headers = {
'Retry-After': str(retry_after),
'X-RateLimit-Limit': str(result.get('limit', 0)),
'X-RateLimit-Remaining': str(result.get('remaining', 0)),
'X-RateLimit-Reset': str(int(reset_time)),
'X-RateLimit-Rule': rule_name
}

return response_body, headers

# 配置示例
RATE_LIMIT_CONFIG = {
'rules': {
'ip_global': {
'algorithm': 'token_bucket',
'params': {
'capacity': 1000,
'refill_rate': 10.0, # 10请求/秒
'cost': 1
},
'description': '全局IP限制'
},
'user_auth': {
'algorithm': 'fixed_window',
'params': {
'limit': 10,
'window_seconds': 60, # 10请求/分钟
'cost': 1
},
'description': '用户认证限制'
},
'api_endpoint': {
'algorithm': 'sliding_window',
'params': {
'limit': 60,
'window_seconds': 60, # 60请求/分钟
'cost': 1
},
'description': 'API端点限制'
},
'upload_endpoint': {
'algorithm': 'token_bucket',
'params': {
'capacity': 10,
'refill_rate': 0.1, # 每10秒1个请求
'cost': 1
},
'description': '上传端点限制'
}
},
'default_rule': 'ip_global'
}

22.3.2 智能速率限制中间件

python

# 智能速率限制中间件
from flask import Flask, request, jsonify, g
import time
from functools import wraps
import uuid

class IntelligentRateLimitMiddleware:
"""智能速率限制中间件"""

def __init__(self, app: Flask = None, config: Dict = None):
self.app = app
self.config = {
'enabled': True,
'default_limits': {
'ip': '100 per hour',
'user': '1000 per day',
'endpoint': '60 per minute'
},
'exempt_paths': ['/health', '/metrics'],
'exempt_methods': ['OPTIONS'],
'cost_calculator': self.default_cost_calculator,
'identifier_extractor': self.default_identifier_extractor,
'response_handler': self.default_response_handler,
'adaptive_enabled': True,
'anomaly_detection_enabled': True,
** (config or {})
}

# 初始化限制器
self.rate_limiter = DistributedRateLimiter(
redis_client=redis.Redis(),
config=RATE_LIMIT_CONFIG
)

# 异常检测器
self.anomaly_detector = RateLimitAnomalyDetector()

# 自适应调整器
self.adaptive_adjuster = AdaptiveRateAdjuster()

if app:
self.init_app(app)

def init_app(self, app: Flask):
"""初始化Flask应用"""
self.app = app

# 注册中间件
@app.before_request
def rate_limit_check():
self.check_rate_limit()

# 注册错误处理器
@app.errorhandler(429)
def handle_rate_limit(e):
return self.handle_rate_limit_error(e)

def check_rate_limit(self):
"""检查速率限制"""

# 检查是否启用
if not self.config['enabled']:
return

# 检查豁免路径
if request.path in self.config['exempt_paths']:
return

# 检查豁免方法
if request.method in self.config['exempt_methods']:
return

# 提取标识符
identifiers = self.extract_identifiers(request)

# 确定适用的规则
rules = self.determine_applicable_rules(request, identifiers)

# 检查每个规则
for rule_name, identifier in rules:
allowed, result = self.rate_limiter.is_allowed(identifier, rule_name)

if not allowed:
# 检测异常行为
if self.config['anomaly_detection_enabled']:
self.anomaly_detector.record_violation(
identifier, rule_name, request
)

# 触发速率限制
self.trigger_rate_limit(identifier, rule_name, result)
break

# 记录请求(用于自适应调整)
if self.config['adaptive_enabled']:
self.adaptive_adjuster.record_request(request, identifiers)

def extract_identifiers(self, request) -> Dict[str, str]:
"""提取标识符"""
identifiers = {}

# 提取IP
if request.headers.get('X-Forwarded-For'):
ip = request.headers['X-Forwarded-For'].split(',')[0].strip()
else:
ip = request.remote_addr
identifiers['ip'] = ip

# 提取用户ID(如果已认证)
if hasattr(g, 'user') and g.user:
identifiers['user'] = g.user.id
elif request.headers.get('X-User-ID'):
identifiers['user'] = request.headers['X-User-ID']

# 提取API密钥
if request.headers.get('X-API-Key'):
identifiers['api_key'] = request.headers['X-API-Key']

# 提取会话ID
if request.cookies.get('session_id'):
identifiers['session'] = request.cookies['session_id']

return identifiers

def determine_applicable_rules(self, request, identifiers: Dict) -> List[Tuple[str, str]]:
"""确定适用的规则"""
rules = []

# 基础规则:IP限制
if 'ip' in identifiers:
rules.append(('ip_global', identifiers['ip']))

# 用户规则
if 'user' in identifiers:
# 用户特定限制
rules.append(('user_auth', identifiers['user']))

# 端点特定用户限制
endpoint_rule = f"user_endpoint:{request.path}"
rules.append((endpoint_rule, identifiers['user']))

# API密钥规则
if 'api_key' in identifiers:
rules.append(('api_key', identifiers['api_key']))

# 端点规则
endpoint_key = f"endpoint:{request.method}:{request.path}"
rules.append(('api_endpoint', endpoint_key))

# 特殊端点规则
if request.path.startswith('/api/upload'):
rules.append(('upload_endpoint', identifiers.get('ip', 'unknown')))

return rules

def default_cost_calculator(self, request) -> int:
"""默认成本计算器"""
# 根据请求大小、复杂度等计算成本
cost = 1

# 请求体大小成本
content_length = request.content_length or 0
if content_length > 1024 * 1024: # 1MB
cost += 1
elif content_length > 10 * 1024 * 1024: # 10MB
cost += 5

# 复杂操作成本
if request.method in ['POST', 'PUT', 'DELETE']:
cost += 1

# 敏感端点成本
if request.path.startswith('/api/admin/'):
cost += 2

return cost

def default_identifier_extractor(self, request) -> str:
"""默认标识符提取器"""
# 使用IP作为默认标识符
if request.headers.get('X-Forwarded-For'):
return request.headers['X-Forwarded-For'].split(',')[0].strip()
return request.remote_addr

def default_response_handler(self, rule_name: str, result: Dict) -> Tuple[Dict, int, Dict]:
"""默认响应处理器"""
retry_after = result.get('retry_after', 60)

response_body = {
'error': {
'code': 'RATE_LIMIT_EXCEEDED',
'message': 'Too many requests. Please try again later.',
'details': {
'rule': rule_name,
'retry_after': retry_after,
'reset_time': result.get('reset_time'),
'limit': result.get('limit'),
'remaining': result.get('remaining')
}
}
}

headers = {
'Retry-After': str(retry_after),
'X-RateLimit-Limit': str(result.get('limit', 0)),
'X-RateLimit-Remaining': str(result.get('remaining', 0)),
'X-RateLimit-Reset': str(int(result.get('reset_time', time.time()))),
'X-RateLimit-Rule': rule_name
}

return response_body, 429, headers

def trigger_rate_limit(self, identifier: str, rule_name: str, result: Dict):
"""触发速率限制"""
# 记录限制事件
self.log_rate_limit_event(identifier, rule_name, result)

# 生成响应
response_body, status_code, headers = self.config['response_handler'](
rule_name, result
)

# 抛出异常,由错误处理器处理
from werkzeug.exceptions import TooManyRequests
raise TooManyRequests(response=jsonify(response_body), headers=headers)

def log_rate_limit_event(self, identifier: str, rule_name: str, result: Dict):
"""记录速率限制事件"""
event = {
'timestamp': time.time(),
'identifier': identifier,
'rule': rule_name,
'remaining': result.get('remaining', 0),
'limit': result.get('limit', 0),
'retry_after': result.get('retry_after', 0),
'request_path': request.path,
'request_method': request.method,
'user_agent': request.headers.get('User-Agent'),
'client_ip': request.remote_addr
}

# 在实际应用中,这里会记录到日志系统
print(f"[Rate Limit] {json.dumps(event)}")

# 写入文件日志
with open('rate_limit_events.log', 'a') as f:
f.write(json.dumps(event) + '\\n')

def handle_rate_limit_error(self, error):
"""处理速率限制错误"""
# 从错误中提取响应和头部
response = error.response
return response

def get_rate_limit_info(self, identifier: str) -> Dict:
"""获取速率限制信息"""
return self.rate_limiter.get_all_quotas(identifier)

class RateLimitAnomalyDetector:
"""速率限制异常检测器"""

def __init__(self):
self.violation_history = {}
self.anomaly_patterns = {}

def record_violation(self, identifier: str, rule_name: str, request):
"""记录违规"""
key = f"{identifier}:{rule_name}"

if key not in self.violation_history:
self.violation_history[key] = {
'count': 0,
'first_violation': time.time(),
'last_violation': time.time(),
'requests': []
}

record = self.violation_history[key]
record['count'] += 1
record['last_violation'] = time.time()

# 记录请求详情
request_info = {
'timestamp': time.time(),
'path': request.path,
'method': request.method,
'user_agent': request.headers.get('User-Agent'),
'client_ip': request.remote_addr
}
record['requests'].append(request_info)

# 检测异常模式
self.detect_anomaly(key, record)

def detect_anomaly(self, key: str, record: Dict):
"""检测异常模式"""

# 检测频繁违规
time_window = 300 # 5分钟
violations_in_window = sum(
1 for req in record['requests']
if time.time() – req['timestamp'] < time_window
)

if violations_in_window > 10:
# 频繁违规,可能是恶意攻击
self.trigger_anomaly_alert(key, 'frequent_violations', {
'violations_in_window': violations_in_window,
'time_window': time_window
})

# 检测分布式攻击模式
if self.is_distributed_attack_pattern(record):
self.trigger_anomaly_alert(key, 'distributed_attack', {
'request_patterns': len(set(r['client_ip'] for r in record['requests']))
})

def is_distributed_attack_pattern(self, record: Dict) -> bool:
"""检测分布式攻击模式"""
if len(record['requests']) < 20:
return False

# 检查是否来自多个IP地址
unique_ips = set(r['client_ip'] for r in record['requests'])
if len(unique_ips) > 5:
return True

# 检查是否有规律的请求模式
timestamps = [r['timestamp'] for r in record['requests']]
if len(timestamps) >= 10:
intervals = [timestamps[i+1] – timestamps[i] for i in range(len(timestamps)-1)]
avg_interval = sum(intervals) / len(intervals)

# 检查间隔是否过于规律
if avg_interval > 0:
deviation = sum(abs(i – avg_interval) for i in intervals) / len(intervals)
if deviation / avg_interval < 0.1: # 偏差小于10%
return True

return False

def trigger_anomaly_alert(self, key: str, anomaly_type: str, details: Dict):
"""触发异常警报"""
alert = {
'timestamp': time.time(),
'key': key,
'type': anomaly_type,
'details': details,
'severity': 'high'
}

print(f"[Rate Limit Anomaly] {json.dumps(alert)}")

with open('rate_limit_anomalies.log', 'a') as f:
f.write(json.dumps(alert) + '\\n')

class AdaptiveRateAdjuster:
"""自适应速率调整器"""

def __init__(self):
self.request_history = deque(maxlen=1000)
self.adjustment_history = []

def record_request(self, request, identifiers: Dict):
"""记录请求"""
request_record = {
'timestamp': time.time(),
'path': request.path,
'method': request.method,
'identifiers': identifiers,
'response_time': None, # 将在响应时填充
'status_code': None
}

self.request_history.append(request_record)

# 定期分析并调整
if len(self.request_history) % 100 == 0:
self.analyze_and_adjust()

def analyze_and_adjust(self):
"""分析并调整速率限制"""
# 分析请求模式
recent_requests = list(self.request_history)[-100:] # 最近100个请求

if not recent_requests:
return

# 计算请求速率
time_window = recent_requests[-1]['timestamp'] – recent_requests[0]['timestamp']
request_rate = len(recent_requests) / time_window if time_window > 0 else 0

# 计算错误率
error_requests = [r for r in recent_requests if r.get('status_code', 200) >= 400]
error_rate = len(error_requests) / len(recent_requests) if recent_requests else 0

# 根据分析结果调整
if error_rate > 0.1: # 错误率超过10%
# 降低限制,可能是服务器压力大
self.adjust_limits('decrease', {
'reason': 'high_error_rate',
'error_rate': error_rate,
'request_rate': request_rate
})
elif request_rate > 100 and error_rate < 0.01: # 高请求率,低错误率
# 可以考虑增加限制
self.adjust_limits('increase', {
'reason': 'high_throughput_low_errors',
'request_rate': request_rate,
'error_rate': error_rate
})

def adjust_limits(self, direction: str, context: Dict):
"""调整限制"""
adjustment = {
'timestamp': time.time(),
'direction': direction,
'context': context
}

self.adjustment_history.append(adjustment)

print(f"[Rate Limit Adjustment] {json.dumps(adjustment)}")

22.4 客户端处理策略

22.4.1 智能重试与退避策略

javascript

// 客户端速率限制处理策略
class RateLimitAwareClient {
constructor(config = {}) {
this.config = {
baseDelay: 1000,
maxDelay: 60000,
backoffFactor: 2,
jitter: 0.1,
maxRetries: 5,
respectRetryAfter: true,
quotaMonitoring: true,
adaptiveBackoff: true,
…config
};

this.rateLimitInfo = new Map();
this.retryQueue = [];
this.quotaHistory = [];
this.circuitBreakers = new Map();

// 初始化监控
this.initQuotaMonitoring();
}

async request(endpoint, options = {}) {
const requestId = this.generateRequestId();
const startTime = Date.now();

// 检查速率限制状态
const rateLimitCheck = this.checkRateLimitState(endpoint, options);
if (!rateLimitCheck.allowed) {
// 速率限制已触发,等待或拒绝
if (options.ignoreRateLimit) {
return this.executeRequest(requestId, endpoint, options);
} else {
return this.handleRateLimitedRequest(requestId, endpoint, options, rateLimitCheck);
}
}

// 执行请求
try {
const response = await this.executeRequest(requestId, endpoint, options);

// 更新速率限制信息
this.updateRateLimitInfo(endpoint, response);

return response;

} catch (error) {
// 处理错误
if (error.name === 'RateLimitError') {
return this.handleRateLimitError(requestId, endpoint, options, error);
}

throw error;
}
}

checkRateLimitState(endpoint, options) {
const endpointKey = this.getEndpointKey(endpoint, options);
const info = this.rateLimitInfo.get(endpointKey);

if (!info) {
return { allowed: true };
}

// 检查剩余配额
if (info.remaining <= 0) {
const now = Date.now();
const resetTime = info.resetTime * 1000; // 转换为毫秒

if (now < resetTime) {
// 仍在限制期内
const retryAfter = Math.ceil((resetTime – now) / 1000);

return {
allowed: false,
reason: 'quota_exhausted',
retryAfter,
resetTime
};
} else {
// 限制已过期
return { allowed: true };
}
}

// 检查电路断路器
const circuitKey = this.getCircuitKey(endpoint, options);
if (!this.isCircuitClosed(circuitKey)) {
return {
allowed: false,
reason: 'circuit_open',
circuitKey
};
}

return { allowed: true };
}

async executeRequest(requestId, endpoint, options) {
const controller = new AbortController();
const timeoutId = setTimeout(() => controller.abort(), options.timeout || 30000);

try {
const response = await fetch(endpoint, {
…options,
signal: controller.signal,
headers: {
…options.headers,
'X-Request-ID': requestId,
'X-Client-Version': '1.0.0'
}
});

clearTimeout(timeoutId);

// 检查速率限制头部
this.extractRateLimitHeaders(endpoint, response);

// 检查状态码
if (response.status === 429) {
throw new RateLimitError('Rate limit exceeded', response);
}

return response;

} catch (error) {
clearTimeout(timeoutId);
throw error;
}
}

extractRateLimitHeaders(endpoint, response) {
const endpointKey = this.getEndpointKey(endpoint);

const limit = response.headers.get('X-RateLimit-Limit');
const remaining = response.headers.get('X-RateLimit-Remaining');
const reset = response.headers.get('X-RateLimit-Reset');
const retryAfter = response.headers.get('Retry-After');
const rule = response.headers.get('X-RateLimit-Rule');

if (limit !== null && remaining !== null && reset !== null) {
this.rateLimitInfo.set(endpointKey, {
limit: parseInt(limit, 10),
remaining: parseInt(remaining, 10),
resetTime: parseInt(reset, 10),
rule: rule,
lastUpdated: Date.now()
});

// 记录配额历史
this.recordQuotaHistory(endpointKey, {
limit: parseInt(limit, 10),
remaining: parseInt(remaining, 10),
timestamp: Date.now()
});
}

// 处理Retry-After头部
if (retryAfter !== null && this.config.respectRetryAfter) {
const retryAfterSeconds = parseInt(retryAfter, 10);
this.scheduleRetryDelay(endpointKey, retryAfterSeconds * 1000);
}
}

async handleRateLimitedRequest(requestId, endpoint, options, rateLimitCheck) {
const endpointKey = this.getEndpointKey(endpoint, options);

// 根据原因采取不同策略
switch (rateLimitCheck.reason) {
case 'quota_exhausted':
return this.handleQuotaExhausted(requestId, endpoint, options, rateLimitCheck);

case 'circuit_open':
return this.handleCircuitOpen(requestId, endpoint, options, rateLimitCheck);

default:
throw new RateLimitError(`Rate limited: ${rateLimitCheck.reason}`);
}
}

async handleQuotaExhausted(requestId, endpoint, options, rateLimitCheck) {
const { retryAfter, resetTime } = rateLimitCheck;

// 计算等待时间
let waitTime = retryAfter * 1000; // 转换为毫秒

if (this.config.adaptiveBackoff) {
// 自适应调整等待时间
waitTime = this.calculateAdaptiveWaitTime(endpoint, waitTime);
}

// 添加抖动
waitTime = this.addJitter(waitTime);

// 记录等待
this.logRateLimitWait(endpoint, waitTime, 'quota_exhausted');

// 等待后重试
await this.sleep(waitTime);

// 重试请求
return this.request(endpoint, {
…options,
retryCount: (options.retryCount || 0) + 1
});
}

async handleRateLimitError(requestId, endpoint, options, error) {
const retryCount = options.retryCount || 0;

if (retryCount >= this.config.maxRetries) {
throw new MaxRetriesError('Maximum retries exceeded', error);
}

// 从响应中提取Retry-After
let retryAfter = 60; // 默认60秒

if (error.response) {
const retryAfterHeader = error.response.headers.get('Retry-After');
if (retryAfterHeader) {
retryAfter = parseInt(retryAfterHeader, 10);
}
}

// 计算退避延迟
const delay = this.calculateBackoffDelay(retryCount, retryAfter);

// 记录重试
this.logRetry(endpoint, retryCount, delay, 'rate_limit');

// 等待后重试
await this.sleep(delay);

return this.request(endpoint, {
…options,
retryCount: retryCount + 1
});
}

calculateBackoffDelay(retryCount, retryAfter = 0) {
// 指数退避
let delay = this.config.baseDelay * Math.pow(this.config.backoffFactor, retryCount);

// 考虑Retry-After头部
if (retryAfter > 0) {
delay = Math.max(delay, retryAfter * 1000);
}

// 限制最大延迟
delay = Math.min(delay, this.config.maxDelay);

// 添加抖动
delay = this.addJitter(delay);

return delay;
}

calculateAdaptiveWaitTime(endpoint, baseWaitTime) {
const endpointKey = this.getEndpointKey(endpoint);
const history = this.getQuotaHistory(endpointKey);

if (history.length < 5) {
return baseWaitTime;
}

// 分析历史配额使用模式
const recentUsage = history.slice(-10);
const avgUsageRate = this.calculateUsageRate(recentUsage);

// 根据使用率调整等待时间
if (avgUsageRate > 0.8) {
// 高使用率,增加等待时间
return baseWaitTime * 1.5;
} else if (avgUsageRate < 0.3) {
// 低使用率,减少等待时间
return baseWaitTime * 0.7;
}

return baseWaitTime;
}

calculateUsageRate(quotaHistory) {
if (quotaHistory.length < 2) {
return 0;
}

let totalUsage = 0;
let totalTime = 0;

for (let i = 1; i < quotaHistory.length; i++) {
const prev = quotaHistory[i – 1];
const curr = quotaHistory[i];

const usage = prev.remaining – curr.remaining;
const timeDiff = curr.timestamp – prev.timestamp;

if (timeDiff > 0 && prev.limit > 0) {
totalUsage += usage;
totalTime += timeDiff;
}
}

if (totalTime === 0) {
return 0;
}

const avgUsagePerMs = totalUsage / totalTime;
const limitPerMs = quotaHistory[0].limit / (24 * 60 * 60 * 1000); // 每天的限制

return avgUsagePerMs / limitPerMs;
}

addJitter(delay) {
const jitter = this.config.jitter;
const jitterAmount = delay * jitter;
const randomJitter = (Math.random() * 2 – 1) * jitterAmount;

return Math.max(0, delay + randomJitter);
}

// 电路断路器
isCircuitClosed(circuitKey) {
if (!this.circuitBreakers.has(circuitKey)) {
return true;
}

const circuit = this.circuitBreakers.get(circuitKey);

if (circuit.state === 'open') {
// 检查是否应该进入半开状态
if (Date.now() – circuit.openedAt > 60000) { // 60秒后
circuit.state = 'half_open';
circuit.halfOpenAttempts = 0;
return true;
}
return false;
}

if (circuit.state === 'half_open') {
if (circuit.halfOpenAttempts >= 3) {
return false;
}
circuit.halfOpenAttempts++;
return true;
}

return true; // closed
}

updateCircuitBreaker(circuitKey, success) {
if (!this.circuitBreakers.has(circuitKey)) {
this.circuitBreakers.set(circuitKey, {
state: 'closed',
failureCount: 0,
successCount: 0
});
}

const circuit = this.circuitBreakers.get(circuitKey);

if (success) {
circuit.successCount++;
circuit.failureCount = Math.max(0, circuit.failureCount – 1);

if (circuit.state === 'half_open' && circuit.successCount >= 3) {
circuit.state = 'closed';
circuit.failureCount = 0;
circuit.successCount = 0;
}
} else {
circuit.failureCount++;

if (circuit.failureCount >= 5 && circuit.state === 'closed') {
circuit.state = 'open';
circuit.openedAt = Date.now();
}
}
}

// 配额监控
initQuotaMonitoring() {
if (this.config.quotaMonitoring) {
// 定期刷新配额信息
setInterval(() => this.refreshQuotaInfo(), 30000);

// 监控配额使用率
setInterval(() => this.monitorQuotaUsage(), 60000);
}
}

refreshQuotaInfo() {
// 清理过期的配额信息
const now = Date.now();

for (const [key, info] of this.rateLimitInfo.entries()) {
const resetTime = info.resetTime * 1000;

if (now > resetTime + 60000) { // 重置时间后1分钟
this.rateLimitInfo.delete(key);
}
}
}

monitorQuotaUsage() {
const warnings = [];

for (const [endpointKey, info] of this.rateLimitInfo.entries()) {
const usageRate = info.remaining / info.limit;

if (usageRate < 0.1) { // 剩余不足10%
warnings.push({
endpointKey,
remaining: info.remaining,
limit: info.limit,
resetTime: new Date(info.resetTime * 1000).toISOString()
});
}
}

if (warnings.length > 0) {
this.emitQuotaWarning(warnings);
}
}

recordQuotaHistory(endpointKey, quota) {
this.quotaHistory.push({
endpointKey,
…quota
});

// 保持历史记录大小
if (this.quotaHistory.length > 1000) {
this.quotaHistory = this.quotaHistory.slice(-500);
}
}

getQuotaHistory(endpointKey, limit = 50) {
return this.quotaHistory
.filter(q => q.endpointKey === endpointKey)
.slice(-limit);
}

// 工具方法
generateRequestId() {
return `req_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
}

getEndpointKey(endpoint, options = {}) {
const url = new URL(endpoint, window.location.origin);
const method = options.method || 'GET';

return `${method}:${url.pathname}`;
}

getCircuitKey(endpoint, options = {}) {
const endpointKey = this.getEndpointKey(endpoint, options);
return `circuit:${endpointKey}`;
}

sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}

logRateLimitWait(endpoint, waitTime, reason) {
console.log(`Rate limit wait for ${endpoint}: ${waitTime}ms (${reason})`);
}

logRetry(endpoint, retryCount, delay, reason) {
console.log(`Retry ${retryCount + 1} for ${endpoint} in ${delay}ms (${reason})`);
}

emitQuotaWarning(warnings) {
// 在实际应用中,这里会触发事件或显示通知
console.warn('Quota warnings:', warnings);
}

getRateLimitInfo() {
return Array.from(this.rateLimitInfo.entries()).map(([key, info]) => ({
endpoint: key,
…info,
lastUpdated: new Date(info.lastUpdated).toISOString()
}));
}
}

class RateLimitError extends Error {
constructor(message, response) {
super(message);
this.name = 'RateLimitError';
this.response = response;
}
}

class MaxRetriesError extends Error {
constructor(message, originalError) {
super(message);
this.name = 'MaxRetriesError';
this.originalError = originalError;
}
}

// 使用示例
const client = new RateLimitAwareClient({
baseDelay: 1000,
maxRetries: 3,
adaptiveBackoff: true,
quotaMonitoring: true
});

// 发起请求
async function fetchData() {
try {
const response = await client.request('/api/data', {
headers: {
'Authorization': `Bearer ${token}`
}
});

return await response.json();

} catch (error) {
if (error.name === 'RateLimitError') {
// 显示用户友好的错误消息
showRateLimitError(error);
return null;
}

throw error;
}
}

// 显示速率限制错误的UI组件
function showRateLimitError(error) {
const container = document.getElementById('error-container');

let retryAfter = 60;
if (error.response) {
const retryAfterHeader = error.response.headers.get('Retry-After');
if (retryAfterHeader) {
retryAfter = parseInt(retryAfterHeader, 10);
}
}

const html = `
<div class="rate-limit-error">
<h3>⚠️ 请求过于频繁</h3>
<p>您发送了太多请求,请稍后再试。</p>

<div class="retry-info">
<p>建议在 <strong>${retryAfter}</strong> 秒后重试</p>
<div class="countdown" data-seconds="${retryAfter}">
倒计时: <span class="seconds">${retryAfter}</span> 秒
</div>
</div>

<div class="actions">
<button οnclick="retryNow()" class="retry-btn">
立即重试
</button>
<button οnclick="dismissError()" class="dismiss-btn">
取消
</button>
</div>

<div class="tips">
<h4>如何避免此问题:</h4>
<ul>
<li>避免频繁刷新页面</li>
<li>使用分页加载更多数据</li>
<li>合理使用缓存</li>
</ul>
</div>
</div>
`;

container.innerHTML = html;

// 启动倒计时
startCountdown(retryAfter);
}

22.4.2 配额使用可视化仪表板

javascript

// 速率限制配额可视化仪表板
import React, { useState, useEffect, useCallback } from 'react';
import {
LineChart, Line, BarChart, Bar, PieChart, Pie, Cell,
XAxis, YAxis, CartesianGrid, Tooltip, Legend, ResponsiveContainer,
AreaChart, Area, RadarChart, Radar, PolarGrid, PolarAngleAxis, PolarRadiusAxis
} from 'recharts';
import { AlertTriangle, Clock, Zap, Battery, RefreshCw, Shield } from 'lucide-react';

function RateLimitDashboard() {
const [timeRange, setTimeRange] = useState('1h');
const [quotaData, setQuotaData] = useState([]);
const [violationHistory, setViolationHistory] = useState([]);
const [endpointStats, setEndpointStats] = useState({});
const [realTimeUpdates, setRealTimeUpdates] = useState([]);

// 获取配额数据
useEffect(() => {
const fetchQuotaData = async () => {
try {
const response = await fetch(`/api/rate-limit/quotas?range=${timeRange}`);
const data = await response.json();
setQuotaData(data.quotas || []);
setViolationHistory(data.violations || []);
setEndpointStats(data.endpointStats || {});
} catch (error) {
console.error('Failed to fetch quota data:', error);
}
};

fetchQuotaData();
const interval = setInterval(fetchQuotaData, 30000); // 每30秒更新

return () => clearInterval(interval);
}, [timeRange]);

// WebSocket连接接收实时更新
useEffect(() => {
const ws = new WebSocket('wss://api.example.com/rate-limit/updates');

ws.onmessage = (event) => {
const update = JSON.parse(event.data);
setRealTimeUpdates(prev => [update, …prev.slice(0, 49)]);
};

return () => ws.close();
}, []);

// 处理重置配额
const handleResetQuota = useCallback(async (endpoint, rule) => {
try {
await fetch('/api/rate-limit/reset', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ endpoint, rule })
});

alert('配额已重置');
} catch (error) {
alert('重置失败: ' + error.message);
}
}, []);

// 处理调整限制
const handleAdjustLimit = useCallback(async (endpoint, rule, newLimit) => {
try {
await fetch('/api/rate-limit/adjust', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ endpoint, rule, newLimit })
});

alert('限制已调整');
} catch (error) {
alert('调整失败: ' + error.message);
}
}, []);

// 生成图表数据
const generateUsageChartData = () => {
if (!quotaData.length) return [];

// 按时间分组
const timeGroups = {};

quotaData.forEach(quota => {
const date = new Date(quota.timestamp);
const hour = `${date.getHours()}:00`;

if (!timeGroups[hour]) {
timeGroups[hour] = {
time: hour,
used: 0,
total: 0,
violations: 0
};
}

timeGroups[hour].used += quota.used || 0;
timeGroups[hour].total += quota.limit || 0;

if (quota.remaining === 0) {
timeGroups[hour].violations++;
}
});

return Object.values(timeGroups).sort((a, b) => a.time.localeCompare(b.time));
};

const generateEndpointDistributionData = () => {
const endpointData = [];

Object.entries(endpointStats).forEach(([endpoint, stats]) => {
endpointData.push({
name: endpoint.length > 20 ? endpoint.substring(0, 20) + '…' : endpoint,
violations: stats.violations || 0,
requests: stats.requests || 0,
usageRate: stats.usageRate || 0
});
});

return endpointData.sort((a, b) => b.violations – a.violations).slice(0, 10);
};

const generateRuleDistributionData = () => {
const ruleData = [];
const ruleMap = {};

quotaData.forEach(quota => {
const rule = quota.rule || 'default';

if (!ruleMap[rule]) {
ruleMap[rule] = {
name: rule,
violations: 0,
total: 0
};
}

ruleMap[rule].violations += quota.remaining === 0 ? 1 : 0;
ruleMap[rule].total++;
});

Object.values(ruleMap).forEach(rule => {
ruleData.push({
name: rule.name,
value: rule.violations,
percentage: rule.total > 0 ? (rule.violations / rule.total * 100).toFixed(1) : 0
});
});

return ruleData.sort((a, b) => b.value – a.value).slice(0, 8);
};

// 颜色配置
const COLORS = ['#0088FE', '#00C49F', '#FFBB28', '#FF8042', '#8884D8', '#82CA9D', '#FF6B6B', '#FFD166'];

return (
<div className="dashboard rate-limit-dashboard">
{/* 仪表板头部 */}
<div className="dashboard-header">
<div className="header-left">
<Shield size={32} className="header-icon" />
<h1>速率限制监控</h1>
<span className="time-range-label">
{timeRange === '1h' ? '1小时' :
timeRange === '24h' ? '24小时' :
timeRange === '7d' ? '7天' : '30天'}
</span>
</div>

<div className="header-right">
<div className="time-range-selector">
{['1h', '24h', '7d', '30d'].map(range => (
<button
key={range}
className={`time-range-btn ${timeRange === range ? 'active' : ''}`}
onClick={() => setTimeRange(range)}
>
{range}
</button>
))}
</div>

<button className="refresh-btn" onClick={() => window.location.reload()}>
<RefreshCw size={16} /> 刷新
</button>
</div>
</div>

{/* 关键指标卡片 */}
<div className="key-metrics-cards">
<div className="metric-card total-requests">
<div className="metric-icon">
<Zap size={24} />
</div>
<div className="metric-content">
<h3>总请求数</h3>
<p className="metric-value">
{endpointStats.totalRequests?.toLocaleString() || '0'}
</p>
<p className="metric-trend">
成功率: {endpointStats.successRate ? (endpointStats.successRate * 100).toFixed(1) + '%' : 'N/A'}
</p>
</div>
</div>

<div className="metric-card violations">
<div className="metric-icon">
<AlertTriangle size={24} />
</div>
<div className="metric-content">
<h3>违规次数</h3>
<p className="metric-value">
{violationHistory.length.toLocaleString()}
</p>
<p className="metric-trend">
违规率: {quotaData.length > 0 ?
((violationHistory.length / quotaData.length) * 100).toFixed(1) + '%' : '0%'}
</p>
</div>
</div>

<div className="metric-card quota-usage">
<div className="metric-icon">
<Battery size={24} />
</div>
<div className="metric-content">
<h3>配额使用率</h3>
<p className="metric-value">
{endpointStats.avgUsageRate ?
(endpointStats.avgUsageRate * 100).toFixed(1) + '%' : '0%'}
</p>
<p className="metric-trend">
峰值: {endpointStats.peakUsageRate ?
(endpointStats.peakUsageRate * 100).toFixed(1) + '%' : 'N/A'}
</p>
</div>
</div>

<div className="metric-card avg-wait-time">
<div className="metric-icon">
<Clock size={24} />
</div>
<div className="metric-content">
<h3>平均等待时间</h3>
<p className="metric-value">
{endpointStats.avgWaitTime ?
endpointStats.avgWaitTime.toFixed(1) + 's' : '0s'}
</p>
<p className="metric-trend">
最长: {endpointStats.maxWaitTime ?
endpointStats.maxWaitTime.toFixed(1) + 's' : 'N/A'}
</p>
</div>
</div>
</div>

{/* 图表区域 */}
<div className="charts-section">
<div className="chart-row">
<div className="chart-container large">
<h3>配额使用趋势</h3>
<ResponsiveContainer width="100%" height={300}>
<AreaChart data={generateUsageChartData()}>
<CartesianGrid strokeDasharray="3 3" />
<XAxis dataKey="time" />
<YAxis />
<Tooltip />
<Legend />
<Area
type="monotone"
dataKey="used"
stackId="1"
stroke="#8884d8"
fill="#8884d8"
fillOpacity={0.3}
name="已使用"
/>
<Area
type="monotone"
dataKey="total"
stackId="1"
stroke="#82ca9d"
fill="#82ca9d"
fillOpacity={0.3}
name="总配额"
/>
<Line
type="monotone"
dataKey="violations"
stroke="#ff6b6b"
strokeWidth={2}
dot={{ r: 4 }}
name="违规次数"
/>
</AreaChart>
</ResponsiveContainer>
</div>

<div className="chart-container">
<h3>规则违规分布</h3>
<ResponsiveContainer width="100%" height={300}>
<PieChart>
<Pie
data={generateRuleDistributionData()}
cx="50%"
cy="50%"
labelLine={false}
label={({ name, percentage }) => `${name}: ${percentage}%`}
outerRadius={80}
fill="#8884d8"
dataKey="value"
>
{generateRuleDistributionData().map((entry, index) => (
<Cell key={`cell-${index}`} fill={COLORS[index % COLORS.length]} />
))}
</Pie>
<Tooltip formatter={(value, name, props) => [
`${value}次 (${props.payload.percentage}%)`,
'违规次数'
]} />
</PieChart>
</ResponsiveContainer>
</div>
</div>

<div className="chart-row">
<div className="chart-container large">
<h3>端点违规排名</h3>
<ResponsiveContainer width="100%" height={300}>
<BarChart data={generateEndpointDistributionData()}>
<CartesianGrid strokeDasharray="3 3" />
<XAxis dataKey="name" angle={-45} textAnchor="end" height={80} />
<YAxis />
<Tooltip />
<Legend />
<Bar dataKey="violations" fill="#ff6b6b" name="违规次数" />
<Bar dataKey="requests" fill="#8884d8" name="总请求数" />
</BarChart>
</ResponsiveContainer>
</div>

<div className="chart-container">
<h3>配额使用率雷达图</h3>
<ResponsiveContainer width="100%" height={300}>
<RadarChart data={generateRadarData()}>
<PolarGrid />
<PolarAngleAxis dataKey="subject" />
<PolarRadiusAxis angle={30} domain={[0, 100]} />
<Radar
name="使用率"
dataKey="usage"
stroke="#8884d8"
fill="#8884d8"
fillOpacity={0.3}
/>
<Radar
name="目标"
dataKey="target"
stroke="#82ca9d"
fill="#82ca9d"
fillOpacity={0.3}
/>
<Legend />
<Tooltip />
</RadarChart>
</ResponsiveContainer>
</div>
</div>
</div>

{/* 实时违规事件 */}
<div className="realtime-events-section">
<h3>实时违规事件</h3>
<div className="events-table-container">
<table className="events-table">
<thead>
<tr>
<th>时间</th>
<th>端点</th>
<th>规则</th>
<th>客户端IP</th>
<th>剩余配额</th>
<th>重试等待</th>
<th>操作</th>
</tr>
</thead>
<tbody>
{realTimeUpdates.map((event, index) => (
<tr key={index} className="event-row">
<td>
{new Date(event.timestamp).toLocaleTimeString()}
</td>
<td className="endpoint-cell">
<code title={event.endpoint}>
{event.endpoint?.length > 30 ?
event.endpoint.substring(0, 30) + '…' :
event.endpoint}
</code>
</td>
<td>
<span className="rule-badge">
{event.rule || 'default'}
</span>
</td>
<td>
<code>{event.client_ip}</code>
</td>
<td>
<span className={`quota-badge ${event.remaining === 0 ? 'exhausted' : 'available'}`}>
{event.remaining}/{event.limit}
</span>
</td>
<td>
<span className="wait-time">
{event.retry_after || 0}s
</span>
</td>
<td>
<button
className="action-btn view-details"
onClick={() => viewEventDetails(event)}
>
详情
</button>
<button
className="action-btn reset-quota"
onClick={() => handleResetQuota(event.endpoint, event.rule)}
>
重置配额
</button>
</td>
</tr>
))}
</tbody>
</table>
</div>
</div>

{/* 配额管理 */}
<div className="quota-management-section">
<h3>配额管理</h3>
<div className="quota-list">
{quotaData.slice(0, 10).map((quota, index) => (
<div key={index} className="quota-item">
<div className="quota-header">
<span className="quota-endpoint">{quota.endpoint}</span>
<span className="quota-rule">{quota.rule}</span>
</div>

<div className="quota-progress">
<div className="progress-bar">
<div
className="progress-fill"
style={{ width: `${(quota.used / quota.limit) * 100}%` }}
></div>
</div>
<div className="quota-numbers">
<span>{quota.used} / {quota.limit}</span>
<span>{((quota.used / quota.limit) * 100).toFixed(1)}%</span>
</div>
</div>

<div className="quota-actions">
<button
className="btn btn-sm btn-primary"
onClick={() => handleAdjustLimit(quota.endpoint, quota.rule, quota.limit * 1.1)}
>
增加10%
</button>
<button
className="btn btn-sm btn-secondary"
onClick={() => handleAdjustLimit(quota.endpoint, quota.rule, quota.limit * 0.9)}
>
减少10%
</button>
<button
className="btn btn-sm btn-tertiary"
onClick={() => handleResetQuota(quota.endpoint, quota.rule)}
>
立即重置
</button>
</div>
</div>
))}
</div>
</div>

{/* 配置建议 */}
<div className="recommendations-section">
<h3>配置建议</h3>
<div className="recommendations-list">
{generateRecommendations().map((rec, index) => (
<div key={index} className="recommendation-card">
<div className="recommendation-icon">💡</div>
<div className="recommendation-content">
<h4>{rec.title}</h4>
<p>{rec.description}</p>
<div className="recommendation-actions">
<button
className="btn btn-sm btn-primary"
onClick={() => applyRecommendation(rec)}
>
应用建议
</button>
<button
className="btn btn-sm btn-secondary"
onClick={() => dismissRecommendation(rec.id)}
>
忽略
</button>
</div>
</div>
</div>
))}
</div>
</div>
</div>
);

// 辅助函数
function generateRadarData() {
return [
{ subject: '认证端点', usage: 85, target: 70 },
{ subject: '数据查询', usage: 60, target: 80 },
{ subject: '文件上传', usage: 95, target: 50 },
{ subject: '管理接口', usage: 40, target: 30 },
{ subject: '公共API', usage: 75, target: 90 },
{ subject: 'Webhook', usage: 20, target: 40 }
];
}

function generateRecommendations() {
const recommendations = [];

// 基于数据分析生成建议
const highViolationEndpoints = generateEndpointDistributionData()
.filter(e => e.violations > 10);

if (highViolationEndpoints.length > 0) {
highViolationEndpoints.forEach(endpoint => {
recommendations.push({
id: `increase-${endpoint.name}`,
title: `增加 ${endpoint.name} 的配额限制`,
description: `该端点有 ${endpoint.violations} 次违规,建议增加配额限制以提高可用性。`,
type: 'increase_limit',
endpoint: endpoint.name,
currentLimit: endpoint.requests,
suggestedLimit: Math.ceil(endpoint.requests * 1.5)
});
});
}

// 检查使用率低的端点
const lowUsageEndpoints = generateEndpointDistributionData()
.filter(e => e.usageRate < 0.3 && e.requests > 100);

if (lowUsageEndpoints.length > 0) {
lowUsageEndpoints.forEach(endpoint => {
recommendations.push({
id: `decrease-${endpoint.name}`,
title: `优化 ${endpoint.name} 的配额配置`,
description: `该端点使用率较低(${(endpoint.usageRate * 100).toFixed(1)}%),可以考虑减少配额以释放资源。`,
type: 'decrease_limit',
endpoint: endpoint.name,
currentLimit: endpoint.requests,
suggestedLimit: Math.ceil(endpoint.requests * 0.7)
});
});
}

// 通用建议
recommendations.push({
id: 'implement-adaptive',
title: '实施自适应速率限制',
description: '根据历史使用模式动态调整限制,提高系统效率和用户体验。',
type: 'feature',
priority: 'high'
});

recommendations.push({
id: 'add-client-education',
title: '添加客户端配额教育',
description: '在API响应中添加配额使用信息,帮助客户端更好地管理请求。',
type: 'improvement',
priority: 'medium'
});

return recommendations.slice(0, 5);
}

function viewEventDetails(event) {
console.log('Event details:', event);
alert(`事件详情:\\n${JSON.stringify(event, null, 2)}`);
}

function applyRecommendation(recommendation) {
console.log('Applying recommendation:', recommendation);

if (recommendation.type === 'increase_limit' || recommendation.type === 'decrease_limit') {
handleAdjustLimit(
recommendation.endpoint,
'custom_rule',
recommendation.suggestedLimit
);
}

alert(`已应用建议: ${recommendation.title}`);
}

function dismissRecommendation(id) {
console.log('Dismissing recommendation:', id);
// 在实际应用中,这里会标记建议为已忽略
}
}

第23章:其他4xx状态码详解(402、406-418、421-428、431、451)

引言

HTTP状态码是Web通信的基础语言,其中4xx系列专门指示客户端错误。虽然404、403和400广为人知,但HTTP协议还定义了许多专门化的4xx状态码,为特定错误场景提供精确语义。这些"边缘案例"状态码在现代API设计、微服务架构和RESTful服务中扮演着越来越重要的角色。本章将深入探讨23个不常用但重要的4xx状态码,涵盖从支付系统到法律合规的广泛场景。

23.1 402 Payment Required(需要付款)

23.1.1 定义与语义

402状态码在HTTP/1.1规范中定义为"保留用于未来使用",但实际上已被广泛接受为表示需要付款才能访问资源的状态。虽然从未被正式标准化,但它已成为支付网关、订阅服务和微交易API的事实标准。

23.1.2 使用场景

  • 订阅内容访问:用户尝试访问需要订阅的内容

  • API调用计费:超过免费额度的API请求

  • 数字商品购买:尝试下载未购买的电子书、软件等

  • 服务额度耗尽:云服务、SaaS平台的用量超限

  • 23.1.3 实现示例

    http

    HTTP/1.1 402 Payment Required
    Content-Type: application/json
    X-Payment-URL: https://api.example.com/payment/order_123
    Retry-After: 3600

    {
    "error": "payment_required",
    "message": "You have exceeded your free API call limit",
    "upgrade_url": "https://api.example.com/plans",
    "current_usage": {
    "used": 1000,
    "limit": 100
    }
    }

    23.1.4 最佳实践

    • 始终提供清晰的错误信息和解决指引

    • 包含支付URL或升级路径

    • 对于API限流,使用429状态码可能更合适

    • 考虑支持多种支付协议(如Web Monetization API)

    23.2 406 Not Acceptable(不可接受)

    23.2.1 定义与语义

    406状态码表示服务器无法生成客户端Accept头指定的响应内容。当服务器不支持客户端请求的媒体类型时返回此状态码。

    23.2.2 内容协商机制

    HTTP内容协商涉及多个请求头:

    • Accept:客户端接受的媒体类型

    • Accept-Charset:字符集偏好

    • Accept-Encoding:内容编码偏好

    • Accept-Language:语言偏好

    23.2.3 实现模式

    python

    # Flask示例:处理内容协商
    @app.route('/resource')
    def get_resource():
    accepted_types = request.accept_mimetypes
    if 'application/json' in accepted_types:
    return jsonify(data)
    elif 'application/xml' in accepted_types:
    return xml_response(data)
    else:
    # 返回406并提供支持的格式
    response = jsonify({
    'error': 'Unsupported media type requested',
    'supported_types': [
    'application/json',
    'application/xml',
    'text/html'
    ]
    })
    response.status_code = 406
    response.headers['Content-Type'] = 'application/json'
    return response

    23.2.4 变体选择头

    HTTP/1.1引入了Vary头,指示缓存服务器根据哪些请求头生成不同响应:

    text

    Vary: Accept, Accept-Language, User-Agent

    23.2.5 最佳实践

    • 实现优雅降级(如JSON作为默认格式)

    • 在406响应中列出支持的媒体类型

    • 正确设置Vary头以支持缓存

    • 考虑使用格式参数作为后备方案(如?format=json)

    23.3 407 Proxy Authentication Required(需要代理认证)

    23.3.1 定义与语义

    407状态码表示客户端必须首先通过代理服务器的身份验证。与401类似,但专门用于代理认证。

    23.3.2 代理认证方案

  • Basic:基本认证(Base64编码)

  • Digest:摘要认证(更安全)

  • Bearer:令牌认证

  • NTLM:Windows认证协议

  • 23.3.3 工作流程

    text

    客户端 → 代理: GET /resource HTTP/1.1
    代理 → 客户端: HTTP/1.1 407 Proxy Authentication Required
    Proxy-Authenticate: Basic realm="CorpProxy"
    客户端 → 代理: GET /resource HTTP/1.1
    Proxy-Authorization: Basic dXNlcjpwYXNz
    代理 → 后端: 转发请求
    后端 → 客户端: 通过代理返回响应

    23.3.4 安全考虑

    • 始终使用HTTPS传输代理凭证

    • 考虑使用摘要认证避免明文传输

    • 实现凭证轮换机制

    • 监控代理认证失败尝试

    23.3.5 实现示例

    nginx

    # Nginx代理认证配置
    location / {
    auth_basic "Proxy Authentication Required";
    auth_basic_user_file /etc/nginx/.htpasswd;
    proxy_pass http://backend;

    # 转发认证头到后端(如果需要)
    proxy_set_header Authorization $http_authorization;
    }

    23.4 408 Request Timeout(请求超时)

    23.4.1 定义与语义

    408状态码表示服务器在等待请求时超时。服务器愿意继续等待,但客户端没有在服务器准备等待的时间内完成请求发送。

    23.4.2 超时原因分析

  • 网络延迟:客户端与服务器间的高延迟

  • 请求体过大:上传大文件时传输缓慢

  • 客户端故障:客户端崩溃或连接中断

  • 网络中断:中间网络设备问题

  • 23.4.3 服务器配置

    nginx

    # Nginx超时配置
    http {
    # 客户端读取超时(两个连续操作间隔)
    client_header_timeout 60s;

    # 客户端发送请求体超时
    client_body_timeout 60s;

    # 保持连接超时
    keepalive_timeout 75s;

    # 发送响应到客户端超时
    send_timeout 60s;
    }

    23.4.4 重试策略

    javascript

    // 指数退避重试策略
    async function fetchWithRetry(url, options, maxRetries = 3) {
    let lastError;

    for (let i = 0; i < maxRetries; i++) {
    try {
    const response = await fetch(url, options);
    return response;
    } catch (error) {
    lastError = error;

    // 检查是否为超时错误
    if (error.name === 'TimeoutError' || error.status === 408) {
    // 指数退避:1s, 2s, 4s…
    const delay = Math.pow(2, i) * 1000;
    await new Promise(resolve => setTimeout(resolve, delay));
    continue;
    }

    // 非超时错误,立即抛出
    throw error;
    }
    }

    throw lastError;
    }

    23.4.5 最佳实践

    • 实现幂等请求以安全重试

    • 客户端使用指数退避算法

    • 服务器设置合理的超时阈值

    • 监控超时率以识别性能问题

    23.5 409 Conflict(冲突)

    23.5.1 定义与语义

    409状态码表示请求与服务器当前状态冲突。通常用于并发修改场景,如版本冲突或资源锁定。

    23.5.2 使用场景

  • 版本冲突:基于版本号的乐观锁

  • 唯一约束违反:数据库唯一键冲突

  • 业务规则冲突:违反领域特定规则

  • 资源状态冲突:尝试修改已删除的资源

  • 23.5.3 乐观锁实现

    java

    // Spring Boot示例:使用ETag实现乐观锁
    @PutMapping("/products/{id}")
    public ResponseEntity<?> updateProduct(
    @PathVariable Long id,
    @RequestBody Product product,
    @RequestHeader("If-Match") String ifMatch) {

    Product existing = productRepository.findById(id)
    .orElseThrow(() -> new ResourceNotFoundException());

    // 检查ETag是否匹配
    String currentETag = generateETag(existing);
    if (!currentETag.equals(ifMatch)) {
    return ResponseEntity.status(409)
    .header("ETag", currentETag)
    .body(Map.of(
    "error", "conflict",
    "message", "Resource has been modified by another user",
    "current_version", currentETag
    ));
    }

    // 更新资源并生成新ETag
    Product updated = productRepository.save(product);
    String newETag = generateETag(updated);

    return ResponseEntity.ok()
    .header("ETag", newETag)
    .body(updated);
    }

    23.5.4 冲突解决策略

  • 最后写入获胜:简单的覆盖策略

  • 手动合并:提示用户解决冲突

  • 自动合并:基于规则的自动合并

  • 操作转换:实时协作系统使用

  • 23.5.5 响应格式建议

    json

    {
    "error": "conflict",
    "message": "Resource update conflict",
    "conflict_details": {
    "field": "email",
    "reason": "already_exists",
    "existing_value": "user@example.com"
    },
    "resolution_options": [
    {
    "action": "overwrite",
    "method": "PUT",
    "url": "/resource/123?force=true"
    },
    {
    "action": "merge",
    "method": "PATCH",
    "url": "/resource/123"
    }
    ]
    }

    23.6 410 Gone(已删除)

    23.6.1 定义与语义

    410状态码表示请求的资源在服务器上已永久删除,且没有转发地址。与404不同,410明确表示资源曾经存在但被有意删除。

    23.6.2 与404的区别

    特性404 Not Found410 Gone
    资源是否存在过 未知 曾经存在
    是否永久性 可能是临时的 永久删除
    缓存建议 可短期缓存 应长期缓存
    客户端响应 可重试 应删除引用

    23.6.3 使用场景

  • 用户删除内容:社交媒体帖子、评论

  • 内容下架:已停止的产品页面

  • URL重构:旧URL不再可用且无重定向

  • 合规要求:法律要求删除的内容

  • 23.6.4 SEO考虑

    html

    <!– 对于SEO友好的410页面 –>
    <!DOCTYPE html>
    <html lang="en">
    <head>
    <meta charset="UTF-8">
    <meta name="robots" content="noindex, follow">
    <title>Content Removed</title>
    </head>
    <body>
    <h1>This content has been permanently removed</h1>
    <p>The resource you requested was intentionally removed and is no longer available.</p>

    <!– 提供相关替代内容 –>
    <div class="suggestions">
    <h2>You might be interested in:</h2>
    <ul>
    <li><a href="/new-location">Updated version of this content</a></li>
    <li><a href="/related-topic">Related information</a></li>
    </ul>
    </div>

    <!– 结构化数据帮助搜索引擎理解 –>
    <script type="application/ld+json">
    {
    "@context": "https://schema.org",
    "@type": "WebPage",
    "name": "Content Removed",
    "description": "This page indicates that previously available content has been permanently removed.",
    "mainEntity": {
    "@type": "Message",
    "text": "The requested resource was intentionally removed and is no longer available."
    }
    }
    </script>
    </body>
    </html>

    23.6.5 实现模式

    python

    # Django视图示例
    from django.http import HttpResponseGone
    from django.views.generic import DetailView

    class DeletedResourceView(DetailView):
    def get(self, request, *args, **kwargs):
    # 检查资源是否被标记为删除
    resource = self.get_object()

    if resource.deleted_at:
    response = HttpResponseGone()
    response['Link'] = '</related-resource>; rel="related"'
    response['X-Deleted-At'] = resource.deleted_at.isoformat()
    response['X-Deleted-Reason'] = resource.deletion_reason
    return response

    return super().get(request, *args, **kwargs)

    23.6.6 最佳实践

    • 为已删除资源保留元数据一段时间

    • 在响应中包含删除原因和时间戳

    • 提供相关资源的链接

    • 更新内部链接和站点地图

    23.7 411 Length Required(需要内容长度)

    23.7.1 定义与语义

    411状态码表示服务器拒绝处理没有Content-Length头的请求。某些服务器要求提前知道请求体大小以进行安全或效率优化。

    23.7.2 使用场景

  • 文件上传:需要知道上传文件大小

  • API限流:基于内容长度的配额检查

  • 安全扫描:提前检测过大请求

  • 资源预分配:提前分配存储空间

  • 23.7.3 分块传输编码

    当无法提前知道内容大小时,可使用分块传输编码:

    http

    POST /upload HTTP/1.1
    Host: example.com
    Transfer-Encoding: chunked

    7\\r\\n
    Mozilla\\r\\n
    9\\r\\n
    Developer\\r\\n
    7\\r\\n
    Network\\r\\n
    0\\r\\n
    \\r\\n

    23.7.4 客户端处理

    javascript

    // 现代JavaScript中自动处理Content-Length
    async function uploadFile(file) {
    const formData = new FormData();
    formData.append('file', file);

    // Fetch API会自动计算和设置Content-Length
    const response = await fetch('/upload', {
    method: 'POST',
    body: formData,
    // 注意:FormData使用multipart/form-data时
    // 浏览器会自动处理,不需要手动设置
    });

    return response.json();
    }

    // 手动设置Content-Length的示例
    async function sendJsonData(data) {
    const jsonString = JSON.stringify(data);

    const response = await fetch('/api/data', {
    method: 'POST',
    headers: {
    'Content-Type': 'application/json',
    'Content-Length': Buffer.byteLength(jsonString, 'utf8').toString()
    },
    body: jsonString
    });

    return response.json();
    }

    23.7.5 服务器配置

    nginx

    # Nginx配置:限制请求体大小并需要Content-Length
    http {
    # 启用请求体大小检查
    client_max_body_size 10m;

    # 对于某些端点要求Content-Length
    location /api/upload {
    # 如果请求体超过1M,要求Content-Length
    if ($request_body_length > 1m) {
    # 检查是否有Content-Length头
    if ($http_content_length = "") {
    return 411;
    }
    }

    # 正常处理
    proxy_pass http://backend;
    }
    }

    23.7.6 最佳实践

    • 对于小请求体,服务器可更灵活

    • 大文件上传应要求Content-Length

    • 支持分块传输作为备选方案

    • 提供清晰的错误信息说明要求

    23.8 412 Precondition Failed(前提条件失败)

    23.8.1 定义与语义

    412状态码表示请求中提供的条件头(如If-Match、If-None-Match、If-Modified-Since等)评估失败。

    23.8.2 条件请求头

  • If-Match:要求ETag匹配

  • If-None-Match:要求ETag不匹配

  • If-Modified-Since:要求资源在指定时间后被修改

  • If-Unmodified-Since:要求资源在指定时间后未修改

  • If-Range:用于断点续传

  • 23.8.3 乐观锁实现

    python

    # FastAPI示例:使用ETag实现乐观锁
    from fastapi import FastAPI, Header, HTTPException
    import hashlib
    import json

    app = FastAPI()

    def generate_etag(data: dict) -> str:
    """生成资源的ETag"""
    data_str = json.dumps(data, sort_keys=True)
    return hashlib.md5(data_str.encode()).hexdigest()

    resources = {
    "1": {"id": 1, "name": "Resource 1", "version": 1}
    }

    @app.put("/resources/{resource_id}")
    async def update_resource(
    resource_id: str,
    updates: dict,
    if_match: str = Header(None, alias="If-Match")
    ):
    if resource_id not in resources:
    raise HTTPException(status_code=404)

    current = resources[resource_id]
    current_etag = generate_etag(current)

    # 检查ETag是否匹配
    if if_match and if_match != current_etag:
    raise HTTPException(
    status_code=412,
    detail={
    "error": "precondition_failed",
    "message": "Resource has been modified",
    "current_etag": current_etag
    }
    )

    # 更新资源
    current.update(updates)
    current["version"] += 1

    new_etag = generate_etag(current)

    return {
    "data": current,
    "etag": new_etag
    }

    23.8.4 缓存验证场景

    http

    # 缓存验证请求
    GET /resource HTTP/1.1
    Host: api.example.com
    If-None-Match: "abc123"

    # 响应(如果未修改)
    HTTP/1.1 304 Not Modified
    ETag: "abc123"
    Cache-Control: max-age=3600

    # 响应(如果已修改)
    HTTP/1.1 200 OK
    ETag: "def456"
    Cache-Control: max-age=3600
    Content-Type: application/json

    {"data": "updated"}

    23.8.5 最佳实践

    • 为可变资源实现ETag支持

    • 在响应中始终返回ETag头

    • 支持弱ETag(以W/开头)用于启发式比较

    • 提供清晰的错误信息说明前提条件

    23.9 413 Payload Too Large(请求体过大)

    23.9.1 定义与语义

    413状态码表示请求体超过服务器能够处理的大小限制。服务器可以关闭连接或返回Retry-After头指示何时重试。

    23.9.2 配置示例

    nginx

    # Nginx请求体大小限制
    http {
    # 全局请求体大小限制
    client_max_body_size 10m;

    # 特定位置更大限制
    location /api/upload {
    client_max_body_size 100m;

    # 大文件上传超时设置
    client_body_timeout 300s;
    client_header_timeout 300s;

    # 缓冲区设置
    client_body_buffer_size 128k;
    client_body_temp_path /tmp/nginx_upload;
    }

    # 特定位置更小限制
    location /api/json {
    client_max_body_size 1m;
    }
    }

    23.9.3 错误处理

    http

    HTTP/1.1 413 Payload Too Large
    Content-Type: application/problem+json
    Retry-After: 3600

    {
    "type": "https://example.com/errors/payload-too-large",
    "title": "Request payload too large",
    "status": 413,
    "detail": "Maximum request body size is 10MB",
    "instance": "/api/upload",
    "max_size": 10485760,
    "current_size": 15728640,
    "suggestions": [
    "Compress your data before uploading",
    "Split the data into smaller chunks",
    "Use our resumable upload API at /api/resumable-upload"
    ]
    }

    23.9.4 大文件上传策略

  • 分块上传:将大文件分成小块

  • 断点续传:支持从中断处继续

  • 流式上传:边上传边处理

  • 外部存储:上传到S3等对象存储

  • 23.9.5 实现分块上传

    javascript

    // 客户端分块上传实现
    class ChunkedUploader {
    constructor(file, chunkSize = 5 * 1024 * 1024) { // 5MB
    this.file = file;
    this.chunkSize = chunkSize;
    this.totalChunks = Math.ceil(file.size / chunkSize);
    this.uploadedChunks = new Set();
    this.uploadId = null;
    }

    async startUpload() {
    // 初始化上传会话
    const response = await fetch('/api/upload/init', {
    method: 'POST',
    headers: {'Content-Type': 'application/json'},
    body: JSON.stringify({
    filename: this.file.name,
    file_size: this.file.size,
    chunk_size: this.chunkSize,
    total_chunks: this.totalChunks
    })
    });

    const { upload_id } = await response.json();
    this.uploadId = upload_id;

    // 上传所有分块
    for (let chunkIndex = 0; chunkIndex < this.totalChunks; chunkIndex++) {
    await this.uploadChunk(chunkIndex);
    }

    // 完成上传
    await this.completeUpload();
    }

    async uploadChunk(chunkIndex) {
    if (this.uploadedChunks.has(chunkIndex)) {
    return; // 已上传
    }

    const start = chunkIndex * this.chunkSize;
    const end = Math.min(start + this.chunkSize, this.file.size);
    const chunk = this.file.slice(start, end);

    const formData = new FormData();
    formData.append('chunk', chunk);
    formData.append('chunk_index', chunkIndex);
    formData.append('upload_id', this.uploadId);

    await fetch('/api/upload/chunk', {
    method: 'POST',
    body: formData
    });

    this.uploadedChunks.add(chunkIndex);
    }

    async completeUpload() {
    await fetch('/api/upload/complete', {
    method: 'POST',
    headers: {'Content-Type': 'application/json'},
    body: JSON.stringify({
    upload_id: this.uploadId,
    uploaded_chunks: Array.from(this.uploadedChunks)
    })
    });
    }
    }

    23.9.6 最佳实践

    • 提供清晰的错误信息,包括最大允许大小

    • 实现分块上传支持大文件

    • 考虑使用CDN或对象存储处理大文件

    • 监控大文件上传频率和大小分布

    23.10 414 URI Too Long(URI过长)

    23.10.1 定义与语义

    414状态码表示客户端请求的URI长度超过服务器能够处理的限制。虽然HTTP规范没有指定最大URI长度,但服务器实现通常有限制。

    23.10.2 各服务器限制

    服务器/组件默认限制配置项
    Nginx 取决于系统,通常8KB large_client_header_buffers
    Apache 8KB LimitRequestLine
    IIS 16KB maxUrl in web.config
    Node.js 取决于实现 可配置
    浏览器 约2000字符 无标准

    23.10.3 常见原因

  • 过度使用查询参数:GET请求携带大量参数

  • 深度嵌套路径:RESTful API的深度资源引用

  • Base64编码数据:在URL中嵌入数据

  • 错误的链接生成:无限循环生成参数

  • 23.10.4 解决方案

    javascript

    // 将长GET请求转换为POST
    function convertLongGetToPost(url) {
    const urlObj = new URL(url);

    // 如果URL超过安全长度,转换为POST
    if (url.length > 2000) {
    const params = {};
    urlObj.searchParams.forEach((value, key) => {
    params[key] = value;
    });

    return {
    method: 'POST',
    url: urlObj.pathname,
    body: JSON.stringify(params),
    headers: {
    'Content-Type': 'application/json',
    'X-Original-Method': 'GET'
    }
    };
    }

    return { method: 'GET', url };
    }

    // 服务器端处理
    app.post('/api/data', (req, res) => {
    // 检查是否为GET转换的POST请求
    if (req.get('X-Original-Method') === 'GET') {
    // 按照GET语义处理,但使用POST主体
    const params = req.body;
    // 处理逻辑…
    }
    });

    23.10.5 最佳实践

    • 对于复杂查询,使用POST替代GET

    • 避免在URL中嵌入大块数据

    • 使用分页和过滤减少数据量

    • 监控URI长度异常

    23.11 415 Unsupported Media Type(不支持的媒体类型)

    23.11.1 定义与语义

    415状态码表示服务器不支持请求的媒体类型。通常发生在请求的Content-Type不被服务器接受时。

    23.11.2 常见媒体类型

    类型描述使用场景
    application/json JSON数据 REST API
    application/xml XML数据 SOAP API
    multipart/form-data 表单数据 文件上传
    application/x-www-form-urlencoded URL编码表单 简单表单
    text/plain 纯文本 简单数据

    23.11.3 实现示例

    python

    # FastAPI媒体类型验证
    from fastapi import FastAPI, UploadFile, File, HTTPException
    from fastapi.responses import JSONResponse

    app = FastAPI()

    # 允许的媒体类型
    ALLOWED_MEDIA_TYPES = {
    'application/json',
    'application/xml',
    'text/plain',
    'multipart/form-data'
    }

    @app.middleware("http")
    async def check_media_type(request, call_next):
    # 检查请求方法是否需要body
    if request.method in ['POST', 'PUT', 'PATCH']:
    content_type = request.headers.get('content-type', '')

    # 提取主媒体类型
    main_type = content_type.split(';')[0].strip() if content_type else ''

    # 如果提供了Content-Type但不被支持
    if content_type and main_type not in ALLOWED_MEDIA_TYPES:
    return JSONResponse(
    status_code=415,
    content={
    "error": "unsupported_media_type",
    "message": f"Media type '{main_type}' is not supported",
    "supported_types": list(ALLOWED_MEDIA_TYPES),
    "received_type": content_type
    }
    )

    response = await call_next(request)
    return response

    @app.post("/api/data")
    async def create_data(data: dict):
    return {"id": 123, **data}

    23.11.4 内容类型协商

    http

    # 客户端请求
    POST /api/resource HTTP/1.1
    Host: api.example.com
    Content-Type: application/json
    Accept: application/json, application/xml;q=0.9, text/plain;q=0.5

    {"name": "test"}

    # 服务器响应
    HTTP/1.1 200 OK
    Content-Type: application/json
    Vary: Accept

    {"id": 1, "name": "test"}

    23.11.5 最佳实践

    • 在415响应中列出支持的媒体类型

    • 提供默认媒体类型作为后备

    • 实现严格模式用于生产,宽松模式用于开发

    • 记录API期望的媒体类型

    23.12 416 Range Not Satisfiable(范围请求无法满足)

    23.12.1 定义与语义

    416状态码表示服务器无法提供请求的Range头指定的字节范围。通常发生在请求的范围超出资源大小时。

    23.12.2 范围请求机制

    http

    # 请求特定范围
    GET /large-file.pdf HTTP/1.1
    Host: example.com
    Range: bytes=0-999

    # 成功响应
    HTTP/1.1 206 Partial Content
    Content-Type: application/pdf
    Content-Range: bytes 0-999/5000
    Content-Length: 1000

    [二进制数据]

    # 无效范围请求
    GET /large-file.pdf HTTP/1.1
    Host: example.com
    Range: bytes=5000-6000

    # 416响应
    HTTP/1.1 416 Range Not Satisfiable
    Content-Range: bytes */5000

    23.12.3 断点续传实现

    python

    # Flask断点续传实现
    from flask import Flask, request, send_file, Response
    import os

    app = Flask(__name__)

    @app.route('/download/<filename>')
    def download_file(filename):
    filepath = os.path.join('uploads', filename)

    if not os.path.exists(filepath):
    return {'error': 'file_not_found'}, 404

    file_size = os.path.getsize(filepath)

    # 检查Range头
    range_header = request.headers.get('Range')

    if range_header:
    # 解析范围请求
    try:
    byte1, byte2 = 0, None

    # 格式:bytes=start-end
    range_ = range_header.replace('bytes=', '').split('-')
    byte1 = int(range_[0]) if range_[0] else 0
    if range_[1]:
    byte2 = int(range_[1])

    if byte2 is None:
    byte2 = file_size – 1
    elif byte2 >= file_size:
    # 范围超出文件大小
    return Response(
    status=416,
    headers={
    'Content-Range': f'bytes */{file_size}',
    'Accept-Ranges': 'bytes'
    }
    )

    length = byte2 – byte1 + 1

    # 发送部分内容
    def generate():
    with open(filepath, 'rb') as f:
    f.seek(byte1)
    remaining = length
    chunk_size = 8192

    while remaining > 0:
    chunk = f.read(min(chunk_size, remaining))
    if not chunk:
    break
    remaining -= len(chunk)
    yield chunk

    response = Response(generate(), 206)
    response.headers.add('Content-Range', f'bytes {byte1}-{byte2}/{file_size}')
    response.headers.add('Accept-Ranges', 'bytes')
    response.headers.add('Content-Length', str(length))
    response.headers.add('Content-Type', 'application/octet-stream')

    return response

    except ValueError:
    # 无效的Range头格式
    pass

    # 完整文件下载
    return send_file(filepath, as_attachment=True)

    23.12.4 客户端实现

    javascript

    // 支持断点续传的下载器
    class ResumableDownloader {
    constructor(url, filename) {
    this.url = url;
    this.filename = filename;
    this.downloadedBytes = 0;
    this.totalBytes = 0;
    this.isPaused = false;
    }

    async start() {
    try {
    // 尝试获取文件信息
    const headResponse = await fetch(this.url, { method: 'HEAD' });

    if (!headResponse.ok) {
    throw new Error(`Failed to get file info: ${headResponse.status}`);
    }

    this.totalBytes = parseInt(headResponse.headers.get('Content-Length')) || 0;
    const acceptRanges = headResponse.headers.get('Accept-Ranges') === 'bytes';

    // 检查本地已下载部分
    const downloaded = await this.getDownloadedSize();
    this.downloadedBytes = downloaded;

    if (acceptRanges && downloaded > 0 && downloaded < this.totalBytes) {
    // 断点续传
    await this.resumeDownload();
    } else {
    // 全新下载
    await this.startNewDownload();
    }
    } catch (error) {
    console.error('Download failed:', error);
    throw error;
    }
    }

    async resumeDownload() {
    const headers = {
    'Range': `bytes=${this.downloadedBytes}-`
    };

    const response = await fetch(this.url, { headers });

    if (response.status === 416) {
    // 范围无效,重新开始
    this.downloadedBytes = 0;
    await this.startNewDownload();
    return;
    }

    if (response.status !== 206) {
    throw new Error(`Unexpected status: ${response.status}`);
    }

    await this.processResponse(response);
    }

    async startNewDownload() {
    const response = await fetch(this.url);

    if (!response.ok) {
    throw new Error(`Download failed: ${response.status}`);
    }

    this.downloadedBytes = 0;
    await this.processResponse(response);
    }

    async processResponse(response) {
    const reader = response.body.getReader();
    const fileStream = await this.getFileStream();

    while (true) {
    if (this.isPaused) {
    await new Promise(resolve => {
    this.resumeCallback = resolve;
    });
    }

    const { done, value } = await reader.read();

    if (done) {
    fileStream.close();
    console.log('Download completed');
    return;
    }

    await fileStream.write(value);
    this.downloadedBytes += value.length;

    // 更新进度
    const progress = this.totalBytes > 0
    ? (this.downloadedBytes / this.totalBytes * 100).toFixed(2)
    : 'unknown';

    console.log(`Progress: ${progress}%`);
    }
    }

    pause() {
    this.isPaused = true;
    }

    resume() {
    this.isPaused = false;
    if (this.resumeCallback) {
    this.resumeCallback();
    this.resumeCallback = null;
    }
    }

    async getDownloadedSize() {
    // 从本地存储获取已下载大小
    // 实现取决于存储方式(IndexedDB、文件系统API等)
    return 0;
    }

    async getFileStream() {
    // 获取文件写入流
    // 实现取决于存储方式
    return {
    write: async () => {},
    close: () => {}
    };
    }
    }

    23.12.5 最佳实践

    • 始终在响应中包含Accept-Ranges头

    • 对于416响应,包含Content-Range头显示有效范围

    • 实现完整的分块下载支持

    • 考虑支持多范围请求(multipart/byteranges)

    23.13 417 Expectation Failed(期望失败)

    23.13.1 定义与语义

    417状态码表示服务器无法满足请求的Expect头中的期望。最常见的场景是Expect: 100-continue。

    23.13.2 100 Continue机制

    http

    # 客户端请求(使用100-continue)
    POST /upload HTTP/1.1
    Host: example.com
    Content-Type: application/json
    Content-Length: 1024
    Expect: 100-continue

    # 服务器响应
    # 情况1:接受请求
    HTTP/1.1 100 Continue

    # 客户端发送请求体
    {"data": "…"}

    # 服务器最终响应
    HTTP/1.1 201 Created

    # 情况2:拒绝请求
    HTTP/1.1 417 Expectation Failed
    Content-Type: application/json

    {
    "error": "expectation_failed",
    "message": "Server cannot accept the request",
    "reason": "File size limit exceeded"
    }

    23.13.3 使用场景

  • 大文件上传验证:检查权限和配额后再接收数据

  • 实时处理验证:确保服务器能处理请求

  • 条件请求:基于特定条件接受请求

  • 资源预留:检查资源可用性

  • 23.13.4 实现示例

    python

    # FastAPI Expect头处理
    from fastapi import FastAPI, Request, HTTPException
    from fastapi.responses import JSONResponse

    app = FastAPI()

    @app.middleware("http")
    async def handle_expect_header(request: Request, call_next):
    expect_header = request.headers.get("expect", "").lower()

    if expect_header == "100-continue":
    # 执行预检查
    content_length = request.headers.get("content-length")

    if content_length and int(content_length) > 10 * 1024 * 1024: # 10MB
    # 请求体太大,拒绝
    return JSONResponse(
    status_code=417,
    content={
    "error": "expectation_failed",
    "message": "File size exceeds limit",
    "max_size": 10 * 1024 * 1024,
    "requested_size": int(content_length)
    }
    )

    # 检查认证
    auth_header = request.headers.get("authorization")
    if not auth_header or not validate_token(auth_header):
    return JSONResponse(
    status_code=417,
    content={
    "error": "expectation_failed",
    "message": "Authentication required"
    }
    )

    # 发送100 Continue响应
    from starlette.responses import Response
    response = Response(status_code=100)
    await response(scope=request.scope, receive=request.receive, send=request.send)

    # 继续正常处理
    response = await call_next(request)
    return response

    def validate_token(token: str) -> bool:
    # 实现令牌验证
    return token.startswith("Bearer ")

    23.13.5 客户端实现

    javascript

    // 使用Expect: 100-continue的客户端
    async function uploadWithContinue(file, url) {
    return new Promise((resolve, reject) => {
    const xhr = new XMLHttpRequest();

    xhr.open('POST', url, true);

    // 设置Expect头
    xhr.setRequestHeader('Expect', '100-continue');
    xhr.setRequestHeader('Content-Type', 'application/octet-stream');

    // 监听100-continue响应
    xhr.onreadystatechange = function() {
    if (xhr.readyState === XMLHttpRequest.HEADERS_RECEIVED) {
    // 检查状态码
    if (xhr.status === 100) {
    console.log('Server ready to receive data');
    } else if (xhr.status === 417) {
    reject(new Error('Server rejected the request'));
    }
    }
    };

    xhr.onload = function() {
    if (xhr.status >= 200 && xhr.status < 300) {
    resolve(xhr.response);
    } else {
    reject(new Error(`Upload failed: ${xhr.status}`));
    }
    };

    xhr.onerror = function() {
    reject(new Error('Network error'));
    };

    xhr.send(file);
    });
    }

    // 使用Fetch API(自动处理100-continue)
    async function uploadWithFetch(file, url) {
    const response = await fetch(url, {
    method: 'POST',
    headers: {
    'Content-Type': 'application/octet-stream',
    // Fetch API会自动处理Expect头
    },
    body: file
    });

    if (!response.ok) {
    throw new Error(`Upload failed: ${response.status}`);
    }

    return response.json();
    }

    23.13.6 最佳实践

    • 对于大文件上传实现100-continue支持

    • 提供清晰的拒绝原因

    • 考虑超时处理(客户端等待100响应的超时)

    • 监控417错误频率以识别客户端问题

    23.14 418 I'm a Teapot(我是茶壶)

    23.14.1 起源与定义

    418状态码最初是1998年愚人节的RFC 2324(超文本咖啡壶控制协议)的一部分,用于表示服务器是一个茶壶,无法煮咖啡。虽然最初是玩笑,但已被许多服务器实现并用于表示"这个端点永远不会工作"。

    23.14.2 现代用途

  • API端点占位符:表示尚未实现的端点

  • 内部玩笑:开发团队间的幽默

  • 安全措施:混淆攻击者

  • 复活节彩蛋:隐藏功能或玩笑响应

  • 23.14.3 实现示例

    python

    # Flask 418实现
    from flask import Flask, jsonify

    app = Flask(__name__)

    @app.route('/brew-coffee')
    def brew_coffee():
    """HTCPCP端点 – 总是返回418"""
    response = jsonify({
    "error": "I'm a teapot",
    "message": "The requested entity body is short and stout.",
    "hint": "Tip me over and pour me out.",
    "rfc": "https://tools.ietf.org/html/rfc2324",
    "solution": "Try making tea instead."
    })
    response.status_code = 418
    response.headers['X-Teapot-Type'] = 'Robust Brown Betty'
    response.headers['X-Brewing-Time'] = '4 minutes'
    return response

    @app.route('/make-tea')
    def make_tea():
    """正确的HTCPCP端点"""
    return jsonify({
    "status": "brewing",
    "type": "Earl Grey",
    "strength": "medium",
    "message": "Your tea will be ready shortly."
    })

    23.14.4 创意使用

    javascript

    // 创意418页面
    function createTeapotResponse() {
    return {
    status: 418,
    headers: {
    'Content-Type': 'application/json',
    'X-Teapot': 'true',
    'X-Brew-Advice': 'Use freshly boiled water at 95°C',
    'X-Tea-Type': 'Oolong'
    },
    body: JSON.stringify({
    error: "I'm a teapot",
    message: "This server is a teapot, not a coffee pot.",
    instructions: {
    step1: "Fill me with fresh water",
    step2: "Heat to 95°C (203°F)",
    step3: "Add 1 teaspoon of tea leaves per cup",
    step4: "Steep for 3-5 minutes",
    step5: "Pour and enjoy"
    },
    tea_suggestions: [
    {
    name: "Green Tea",
    temperature: "80°C",
    time: "2-3 minutes"
    },
    {
    name: "Black Tea",
    temperature: "95°C",
    time: "3-5 minutes"
    },
    {
    name: "Herbal Tea",
    temperature: "100°C",
    time: "5-7 minutes"
    }
    ],
    fun_fact: "The 418 status code was defined in RFC 2324 (Hyper Text Coffee Pot Control Protocol) as an April Fools' joke in 1998."
    }, null, 2)
    };
    }

    // Express中间件检测418请求
    app.use((req, res, next) => {
    // 检查是否为咖啡相关请求
    if (req.path.includes('coffee') ||
    req.get('X-Beverage') === 'coffee') {

    const teapotResponse = createTeapotResponse();

    res.status(teapotResponse.status);
    Object.entries(teapotResponse.headers)
    .forEach(([key, value]) => res.set(key, value));
    res.send(teapotResponse.body);
    return;
    }

    next();
    });

    23.14.5 最佳实践

    • 谨慎使用418,避免混淆真正的错误

    • 考虑在开发环境使用,生产环境禁用

    • 可以用于API版本控制(旧版本返回418)

    • 确保不会影响SEO或爬虫

    23.15 421 Misdirected Request(错误定向请求)

    23.15.1 定义与语义

    421状态码在HTTP/2中引入,表示请求被发送到无法生成响应的服务器。通常发生在连接重用和服务器名称指示(SNI)不匹配时。

    23.15.2 HTTP/2连接重用

    HTTP/2允许在单个连接上多路复用多个请求。但当客户端尝试在连接上发送针对不同主机的请求时,服务器可能返回421。

    23.15.3 SNI(服务器名称指示)

    SNI是TLS扩展,允许客户端在握手时指定要连接的主机名。这对于虚拟主机托管至关重要。

    23.15.4 问题场景

    text

    客户端 → 服务器: TLS握手(SNI: api.example.com)
    客户端 → 服务器: 请求1: GET /users (Host: api.example.com)
    服务器 → 客户端: 200 OK

    客户端 → 服务器: 请求2: GET /orders (Host: shop.example.com)
    服务器 → 客户端: 421 Misdirected Request
    (因为连接是为api.example.com建立的)

    23.15.5 解决方案

    nginx

    # Nginx配置处理多个主机名
    server {
    listen 443 ssl http2;
    server_name api.example.com;
    ssl_certificate /certs/api.example.com.crt;
    ssl_certificate_key /certs/api.example.com.key;

    location / {
    proxy_pass http://api_backend;
    }
    }

    server {
    listen 443 ssl http2;
    server_name shop.example.com;
    ssl_certificate /certs/shop.example.com.crt;
    ssl_certificate_key /certs/shop.example.com.key;

    location / {
    proxy_pass http://shop_backend;
    }
    }

    # 或者使用通配符证书
    server {
    listen 443 ssl http2;
    server_name api.example.com shop.example.com;
    ssl_certificate /certs/wildcard.example.com.crt;
    ssl_certificate_key /certs/wildcard.example.com.key;

    # 基于Host头路由
    location / {
    if ($host = "api.example.com") {
    proxy_pass http://api_backend;
    }
    if ($host = "shop.example.com") {
    proxy_pass http://shop_backend;
    }
    }
    }

    23.15.6 客户端处理

    javascript

    // HTTP/2客户端连接管理
    class H2ConnectionManager {
    constructor() {
    this.connections = new Map(); // host -> connection
    this.pendingRequests = new Map(); // host -> [requests]
    }

    async request(host, path, options = {}) {
    // 检查是否有现有连接
    let connection = this.connections.get(host);

    if (!connection) {
    // 创建新连接
    connection = await this.createConnection(host);
    this.connections.set(host, connection);

    // 监听连接错误
    connection.on('error', (error) => {
    console.error(`Connection to ${host} failed:`, error);
    this.connections.delete(host);
    });

    // 监听421错误
    connection.on('stream', (stream) => {
    stream.on('response', (headers) => {
    if (headers[':status'] === '421') {
    console.log(`Received 421 for ${host}, closing connection`);
    connection.close();
    this.connections.delete(host);

    // 重试待处理请求
    this.retryPendingRequests(host);
    }
    });
    });
    }

    // 发送请求
    return new Promise((resolve, reject) => {
    const stream = connection.request({
    ':method': options.method || 'GET',
    ':path': path,
    ':authority': host,
    …options.headers
    });

    stream.on('response', (headers) => {
    const chunks = [];

    stream.on('data', (chunk) => {
    chunks.push(chunk);
    });

    stream.on('end', () => {
    const response = {
    status: headers[':status'],
    headers,
    body: Buffer.concat(chunks)
    };
    resolve(response);
    });

    stream.on('error', reject);
    });

    if (options.body) {
    stream.end(options.body);
    } else {
    stream.end();
    }
    });
    }

    async createConnection(host) {
    // 实现HTTP/2连接创建
    // 注意:需要适当的HTTP/2客户端库
    throw new Error('Not implemented');
    }

    retryPendingRequests(host) {
    const pending = this.pendingRequests.get(host) || [];
    this.pendingRequests.delete(host);

    pending.forEach(request => {
    this.request(host, request.path, request.options)
    .then(request.resolve)
    .catch(request.reject);
    });
    }
    }

    23.15.7 最佳实践

    • 为不同主机名使用通配符或多域名证书

    • 客户端正确管理HTTP/2连接池

    • 监控421错误以识别配置问题

    • 考虑使用HTTP/1.1作为备用方案

    23.16 422 Unprocessable Entity(无法处理的实体)

    23.16.1 定义与语义

    422状态码来自WebDAV规范,表示服务器理解请求实体的内容类型,且语法正确,但无法处理其中包含的指令。常用于验证错误。

    23.16.2 与400的区别

    • 400 Bad Request:请求语法错误或无法解析

    • 422 Unprocessable Entity:请求语法正确,但语义错误

    23.16.3 验证错误示例

    json

    // 请求
    POST /api/users HTTP/1.1
    Content-Type: application/json

    {
    "email": "invalid-email",
    "age": -5,
    "password": "123"
    }

    // 响应
    HTTP/1.1 422 Unprocessable Entity
    Content-Type: application/problem+json

    {
    "type": "https://example.com/errors/validation",
    "title": "Validation Failed",
    "status": 422,
    "detail": "The request contains validation errors",
    "instance": "/api/users",
    "errors": [
    {
    "field": "email",
    "code": "invalid_format",
    "message": "Must be a valid email address",
    "value": "invalid-email"
    },
    {
    "field": "age",
    "code": "minimum",
    "message": "Must be greater than 0",
    "value": -5,
    "constraint": 0
    },
    {
    "field": "password",
    "code": "min_length",
    "message": "Must be at least 8 characters",
    "value": "123",
    "constraint": 8
    }
    ]
    }

    23.16.4 实现模式

    python

    # FastAPI验证错误处理
    from fastapi import FastAPI, HTTPException, Request
    from fastapi.responses import JSONResponse
    from pydantic import BaseModel, EmailStr, validator
    from typing import List

    app = FastAPI()

    class UserCreate(BaseModel):
    email: EmailStr
    age: int
    password: str

    @validator('age')
    def age_must_be_positive(cls, v):
    if v <= 0:
    raise ValueError('age must be positive')
    return v

    @validator('password')
    def password_length(cls, v):
    if len(v) < 8:
    raise ValueError('password must be at least 8 characters')
    return v

    @app.exception_handler(ValueError)
    async def validation_exception_handler(request: Request, exc: ValueError):
    # 将验证错误转换为422响应
    return JSONResponse(
    status_code=422,
    content={
    "detail": str(exc),
    "errors": [
    {
    "loc": ["body"],
    "msg": str(exc),
    "type": "value_error"
    }
    ]
    }
    )

    @app.post("/users/")
    async def create_user(user: UserCreate):
    # 如果验证通过,创建用户
    return {"message": "User created", "user": user.dict()}

    # 更精细的错误处理
    from pydantic import ValidationError

    @app.exception_handler(ValidationError)
    async def pydantic_validation_handler(request: Request, exc: ValidationError):
    errors = []

    for error in exc.errors():
    errors.append({
    "field": ".".join(str(loc) for loc in error["loc"]),
    "code": error["type"],
    "message": error["msg"],
    "context": error.get("ctx")
    })

    return JSONResponse(
    status_code=422,
    content={
    "error": "validation_failed",
    "message": "The request contains validation errors",
    "errors": errors
    }
    )

    23.16.5 前端集成

    javascript

    // 前端表单验证和422错误处理
    class FormValidator {
    constructor(formId) {
    this.form = document.getElementById(formId);
    this.errorsContainer = document.createElement('div');
    this.errorsContainer.className = 'validation-errors';
    this.form.parentNode.insertBefore(this.errorsContainer, this.form);

    this.form.addEventListener('submit', this.handleSubmit.bind(this));
    }

    async handleSubmit(event) {
    event.preventDefault();

    // 清除之前的错误
    this.clearErrors();

    // 收集表单数据
    const formData = new FormData(this.form);
    const data = Object.fromEntries(formData.entries());

    try {
    const response = await fetch(this.form.action, {
    method: this.form.method,
    headers: {
    'Content-Type': 'application/json'
    },
    body: JSON.stringify(data)
    });

    if (response.status === 422) {
    // 处理验证错误
    const result = await response.json();
    this.displayErrors(result.errors);
    return;
    }

    if (!response.ok) {
    throw new Error(`Request failed: ${response.status}`);
    }

    // 成功处理
    const result = await response.json();
    this.onSuccess(result);

    } catch (error) {
    console.error('Form submission error:', error);
    this.displayGenericError(error.message);
    }
    }

    displayErrors(errors) {
    this.errorsContainer.innerHTML = '';

    // 按字段分组错误
    const fieldErrors = {};

    errors.forEach(error => {
    if (!fieldErrors[error.field]) {
    fieldErrors[error.field] = [];
    }
    fieldErrors[error.field].push(error.message);
    });

    // 显示错误
    Object.entries(fieldErrors).forEach(([field, messages]) => {
    const fieldElement = this.form.querySelector(`[name="${field}"]`);
    if (fieldElement) {
    // 添加错误类
    fieldElement.classList.add('error');

    // 创建错误消息
    const errorElement = document.createElement('div');
    errorElement.className = 'field-error';
    errorElement.textContent = messages.join(', ');

    fieldElement.parentNode.appendChild(errorElement);
    }

    // 添加到错误容器
    const errorSummary = document.createElement('div');
    errorSummary.className = 'error-summary';
    errorSummary.innerHTML = `
    <strong>${field}:</strong>
    <ul>
    ${messages.map(msg => `<li>${msg}</li>`).join('')}
    </ul>
    `;
    this.errorsContainer.appendChild(errorSummary);
    });

    // 滚动到错误
    this.errorsContainer.scrollIntoView({ behavior: 'smooth' });
    }

    clearErrors() {
    // 清除字段错误样式
    this.form.querySelectorAll('.error').forEach(el => {
    el.classList.remove('error');
    });

    // 移除错误消息
    this.form.querySelectorAll('.field-error').forEach(el => {
    el.remove();
    });

    // 清空错误容器
    this.errorsContainer.innerHTML = '';
    }

    displayGenericError(message) {
    this.errorsContainer.innerHTML = `
    <div class="alert alert-danger">
    <strong>Error:</strong> ${message}
    </div>
    `;
    }

    onSuccess(result) {
    // 成功回调
    console.log('Form submitted successfully:', result);
    this.form.reset();
    this.clearErrors();

    alert('Form submitted successfully!');
    }
    }

    23.16.6 最佳实践

    • 提供详细的验证错误信息

    • 使用标准错误格式(如RFC 7807)

    • 实现客户端验证以减少422请求

    • 考虑使用JSON Schema进行验证

    23.17 423 Locked(已锁定)

    23.17.1 定义与语义

    423状态码来自WebDAV规范,表示请求的资源被锁定。客户端应等待锁释放或联系锁持有者。

    23.17.2 锁类型

  • 独占锁:只有一个客户端可修改资源

  • 共享锁:多个客户端可读取,但只有一个可写入

  • 深度锁:锁住资源及其所有子资源

  • 23.17.3 锁机制

    http

    # 锁定请求
    LOCK /document.txt HTTP/1.1
    Host: example.com
    Timeout: Infinite, Second-3600
    Depth: infinity
    Content-Type: application/xml

    <?xml version="1.0" encoding="utf-8" ?>
    <D:lockinfo xmlns:D="DAV:">
    <D:lockscope><D:exclusive/></D:lockscope>
    <D:locktype><D:write/></D:locktype>
    <D:owner>
    <D:href>mailto:user@example.com</D:href>
    </D:owner>
    </D:lockinfo>

    # 锁定响应
    HTTP/1.1 200 OK
    Content-Type: application/xml

    <?xml version="1.0" encoding="utf-8" ?>
    <D:prop xmlns:D="DAV:">
    <D:lockdiscovery>
    <D:activelock>
    <D:locktype><D:write/></D:locktype>
    <D:lockscope><D:exclusive/></D:lockscope>
    <D:depth>infinity</D:depth>
    <D:owner>
    <D:href>mailto:user@example.com</D:href>
    </D:owner>
    <D:locktoken>
    <D:href>urn:uuid:e71d4fae-5dec-11d0-a765-00a0c91e6bf6</D:href>
    </D:locktoken>
    <D:timeout>Second-3600</D:timeout>
    </D:activelock>
    </D:lockdiscovery>
    </D:prop>

    # 尝试修改已锁定的资源
    PUT /document.txt HTTP/1.1
    Host: example.com
    If: <urn:uuid:e71d4fae-5dec-11d0-a765-00a0c91e6bf6>
    Content-Type: text/plain

    New content

    # 响应(如果没有正确的锁令牌)
    HTTP/1.1 423 Locked
    Content-Type: application/xml

    <?xml version="1.0" encoding="utf-8" ?>
    <D:error xmlns:D="DAV:">
    <D:lock-token-submitted>
    <D:href>/document.txt</D:href>
    </D:lock-token-submitted>
    </D:error>

    23.17.4 现代API实现

    python

    # Flask资源锁实现
    from flask import Flask, request, jsonify
    import threading
    import time
    import uuid
    from datetime import datetime, timedelta

    app = Flask(__name__)

    class ResourceLock:
    def __init__(self):
    self.locks = {}
    self.lock = threading.Lock()

    def acquire(self, resource_id, client_id, timeout=300):
    """获取资源锁"""
    with self.lock:
    current_time = datetime.now()

    # 检查是否已锁定
    if resource_id in self.locks:
    lock_info = self.locks[resource_id]

    # 检查锁是否过期
    if lock_info['expires'] < current_time:
    # 锁已过期,可以获取
    pass
    elif lock_info['client_id'] == client_id:
    # 同一客户端,更新超时
    lock_info['expires'] = current_time + timedelta(seconds=timeout)
    return True
    else:
    # 资源被其他客户端锁定
    return False

    # 获取新锁
    lock_token = str(uuid.uuid4())
    self.locks[resource_id] = {
    'client_id': client_id,
    'token': lock_token,
    'created': current_time,
    'expires': current_time + timedelta(seconds=timeout),
    'timeout': timeout
    }

    return lock_token

    def release(self, resource_id, client_id=None, token=None):
    """释放资源锁"""
    with self.lock:
    if resource_id not in self.locks:
    return True

    lock_info = self.locks[resource_id]

    # 检查权限
    if client_id and lock_info['client_id'] != client_id:
    return False

    if token and lock_info['token'] != token:
    return False

    # 释放锁
    del self.locks[resource_id]
    return True

    def check(self, resource_id, token=None):
    """检查资源是否被锁定"""
    with self.lock:
    if resource_id not in self.locks:
    return True

    lock_info = self.locks[resource_id]

    # 检查是否过期
    if lock_info['expires'] < datetime.now():
    del self.locks[resource_id]
    return True

    # 检查令牌
    if token and lock_info['token'] == token:
    return True

    return False

    # 全局锁管理器
    lock_manager = ResourceLock()

    @app.route('/api/documents/<document_id>/lock', methods=['POST'])
    def lock_document(document_id):
    """锁定文档"""
    client_id = request.headers.get('X-Client-ID')
    if not client_id:
    return jsonify({'error': 'client_id_required'}), 400

    timeout = request.json.get('timeout', 300)

    # 尝试获取锁
    result = lock_manager.acquire(document_id, client_id, timeout)

    if result is True:
    # 已持有锁
    return jsonify({'status': 'already_locked'}), 200
    elif result is False:
    # 被其他客户端锁定
    lock_info = get_lock_info(document_id)
    return jsonify({
    'error': 'resource_locked',
    'message': 'Document is locked by another client',
    'locked_by': lock_info['client_id'],
    'expires_at': lock_info['expires'].isoformat()
    }), 423
    else:
    # 成功获取锁
    return jsonify({
    'lock_token': result,
    'timeout': timeout,
    'expires_in': timeout
    }), 200

    @app.route('/api/documents/<document_id>', methods=['PUT'])
    def update_document(document_id):
    """更新文档(需要锁)"""
    lock_token = request.headers.get('X-Lock-Token')

    # 检查锁
    if not lock_manager.check(document_id, lock_token):
    return jsonify({
    'error': 'resource_locked',
    'message': 'Document is locked. Provide valid lock token.',
    'required_header': 'X-Lock-Token'
    }), 423

    # 更新文档逻辑
    # …

    return jsonify({'status': 'updated'}), 200

    def get_lock_info(resource_id):
    """获取锁信息(简化实现)"""
    # 实际实现需要线程安全地访问lock_manager
    return {'client_id': 'unknown', 'expires': datetime.now()}

    23.17.5 最佳实践

    • 实现锁超时机制防止死锁

    • 提供锁令牌以便客户端后续操作

    • 支持锁查询和释放操作

    • 考虑分布式锁方案(如Redis)

    23.18 424 Failed Dependency(依赖失败)

    23.18.1 定义与语义

    424状态码来自WebDAV规范,表示请求的操作依赖于另一个操作,且那个操作失败。用于事务性操作或多资源操作。

    23.18.2 使用场景

  • 批量操作:部分操作失败导致整体失败

  • 事务处理:数据库事务回滚

  • 文件操作:移动包含多个文件的文件夹

  • API组合:调用多个微服务

  • 23.18.3 实现示例

    python

    # Django事务性操作
    from django.db import transaction
    from django.http import JsonResponse
    from django.views import View
    import logging

    logger = logging.getLogger(__name__)

    class BatchCreateView(View):
    @transaction.atomic
    def post(self, request):
    data = request.json
    created_resources = []
    errors = []

    # 创建保存点
    sid = transaction.savepoint()

    try:
    for item in data['items']:
    try:
    # 创建资源
    resource = self.create_resource(item)
    created_resources.append(resource)

    except ValidationError as e:
    # 单个资源创建失败
    errors.append({
    'item': item,
    'error': str(e),
    'field': getattr(e, 'field', None)
    })
    # 回滚到保存点
    transaction.savepoint_rollback(sid)
    raise BatchOperationError(f"Failed to create item: {item}")

    # 所有操作成功,提交事务
    transaction.savepoint_commit(sid)

    return JsonResponse({
    'status': 'success',
    'created': len(created_resources),
    'resources': created_resources
    })

    except BatchOperationError as e:
    # 返回424响应
    return JsonResponse({
    'error': 'failed_dependency',
    'message': 'Batch operation failed due to dependency errors',
    'details': errors,
    'successful_count': len(created_resources),
    'failed_count': len(errors)
    }, status=424)

    except Exception as e:
    # 其他错误
    logger.error(f"Batch operation failed: {e}")
    return JsonResponse({
    'error': 'internal_error',
    'message': 'Internal server error'
    }, status=500)

    def create_resource(self, item):
    # 实现资源创建逻辑
    # 可能抛出ValidationError
    pass

    class BatchOperationError(Exception):
    pass

    23.18.4 微服务场景

    javascript

    // 微服务编排中的依赖失败处理
    class Orchestrator {
    async processOrder(order) {
    const steps = [
    this.validateOrder.bind(this, order),
    this.reserveInventory.bind(this, order),
    this.processPayment.bind(this, order),
    this.shipOrder.bind(this, order),
    this.sendConfirmation.bind(this, order)
    ];

    const results = [];
    const errors = [];

    for (let i = 0; i < steps.length; i++) {
    try {
    const result = await steps[i]();
    results.push({
    step: i + 1,
    success: true,
    result
    });
    } catch (error) {
    errors.push({
    step: i + 1,
    success: false,
    error: error.message,
    dependency_failed: i > 0 ? i : null
    });

    // 如果依赖步骤失败,后续步骤无法执行
    if (i > 0) {
    // 补偿已完成的步骤
    await this.compensate(results.slice(0, i));

    return {
    status: 'failed',
    error_type: 'dependency_failed',
    status_code: 424,
    completed_steps: results.length,
    failed_step: i + 1,
    errors,
    compensation_status: 'executed'
    };
    }

    // 第一步失败,直接返回错误
    return {
    status: 'failed',
    error_type: 'initial_failure',
    status_code: 400,
    failed_step: 1,
    error: error.message
    };
    }
    }

    return {
    status: 'success',
    results
    };
    }

    async compensate(completedSteps) {
    // 执行补偿操作
    for (const step of completedSteps.reverse()) {
    try {
    await this.reverseStep(step);
    } catch (error) {
    console.error(`Compensation failed for step ${step.step}:`, error);
    }
    }
    }

    async reverseStep(step) {
    // 实现步骤回滚逻辑
    switch (step.step) {
    case 2: // 库存预留
    await this.releaseInventory(step.result.inventory_id);
    break;
    case 3: // 支付处理
    await this.refundPayment(step.result.payment_id);
    break;
    // 其他步骤的回滚…
    }
    }
    }

    23.18.5 最佳实践

    • 实现原子操作或事务补偿

    • 提供详细的失败依赖信息

    • 设计幂等操作以便重试

    • 监控依赖失败率以识别系统弱点

    23.19 425 Too Early(太早)

    23.19.1 定义与语义

    425状态码来自RFC 8470,表示服务器不愿意处理请求,因为它可能被重放。用于防止重放攻击,特别是在0-RTT(零往返时间)TLS连接中。

    23.19.2 TLS 1.3和0-RTT

    TLS 1.3引入了0-RTT特性,允许客户端在TLS握手完成前发送数据。这带来了重放攻击的风险。

    23.19.3 重放攻击防护

    python

    # Flask 0-RTT请求处理
    from flask import Flask, request, jsonify
    import hashlib
    import time
    from collections import deque

    app = Flask(__name__)

    class ReplayProtection:
    def __init__(self, window_size=300): # 5分钟窗口
    self.window_size = window_size
    self.seen_requests = deque(maxlen=10000)

    def is_replay(self, request_data, client_id):
    """检查是否为重放请求"""
    # 创建请求指纹
    fingerprint = self.create_fingerprint(request_data, client_id)

    # 检查是否已见过
    if fingerprint in self.seen_requests:
    return True

    # 添加到已见列表
    self.seen_requests.append(fingerprint)

    # 清理过期指纹(简化实现)
    if len(self.seen_requests) % 100 == 0:
    self.cleanup()

    return False

    def create_fingerprint(self, request_data, client_id):
    """创建请求指纹"""
    components = [
    client_id,
    str(time.time() // self.window_size), # 时间窗口
    request.method,
    request.path,
    hashlib.sha256(request.get_data()).hexdigest()[:16]
    ]

    return hashlib.sha256('|'.join(components).encode()).hexdigest()

    def cleanup(self):
    """清理过期指纹(简化)"""
    # 实际实现需要基于时间戳清理
    pass

    replay_protection = ReplayProtection()

    @app.before_request
    def check_replay():
    # 检查是否为0-RTT请求
    early_data = request.headers.get('Early-Data') == '1'

    if early_data:
    # 提取客户端标识
    client_id = request.headers.get('X-Client-ID') or request.remote_addr

    # 检查重放
    if replay_protection.is_replay(request, client_id):
    return jsonify({
    'error': 'too_early',
    'message': 'Request may be replayed',
    'retry_after': 1,
    'hint': 'Wait 1 second and retry without early data'
    }), 425

    @app.route('/api/order', methods=['POST'])
    def create_order():
    # 检查是否为0-RTT请求
    early_data = request.headers.get('Early-Data') == '1'

    if early_data:
    # 对于0-RTT请求,只允许幂等操作
    # 或者执行额外验证
    return jsonify({
    'status': 'accepted_with_caution',
    'message': 'Order accepted via 0-RTT, additional verification may be required',
    'order_id': generate_order_id(),
    'warning': 'This request was sent via 0-RTT TLS'
    })

    # 正常请求处理
    return jsonify({
    'status': 'created',
    'order_id': generate_order_id()
    })

    def generate_order_id():
    import uuid
    return str(uuid.uuid4())

    23.19.4 客户端实现

    javascript

    // 支持0-RTT的客户端
    class EarlyDataClient {
    constructor(baseURL) {
    this.baseURL = baseURL;
    this.earlyDataSupported = false;
    this.earlyDataAttempts = new Map(); // requestId -> timestamp
    }

    async request(endpoint, options = {}) {
    const requestId = this.generateRequestId();
    const url = `${this.baseURL}${endpoint}`;

    // 检查是否支持0-RTT
    if (this.earlyDataSupported && this.isIdempotent(options.method)) {
    // 尝试0-RTT请求
    try {
    const earlyOptions = {
    …options,
    headers: {
    …options.headers,
    'Early-Data': '1',
    'X-Request-ID': requestId
    }
    };

    this.earlyDataAttempts.set(requestId, Date.now());

    const response = await fetch(url, earlyOptions);

    if (response.status === 425) {
    // 太早,需要重试
    console.log('Received 425 Too Early, retrying without early data');

    // 移除Early-Data头
    const retryOptions = { …options };
    if (retryOptions.headers) {
    delete retryOptions.headers['Early-Data'];
    }

    // 等待建议的时间
    const retryAfter = response.headers.get('Retry-After');
    if (retryAfter) {
    await this.delay(parseInt(retryAfter) * 1000);
    }

    return this.requestWithoutEarlyData(url, retryOptions);
    }

    // 检查是否为谨慎接受的响应
    if (response.status === 200) {
    const data = await response.json();
    if (data.warning && data.warning.includes('0-RTT')) {
    console.warn('Request accepted via 0-RTT with caution');
    }
    }

    return response;

    } catch (error) {
    console.error('Early data request failed:', error);
    // 回退到正常请求
    return this.requestWithoutEarlyData(url, options);
    }
    }

    // 正常请求
    return this.requestWithoutEarlyData(url, options);
    }

    async requestWithoutEarlyData(url, options) {
    // 移除可能的Early-Data头
    const cleanOptions = { …options };
    if (cleanOptions.headers) {
    delete cleanOptions.headers['Early-Data'];
    }

    return fetch(url, cleanOptions);
    }

    isIdempotent(method) {
    // 检查方法是否幂等
    const idempotentMethods = ['GET', 'HEAD', 'PUT', 'DELETE', 'OPTIONS', 'TRACE'];
    return idempotentMethods.includes(method.toUpperCase());
    }

    generateRequestId() {
    return `${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;
    }

    delay(ms) {
    return new Promise(resolve => setTimeout(resolve, ms));
    }

    async detectEarlyDataSupport() {
    // 检测服务器是否支持0-RTT
    try {
    const response = await fetch(`${this.baseURL}/.well-known/support-early-data`, {
    method: 'HEAD'
    });

    this.earlyDataSupported = response.headers.get('Supports-Early-Data') === '1';
    return this.earlyDataSupported;
    } catch (error) {
    this.earlyDataSupported = false;
    return false;
    }
    }
    }

    23.19.5 最佳实践

    • 只对幂等操作使用0-RTT

    • 实现重放保护机制

    • 为0-RTT请求提供明确的警告

    • 监控425错误率以调整重放窗口

    23.20 426 Upgrade Required(需要升级)

    23.20.1 定义与语义

    426状态码表示服务器拒绝使用当前协议处理请求,但愿意在客户端升级到不同协议后处理。通常用于协议升级协商。

    23.20.2 协议升级机制

    http

    # 客户端请求升级
    GET /chat HTTP/1.1
    Host: server.example.com
    Upgrade: websocket
    Connection: Upgrade
    Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
    Sec-WebSocket-Version: 13

    # 服务器同意升级
    HTTP/1.1 101 Switching Protocols
    Upgrade: websocket
    Connection: Upgrade
    Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=

    # 服务器拒绝升级(要求其他协议)
    HTTP/1.1 426 Upgrade Required
    Upgrade: TLS/1.2, HTTP/1.1
    Connection: Upgrade
    Content-Type: application/json

    {
    "error": "upgrade_required",
    "message": "Please upgrade to a more secure protocol",
    "supported_upgrades": [
    {
    "protocol": "TLS/1.2",
    "description": "Transport Layer Security 1.2"
    },
    {
    "protocol": "HTTP/2",
    "description": "HTTP version 2"
    }
    ]
    }

    23.20.3 使用场景

  • 安全协议升级:要求从HTTP升级到HTTPS

  • API版本弃用:要求使用新版本API

  • 传输协议升级:从HTTP/1.1升级到HTTP/2或HTTP/3

  • WebSocket握手:协商WebSocket协议

  • 23.20.4 实现示例

    python

    # Flask协议升级处理
    from flask import Flask, request, jsonify
    import ssl

    app = Flask(__name__)

    @app.before_request
    def require_secure_connection():
    # 检查是否为HTTPS
    if not request.is_secure:
    # 检查是否支持升级
    upgrade_header = request.headers.get('Upgrade')

    if upgrade_header and 'TLS' in upgrade_header:
    # 客户端已请求升级
    return # 允许继续,将由底层服务器处理

    # 返回426要求升级
    response = jsonify({
    'error': 'upgrade_required',
    'message': 'Secure connection required',
    'upgrade_to': 'HTTPS',
    'url': f'https://{request.host}{request.path}',
    'status_code': 301 # 也提供重定向选项
    })
    response.status_code = 426
    response.headers['Upgrade'] = 'TLS/1.2, TLS/1.3'
    response.headers['Connection'] = 'Upgrade'
    return response

    @app.route('/api/deprecated', methods=['GET'])
    def deprecated_api():
    """已弃用的API端点"""
    # 检查客户端是否使用新版本
    api_version = request.headers.get('X-API-Version', '1.0')

    if api_version == '1.0':
    # 要求升级到v2
    response = jsonify({
    'error': 'api_version_deprecated',
    'message': 'API v1.0 is deprecated',
    'upgrade_to': 'v2.0',
    'documentation': 'https://api.example.com/v2/docs',
    'sunset_date': '2024-12-31'
    })
    response.status_code = 426
    response.headers['X-API-Deprecated'] = 'true'
    response.headers['X-API-Sunset'] = '2024-12-31T23:59:59Z'
    return response

    # 新版本API处理
    return jsonify({'data': 'from v2 API'})

    # WebSocket升级处理
    @app.route('/ws')
    def websocket_endpoint():
    # Flask本身不直接处理WebSocket
    # 这通常由专门的WebSocket服务器处理
    # 以下为概念性代码

    upgrade = request.headers.get('Upgrade', '').lower()

    if upgrade == 'websocket':
    # 检查WebSocket版本
    ws_version = request.headers.get('Sec-WebSocket-Version')

    if ws_version != '13':
    response = jsonify({
    'error': 'unsupported_websocket_version',
    'message': f'WebSocket version {ws_version} not supported',
    'supported_versions': ['13']
    })
    response.status_code = 426
    response.headers['Sec-WebSocket-Version'] = '13'
    return response

    # WebSocket握手将由底层服务器处理
    return '', 101 # 实际应由WebSocket服务器处理

    return jsonify({'error': 'websocket_upgrade_required'}), 426

    23.20.5 客户端处理

    javascript

    // 处理426升级的客户端
    class UpgradeAwareClient {
    constructor(baseURL) {
    this.baseURL = baseURL;
    this.upgradeHandlers = new Map();

    // 注册升级处理器
    this.registerUpgradeHandler('TLS', this.handleTlsUpgrade.bind(this));
    this.registerUpgradeHandler('websocket', this.handleWebSocketUpgrade.bind(this));
    this.registerUpgradeHandler('h2', this.handleHttp2Upgrade.bind(this));
    }

    async request(options) {
    try {
    const response = await fetch(this.baseURL + options.path, options);

    if (response.status === 426) {
    // 需要升级
    return this.handleUpgradeResponse(response, options);
    }

    return response;
    } catch (error) {
    console.error('Request failed:', error);
    throw error;
    }
    }

    async handleUpgradeResponse(response, originalOptions) {
    const upgradeHeader = response.headers.get('Upgrade');

    if (!upgradeHeader) {
    throw new Error('426 response without Upgrade header');
    }

    // 解析支持的升级选项
    const upgrades = upgradeHeader.split(',').map(s => s.trim());

    // 选择支持的升级
    for (const upgrade of upgrades) {
    const handler = this.upgradeHandlers.get(upgrade.toLowerCase());
    if (handler) {
    return handler(response, originalOptions);
    }
    }

    throw new Error(`No handler for required upgrade: ${upgrades.join(', ')}`);
    }

    async handleTlsUpgrade(response, originalOptions) {
    // 从HTTP升级到HTTPS
    const httpsUrl = this.baseURL.replace('http://', 'https://');

    console.log('Upgrading to HTTPS…');

    // 重试请求到HTTPS端点
    const upgradedOptions = { …originalOptions };
    upgradedOptions.path = originalOptions.path;

    return fetch(httpsUrl + upgradedOptions.path, upgradedOptions);
    }

    async handleWebSocketUpgrade(response, originalOptions) {
    // 升级到WebSocket
    const wsUrl = this.baseURL.replace('http', 'ws') + originalOptions.path;

    console.log('Upgrading to WebSocket…');

    // 创建WebSocket连接
    return new Promise((resolve, reject) => {
    const ws = new WebSocket(wsUrl);

    ws.onopen = () => {
    console.log('WebSocket connected');

    // 如果原始请求是GET,可以解析为成功
    if (originalOptions.method === 'GET') {
    resolve({
    status: 101,
    ok: true,
    webSocket: ws
    });
    } else {
    // 对于其他方法,需要额外处理
    reject(new Error('WebSocket upgrade only supported for GET requests'));
    }
    };

    ws.onerror = (error) => {
    reject(new Error(`WebSocket connection failed: ${error}`));
    };
    });
    }

    async handleHttp2Upgrade(response, originalOptions) {
    // HTTP/2升级(浏览器中通常自动处理)
    console.log('Upgrading to HTTP/2…');

    // 对于浏览器,只需重试请求
    return fetch(this.baseURL + originalOptions.path, originalOptions);
    }

    registerUpgradeHandler(protocol, handler) {
    this.upgradeHandlers.set(protocol.toLowerCase(), handler);
    }
    }

    23.20.6 最佳实践

    • 提供清晰的升级说明和文档链接

    • 支持多种升级选项

    • 实现优雅降级

    • 监控协议使用情况以计划弃用

    23.21 428 Precondition Required(需要前提条件)

    23.21.1 定义与语义

    428状态码表示源服务器要求请求是条件性的。用于防止"丢失更新"问题,客户端在更新资源前必须先获取当前状态。

    23.21.2 与412的区别

    • 412 Precondition Failed:客户端提供了条件头,但条件不满足

    • 428 Precondition Required:服务器要求条件请求,但客户端没有提供条件头

    23.21.3 实现模式

    python

    # Django条件请求要求
    from django.http import JsonResponse
    from django.views import View
    from django.utils.decorators import method_decorator
    from django.views.decorators.http import require_http_methods
    import hashlib
    import json

    class ConditionalUpdateMixin:
    """要求条件更新的Mixin"""

    def require_conditional_request(self, request, *args, **kwargs):
    """检查是否满足条件请求要求"""
    # 对于安全方法(GET、HEAD)不需要条件
    if request.method in ['GET', 'HEAD', 'OPTIONS']:
    return True

    # 检查条件头
    has_condition = any([
    request.META.get('HTTP_IF_MATCH'),
    request.META.get('HTTP_IF_NONE_MATCH'),
    request.META.get('HTTP_IF_MODIFIED_SINCE'),
    request.META.get('HTTP_IF_UNMODIFIED_SINCE')
    ])

    if not has_condition:
    # 返回428,要求条件请求
    response = JsonResponse({
    'error': 'precondition_required',
    'message': 'This request requires conditional headers',
    'required_headers': [
    'If-Match or If-None-Match',
    'If-Modified-Since or If-Unmodified-Since'
    ],
    'how_to': 'First perform a GET request to get the current ETag, then include it in If-Match header'
    })
    response.status_code = 428
    response['Precondition-Required'] = 'true'
    return response

    return True

    class ResourceDetailView(ConditionalUpdateMixin, View):
    """需要条件更新的资源详情视图"""

    def dispatch(self, request, *args, **kwargs):
    # 检查条件请求要求
    requirement_check = self.require_conditional_request(request, *args, **kwargs)
    if requirement_check is not True:
    return requirement_check

    return super().dispatch(request, *args, **kwargs)

    def get(self, request, resource_id):
    """获取资源(包含ETag)"""
    resource = self.get_resource(resource_id)

    # 生成ETag
    etag = self.generate_etag(resource)

    response = JsonResponse(resource)
    response['ETag'] = etag
    response['Last-Modified'] = resource['modified_at']

    return response

    def put(self, request, resource_id):
    """更新资源(需要If-Match)"""
    resource = self.get_resource(resource_id)
    current_etag = self.generate_etag(resource)

    # 检查If-Match头
    if_match = request.META.get('HTTP_IF_MATCH')
    if not if_match:
    # 这不应该发生,因为dispatch已经检查了
    return JsonResponse({
    'error': 'precondition_required',
    'message': 'If-Match header is required'
    }, status=428)

    # 验证ETag
    if if_match != current_etag and if_match != '*':
    return JsonResponse({
    'error': 'precondition_failed',
    'message': 'Resource has been modified',
    'current_etag': current_etag
    }, status=412)

    # 更新资源
    updated_resource = self.update_resource(resource_id, request.body)
    new_etag = self.generate_etag(updated_resource)

    response = JsonResponse(updated_resource)
    response['ETag'] = new_etag
    return response

    def get_resource(self, resource_id):
    """获取资源(简化实现)"""
    return {
    'id': resource_id,
    'name': f'Resource {resource_id}',
    'modified_at': '2024-01-15T10:30:00Z',
    'data': 'Some data'
    }

    def generate_etag(self, resource):
    """生成ETag"""
    content = json.dumps(resource, sort_keys=True)
    return hashlib.md5(content.encode()).hexdigest()

    def update_resource(self, resource_id, data):
    """更新资源(简化实现)"""
    resource = self.get_resource(resource_id)
    # 实际实现会解析和更新数据
    resource['modified_at'] = '2024-01-15T11:00:00Z'
    return resource

    23.21.4 客户端流程

    javascript

    // 处理428的客户端
    class ConditionalRequestClient {
    async updateResource(resourceId, updates) {
    // 1. 首先获取资源以获取ETag
    const getResponse = await fetch(`/api/resources/${resourceId}`);

    if (!getResponse.ok) {
    throw new Error(`Failed to get resource: ${getResponse.status}`);
    }

    const etag = getResponse.headers.get('ETag');
    const resource = await getResponse.json();

    // 2. 使用ETag进行条件更新
    const updateResponse = await fetch(`/api/resources/${resourceId}`, {
    method: 'PUT',
    headers: {
    'Content-Type': 'application/json',
    'If-Match': etag
    },
    body: JSON.stringify({
    …resource,
    …updates
    })
    });

    if (updateResponse.status === 428) {
    // 服务器要求条件请求,但我们已经提供了
    // 这可能是配置错误
    throw new Error('Server requires conditional request but rejected our If-Match header');
    }

    if (updateResponse.status === 412) {
    // 前提条件失败(资源已被修改)
    // 获取新版本并重试或通知用户
    const error = await updateResponse.json();
    throw new Error(`Update conflict: ${error.message}`);
    }

    if (!updateResponse.ok) {
    throw new Error(`Update failed: ${updateResponse.status}`);
    }

    return updateResponse.json();
    }

    async safeUpdateWithRetry(resourceId, updates, maxRetries = 3) {
    for (let attempt = 0; attempt < maxRetries; attempt++) {
    try {
    return await this.updateResource(resourceId, updates);
    } catch (error) {
    if (error.message.includes('Update conflict') && attempt < maxRetries – 1) {
    // 冲突错误,等待后重试
    console.log(`Update conflict, retrying (attempt ${attempt + 1})…`);
    await this.delay(100 * Math.pow(2, attempt)); // 指数退避
    continue;
    }
    throw error;
    }
    }
    }

    delay(ms) {
    return new Promise(resolve => setTimeout(resolve, ms));
    }
    }

    23.21.5 最佳实践

    • 为所有非安全方法要求条件请求

    • 提供清晰的错误信息和解决步骤

    • 实现乐观锁避免更新冲突

    • 考虑支持弱ETag用于性能优化

    23.22 429 Too Many Requests(请求过多)

    注:虽然标题中未列出429,但由于它常与其他4xx状态码一起讨论,且非常重要,此处简要涵盖。

    23.22.1 定义与语义

    429状态码表示用户在给定时间内发送了太多请求("限流")。用于防止滥用和保证服务可用性。

    23.22.2 限流头信息

    http

    HTTP/1.1 429 Too Many Requests
    Content-Type: application/json
    Retry-After: 3600
    X-RateLimit-Limit: 100
    X-RateLimit-Remaining: 0
    X-RateLimit-Reset: 1673780400

    {
    "error": "rate_limit_exceeded",
    "message": "Too many requests, please try again later",
    "retry_after": 3600,
    "limit": 100,
    "period": "hour",
    "documentation": "https://api.example.com/docs/rate-limiting"
    }

    23.22.3 限流策略

  • 令牌桶算法:平滑限流,允许突发

  • 漏桶算法:恒定速率处理

  • 固定窗口计数器:简单计数,但可能允许瞬时超限

  • 滑动窗口日志:精确但内存消耗大

  • 分布式限流:用于微服务架构

  • 23.23 431 Request Header Fields Too Large(请求头字段太大)

    23.23.1 定义与语义

    431状态码表示服务器不愿意处理请求,因为单个头字段或所有头字段的总大小超过限制。

    23.23.2 常见限制

    服务器默认限制配置项
    Nginx 4KB/8KB(取决于系统) large_client_header_buffers
    Apache 8KB LimitRequestFieldSize
    IIS 16KB maxRequestLength
    Node.js 16KB maxHeaderSize

    23.23.3 问题诊断

    python

    # 诊断头大小问题
    def diagnose_header_size(request):
    total_size = 0
    header_sizes = {}

    for key, value in request.headers.items():
    size = len(key) + len(value) + 2 # +2 for ": "
    header_sizes[key] = size
    total_size += size

    return {
    'total_size': total_size,
    'header_sizes': header_sizes,
    'largest_headers': sorted(
    header_sizes.items(),
    key=lambda x: x[1],
    reverse=True
    )[:5]
    }

    # 中间件检查头大小
    from django.http import JsonResponse

    class HeaderSizeMiddleware:
    def __init__(self, get_response):
    self.get_response = get_response
    self.max_header_size = 8 * 1024 # 8KB

    def __call__(self, request):
    # 计算请求头大小
    header_size = sum(
    len(key) + len(value) + 2
    for key, value in request.headers.items()
    )

    if header_size > self.max_header_size:
    return JsonResponse({
    'error': 'request_header_fields_too_large',
    'message': 'Request headers exceed size limit',
    'max_size': self.max_header_size,
    'actual_size': header_size,
    'largest_headers': self.get_largest_headers(request)
    }, status=431)

    return self.get_response(request)

    def get_largest_headers(self, request):
    headers = []
    for key, value in request.headers.items():
    size = len(key) + len(value) + 2
    headers.append({'name': key, 'size': size})

    return sorted(headers, key=lambda x: x['size'], reverse=True)[:3]

    23.23.4 客户端优化

    javascript

    // 优化请求头大小
    class HeaderOptimizer {
    constructor() {
    this.essentialHeaders = new Set([
    'authorization',
    'content-type',
    'accept',
    'user-agent'
    ]);
    }

    optimizeHeaders(headers) {
    const optimized = {};
    let totalSize = 0;

    for (const [key, value] of Object.entries(headers)) {
    const lowerKey = key.toLowerCase();

    // 只保留必要头
    if (this.essentialHeaders.has(lowerKey) ||
    lowerKey.startsWith('x-') ||
    lowerKey.startsWith('sec-')) {

    optimized[key] = value;
    totalSize += key.length + value.length + 2;
    }
    }

    // 检查是否超过限制
    if (totalSize > 8 * 1024) { // 8KB
    console.warn(`Request headers are large: ${totalSize} bytes`);

    // 尝试压缩大值头
    return this.compressHeaders(optimized);
    }

    return optimized;
    }

    compressHeaders(headers) {
    const compressed = { …headers };

    // 对于大值的自定义头,可以考虑压缩
    for (const [key, value] of Object.entries(headers)) {
    if (key.startsWith('X-') && value.length > 1024) {
    // 使用Base64编码或其他压缩
    compressed[key] = this.compressValue(value);
    }
    }

    return compressed;
    }

    compressValue(value) {
    // 简单压缩示例
    if (typeof value === 'object') {
    // 如果是对象,转为紧凑JSON
    return JSON.stringify(value);
    }

    return value;
    }

    async makeRequest(url, options) {
    const optimizedHeaders = this.optimizeHeaders(options.headers || {});

    const optimizedOptions = {
    …options,
    headers: optimizedHeaders
    };

    const response = await fetch(url, optimizedOptions);

    if (response.status === 431) {
    // 头仍然太大,需要进一步优化
    console.error('Headers still too large after optimization');

    // 获取诊断信息
    const error = await response.json();
    console.error('Large headers:', error.largest_headers);

    throw new Error('Request headers too large');
    }

    return response;
    }
    }

    23.23.5 服务器配置

    nginx

    # Nginx头大小配置
    http {
    # 客户端头缓冲区大小
    client_header_buffer_size 1k;

    # 大客户端头缓冲区
    large_client_header_buffers 4 8k;

    # 请求行最大大小
    client_max_body_size 10m;

    # 特定位置更严格的限制
    location /api/ {
    # 更严格的头大小限制
    large_client_header_buffers 2 4k;

    # 自定义错误处理
    error_page 431 /431.json;

    location = /431.json {
    internal;
    default_type application/json;
    return 200 '{"error":"headers_too_large","message":"Request headers exceed 4KB limit"}';
    }
    }
    }

    23.23.6 最佳实践

    • 监控头大小分布以设置合理限制

    • 提供清晰的错误信息指出哪些头太大

    • 考虑支持头压缩(如HTTP/2 HPACK)

    • 避免在头中存储大量数据

    23.24 451 Unavailable For Legal Reasons(因法律原因不可用)

    23.24.1 定义与语义

    451状态码表示服务器由于法律原因无法提供请求的资源。引用自雷·布拉德伯里的《华氏451》,该状态码专门用于内容审查场景。

    23.24.2 使用场景

  • 政府审查:政府要求屏蔽的内容

  • 版权问题:侵权内容移除

  • 法院命令:根据法律命令屏蔽

  • 地区限制:内容地理封锁

  • GDPR合规:数据保护要求移除

  • 23.24.3 响应格式

    http

    HTTP/1.1 451 Unavailable For Legal Reasons
    Content-Type: application/json
    X-Censorship-Reason: government-order
    X-Blocking-Authority: Ministry of Truth
    X-Appeal-URL: https://example.com/appeal
    Link: <https://eff.org/censorship>; rel="related"

    {
    "error": "unavailable_for_legal_reasons",
    "message": "This resource is not available in your country due to legal restrictions",
    "reference": "Government Order 2024-01",
    "reason": "violates_local_laws",
    "country": "DE",
    "blocking_authority": "Federal Agency",
    "block_date": "2024-01-15",
    "appeal_process": {
    "url": "https://example.com/appeal",
    "email": "legal@example.com",
    "deadline": "2024-02-15"
    },
    "alternatives": [
    {
    "description": "Similar content available in other regions",
    "url": "https://global.example.com/content/123"
    }
    ]
    }

    23.24.4 实现示例

    python

    # Django地理封锁和内容审查
    from django.http import JsonResponse
    from django.views import View
    from django.utils.decorators import method_decorator
    from django.views.decorators.vary import vary_on_headers
    import geoip2.database
    import json

    class LegalComplianceMiddleware:
    """法律合规中间件"""

    def __init__(self, get_response):
    self.get_response = get_response
    self.geoip_reader = geoip2.database.Reader('/path/to/GeoLite2-Country.mmdb')
    self.blocked_content = self.load_blocked_content()

    def __call__(self, request):
    response = self.get_response(request)

    # 检查是否需要地理封锁
    if self.is_blocked_in_country(request):
    return self.create_451_response(request)

    return response

    def is_blocked_in_country(self, request):
    """检查内容是否在用户所在国家被封锁"""
    try:
    # 获取用户国家
    country_code = self.get_user_country(request)

    # 获取请求的路径
    path = request.path

    # 检查是否在封锁列表中
    if path in self.blocked_content:
    blocked_in = self.blocked_content[path].get('blocked_countries', [])
    return country_code in blocked_in

    except Exception as e:
    print(f"Error checking geo-block: {e}")

    return False

    def get_user_country(self, request):
    """从IP获取用户国家"""
    # 从X-Forwarded-For或直接IP获取
    ip = self.get_client_ip(request)

    try:
    response = self.geoip_reader.country(ip)
    return response.country.iso_code
    except:
    # 默认国家或基于其他头判断
    return request.META.get('HTTP_CF_IPCOUNTRY', 'US')

    def get_client_ip(self, request):
    """获取客户端IP"""
    x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
    if x_forwarded_for:
    ip = x_forwarded_for.split(',')[0]
    else:
    ip = request.META.get('REMOTE_ADDR')
    return ip

    def create_451_response(self, request):
    """创建451响应"""
    path = request.path
    blocking_info = self.blocked_content.get(path, {})

    response = JsonResponse({
    'error': 'unavailable_for_legal_reasons',
    'message': blocking_info.get('message', 'Content not available in your region'),
    'reason': blocking_info.get('reason', 'legal_restriction'),
    'reference': blocking_info.get('reference'),
    'country': self.get_user_country(request),
    'blocking_authority': blocking_info.get('authority'),
    'block_date': blocking_info.get('block_date'),
    'appeal': blocking_info.get('appeal_url'),
    'alternatives': blocking_info.get('alternatives', [])
    })
    response.status_code = 451

    # 添加信息头
    response['X-Censorship-Reason'] = blocking_info.get('reason', 'unknown')
    if blocking_info.get('authority'):
    response['X-Blocking-Authority'] = blocking_info['authority']
    if blocking_info.get('appeal_url'):
    response['X-Appeal-URL'] = blocking_info['appeal_url']

    return response

    def load_blocked_content(self):
    """加载封锁内容列表(从数据库或文件)"""
    # 示例数据
    return {
    '/content/restricted-article': {
    'blocked_countries': ['CN', 'RU', 'IR'],
    'reason': 'government_order',
    'authority': 'Ministry of Culture',
    'reference': 'GO-2024-001',
    'block_date': '2024-01-01',
    'appeal_url': 'https://example.com/appeal/restricted-article',
    'alternatives': [
    {
    'url': '/content/similar-article',
    'description': 'Similar topic available'
    }
    ]
    }
    }

    class ContentDetailView(View):
    """内容详情视图(可能返回451)"""

    @method_decorator(vary_on_headers('X-Country-Code'))
    def get(self, request, content_id):
    content = self.get_content(content_id)

    if content.get('blocked', False):
    # 检查用户国家
    user_country = request.META.get('HTTP_X_COUNTRY_CODE', 'US')

    if user_country in content.get('blocked_countries', []):
    return JsonResponse({
    'error': 'unavailable_for_legal_reasons',
    'message': f'This content is not available in {user_country}',
    'content_id': content_id,
    'country': user_country
    }, status=451)

    return JsonResponse(content)

    def get_content(self, content_id):
    """获取内容(简化实现)"""
    return {
    'id': content_id,
    'title': 'Sample Content',
    'blocked': True,
    'blocked_countries': ['CN', 'RU'],
    'available_countries': ['US', 'UK', 'DE']
    }

    23.24.5 透明度报告

    json

    {
    "transparency_report": {
    "period": "2024-Q1",
    "total_requests": 1000000,
    "451_responses": 1250,
    "blocking_reasons": {
    "government_order": 800,
    "copyright": 300,
    "court_order": 100,
    "terms_violation": 50
    },
    "countries_affected": [
    {"country": "CN", "blocks": 500},
    {"country": "RU", "blocks": 300},
    {"country": "IR", "blocks": 200},
    {"country": "DE", "blocks": 50}
    ],
    "appeals": {
    "received": 200,
    "granted": 50,
    "denied": 100,
    "pending": 50
    },
    "documentation": "https://example.com/transparency/2024-Q1"
    }
    }

    23.24.6 最佳实践

    • 提供详细的封锁原因和法律依据

    • 实现透明的上诉流程

    • 发布透明度报告

    • 考虑用户隐私(避免过度追踪)

    • 提供替代内容选项

    总结

    本章详细探讨了23个"其他"4xx HTTP状态码,每个状态码都有其特定的使用场景和最佳实践。从支付系统(402)到法律合规(451),这些状态码为现代Web开发和API设计提供了丰富的语义工具。

    关键要点:

  • 精确语义:使用正确的状态码可以提供更清晰的错误信息

  • 用户体验:良好的错误响应可以指导用户解决问题

  • 安全性:适当的状态码有助于防止滥用和攻击

  • 合规性:特定状态码(如451)支持法律合规要求

  • 互操作性:标准状态码促进系统间的互操作性

  • 实施建议:

  • 渐进采用:从最重要的状态码开始,逐步实现更多

  • 文档化:为API消费者提供状态码的详细文档

  • 监控:跟踪状态码分布以识别问题

  • 测试:为各种错误场景编写测试用例

  • 客户端处理:确保客户端能优雅处理所有状态码

  • 赞(0)
    未经允许不得转载:网硕互联帮助中心 » HTTP 状态码:客户端与服务器的通信语言——第四部分:客户端错误状态码(4xx)深度解读(三)
    分享到: 更多 (0)

    评论 抢沙发

    评论前必须登录!