第39章:状态码的性能优化
39.1 状态码响应性能分析
39.1.1 响应时间分解
python
# 状态码响应时间分析工具
from typing import Dict, List, Tuple
import time
from dataclasses import dataclass
from enum import Enum
import statistics
class ResponseTimeComponent(Enum):
"""响应时间组件"""
NETWORK = "network" # 网络传输时间
DNS = "dns" # DNS解析时间
SSL = "ssl" # TLS握手时间
QUEUEING = "queueing" # 请求排队时间
PROCESSING = "processing" # 服务器处理时间
DATABASE = "database" # 数据库查询时间
CACHE = "cache" # 缓存访问时间
EXTERNAL_API = "external_api" # 外部API调用时间
SERIALIZATION = "serialization" # 数据序列化时间
UNKNOWN = "unknown" # 未知时间
@dataclass
class TimingMeasurement:
"""时间测量"""
component: ResponseTimeComponent
start_time: float
end_time: float
@property
def duration_ms(self) -> float:
return (self.end_time – self.start_time) * 1000
@dataclass
class RequestProfile:
"""请求性能剖析"""
request_id: str
status_code: int
total_time_ms: float
timings: List[TimingMeasurement]
endpoint: str
method: str
def get_component_time(self, component: ResponseTimeComponent) -> float:
"""获取组件时间"""
component_timings = [
t for t in self.timings
if t.component == component
]
return sum(t.duration_ms for t in component_timings)
def get_time_breakdown(self) -> Dict[ResponseTimeComponent, float]:
"""获取时间分解"""
breakdown = {}
for component in ResponseTimeComponent:
time_ms = self.get_component_time(component)
if time_ms > 0:
breakdown[component] = time_ms
return breakdown
def get_percentage_breakdown(self) -> Dict[ResponseTimeComponent, float]:
"""获取百分比分解"""
breakdown = self.get_time_breakdown()
if not breakdown or self.total_time_ms == 0:
return {}
return {
component: (time_ms / self.total_time_ms) * 100
for component, time_ms in breakdown.items()
}
class PerformanceAnalyzer:
"""性能分析器"""
def __init__(self):
self.profiles: Dict[str, RequestProfile] = {}
self.status_code_stats: Dict[int, List[RequestProfile]] = {}
self.endpoint_stats: Dict[str, List[RequestProfile]] = {}
def add_profile(self, profile: RequestProfile):
"""添加性能剖析"""
self.profiles[profile.request_id] = profile
# 按状态码统计
if profile.status_code not in self.status_code_stats:
self.status_code_stats[profile.status_code] = []
self.status_code_stats[profile.status_code].append(profile)
# 按端点统计
if profile.endpoint not in self.endpoint_stats:
self.endpoint_stats[profile.endpoint] = []
self.endpoint_stats[profile.endpoint].append(profile)
def analyze_by_status_code(self, status_code: int) -> Dict:
"""按状态码分析性能"""
if status_code not in self.status_code_stats:
return {}
profiles = self.status_code_stats[status_code]
return self._analyze_profiles(profiles, f"status_code_{status_code}")
def analyze_by_endpoint(self, endpoint: str) -> Dict:
"""按端点分析性能"""
if endpoint not in self.endpoint_stats:
return {}
profiles = self.endpoint_stats[endpoint]
return self._analyze_profiles(profiles, f"endpoint_{endpoint}")
def _analyze_profiles(self, profiles: List[RequestProfile], group_name: str) -> Dict:
"""分析性能剖析组"""
if not profiles:
return {}
total_times = [p.total_time_ms for p in profiles]
# 时间组件分析
component_times = self._analyze_components(profiles)
# 识别瓶颈
bottlenecks = self._identify_bottlenecks(component_times)
# 趋势分析
trends = self._analyze_trends(profiles)
return {
'group': group_name,
'count': len(profiles),
'total_time_stats': {
'mean': statistics.mean(total_times),
'median': statistics.median(total_times),
'p95': self._calculate_percentile(total_times, 95),
'p99': self._calculate_percentile(total_times, 99),
'std_dev': statistics.stdev(total_times) if len(total_times) > 1 else 0,
'min': min(total_times),
'max': max(total_times)
},
'component_analysis': component_times,
'bottlenecks': bottlenecks,
'trends': trends,
'recommendations': self._generate_recommendations(bottlenecks, component_times)
}
def _analyze_components(self, profiles: List[RequestProfile]) -> Dict:
"""分析时间组件"""
component_data = {}
for component in ResponseTimeComponent:
times = []
percentages = []
for profile in profiles:
time_ms = profile.get_component_time(component)
if time_ms > 0:
times.append(time_ms)
percentages.append(
(time_ms / profile.total_time_ms) * 100
if profile.total_time_ms > 0 else 0
)
if times:
component_data[component.value] = {
'count': len(times),
'time_stats': {
'mean': statistics.mean(times),
'median': statistics.median(times),
'p95': self._calculate_percentile(times, 95),
'p99': self._calculate_percentile(times, 99)
},
'percentage_stats': {
'mean': statistics.mean(percentages),
'median': statistics.median(percentages),
'p95': self._calculate_percentile(percentages, 95)
}
}
return component_data
def _identify_bottlenecks(self, component_analysis: Dict) -> List[Dict]:
"""识别瓶颈"""
bottlenecks = []
for component, data in component_analysis.items():
time_stats = data['time_stats']
percentage_stats = data['percentage_stats']
# 检查绝对时间瓶颈
if time_stats['p95'] > 1000: # 超过1秒
bottlenecks.append({
'component': component,
'type': 'ABSOLUTE_TIME',
'value': time_stats['p95'],
'threshold': 1000,
'severity': 'HIGH' if time_stats['p95'] > 5000 else 'MEDIUM'
})
# 检查相对时间瓶颈
if percentage_stats['mean'] > 50: # 超过50%的时间
bottlenecks.append({
'component': component,
'type': 'RELATIVE_TIME',
'value': percentage_stats['mean'],
'threshold': 50,
'severity': 'HIGH' if percentage_stats['mean'] > 80 else 'MEDIUM'
})
# 检查高方差(不稳定性)
time_variance = time_stats['p99'] – time_stats['mean']
if time_variance > time_stats['mean'] * 2: # 方差超过均值的2倍
bottlenecks.append({
'component': component,
'type': 'HIGH_VARIANCE',
'value': time_variance,
'threshold': time_stats['mean'] * 2,
'severity': 'MEDIUM'
})
# 按严重程度排序
severity_order = {'HIGH': 0, 'MEDIUM': 1, 'LOW': 2}
bottlenecks.sort(key=lambda x: severity_order[x['severity']])
return bottlenecks
def _analyze_trends(self, profiles: List[RequestProfile]) -> Dict:
"""分析趋势"""
if len(profiles) < 10:
return {}
# 按时间排序
sorted_profiles = sorted(profiles, key=lambda p: p.timings[0].start_time)
# 计算移动平均
window_size = min(10, len(sorted_profiles) // 5)
moving_averages = []
for i in range(len(sorted_profiles) – window_size + 1):
window = sorted_profiles[i:i + window_size]
avg_time = statistics.mean(p.total_time_ms for p in window)
moving_averages.append(avg_time)
# 检测趋势
trend = self._detect_trend(moving_averages)
return {
'sample_size': len(sorted_profiles),
'moving_average_window': window_size,
'trend': trend,
'latest_moving_average': moving_averages[-1] if moving_averages else 0
}
def _detect_trend(self, values: List[float]) -> str:
"""检测趋势"""
if len(values) < 2:
return "INSUFFICIENT_DATA"
# 使用线性回归简单判断趋势
from scipy import stats
x = list(range(len(values)))
slope, intercept, r_value, p_value, std_err = stats.linregress(x, values)
if abs(slope) < 0.1: # 变化很小
return "STABLE"
elif slope > 0.5: # 明显上升
return "INCREASING"
elif slope < -0.5: # 明显下降
return "DECREASING"
else:
return "SLIGHT_CHANGE"
def _generate_recommendations(self,
bottlenecks: List[Dict],
component_analysis: Dict) -> List[str]:
"""生成优化建议"""
recommendations = []
for bottleneck in bottlenecks:
component = bottleneck['component']
if component == 'database':
recommendations.append(
f"Database queries are a bottleneck ({bottleneck['value']:.0f}ms). "
f"Consider: 1) Adding indexes, 2) Query optimization, "
f"3) Implementing caching, 4) Database connection pooling."
)
elif component == 'external_api':
recommendations.append(
f"External API calls are slow ({bottleneck['value']:.0f}ms). "
f"Consider: 1) Implementing timeout and retry logic, "
f"2) Adding circuit breaker, 3) Caching responses, "
f"4) Async processing if possible."
)
elif component == 'network':
recommendations.append(
f"Network latency is high ({bottleneck['value']:.0f}ms). "
f"Consider: 1) Using CDN, 2) Optimizing payload size, "
f"3) HTTP/2 or HTTP/3, 4) Edge computing."
)
elif component == 'serialization':
recommendations.append(
f"Data serialization is slow ({bottleneck['value']:.0f}ms). "
f"Consider: 1) Using binary formats (Protocol Buffers, MessagePack), "
f"2) Reducing nested structures, 3) Lazy loading."
)
elif component == 'processing':
recommendations.append(
f"Server processing time is high ({bottleneck['value']:.0f}ms). "
f"Consider: 1) Code profiling and optimization, "
f"2) Using more efficient algorithms, "
f"3) Parallel processing, 4) Scaling horizontally."
)
# 基于组件分析的通用建议
if 'cache' in component_analysis:
cache_stats = component_analysis['cache']['percentage_stats']
if cache_stats['mean'] < 10: # 缓存使用率低
recommendations.append(
"Cache utilization is low. Consider increasing cache hit rate "
"through better cache strategies and cache warming."
)
return recommendations
def _calculate_percentile(self, values: List[float], percentile: float) -> float:
"""计算百分位数"""
if not values:
return 0
sorted_values = sorted(values)
index = (percentile / 100) * (len(sorted_values) – 1)
if index.is_integer():
return sorted_values[int(index)]
else:
lower = sorted_values[int(index)]
upper = sorted_values[int(index) + 1]
return lower + (upper – lower) * (index – int(index))
def generate_overall_report(self) -> Dict:
"""生成总体报告"""
all_profiles = list(self.profiles.values())
# 分析所有状态码
status_code_analysis = {}
for code in self.status_code_stats.keys():
analysis = self.analyze_by_status_code(code)
if analysis:
status_code_analysis[code] = analysis
# 分析所有端点
endpoint_analysis = {}
for endpoint in self.endpoint_stats.keys():
analysis = self.analyze_by_endpoint(endpoint)
if analysis:
endpoint_analysis[endpoint] = analysis
# 识别最慢的状态码
slowest_codes = sorted(
status_code_analysis.items(),
key=lambda x: x[1]['total_time_stats']['p95'],
reverse=True
)[:5]
# 识别最慢的端点
slowest_endpoints = sorted(
endpoint_analysis.items(),
key=lambda x: x[1]['total_time_stats']['p95'],
reverse=True
)[:5]
return {
'summary': {
'total_requests': len(all_profiles),
'unique_status_codes': len(status_code_analysis),
'unique_endpoints': len(endpoint_analysis),
'avg_response_time': statistics.mean([p.total_time_ms for p in all_profiles])
if all_profiles else 0
},
'slowest_status_codes': [
{
'status_code': code,
'p95_response_time': analysis['total_time_stats']['p95'],
'request_count': analysis['count']
}
for code, analysis in slowest_codes
],
'slowest_endpoints': [
{
'endpoint': endpoint,
'p95_response_time': analysis['total_time_stats']['p95'],
'request_count': analysis['count']
}
for endpoint, analysis in slowest_endpoints
],
'common_bottlenecks': self._identify_common_bottlenecks(status_code_analysis),
'performance_trends': self._analyze_overall_trends(all_profiles),
'optimization_priority': self._calculate_optimization_priority(
slowest_codes, slowest_endpoints
)
}
def _identify_common_bottlenecks(self, status_code_analysis: Dict) -> List[Dict]:
"""识别常见瓶颈"""
bottleneck_counter = {}
for code, analysis in status_code_analysis.items():
bottlenecks = analysis.get('bottlenecks', [])
for bottleneck in bottlenecks:
component = bottleneck['component']
if component not in bottleneck_counter:
bottleneck_counter[component] = 0
bottleneck_counter[component] += 1
# 转换为百分比
total_codes = len(status_code_analysis)
common_bottlenecks = [
{
'component': component,
'frequency': count,
'percentage': (count / total_codes) * 100
}
for component, count in bottleneck_counter.items()
]
# 按频率排序
common_bottlenecks.sort(key=lambda x: x['frequency'], reverse=True)
return common_bottlenecks
def _analyze_overall_trends(self, profiles: List[RequestProfile]) -> Dict:
"""分析总体趋势"""
if len(profiles) < 20:
return {}
# 按状态码分组趋势
trends_by_code = {}
for code in self.status_code_stats.keys():
code_profiles = self.status_code_stats[code]
if len(code_profiles) >= 10:
trends = self._analyze_trends(code_profiles)
if trends:
trends_by_code[code] = trends
return {
'trends_by_status_code': trends_by_code,
'overall_trend': self._detect_trend(
[p.total_time_ms for p in sorted(profiles, key=lambda p: p.timings[0].start_time)]
)
}
def _calculate_optimization_priority(self,
slowest_codes: List,
slowest_endpoints: List) -> List[Dict]:
"""计算优化优先级"""
priorities = []
# 基于响应时间和请求量计算优先级分数
for code_info in slowest_codes:
code = code_info['status_code']
p95_time = code_info['p95_response_time']
count = code_info['request_count']
# 优先级分数 = 响应时间 * log(请求量)
priority_score = p95_time * (1 + (0.1 * count ** 0.5))
priorities.append({
'type': 'status_code',
'id': code,
'p95_response_time': p95_time,
'request_count': count,
'priority_score': priority_score,
'priority_level': self._get_priority_level(priority_score)
})
for endpoint_info in slowest_endpoints:
endpoint = endpoint_info['endpoint']
p95_time = endpoint_info['p95_response_time']
count = endpoint_info['request_count']
priority_score = p95_time * (1 + (0.1 * count ** 0.5))
priorities.append({
'type': 'endpoint',
'id': endpoint,
'p95_response_time': p95_time,
'request_count': count,
'priority_score': priority_score,
'priority_level': self._get_priority_level(priority_score)
})
# 按优先级分数排序
priorities.sort(key=lambda x: x['priority_score'], reverse=True)
return priorities
def _get_priority_level(self, score: float) -> str:
"""获取优先级级别"""
if score > 10000:
return 'CRITICAL'
elif score > 5000:
return 'HIGH'
elif score > 1000:
return 'MEDIUM'
else:
return 'LOW'
# 使用示例
analyzer = PerformanceAnalyzer()
# 模拟一些性能数据
import random
from datetime import datetime, timedelta
for i in range(100):
request_id = f"req_{i}"
status_code = random.choice([200, 400, 404, 500])
total_time = random.uniform(50, 5000)
# 创建时间测量
timings = []
start = time.time()
# 模拟各个组件的时间
components = [
(ResponseTimeComponent.NETWORK, 10, 100),
(ResponseTimeComponent.PROCESSING, 20, 1000),
(ResponseTimeComponent.DATABASE, 5, 500),
(ResponseTimeComponent.CACHE, 1, 50),
(ResponseTimeComponent.SERIALIZATION, 2, 100)
]
current_time = start
for component, min_time, max_time in components:
component_time = random.uniform(min_time, max_time) / 1000 # 转换为秒
timings.append(TimingMeasurement(
component=component,
start_time=current_time,
end_time=current_time + component_time
))
current_time += component_time
profile = RequestProfile(
request_id=request_id,
status_code=status_code,
total_time_ms=total_time,
timings=timings,
endpoint=f"/api/resource/{i % 5}",
method=random.choice(['GET', 'POST', 'PUT', 'DELETE'])
)
analyzer.add_profile(profile)
# 分析200状态码的性能
analysis_200 = analyzer.analyze_by_status_code(200)
print(f"200响应分析: {analysis_200['count']} 个请求")
print(f"平均响应时间: {analysis_200['total_time_stats']['mean']:.0f}ms")
print(f"P95响应时间: {analysis_200['total_time_stats']['p95']:.0f}ms")
# 生成总体报告
overall_report = analyzer.generate_overall_report()
print(f"\\n最慢的状态码:")
for code_info in overall_report['slowest_status_codes']:
print(f" {code_info['status_code']}: P95={code_info['p95_response_time']:.0f}ms "
f"({code_info['request_count']} 请求)")
print(f"\\n优化优先级:")
for priority in overall_report['optimization_priority'][:3]:
print(f" {priority['type']} {priority['id']}: {priority['priority_level']} "
f"(分数: {priority['priority_score']:.0f})")
39.1.2 状态码缓存优化
python
# 状态码响应的缓存优化策略
from typing import Dict, Any, Optional, Tuple
from dataclasses import dataclass
from datetime import datetime, timedelta
import hashlib
import json
class CacheableResponse:
"""可缓存的响应"""
def __init__(self,
status_code: int,
headers: Dict[str, str],
body: Any,
cache_key: str,
ttl: timedelta = timedelta(minutes=5)):
self.status_code = status_code
self.headers = headers
self.body = body
self.cache_key = cache_key
self.created_at = datetime.now()
self.expires_at = self.created_at + ttl
self.hit_count = 0
@property
def is_expired(self) -> bool:
return datetime.now() > self.expires_at
@property
def age(self) -> timedelta:
return datetime.now() – self.created_at
def record_hit(self):
"""记录命中"""
self.hit_count += 1
def to_dict(self) -> Dict[str, Any]:
"""转换为字典"""
return {
'status_code': self.status_code,
'headers': self.headers,
'body': self.body,
'cache_key': self.cache_key,
'created_at': self.created_at.isoformat(),
'expires_at': self.expires_at.isoformat(),
'hit_count': self.hit_count,
'age_seconds': self.age.total_seconds()
}
class StatusCodeCacheStrategy:
"""状态码缓存策略"""
# 不同状态码的默认缓存策略
DEFAULT_CACHE_TTL = {
# 成功响应
200: timedelta(minutes=5), # 默认5分钟
201: timedelta(seconds=0), # POST创建,通常不缓存
204: timedelta(minutes=1), # 无内容,短时间缓存
# 重定向
301: timedelta(days=30), # 永久重定向,长时间缓存
302: timedelta(minutes=5), # 临时重定向
304: timedelta(minutes=5), # 未修改,使用缓存
# 客户端错误
400: timedelta(seconds=0), # 错误响应不缓存
401: timedelta(seconds=0),
403: timedelta(seconds=0),
404: timedelta(minutes=1), # 404可短暂缓存,避免重复查询
# 服务器错误
500: timedelta(seconds=0), # 错误不缓存
502: timedelta(seconds=0),
503: timedelta(seconds=30), # 服务不可用,短暂缓存避免雪崩
504: timedelta(seconds=0),
# 自定义状态码
460: timedelta(minutes=1), # 缺货状态,短暂缓存
461: timedelta(minutes=1), # 库存不足
520: timedelta(seconds=30), # 熔断器开启,短暂缓存
}
# 可缓存的状态码
CACHEABLE_STATUS_CODES = {
200, 201, 202, 203, 204, 205, 206,
300, 301, 302, 303, 304, 305, 306, 307,
460, 461, 520 # 自定义状态码
}
@classmethod
def is_cacheable(cls, status_code: int, method: str) -> bool:
"""检查响应是否可缓存"""
if method.upper() != 'GET':
return False
return status_code in cls.CACHEABLE_STATUS_CODES
@classmethod
def get_ttl_for_status(cls,
status_code: int,
endpoint: str,
headers: Dict[str, str]) -> timedelta:
"""获取状态码的TTL"""
# 首先检查响应头中的缓存控制
cache_control = headers.get('Cache-Control', '')
if 'max-age' in cache_control:
# 解析max-age
import re
match = re.search(r'max-age=(\\d+)', cache_control)
if match:
return timedelta(seconds=int(match.group(1)))
# 检查Expires头部
expires = headers.get('Expires')
if expires:
try:
expires_dt = datetime.fromisoformat(expires.replace('Z', '+00:00'))
return expires_dt – datetime.now()
except:
pass
# 使用基于状态码的默认TTL
ttl = cls.DEFAULT_CACHE_TTL.get(status_code, timedelta(seconds=0))
# 根据端点调整TTL
ttl = cls._adjust_ttl_by_endpoint(ttl, endpoint)
return ttl
@classmethod
def _adjust_ttl_by_endpoint(cls, ttl: timedelta, endpoint: str) -> timedelta:
"""根据端点调整TTL"""
# 静态资源长时间缓存
static_extensions = ['.css', '.js', '.png', '.jpg', '.gif', '.ico', '.svg']
if any(endpoint.endswith(ext) for ext in static_extensions):
return timedelta(days=30)
# API端点根据路径调整
if '/api/' in endpoint:
# 公共API数据可缓存更久
if '/public/' in endpoint or '/catalog/' in endpoint:
return max(ttl, timedelta(minutes=30))
# 用户相关数据缓存较短
if '/user/' in endpoint or '/profile/' in endpoint:
return min(ttl, timedelta(minutes=1))
return ttl
@classmethod
def generate_cache_key(cls,
method: str,
url: str,
headers: Dict[str, str],
body: Optional[Any] = None) -> str:
"""生成缓存键"""
# 基于请求特征生成唯一键
components = [
method.upper(),
url,
# 规范化头部(只包含影响响应的头部)
cls._normalize_headers(headers),
]
if body:
if isinstance(body, (dict, list)):
body_str = json.dumps(body, sort_keys=True)
else:
body_str = str(body)
components.append(body_str)
# 创建哈希
key_string = '|'.join(str(c) for c in components)
return hashlib.sha256(key_string.encode()).hexdigest()
@classmethod
def _normalize_headers(cls, headers: Dict[str, str]) -> str:
"""规范化头部"""
# 只选择影响响应的头部
relevant_headers = [
'Authorization',
'Accept',
'Accept-Language',
'Accept-Encoding',
'User-Agent',
'If-None-Match',
'If-Modified-Since'
]
normalized = {}
for key in relevant_headers:
if key in headers:
normalized[key.lower()] = headers[key]
# 排序以确保一致性
sorted_items = sorted(normalized.items())
return json.dumps(sorted_items)
class ResponseCache:
"""响应缓存"""
def __init__(self, max_size: int = 10000):
self.max_size = max_size
self.cache: Dict[str, CacheableResponse] = {}
self.hit_count = 0
self.miss_count = 0
self.stats_by_status_code: Dict[int, Dict[str, int]] = {}
def get(self, cache_key: str) -> Optional[CacheableResponse]:
"""获取缓存响应"""
if cache_key in self.cache:
cached = self.cache[cache_key]
# 检查是否过期
if cached.is_expired:
self._remove(cache_key)
self.miss_count += 1
return None
# 记录命中
cached.record_hit()
self.hit_count += 1
# 更新统计
self._update_stats(cached.status_code, 'hit')
return cached
self.miss_count += 1
return None
def set(self, response: CacheableResponse):
"""设置缓存响应"""
# 检查缓存大小,必要时清理
if len(self.cache) >= self.max_size:
self._evict()
# 添加缓存
self.cache[response.cache_key] = response
# 更新统计
self._update_stats(response.status_code, 'store')
def _evict(self):
"""清理缓存"""
# 使用LRU策略
# 首先移除过期的
expired_keys = [
key for key, response in self.cache.items()
if response.is_expired
]
for key in expired_keys:
self._remove(key)
# 如果仍然需要空间,移除最久未使用的
if len(self.cache) >= self.max_size:
# 按最后访问时间排序(简化实现)
# 在实际中,这里需要维护访问顺序
keys_to_remove = list(self.cache.keys())[:self.max_size // 10]
for key in keys_to_remove:
self._remove(key)
def _remove(self, cache_key: str):
"""移除缓存项"""
if cache_key in self.cache:
response = self.cache[cache_key]
# 更新统计
self._update_stats(response.status_code, 'evict')
del self.cache[cache_key]
def _update_stats(self, status_code: int, action: str):
"""更新统计"""
if status_code not in self.stats_by_status_code:
self.stats_by_status_code[status_code] = {
'hit': 0,
'store': 0,
'evict': 0
}
self.stats_by_status_code[status_code][action] += 1
def get_stats(self) -> Dict[str, Any]:
"""获取缓存统计"""
total_requests = self.hit_count + self.miss_count
hit_rate = self.hit_count / total_requests if total_requests > 0 else 0
# 计算缓存效率
efficiency_by_status = {}
for code, stats in self.stats_by_status_code.items():
stores = stats.get('store', 0)
hits = stats.get('hit', 0)
efficiency = hits / stores if stores > 0 else 0
efficiency_by_status[code] = efficiency
return {
'total_items': len(self.cache),
'max_size': self.max_size,
'hit_count': self.hit_count,
'miss_count': self.miss_count,
'hit_rate': hit_rate,
'stats_by_status_code': self.stats_by_status_code,
'efficiency_by_status': efficiency_by_status,
'oldest_item': min(
(r.age for r in self.cache.values()),
default=timedelta(0)
).total_seconds(),
'newest_item': max(
(r.age for r in self.cache.values()),
default=timedelta(0)
).total_seconds()
}
def optimize_cache_strategy(self) -> Dict[str, Any]:
"""优化缓存策略"""
suggestions = []
# 分析缓存效率
stats = self.get_stats()
efficiency_by_status = stats['efficiency_by_status']
for code, efficiency in efficiency_by_status.items():
if efficiency < 0.1: # 效率低于10%
suggestions.append({
'status_code': code,
'efficiency': efficiency,
'suggestion': 'Consider reducing TTL or disabling caching'
})
elif efficiency > 0.8: # 效率高于80%
suggestions.append({
'status_code': code,
'efficiency': efficiency,
'suggestion': 'Consider increasing TTL'
})
# 分析缓存命中率
if stats['hit_rate'] < 0.3:
suggestions.append({
'type': 'overall',
'metric': 'hit_rate',
'value': stats['hit_rate'],
'suggestion': 'Overall cache hit rate is low. Consider adjusting cache keys or increasing cache size.'
})
# 分析缓存大小
utilization = len(self.cache) / self.max_size
if utilization > 0.9:
suggestions.append({
'type': 'size',
'utilization': utilization,
'suggestion': 'Cache is nearly full. Consider increasing max_size or improving eviction strategy.'
})
return {
'current_stats': stats,
'suggestions': suggestions,
'recommended_actions': self._generate_recommended_actions(suggestions)
}
def _generate_recommended_actions(self, suggestions: List[Dict]) -> List[str]:
"""生成推荐操作"""
actions = []
for suggestion in suggestions:
if suggestion.get('type') == 'overall':
actions.append("Review cache key generation strategy")
actions.append("Consider implementing multi-level caching")
elif 'status_code' in suggestion:
code = suggestion['status_code']
if suggestion['efficiency'] < 0.1:
actions.append(f"Reduce TTL for status code {code}")
elif suggestion['efficiency'] > 0.8:
actions.append(f"Increase TTL for status code {code}")
return list(set(actions)) # 去重
# 缓存优化中间件
class CacheOptimizationMiddleware:
"""缓存优化中间件"""
def __init__(self, cache: ResponseCache):
self.cache = cache
self.strategy = StatusCodeCacheStrategy()
async def process_request(self, request: Dict) -> Optional[Dict]:
"""处理请求(缓存查找)"""
# 生成缓存键
cache_key = self.strategy.generate_cache_key(
method=request['method'],
url=request['url'],
headers=request.get('headers', {}),
body=request.get('body')
)
# 检查缓存
cached_response = self.cache.get(cache_key)
if cached_response:
# 检查是否需要验证(ETag/If-None-Match)
if self._should_validate(request, cached_response):
# 添加验证头
request['headers']['If-None-Match'] = self._generate_etag(cached_response)
# 返回缓存响应
return {
'from_cache': True,
'cache_key': cache_key,
'response': cached_response
}
return None
async def process_response(self,
request: Dict,
response: Dict) -> Dict:
"""处理响应(缓存存储)"""
status_code = response['status_code']
method = request['method']
# 检查是否可缓存
if not self.strategy.is_cacheable(status_code, method):
return response
# 生成缓存键
cache_key = self.strategy.generate_cache_key(
method=method,
url=request['url'],
headers=request.get('headers', {}),
body=request.get('body')
)
# 计算TTL
ttl = self.strategy.get_ttl_for_status(
status_code=status_code,
endpoint=request['url'],
headers=response.get('headers', {})
)
if ttl.total_seconds() > 0:
# 创建可缓存响应
cacheable_response = CacheableResponse(
status_code=status_code,
headers=response.get('headers', {}),
body=response.get('body'),
cache_key=cache_key,
ttl=ttl
)
# 存储到缓存
self.cache.set(cacheable_response)
# 添加缓存头到响应
response['headers']['X-Cache'] = 'MISS'
response['headers']['X-Cache-Key'] = cache_key
response['headers']['X-Cache-TTL'] = str(int(ttl.total_seconds()))
return response
def _should_validate(self, request: Dict, cached_response: CacheableResponse) -> bool:
"""检查是否需要验证缓存"""
# 如果有ETag,通常需要验证
if 'etag' in cached_response.headers:
return True
# 根据状态码决定
status_code = cached_response.status_code
if status_code in [200, 203, 206]:
return True
return False
def _generate_etag(self, response: CacheableResponse) -> str:
"""生成ETag"""
# 基于响应内容生成弱ETag
content_hash = hashlib.md5(
json.dumps(response.body).encode()
).hexdigest()
return f'W/"{content_hash}"'
def get_optimization_report(self) -> Dict[str, Any]:
"""获取优化报告"""
cache_stats = self.cache.get_stats()
optimization_suggestions = self.cache.optimize_cache_strategy()
# 计算性能改进
hit_rate = cache_stats['hit_rate']
estimated_savings = self._estimate_performance_savings(hit_rate)
return {
'cache_statistics': cache_stats,
'optimization_suggestions': optimization_suggestions,
'performance_impact': estimated_savings,
'recommended_configuration': self._generate_recommended_configuration()
}
def _estimate_performance_savings(self, hit_rate: float) -> Dict[str, float]:
"""估计性能节省"""
# 简化估算
avg_cache_hit_time = 10 # ms – 缓存命中时间
avg_miss_time = 200 # ms – 缓存未命中时间(需要完整处理)
total_time_without_cache = avg_miss_time
total_time_with_cache = (hit_rate * avg_cache_hit_time +
(1 – hit_rate) * avg_miss_time)
time_savings = total_time_without_cache – total_time_with_cache
percentage_savings = (time_savings / total_time_without_cache) * 100
return {
'avg_response_time_without_cache_ms': avg_miss_time,
'avg_response_time_with_cache_ms': total_time_with_cache,
'time_savings_ms': time_savings,
'percentage_savings': percentage_savings,
'estimated_requests_per_second_improvement':
(1 / total_time_with_cache * 1000) – (1 / avg_miss_time * 1000)
}
def _generate_recommended_configuration(self) -> Dict[str, Any]:
"""生成推荐配置"""
stats = self.cache.get_stats()
# 基于命中率调整缓存大小
hit_rate = stats['hit_rate']
if hit_rate > 0.8:
recommended_size = int(self.cache.max_size * 1.5)
elif hit_rate < 0.3:
recommended_size = int(self.cache.max_size * 0.7)
else:
recommended_size = self.cache.max_size
# 调整TTL配置
ttl_adjustments = {}
for code, efficiency in stats['efficiency_by_status'].items():
if efficiency < 0.1:
current_ttl = StatusCodeCacheStrategy.DEFAULT_CACHE_TTL.get(
code, timedelta(seconds=0)
)
ttl_adjustments[code] = {
'current': current_ttl.total_seconds(),
'recommended': max(current_ttl.total_seconds() * 0.5, 1)
}
elif efficiency > 0.8:
current_ttl = StatusCodeCacheStrategy.DEFAULT_CACHE_TTL.get(
code, timedelta(seconds=0)
)
ttl_adjustments[code] = {
'current': current_ttl.total_seconds(),
'recommended': current_ttl.total_seconds() * 2
}
return {
'cache_size': {
'current': self.cache.max_size,
'recommended': recommended_size
},
'ttl_adjustments': ttl_adjustments,
'suggested_cacheable_codes': self._suggest_additional_cacheable_codes(stats)
}
def _suggest_additional_cacheable_codes(self, stats: Dict) -> List[int]:
"""建议额外的可缓存状态码"""
suggestions = []
# 分析哪些非缓存状态码频繁出现
# 在实际实现中,这里需要访问历史数据
# 示例:如果404频繁出现且内容相同,建议缓存
if '404' in stats.get('frequent_errors', []):
suggestions.append(404)
return suggestions
# 使用示例
cache = ResponseCache(max_size=1000)
middleware = CacheOptimizationMiddleware(cache)
# 模拟处理请求
requests = [
{
'method': 'GET',
'url': '/api/products/123',
'headers': {'Accept': 'application/json'},
'body': None
}
]
for i in range(100):
request = {
'method': 'GET',
'url': f'/api/products/{i % 10}',
'headers': {'Accept': 'application/json'},
'body': None
}
# 检查缓存
cached = middleware.process_request(request)
if cached:
print(f"Cache hit for {request['url']}")
response = cached['response']
else:
# 处理请求
response = {
'status_code': 200 if i % 20 != 0 else 404,
'headers': {'Content-Type': 'application/json'},
'body': {'id': i % 10, 'name': f'Product {i % 10}'}
}
# 处理响应(可能缓存)
response = middleware.process_response(request, response)
# 每20个请求显示统计
if i % 20 == 0 and i > 0:
stats = cache.get_stats()
print(f"\\nAfter {i} requests:")
print(f" Cache hits: {stats['hit_count']}")
print(f" Cache misses: {stats['miss_count']}")
print(f" Hit rate: {stats['hit_rate']:.2%}")
# 生成优化报告
report = middleware.get_optimization_report()
print(f"\\n=== Optimization Report ===")
print(f"Cache hit rate: {report['cache_statistics']['hit_rate']:.2%}")
print(f"Estimated time savings: {report['performance_impact']['percentage_savings']:.1f}%")
print(f"\\nSuggestions:")
for suggestion in report['optimization_suggestions']['suggestions'][:3]:
print(f" – {suggestion['suggestion']}")
39.2 状态码传输优化
39.2.1 HTTP/2头部压缩优化
python
# HTTP/2头部压缩优化
from typing import Dict, List, Tuple
import zlib
import brotli
class HeaderCompressionOptimizer:
"""头部压缩优化器"""
def __init__(self):
self.static_table = self._load_static_table()
self.dynamic_table = []
self.dynamic_table_size = 4096 # 默认大小
self.max_dynamic_table_size = 4096
def _load_static_table(self) -> List[Tuple[str, str]]:
"""加载静态表(HPACK)"""
# HPACK静态表定义
return [
(":authority", ""),
(":method", "GET"),
(":method", "POST"),
(":path", "/"),
(":path", "/index.html"),
(":scheme", "http"),
(":scheme", "https"),
(":status", "200"),
(":status", "204"),
(":status", "206"),
(":status", "304"),
(":status", "400"),
(":status", "404"),
(":status", "500"),
("accept-charset", ""),
("accept-encoding", "gzip, deflate"),
("accept-language", ""),
("accept-ranges", ""),
("accept", ""),
("access-control-allow-origin", ""),
("age", ""),
("allow", ""),
("authorization", ""),
("cache-control", ""),
("content-disposition", ""),
("content-encoding", ""),
("content-language", ""),
("content-length", ""),
("content-location", ""),
("content-range", ""),
("content-type", ""),
("cookie", ""),
("date", ""),
("etag", ""),
("expect", ""),
("expires", ""),
("from", ""),
("host", ""),
("if-match", ""),
("if-modified-since", ""),
("if-none-match", ""),
("if-range", ""),
("if-unmodified-since", ""),
("last-modified", ""),
("link", ""),
("location", ""),
("max-forwards", ""),
("proxy-authenticate", ""),
("proxy-authorization", ""),
("range", ""),
("referer", ""),
("refresh", ""),
("retry-after", ""),
("server", ""),
("set-cookie", ""),
("strict-transport-security", ""),
("transfer-encoding", ""),
("user-agent", ""),
("vary", ""),
("via", ""),
("www-authenticate", "")
]
def compress_headers(self,
headers: Dict[str, str],
use_huffman: bool = True) -> bytes:
"""压缩头部"""
encoded_headers = []
for name, value in headers.items():
# 尝试在静态表中查找
static_index = self._find_in_static_table(name, value)
if static_index is not None:
# 使用静态表索引
encoded_headers.append(self._encode_indexed_header(static_index))
else:
# 尝试在动态表中查找
dynamic_index = self._find_in_dynamic_table(name, value)
if dynamic_index is not None:
# 使用动态表索引
encoded_headers.append(
self._encode_indexed_header(
dynamic_index + len(self.static_table)
)
)
else:
# 字面编码
encoded_headers.append(
self._encode_literal_header(name, value, use_huffman)
)
# 添加到动态表
self._add_to_dynamic_table(name, value)
return b''.join(encoded_headers)
def _find_in_static_table(self, name: str, value: str) -> Optional[int]:
"""在静态表中查找"""
for i, (n, v) in enumerate(self.static_table):
if n == name and v == value:
return i + 1 # HPACK索引从1开始
return None
def _find_in_dynamic_table(self, name: str, value: str) -> Optional[int]:
"""在动态表中查找"""
for i, (n, v) in enumerate(self.dynamic_table):
if n == name and v == value:
return i
return None
def _encode_indexed_header(self, index: int) -> bytes:
"""编码索引头部"""
# HPACK索引头部格式:1xxxxxxx
return bytes([0x80 | (index & 0x7F)])
def _encode_literal_header(self,
name: str,
value: str,
use_huffman: bool) -> bytes:
"""编码字面头部"""
# 简化的实现
# 在实际中,这里需要实现完整的HPACK编码
if use_huffman:
name_encoded = self._huffman_encode(name)
value_encoded = self._huffman_encode(value)
else:
name_encoded = name.encode('utf-8')
value_encoded = value.encode('utf-8')
# 编码头部
encoded = bytearray()
# 名称长度
encoded.append(len(name_encoded))
encoded.extend(name_encoded)
# 值长度
encoded.append(len(value_encoded))
encoded.extend(value_encoded)
return bytes(encoded)
def _huffman_encode(self, text: str) -> bytes:
"""Huffman编码(简化)"""
# 在实际中,这里需要使用HPACK的Huffman表
return text.encode('utf-8')
def _add_to_dynamic_table(self, name: str, value: str):
"""添加到动态表"""
entry_size = len(name) + len(value) + 32 # 估算大小
# 检查空间
while (self._calculate_dynamic_table_size() + entry_size >
self.max_dynamic_table_size):
if self.dynamic_table:
self.dynamic_table.pop(0)
else:
break
# 添加新条目
self.dynamic_table.append((name, value))
def _calculate_dynamic_table_size(self) -> int:
"""计算动态表大小"""
total = 0
for name, value in self.dynamic_table:
total += len(name) + len(value) + 32 # 估算的开销
return total
def analyze_header_patterns(self,
request_logs: List[Dict]) -> Dict[str, any]:
"""分析头部模式"""
header_frequency = {}
header_value_patterns = {}
for log in request_logs:
headers = log.get('headers', {})
for name, value in headers.items():
# 统计头部频率
if name not in header_frequency:
header_frequency[name] = 0
header_frequency[name] += 1
# 分析值模式
if name not in header_value_patterns:
header_value_patterns[name] = {}
if value not in header_value_patterns[name]:
header_value_patterns[name][value] = 0
header_value_patterns[name][value] += 1
# 找出最常用的头部
common_headers = sorted(
header_frequency.items(),
key=lambda x: x[1],
reverse=True
)[:20]
# 分析值分布
value_distribution = {}
for name, patterns in header_value_patterns.items():
total = sum(patterns.values())
if total > 10: # 足够样本
most_common = max(patterns.items(), key=lambda x: x[1])
value_distribution[name] = {
'total_values': len(patterns),
'most_common_value': most_common[0],
'most_common_percentage': most_common[1] / total,
'entropy': self._calculate_entropy(patterns.values())
}
return {
'common_headers': common_headers,
'value_distribution': value_distribution,
'compression_opportunities': self._identify_compression_opportunities(
common_headers, value_distribution
)
}
def _calculate_entropy(self, frequencies: List[int]) -> float:
"""计算熵"""
import math
total = sum(frequencies)
if total == 0:
return 0
entropy = 0
for freq in frequencies:
probability = freq / total
if probability > 0:
entropy -= probability * math.log2(probability)
return entropy
def _identify_compression_opportunities(self,
common_headers: List[Tuple[str, int]],
value_distribution: Dict) -> List[Dict]:
"""识别压缩机会"""
opportunities = []
for name, frequency in common_headers:
if name in value_distribution:
dist = value_distribution[name]
# 高频率且低熵的值是好的压缩候选
if (frequency > 100 and
dist['most_common_percentage'] > 0.8 and
dist['entropy'] < 1.0):
opportunities.append({
'header': name,
'frequency': frequency,
'most_common_value': dist['most_common_value'],
'value_consistency': dist['most_common_percentage'],
'entropy': dist['entropy'],
'compression_potential': self._estimate_compression_potential(
name, dist['most_common_value'], frequency
)
})
return sorted(opportunities,
key=lambda x: x['compression_potential'],
reverse=True)
def _estimate_compression_potential(self,
name: str,
value: str,
frequency: int) -> float:
"""估计压缩潜力"""
# 原始大小
original_size = (len(name) + len(value)) * frequency
# 压缩后大小(估计)
# 使用静态表:1字节
# 使用动态表:~5字节
# 字面编码:~10+字节
if self._find_in_static_table(name, value):
compressed_size = frequency * 1
elif frequency > 10:
compressed_size = frequency * 5 # 动态表
else:
compressed_size = frequency * (len(name) + len(value) + 2)
savings = original_size – compressed_size
return savings
def optimize_compression_strategy(self,
analysis: Dict) -> Dict[str, any]:
"""优化压缩策略"""
recommendations = []
# 基于常见头部调整静态表
common_headers = analysis['common_headers'][:10]
static_table_candidates = [
{'header': name, 'frequency': freq}
for name, freq in common_headers
if freq > 1000 # 非常常见的头部
]
if static_table_candidates:
recommendations.append({
'type': 'static_table_extension',
'candidates': static_table_candidates,
'estimated_savings': sum(
c['frequency'] * 10 # 每个头部节省约10字节
for c in static_table_candidates
)
})
# 调整动态表大小
current_size = self.max_dynamic_table_size
suggested_size = self._calculate_optimal_table_size(analysis)
if suggested_size != current_size:
recommendations.append({
'type': 'dynamic_table_size',
'current': current_size,
'suggested': suggested_size,
'reason': 'Based on header pattern analysis'
})
# Huffman编码优化
low_entropy_headers = [
opp for opp in analysis['compression_opportunities']
if opp['entropy'] < 0.5
]
if low_entropy_headers:
recommendations.append({
'type': 'huffman_optimization',
'headers': low_entropy_headers[:5],
'suggestion': 'These headers have low entropy values, '
'Huffman coding would be highly effective'
})
return {
'current_configuration': {
'static_table_size': len(self.static_table),
'dynamic_table_size': self.max_dynamic_table_size,
'use_huffman': True
},
'recommendations': recommendations,
'estimated_overall_savings': sum(
r.get('estimated_savings', 0) for r in recommendations
)
}
def _calculate_optimal_table_size(self, analysis: Dict) -> int:
"""计算最优表大小"""
# 基于头部频率和值分布
total_headers = sum(freq for _, freq in analysis['common_headers'])
if total_headers > 10000:
return 8192 # 8KB
elif total_headers > 1000:
return 4096 # 4KB
else:
return 2048 # 2KB
# HTTP/3 QPACK优化
class QPACKOptimizer:
"""QPACK优化器(HTTP/3)"""
def __init__(self):
self.static_table = self._load_qpack_static_table()
self.dynamic_table = []
self.max_table_capacity = 0
self.blocked_streams_allowed = True
def _load_qpack_static_table(self) -> List[Tuple[str, str]]:
"""加载QPACK静态表"""
# QPACK静态表与HPACK类似,但有些扩展
table = []
# 状态码条目
for code in range(100, 600):
table.append((":status", str(code)))
# 常见头部
common_headers = [
("content-type", "application/json"),
("content-type", "text/html"),
("content-type", "application/javascript"),
("cache-control", "max-age=3600"),
("cache-control", "no-cache"),
("authorization", "Bearer"),
("user-agent", "Mozilla/5.0"),
("accept-encoding", "gzip, br"),
]
table.extend(common_headers)
return table
def optimize_for_stream(self,
stream_id: int,
headers: Dict[str, str]) -> Dict[str, any]:
"""为特定流优化"""
# QPACK支持每个流独立的动态表引用
stream_optimization = {
'stream_id': stream_id,
'suggested_encodings': [],
'table_references': [],
'estimated_size': 0
}
for name, value in headers.items():
encoding = self._suggest_encoding(name, value, stream_id)
stream_optimization['suggested_encodings'].append(encoding)
stream_optimization['estimated_size'] += encoding['estimated_size']
return stream_optimization
def _suggest_encoding(self,
name: str,
value: str,
stream_id: int) -> Dict[str, any]:
"""建议编码策略"""
# 检查静态表
static_index = self._find_in_qpack_static_table(name, value)
if static_index is not None:
return {
'type': 'static_reference',
'index': static_index,
'estimated_size': 1, # 1字节
'efficiency': 'high'
}
# 检查动态表
dynamic_index = self._find_in_dynamic_table(name, value)
if dynamic_index is not None:
return {
'type': 'dynamic_reference',
'index': dynamic_index,
'estimated_size': 2, # 约2字节
'efficiency': 'medium'
}
# 检查名称引用
name_index = self._find_name_in_tables(name)
if name_index is not None:
return {
'type': 'name_reference',
'index': name_index,
'value': value,
'estimated_size': len(value) + 2, # 值长度+开销
'efficiency': 'low'
}
# 字面编码
return {
'type': 'literal',
'name': name,
'value': value,
'estimated_size': len(name) + len(value) + 4, # 名称+值+开销
'efficiency': 'very_low'
}
def _find_in_qpack_static_table(self, name: str, value: str) -> Optional[int]:
"""在QPACK静态表中查找"""
for i, (n, v) in enumerate(self.static_table):
if n == name and v == value:
return i
return None
def _find_in_dynamic_table(self, name: str, value: str) -> Optional[int]:
"""在动态表中查找"""
for i, (n, v) in enumerate(self.dynamic_table):
if n == name and v == value:
return i
return None
def _find_name_in_tables(self, name: str) -> Optional[int]:
"""在表中查找名称"""
# 检查静态表
for i, (n, _) in enumerate(self.static_table):
if n == name:
return i
# 检查动态表
for i, (n, _) in enumerate(self.dynamic_table):
if n == name:
return i + len(self.static_table)
return None
# 综合优化器
class HTTPHeaderOptimizer:
"""HTTP头部综合优化器"""
def __init__(self):
self.hpack_optimizer = HeaderCompressionOptimizer()
self.qpack_optimizer = QPACKOptimizer()
self.protocol_analytics = {}
def analyze_protocol_performance(self,
http_version: str,
request_logs: List[Dict]) -> Dict[str, any]:
"""分析协议性能"""
# 分析头部压缩效率
header_analysis = self.hpack_optimizer.analyze_header_patterns(request_logs)
# 计算理论压缩比
total_original_size = 0
total_compressed_size = 0
for log in request_logs:
headers = log.get('headers', {})
# 原始大小
for name, value in headers.items():
total_original_size += len(name) + len(value)
# 压缩大小(估算)
compressed = self.hpack_optimizer.compress_headers(headers)
total_compressed_size += len(compressed)
compression_ratio = total_compressed_size / total_original_size if total_original_size > 0 else 0
# 协议特定建议
protocol_suggestions = self._generate_protocol_suggestions(
http_version, header_analysis, compression_ratio
)
return {
'http_version': http_version,
'compression_ratio': compression_ratio,
'estimated_bandwidth_savings': total_original_size – total_compressed_size,
'header_analysis': header_analysis,
'protocol_suggestions': protocol_suggestions,
'migration_recommendation': self._evaluate_protocol_migration(
http_version, compression_ratio
)
}
def _generate_protocol_suggestions(self,
http_version: str,
header_analysis: Dict,
compression_ratio: float) -> List[Dict]:
"""生成协议特定建议"""
suggestions = []
if http_version == 'HTTP/1.1':
if compression_ratio > 0.7: # 压缩率低
suggestions.append({
'type': 'protocol_upgrade',
'suggestion': 'Consider upgrading to HTTP/2 for better header compression',
'priority': 'high'
})
# 头部合并建议
suggestions.append({
'type': 'header_consolidation',
'suggestion': 'Consolidate multiple Set-Cookie headers into one',
'priority': 'medium'
})
elif http_version == 'HTTP/2':
if compression_ratio > 0.5: # 仍有改进空间
suggestions.append({
'type': 'hpack_optimization',
'suggestion': 'Optimize HPACK dynamic table size based on header patterns',
'priority': 'medium'
})
# 服务器推送优化
suggestions.append({
'type': 'server_push_optimization',
'suggestion': 'Use server push for common resources referenced in responses',
'priority': 'low'
})
elif http_version == 'HTTP/3':
if compression_ratio > 0.4:
suggestions.append({
'type': 'qpack_optimization',
'suggestion': 'Optimize QPACK configuration for better stream independence',
'priority': 'medium'
})
# 通用优化
opportunities = header_analysis.get('compression_opportunities', [])
if opportunities:
suggestions.append({
'type': 'header_value_optimization',
'suggestion': 'Standardize header values to improve compression',
'examples': [opp['header'] for opp in opportunities[:3]],
'priority': 'high'
})
return suggestions
def _evaluate_protocol_migration(self,
current_version: str,
compression_ratio: float) -> Optional[Dict]:
"""评估协议迁移建议"""
if current_version == 'HTTP/1.1' and compression_ratio > 0.6:
return {
'recommended_protocol': 'HTTP/2',
'expected_improvement': f"{(1 – 0.3/compression_ratio)*100:.1f}%", # 估算
'complexity': 'medium',
'prerequisites': ['TLS 1.2+', 'Modern client support']
}
elif current_version == 'HTTP/2' and compression_ratio > 0.4:
return {
'recommended_protocol': 'HTTP/3',
'expected_improvement': f"{(1 – 0.2/compression_ratio)*100:.1f}%", # 估算
'complexity': 'high',
'prerequisites': ['QUIC support', 'Updated infrastructure']
}
return None
def generate_optimization_plan(self,
analysis_results: Dict) -> Dict[str, any]:
"""生成优化计划"""
plan = {
'immediate_actions': [],
'short_term_actions': [],
'long_term_actions': [],
'expected_benefits': {},
'implementation_complexity': {}
}
# 分类建议
for suggestion in analysis_results.get('protocol_suggestions', []):
priority = suggestion.get('priority', 'medium')
if priority == 'high':
plan['immediate_actions'].append(suggestion)
elif priority == 'medium':
plan['short_term_actions'].append(suggestion)
else:
plan['long_term_actions'].append(suggestion)
# 迁移建议
migration = analysis_results.get('migration_recommendation')
if migration:
plan['long_term_actions'].append({
'type': 'protocol_migration',
'suggestion': f"Migrate to {migration['recommended_protocol']}",
'details': migration
})
# 计算预期收益
plan['expected_benefits'] = self._calculate_expected_benefits(plan, analysis_results)
# 评估实现复杂度
plan['implementation_complexity'] = self._assess_complexity(plan)
return plan
def _calculate_expected_benefits(self,
plan: Dict,
analysis: Dict) -> Dict[str, float]:
"""计算预期收益"""
benefits = {
'bandwidth_reduction_percentage': 0,
'latency_reduction_ms': 0,
'throughput_increase_percentage': 0
}
# 基于压缩比估算
compression_ratio = analysis.get('compression_ratio', 1)
# 头部压缩收益
if compression_ratio < 1:
bandwidth_reduction = (1 – compression_ratio) * 100
# 应用优化乘数
optimization_multiplier = 1.0
if plan['immediate_actions']:
optimization_multiplier *= 0.8 # 20%额外改进
if plan['short_term_actions']:
optimization_multiplier *= 0.9 # 10%额外改进
benefits['bandwidth_reduction_percentage'] = bandwidth_reduction * optimization_multiplier
# 延迟收益(估算)
avg_header_size = 500 # 字节
network_speed = 10 # Mbps
latency_reduction = (avg_header_size * (1 – compression_ratio) * 8) / (network_speed * 1000)
benefits['latency_reduction_ms'] = latency_reduction * 1000 # 转换为毫秒
# 协议迁移收益
migration = analysis.get('migration_recommendation')
if migration:
expected_improvement = migration.get('expected_improvement', '0%')
improvement = float(expected_improvement.strip('%'))
benefits['throughput_increase_percentage'] += improvement
return benefits
def _assess_complexity(self, plan: Dict) -> Dict[str, str]:
"""评估实现复杂度"""
complexity = {
'overall': 'low',
'immediate_actions': 'low',
'short_term_actions': 'medium',
'long_term_actions': 'high'
}
# 基于行动类型调整
for category in ['immediate_actions', 'short_term_actions', 'long_term_actions']:
actions = plan[category]
if actions:
# 检查是否有复杂行动
complex_actions = [a for a in actions if a.get('type') in
['protocol_migration', 'protocol_upgrade']]
if complex_actions:
complexity[category] = 'high'
# 总体复杂度
if complexity['long_term_actions'] == 'high':
complexity['overall'] = 'medium'
if complexity['short_term_actions'] == 'high':
complexity['overall'] = 'high'
return complexity
# 使用示例
optimizer = HTTPHeaderOptimizer()
# 模拟请求日志
request_logs = []
for i in range(100):
request_logs.append({
'headers': {
'user-agent': 'Mozilla/5.0',
'accept': 'application/json',
'content-type': 'application/json',
'authorization': 'Bearer token123',
'cache-control': 'no-cache'
}
})
# 分析性能
analysis = optimizer.analyze_protocol_performance('HTTP/1.1', request_logs)
print(f"Compression ratio: {analysis['compression_ratio']:.2%}")
print(f"Bandwidth savings: {analysis['estimated_bandwidth_savings']:,} bytes")
# 生成优化计划
plan = optimizer.generate_optimization_plan(analysis)
print("\\n=== Optimization Plan ===")
print(f"Immediate actions ({len(plan['immediate_actions'])}):")
for action in plan['immediate_actions']:
print(f" – {action['suggestion']}")
print(f"\\nExpected benefits:")
for benefit, value in plan['expected_benefits'].items():
print(f" {benefit}: {value}")
39.2.2 状态码的CDN优化
python
# CDN状态码优化策略
from typing import Dict, List, Optional, Set
from dataclasses import dataclass
from datetime import datetime, timedelta
import hashlib
@dataclass
class CDNEdgeConfig:
"""CDN边缘节点配置"""
location: str
cache_ttl: Dict[int, int] # 状态码 -> TTL(秒)
stale_while_revalidate: bool
stale_if_error: bool
compression_enabled: bool
brotli_enabled: bool
@classmethod
def default_config(cls) -> 'CDNEdgeConfig':
"""默认配置"""
return cls(
location='global',
cache_ttl={
200: 3600, # 1小时
201: 0, # 不缓存
204: 300, # 5分钟
301: 2592000, # 30天
302: 300, # 5分钟
304: 300, # 5分钟
404: 60, # 1分钟
500: 0, # 不缓存
502: 0,
503: 30, # 30秒(服务不可用)
504: 0,
460: 60, # 1分钟(缺货)
520: 30 # 30秒(熔断)
},
stale_while_revalidate=True,
stale_if_error=True,
compression_enabled=True,
brotli_enabled=True
)
@dataclass
class CDNCacheKey:
"""CDN缓存键"""
url: str
status_code: int
vary_headers: Dict[str, str]
geo_location: Optional[str]
device_type: Optional[str]
def generate_key(self) -> str:
"""生成缓存键"""
components = [
self.url,
str(self.status_code),
# 规范化vary头部
self._normalize_vary_headers(),
self.geo_location or '',
self.device_type or ''
]
key_string = '|'.join(components)
return hashlib.md5(key_string.encode()).hexdigest()
def _normalize_vary_headers(self) -> str:
"""规范化Vary头部"""
sorted_items = sorted(self.vary_headers.items())
return ';'.join(f"{k}:{v}" for k, v in sorted_items)
class CDNOptimizer:
"""CDN优化器"""
def __init__(self, edge_config: CDNEdgeConfig):
self.edge_config = edge_config
self.cache_analytics = {}
self.hit_rate_by_status: Dict[int, Dict[str, float]] = {}
def generate_cache_headers(self,
status_code: int,
content_type: str,
is_public: bool = True) -> Dict[str, str]:
"""生成缓存头部"""
headers = {}
# Cache-Control
cache_control = []
ttl = self.edge_config.cache_ttl.get(status_code, 0)
if ttl > 0:
if is_public:
cache_control.append(f"public")
cache_control.append(f"max-age={ttl}")
else:
cache_control.append(f"private")
cache_control.append(f"max-age={ttl}")
# Stale-while-revalidate
if self.edge_config.stale_while_revalidate:
cache_control.append(f"stale-while-revalidate={ttl//2}")
# Stale-if-error
if self.edge_config.stale_if_error:
cache_control.append(f"stale-if-error={ttl*2}")
else:
cache_control.append("no-cache")
cache_control.append("must-revalidate")
headers['Cache-Control'] = ', '.join(cache_control)
# Expires(向后兼容)
if ttl > 0:
expires = datetime.utcnow() + timedelta(seconds=ttl)
headers['Expires'] = expires.strftime('%a, %d %b %Y %H:%M:%S GMT')
# Vary头部
vary_headers = ['Accept-Encoding']
if content_type.startswith('text/html'):
vary_headers.append('User-Agent')
headers['Vary'] = ', '.join(vary_headers)
# CDN特定头部
headers['X-CDN-Cacheable'] = 'YES' if ttl > 0 else 'NO'
headers['X-CDN-TTL'] = str(ttl)
return headers
def analyze_cache_performance(self,
access_logs: List[Dict]) -> Dict[str, any]:
"""分析缓存性能"""
# 初始化统计
total_by_status = {}
hit_by_status = {}
miss_by_status = {}
for log in access_logs:
status = log.get('status_code')
cache_status = log.get('cache_status', 'MISS')
if status not in total_by_status:
total_by_status[status] = 0
hit_by_status[status] = 0
miss_by_status[status] = 0
total_by_status[status] += 1
if cache_status == 'HIT':
hit_by_status[status] += 1
else:
miss_by_status[status] += 1
# 计算命中率
hit_rate_by_status = {}
for status in total_by_status:
total = total_by_status[status]
hits = hit_by_status.get(status, 0)
hit_rate = hits / total if total > 0 else 0
hit_rate_by_status[status] = hit_rate
# 识别优化机会
optimization_opportunities = self._identify_optimization_opportunities(
hit_rate_by_status, total_by_status
)
# 计算潜在收益
potential_savings = self._calculate_potential_savings(
total_by_status, hit_rate_by_status, optimization_opportunities
)
return {
'total_requests_by_status': total_by_status,
'hit_rate_by_status': hit_rate_by_status,
'optimization_opportunities': optimization_opportunities,
'potential_savings': potential_savings,
'current_configuration': {
'cache_ttl': self.edge_config.cache_ttl,
'stale_while_revalidate': self.edge_config.stale_while_revalidate,
'stale_if_error': self.edge_config.stale_if_error
}
}
def _identify_optimization_opportunities(self,
hit_rate_by_status: Dict[int, float],
total_by_status: Dict[int, int]) -> List[Dict]:
"""识别优化机会"""
opportunities = []
for status, hit_rate in hit_rate_by_status.items():
total = total_by_status[status]
current_ttl = self.edge_config.cache_ttl.get(status, 0)
# 低命中率但高频率
if hit_rate < 0.3 and total > 100:
opportunities.append({
'status_code': status,
'current_hit_rate': hit_rate,
'request_count': total,
'current_ttl': current_ttl,
'issue': 'Low cache hit rate despite high traffic',
'suggestion': 'Consider increasing TTL or reviewing cache keys'
})
# 高命中率但短TTL
elif hit_rate > 0.8 and current_ttl < 300 and total > 50:
opportunities.append({
'status_code': status,
'current_hit_rate': hit_rate,
'request_count': total,
'current_ttl': current_ttl,
'issue': 'High hit rate with short TTL',
'suggestion': f"Increase TTL from {current_ttl}s to {current_ttl * 4}s"
})
# 可缓存状态码但当前不缓存
elif current_ttl == 0 and self._should_be_cacheable(status, total):
opportunities.append({
'status_code': status,
'current_hit_rate': hit_rate,
'request_count': total,
'current_ttl': current_ttl,
'issue': 'Cacheable status code not being cached',
'suggestion': f'Enable caching with TTL={self._suggest_ttl(status, total)}s'
})
return sorted(opportunities,
key=lambda x: x['request_count'],
reverse=True)
def _should_be_cacheable(self, status_code: int, request_count: int) -> bool:
"""检查是否应该缓存"""
# 404错误如果频繁出现且内容相同,可以缓存
if status_code == 404 and request_count > 100:
return True
# 自定义状态码
if 450 <= status_code <= 499 or 520 <= status_code <= 599:
return request_count > 50
return False
def _suggest_ttl(self, status_code: int, request_count: int) -> int:
"""建议TTL"""
if status_code == 404:
return 60 # 1分钟
elif 450 <= status_code <= 499:
return 300 # 5分钟
elif 520 <= status_code <= 599:
return 30 # 30秒
else:
return 300 # 默认5分钟
def _calculate_potential_savings(self,
total_by_status: Dict[int, int],
hit_rate_by_status: Dict[int, float],
opportunities: List[Dict]) -> Dict[str, float]:
"""计算潜在节省"""
# 估算平均响应大小(字节)
avg_size_by_status = {
200: 50000, # 50KB
201: 1000, # 1KB
204: 100, # 100B
301: 500, # 500B
302: 500,
304: 100,
404: 2000, # 2KB
500: 5000, # 5KB
460: 1000, # 1KB
520: 500 # 500B
}
current_bandwidth = 0
optimized_bandwidth = 0
for status, total in total_by_status.items():
hit_rate = hit_rate_by_status.get(status, 0)
avg_size = avg_size_by_status.get(status, 1000)
# 当前带宽使用
misses = total * (1 – hit_rate)
current_bandwidth += misses * avg_size
# 优化后带宽使用(假设命中率提高)
optimized_hit_rate = hit_rate
# 应用优化机会
for opp in opportunities:
if opp['status_code'] == status:
if 'increase' in opp['suggestion'].lower():
# 假设TTL增加会使命中率提高
optimized_hit_rate = min(hit_rate + 0.3, 0.95)
elif 'enable' in opp['suggestion'].lower():
# 新启用缓存
optimized_hit_rate = 0.7
break
optimized_misses = total * (1 – optimized_hit_rate)
optimized_bandwidth += optimized_misses * avg_size
bandwidth_savings = current_bandwidth – optimized_bandwidth
percentage_savings = (bandwidth_savings / current_bandwidth * 100
if current_bandwidth > 0 else 0)
# 延迟节省估算
avg_origin_latency = 200 # ms
latency_savings = (current_bandwidth – optimized_bandwidth) / 1000 * avg_origin_latency
return {
'bandwidth_savings_bytes': bandwidth_savings,
'bandwidth_savings_percentage': percentage_savings,
'estimated_latency_reduction_ms': latency_savings,
'estimated_cost_savings': self._estimate_cost_savings(bandwidth_savings)
}
def _estimate_cost_savings(self, bandwidth_savings: float) -> float:
"""估算成本节省"""
# 假设 $0.085 per GB (AWS CloudFront价格示例)
cost_per_gb = 0.085
savings_gb = bandwidth_savings / (1024 ** 3)
return savings_gb * cost_per_gb
def optimize_edge_configuration(self,
analysis: Dict) -> CDNEdgeConfig:
"""优化边缘配置"""
optimized_ttl = self.edge_config.cache_ttl.copy()
# 根据分析调整TTL
for opp in analysis['optimization_opportunities']:
status = opp['status_code']
suggestion = opp['suggestion']
if 'Increase TTL' in suggestion:
# 解析建议的新TTL
import re
match = re.search(r'to (\\d+)s', suggestion)
if match:
new_ttl = int(match.group(1))
optimized_ttl[status] = new_ttl
elif 'Enable caching' in suggestion:
# 启用缓存
match = re.search(r'TTL=(\\d+)', suggestion)
if match:
new_ttl = int(match.group(1))
optimized_ttl[status] = new_ttl
# 创建优化后的配置
optimized_config = CDNEdgeConfig(
location=self.edge_config.location,
cache_ttl=optimized_ttl,
stale_while_revalidate=self.edge_config.stale_while_revalidate,
stale_if_error=self.edge_config.stale_if_error,
compression_enabled=self.edge_config.compression_enabled,
brotli_enabled=self.edge_config.brotli_enabled
)
return optimized_config
def generate_purge_strategy(self,
status_code_changes: Dict[int, str],
content_types: Set[str]) -> Dict[str, any]:
"""生成清除策略"""
# 不同状态码变化的清除策略
purge_strategies = []
for status_code, change_type in status_code_changes.items():
if change_type == 'error_to_success':
# 例如 404 -> 200
strategy = {
'status_code': status_code,
'change': change_type,
'purge_scope': 'specific_urls',
'purge_method': 'immediate',
'priority': 'high',
'estimated_impact': 'Users will see updated content immediately'
}
elif change_type == 'success_to_error':
# 例如 200 -> 404
strategy = {
'status_code': status_code,
'change': change_type,
'purge_scope': 'specific_urls',
'purge_method': 'immediate',
'priority': 'critical',
'estimated_impact': 'Prevent users from accessing deleted content'
}
elif change_type == 'custom_code_change':
# 例如 460 -> 200(商品重新上架)
strategy = {
'status_code': status_code,
'change': change_type,
'purge_scope': 'specific_urls',
'purge_method': 'immediate',
'priority': 'medium',
'estimated_impact': 'Update business status immediately'
}
else:
strategy = {
'status_code': status_code,
'change': change_type,
'purge_scope': 'pattern',
'purge_method': 'scheduled',
'priority': 'low'
}
purge_strategies.append(strategy)
# 基于内容类型的清除建议
content_type_purge = {}
for content_type in content_types:
if content_type.startswith('text/html'):
content_type_purge[content_type] = {
'cache_duration': 'short',
'purge_frequency': 'high',
'recommendation': 'Use surrogate keys for granular purging'
}
elif 'image' in content_type:
content_type_purge[content_type] = {
'cache_duration': 'long',
'purge_frequency': 'low',
'recommendation': 'Use versioned URLs for cache busting'
}
elif 'application/json' in content_type:
content_type_purge[content_type] = {
'cache_duration': 'medium',
'purge_frequency': 'medium',
'recommendation': 'Implement cache revalidation with ETags'
}
return {
'purge_strategies': purge_strategies,
'content_type_recommendations': content_type_purge,
'best_practices': self._get_purge_best_practices()
}
def _get_purge_best_practices(self) -> List[str]:
"""获取清除最佳实践"""
return [
"Use surrogate keys for granular cache purging",
"Implement soft purge (stale-while-revalidate) when possible",
"Schedule bulk purges during low-traffic periods",
"Monitor purge queue depth to avoid delays",
"Use cache tags for related content invalidation",
"Implement purge ACLs to prevent accidental purges"
]
# CDN性能监控
class CDNPerformanceMonitor:
"""CDN性能监控器"""
def __init__(self, optimizer: CDNOptimizer):
self.optimizer = optimizer
self.metrics_history = []
self.alert_thresholds = {
'hit_rate': 0.3, # 低于30%告警
'origin_load': 0.8, # 源站负载超过80%告警
'error_rate': 0.05, # 错误率超过5%告警
'latency_p95': 1000 # P95延迟超过1秒告警
}
def record_metrics(self, metrics: Dict[str, any]):
"""记录指标"""
self.metrics_history.append({
'timestamp': datetime.now(),
'metrics': metrics
})
# 保持历史数据大小
if len(self.metrics_history) > 1000:
self.metrics_history = self.metrics_history[-1000:]
# 检查告警
alerts = self._check_alerts(metrics)
if alerts:
self._trigger_alerts(alerts)
def _check_alerts(self, metrics: Dict) -> List[Dict]:
"""检查告警"""
alerts = []
# 检查命中率
hit_rate = metrics.get('overall_hit_rate', 0)
if hit_rate < self.alert_thresholds['hit_rate']:
alerts.append({
'type': 'LOW_HIT_RATE',
'value': hit_rate,
'threshold': self.alert_thresholds['hit_rate'],
'severity': 'WARNING'
})
# 检查源站负载
origin_load = metrics.get('origin_load_percentage', 0)
if origin_load > self.alert_thresholds['origin_load']:
alerts.append({
'type': 'HIGH_ORIGIN_LOAD',
'value': origin_load,
'threshold': self.alert_thresholds['origin_load'],
'severity': 'CRITICAL'
})
# 检查错误率
error_rate = metrics.get('error_rate', 0)
if error_rate > self.alert_thresholds['error_rate']:
alerts.append({
'type': 'HIGH_ERROR_RATE',
'value': error_rate,
'threshold': self.alert_thresholds['error_rate'],
'severity': 'ERROR'
})
# 检查延迟
latency_p95 = metrics.get('latency_p95_ms', 0)
if latency_p95 > self.alert_thresholds['latency_p95']:
alerts.append({
'type': 'HIGH_LATENCY',
'value': latency_p95,
'threshold': self.alert_thresholds['latency_p95'],
'severity': 'WARNING'
})
return alerts
def _trigger_alerts(self, alerts: List[Dict]):
"""触发告警"""
for alert in alerts:
print(f"CDN ALERT [{alert['severity']}]: {alert['type']} = {alert['value']}")
def generate_performance_report(self,
start_time: datetime,
end_time: datetime) -> Dict[str, any]:
"""生成性能报告"""
# 过滤时间范围内的指标
relevant_metrics = [
m for m in self.metrics_history
if start_time <= m['timestamp'] <= end_time
]
if not relevant_metrics:
return {}
# 计算统计
hit_rates = [m['metrics'].get('overall_hit_rate', 0) for m in relevant_metrics]
error_rates = [m['metrics'].get('error_rate', 0) for m in relevant_metrics]
latencies = [m['metrics'].get('latency_p95_ms', 0) for m in relevant_metrics]
import statistics
report = {
'time_period': {
'start': start_time,
'end': end_time
},
'summary': {
'avg_hit_rate': statistics.mean(hit_rates),
'avg_error_rate': statistics.mean(error_rates),
'avg_latency_p95': statistics.mean(latencies),
'total_alerts': sum(len(m.get('alerts', [])) for m in relevant_metrics)
},
'trends': self._analyze_trends(relevant_metrics),
'recommendations': self._generate_recommendations(relevant_metrics)
}
return report
def _analyze_trends(self, metrics_data: List[Dict]) -> Dict[str, any]:
"""分析趋势"""
# 简化实现
return {
'hit_rate_trend': 'stable',
'latency_trend': 'stable',
'error_rate_trend': 'stable'
}
def _generate_recommendations(self, metrics_data: List[Dict]) -> List[Dict]:
"""生成推荐"""
recommendations = []
# 分析平均命中率
avg_hit_rate = statistics.mean(
[m['metrics'].get('overall_hit_rate', 0) for m in metrics_data]
)
if avg_hit_rate < 0.5:
recommendations.append({
'type': 'CACHE_OPTIMIZATION',
'priority': 'HIGH',
'description': f'Low cache hit rate ({avg_hit_rate:.1%}). Consider optimizing cache TTLs and keys.',
'action': 'Review and adjust CDN cache configuration'
})
# 分析错误模式
error_details = []
for data in metrics_data:
errors = data['metrics'].get('errors_by_status', {})
error_details.extend(errors.items())
# 找出最常见的错误状态码
from collections import Counter
error_counter = Counter()
for status, count in error_details:
error_counter[status] += count
common_errors = error_counter.most_common(3)
for status, count in common_errors:
if status >= 500:
recommendations.append({
'type': 'ERROR_RESOLUTION',
'priority': 'CRITICAL',
'description': f'High server errors ({status}: {count} occurrences). Investigate backend issues.',
'action': f'Debug status code {status} errors'
})
return recommendations
# 使用示例
edge_config = CDNEdgeConfig.default_config()
cdn_optimizer = CDNOptimizer(edge_config)
monitor = CDNPerformanceMonitor(cdn_optimizer)
# 模拟CDN访问日志
access_logs = []
for i in range(1000):
status_code = 200 if i % 10 != 0 else 404
cache_status = 'HIT' if i % 3 != 0 else 'MISS'
access_logs.append({
'status_code': status_code,
'cache_status': cache_status,
'url': f'/api/products/{i % 100}',
'response_size': 5000 if status_code == 200 else 1000
})
# 分析缓存性能
analysis = cdn_optimizer.analyze_cache_performance(access_logs)
print("=== CDN Cache Analysis ===")
print(f"Overall hit rate: {analysis['hit_rate_by_status'].get(200, 0):.1%}")
print(f"404 hit rate: {analysis['hit_rate_by_status'].get(404, 0):.1%}")
print("\\nOptimization opportunities:")
for opp in analysis['optimization_opportunities'][:3]:
print(f" Status {opp['status_code']}: {opp['issue']}")
print(f"\\nPotential bandwidth savings: {analysis['potential_savings']['bandwidth_savings_percentage']:.1f}%")
print(f"Estimated cost savings: ${analysis['potential_savings']['estimated_cost_savings']:.2f}")
# 生成优化配置
optimized_config = cdn_optimizer.optimize_edge_configuration(analysis)
print(f"\\nOptimized 404 TTL: {optimized_config.cache_ttl.get(404)}s (was: {edge_config.cache_ttl.get(404)}s)")
# 监控性能
monitor.record_metrics({
'overall_hit_rate': 0.65,
'origin_load_percentage': 0.75,
'error_rate': 0.02,
'latency_p95_ms': 850,
'errors_by_status': {404: 10, 500: 2}
})
# 生成报告
report = monitor.generate_performance_report(
start_time=datetime.now() – timedelta(hours=1),
end_time=datetime.now()
)
print(f"\\nPerformance report:")
print(f"Average hit rate: {report['summary']['avg_hit_rate']:.1%}")
print(f"Recommendations: {len(report['recommendations'])}")
39.3 智能状态码处理
39.3.1 自适应状态码响应
python
# 自适应状态码响应系统
from typing import Dict, Any, Optional, Callable
from dataclasses import dataclass
from datetime import datetime, timedelta
import time
import random
class AdaptiveResponseStrategy:
"""自适应响应策略"""
def __init__(self):
self.strategies = self._initialize_strategies()
self.performance_history = []
self.learning_rate = 0.1
def _initialize_strategies(self) -> Dict[str, Dict[str, Any]]:
"""初始化策略"""
return {
'aggressive_caching': {
'description': 'Aggressive caching for high traffic',
'conditions': ['high_traffic', 'low_error_rate'],
'actions': {
'increase_cache_ttl': 2.0, # 双倍TTL
'enable_stale_while_revalidate': True,
'prefetch_related': True
},
'performance_impact': 0.0 # 待学习
},
'conservative_fallback': {
'description': 'Conservative fallback during instability',
'conditions': ['high_error_rate', 'increasing_latency'],
'actions': {
'reduce_cache_ttl': 0.5, # 一半TTL
'disable_complex_features': True,
'enable_circuit_breaker': True
},
'performance_impact': 0.0
},
'graceful_degradation': {
'description': 'Graceful degradation for overload',
'conditions': ['high_load', 'resource_constrained'],
'actions': {
'simplify_responses': True,
'return_partial_data': True,
'suggest_retry_later': True
},
'performance_impact': 0.0
},
'predictive_optimization': {
'description': 'Predictive optimization based on patterns',
'conditions': ['stable_pattern', 'predictable_load'],
'actions': {
'pre_warm_cache': True,
'schedule_maintenance': True,
'optimize_anticipatory': True
},
'performance_impact': 0.0
}
}
def analyze_context(self,
metrics: Dict[str, Any],
request_context: Dict[str, Any]) -> List[str]:
"""分析上下文,确定适用策略"""
applicable_strategies = []
# 检查各项条件
conditions = self._evaluate_conditions(metrics, request_context)
for strategy_name, strategy in self.strategies.items():
required_conditions = strategy['conditions']
# 检查是否满足所有条件
if all(cond in conditions for cond in required_conditions):
applicable_strategies.append(strategy_name)
return applicable_strategies
def _evaluate_conditions(self,
metrics: Dict[str, Any],
context: Dict[str, Any]) -> List[str]:
"""评估条件"""
conditions = []
# 流量条件
request_rate = metrics.get('requests_per_second', 0)
if request_rate > 100:
conditions.append('high_traffic')
elif request_rate < 10:
conditions.append('low_traffic')
# 错误率条件
error_rate = metrics.get('error_rate', 0)
if error_rate > 0.1:
conditions.append('high_error_rate')
elif error_rate < 0.01:
conditions.append('low_error_rate')
# 延迟条件
latency_trend = metrics.get('latency_trend', 'stable')
if latency_trend == 'increasing':
conditions.append('increasing_latency')
# 负载条件
system_load = metrics.get('system_load', 0)
if system_load > 0.8:
conditions.append('high_load')
# 资源条件
memory_usage = metrics.get('memory_usage', 0)
if memory_usage > 0.9:
conditions.append('resource_constrained')
# 模式条件
if context.get('pattern_stability', 0) > 0.8:
conditions.append('stable_pattern')
if context.get('load_predictability', 0) > 0.7:
conditions.append('predictable_load')
return conditions
def select_best_strategy(self,
applicable_strategies: List[str],
historical_performance: Dict[str, float]) -> str:
"""选择最佳策略"""
if not applicable_strategies:
return 'default'
# 使用UCB(Upper Confidence Bound)算法平衡探索和利用
strategies_with_score = []
for strategy in applicable_strategies:
if strategy in historical_performance:
# 利用:使用历史性能
avg_performance = historical_performance[strategy]['average']
count = historical_performance[strategy]['count']
# 探索项:鼓励尝试次数少的策略
exploration_bonus = (2 * math.log(sum(
h['count'] for h in historical_performance.values()
)) / count) ** 0.5 if count > 0 else float('inf')
score = avg_performance + exploration_bonus
else:
# 新策略,给高分鼓励尝试
score = float('inf')
strategies_with_score.append((strategy, score))
# 选择最高分策略
return max(strategies_with_score, key=lambda x: x[0])[0]
def adapt_response(self,
original_status_code: int,
selected_strategy: str,
context: Dict[str, Any]) -> Dict[str, Any]:
"""根据策略调整响应"""
if selected_strategy == 'default':
return {
'status_code': original_status_code,
'headers': {},
'body': {},
'strategy_applied': 'none'
}
strategy = self.strategies[selected_strategy]
actions = strategy['actions']
adapted_response = {
'status_code': original_status_code,
'headers': {},
'body': {},
'strategy_applied': selected_strategy,
'actions_taken': []
}
# 应用各个动作
if 'increase_cache_ttl' in actions:
multiplier = actions['increase_cache_ttl']
adapted_response['headers']['Cache-Control'] = \\
f"public, max-age={3600 * multiplier}" # 示例
if 'reduce_cache_ttl' in actions:
multiplier = actions['reduce_cache_ttl']
adapted_response['headers']['Cache-Control'] = \\
f"public, max-age={300 * multiplier}"
if 'enable_stale_while_revalidate' in actions:
adapted_response['headers']['Cache-Control'] += \\
", stale-while-revalidate=300"
if 'simplify_responses' in actions and original_status_code == 200:
# 简化响应体
adapted_response['body'] = {
'simplified': True,
'essential_data_only': True
}
if 'return_partial_data' in actions and original_status_code == 200:
# 返回部分数据
adapted_response['status_code'] = 206 # Partial Content
adapted_response['headers']['Content-Range'] = 'bytes 0-499/*'
if 'suggest_retry_later' in actions:
# 建议稍后重试
retry_after = context.get('estimated_recovery_seconds', 30)
adapted_response['headers']['Retry-After'] = str(retry_after)
# 对于成功状态码,添加警告
if 200 <= original_status_code < 300:
adapted_response['headers']['Warning'] = \\
'199 – "Service experiencing high load"'
return adapted_response
def record_performance(self,
strategy: str,
performance_metrics: Dict[str, float]):
"""记录策略性能"""
self.performance_history.append({
'timestamp': datetime.now(),
'strategy': strategy,
'metrics': performance_metrics
})
# 更新策略性能
if strategy in self.strategies:
# 计算性能分数
performance_score = self._calculate_performance_score(performance_metrics)
# 更新学习
if 'performance_impact' in self.strategies[strategy]:
current = self.strategies[strategy]['performance_impact']
updated = current + self.learning_rate * (performance_score – current)
self.strategies[strategy]['performance_impact'] = updated
def _calculate_performance_score(self, metrics: Dict[str, float]) -> float:
"""计算性能分数"""
# 组合多个指标
score = 0.0
# 响应时间(越低越好)
response_time = metrics.get('response_time_ms', 1000)
response_time_score = max(0, 1 – (response_time / 5000)) # 5秒为阈值
score += response_time_score * 0.4
# 错误率(越低越好)
error_rate = metrics.get('error_rate', 0)
error_rate_score = max(0, 1 – (error_rate / 0.5)) # 50%为阈值
score += error_rate_score * 0.3
# 缓存命中率(越高越好)
cache_hit_rate = metrics.get('cache_hit_rate', 0)
score += cache_hit_rate * 0.2
# 资源使用(越低越好)
resource_usage = metrics.get('resource_usage', 0)
resource_score = max(0, 1 – resource_usage)
score += resource_score * 0.1
return score
def get_learning_report(self) -> Dict[str, Any]:
"""获取学习报告"""
# 计算策略性能统计
strategy_stats = {}
for strategy_name in self.strategies:
strategy_performances = [
entry for entry in self.performance_history
if entry['strategy'] == strategy_name
]
if strategy_performances:
scores = [
self._calculate_performance_score(p['metrics'])
for p in strategy_performances
]
import statistics
strategy_stats[strategy_name] = {
'count': len(strategy_performances),
'avg_score': statistics.mean(scores) if scores else 0,
'std_score': statistics.stdev(scores) if len(scores) > 1 else 0,
'last_used': max(
p['timestamp'] for p in strategy_performances
) if strategy_performances else None
}
# 学习趋势
learning_trend = self._analyze_learning_trend()
# 推荐策略
recommended_strategies = self._recommend_strategies(strategy_stats)
return {
'strategy_statistics': strategy_stats,
'learning_trend': learning_trend,
'recommended_strategies': recommended_strategies,
'learning_efficiency': self._calculate_learning_efficiency()
}
def _analyze_learning_trend(self) -> Dict[str, Any]:
"""分析学习趋势"""
if len(self.performance_history) < 10:
return {'status': 'INSUFFICIENT_DATA'}
# 计算性能改进
recent_performance = self.performance_history[-10:]
older_performance = self.performance_history[:10]
recent_scores = [
self._calculate_performance_score(p['metrics'])
for p in recent_performance
]
older_scores = [
self._calculate_performance_score(p['metrics'])
for p in older_performance
]
import statistics
improvement = statistics.mean(recent_scores) – statistics.mean(older_scores)
if improvement > 0.1:
trend = 'IMPROVING'
elif improvement < -0.1:
trend = 'DECLINING'
else:
trend = 'STABLE'
return {
'status': trend,
'improvement': improvement,
'confidence': min(1.0, len(self.performance_history) / 100)
}
def _recommend_strategies(self,
strategy_stats: Dict[str, Dict]) -> List[Dict[str, Any]]:
"""推荐策略"""
recommendations = []
for strategy_name, stats in strategy_stats.items():
avg_score = stats['avg_score']
if avg_score > 0.8:
recommendations.append({
'strategy': strategy_name,
'recommendation': 'HIGHLY_EFFECTIVE',
'confidence': 'HIGH',
'suggested_usage': 'Increase application frequency'
})
elif avg_score < 0.3:
recommendations.append({
'strategy': strategy_name,
'recommendation': 'INEFFECTIVE',
'confidence': 'MEDIUM',
'suggested_usage': 'Review and potentially deprecate'
})
return sorted(recommendations,
key=lambda x: x.get('confidence', 'LOW'),
reverse=True)
def _calculate_learning_efficiency(self) -> float:
"""计算学习效率"""
if len(self.performance_history) < 20:
return 0.0
# 计算性能方差减少
early_scores = [
self._calculate_performance_score(p['metrics'])
for p in self.performance_history[:10]
]
late_scores = [
self._calculate_performance_score(p['metrics'])
for p in self.performance_history[-10:]
]
import statistics
early_variance = statistics.variance(early_scores) if len(early_scores) > 1 else 1
late_variance = statistics.variance(late_scores) if len(late_scores) > 1 else 1
# 方差减少表示学习效率
variance_reduction = (early_variance – late_variance) / early_variance
return max(0, min(1, variance_reduction))
# 自适应中间件
class AdaptiveStatusCodeMiddleware:
"""自适应状态码中间件"""
def __init__(self, strategy_engine: AdaptiveResponseStrategy):
self.strategy_engine = strategy_engine
self.metrics_collector = MetricsCollector()
self.context_analyzer = ContextAnalyzer()
async def process_request(self, request: Dict[str, Any]) -> Dict[str, Any]:
"""处理请求"""
# 收集指标
current_metrics = self.metrics_collector.get_current_metrics()
# 分析上下文
request_context = self.context_analyzer.analyze(request)
# 选择策略
applicable_strategies = self.strategy_engine.analyze_context(
current_metrics, request_context
)
historical_performance = self._get_historical_performance()
selected_strategy = self.strategy_engine.select_best_strategy(
applicable_strategies, historical_performance
)
# 存储策略决策
request['adaptive_strategy'] = selected_strategy
request['request_context'] = request_context
return request
async def process_response(self,
request: Dict[str, Any],
response: Dict[str, Any]) -> Dict[str, Any]:
"""处理响应"""
selected_strategy = request.get('adaptive_strategy', 'default')
request_context = request.get('request_context', {})
# 自适应调整响应
adapted_response = self.strategy_engine.adapt_response(
response['status_code'],
selected_strategy,
request_context
)
# 合并响应
final_response = response.copy()
final_response.update({
'headers': {**response.get('headers', {}),
**adapted_response.get('headers', {})},
'body': adapted_response.get('body') or response.get('body'),
'adaptive_metadata': {
'strategy_applied': adapted_response.get('strategy_applied'),
'actions_taken': adapted_response.get('actions_taken', [])
}
})
# 记录性能
self._record_performance(selected_strategy, request, final_response)
return final_response
def _get_historical_performance(self) -> Dict[str, Dict[str, float]]:
"""获取历史性能"""
# 简化实现
return {}
def _record_performance(self,
strategy: str,
request: Dict[str, Any],
response: Dict[str, Any]):
"""记录性能"""
performance_metrics = {
'response_time_ms': response.get('response_time', 0),
'error_rate': 1 if response['status_code'] >= 400 else 0,
'cache_hit_rate': 1 if response.get('from_cache', False) else 0,
'resource_usage': request.get('resource_usage', 0)
}
self.strategy_engine.record_performance(strategy, performance_metrics)
def get_adaptation_report(self) -> Dict[str, Any]:
"""获取适应报告"""
learning_report = self.strategy_engine.get_learning_report()
current_metrics = self.metrics_collector.get_current_metrics()
return {
'learning_report': learning_report,
'current_metrics': current_metrics,
'adaptation_efficiency': self._calculate_adaptation_efficiency(),
'recommended_adjustments': self._generate_adjustment_recommendations(
learning_report, current_metrics
)
}
def _calculate_adaptation_efficiency(self) -> float:
"""计算适应效率"""
# 基于学习效率和当前性能
learning_report = self.strategy_engine.get_learning_report()
learning_efficiency = learning_report.get('learning_efficiency', 0)
current_metrics = self.metrics_collector.get_current_metrics()
current_performance = self.strategy_engine._calculate_performance_score(
current_metrics
)
# 组合指标
efficiency = (learning_efficiency * 0.3 + current_performance * 0.7)
return efficiency
def _generate_adjustment_recommendations(self,
learning_report: Dict[str, Any],
current_metrics: Dict[str, Any]) -> List[Dict[str, Any]]:
"""生成调整建议"""
recommendations = []
# 基于学习报告的建议
for rec in learning_report.get('recommended_strategies', []):
if rec['recommendation'] == 'INEFFECTIVE':
recommendations.append({
'type': 'STRATEGY_ADJUSTMENT',
'priority': 'MEDIUM',
'description': f"Strategy '{rec['strategy']}' appears ineffective",
'action': f"Review and adjust {rec['strategy']} strategy parameters"
})
# 基于当前指标的建议
if current_metrics.get('error_rate', 0) > 0.1:
recommendations.append({
'type': 'ERROR_HANDLING',
'priority': 'HIGH',
'description': 'High error rate detected',
'action': 'Enable conservative fallback strategies more aggressively'
})
if current_metrics.get('requests_per_second', 0) > 500:
recommendations.append({
'type': 'SCALING_ADJUSTMENT',
'priority': 'MEDIUM',
'description': 'High traffic load',
'action': 'Increase caching aggressiveness and enable prefetching'
})
return recommendations
# 支持类
class MetricsCollector:
"""指标收集器"""
def get_current_metrics(self) -> Dict[str, float]:
"""获取当前指标"""
# 简化实现
return {
'requests_per_second': random.uniform(50, 500),
'error_rate': random.uniform(0, 0.2),
'latency_trend': random.choice(['stable', 'increasing', 'decreasing']),
'system_load': random.uniform(0.3, 0.9),
'memory_usage': random.uniform(0.4, 0.95)
}
class ContextAnalyzer:
"""上下文分析器"""
def analyze(self, request: Dict[str, Any]) -> Dict[str, Any]:
"""分析请求上下文"""
return {
'pattern_stability': random.uniform(0.5, 1.0),
'load_predictability': random.uniform(0.3, 0.9),
'estimated_recovery_seconds': random.randint(10, 300),
'resource_usage': random.uniform(0.2, 0.8)
}
# 使用示例
strategy_engine = AdaptiveResponseStrategy()
middleware = AdaptiveStatusCodeMiddleware(strategy_engine)
# 模拟请求处理
for i in range(100):
request = {
'method': 'GET',
'url': f'/api/data/{i}',
'headers': {},
'resource_usage': random.uniform(0.2, 0.8)
}
# 处理请求
processed_request = middleware.process_request(request)
# 模拟响应
response = {
'status_code': 200 if i % 20 != 0 else 503,
'headers': {},
'body': {'data': f'response_{i}'},
'response_time': random.uniform(50, 2000),
'from_cache': i % 3 == 0
}
# 处理响应
adapted_response = middleware.process_response(processed_request, response)
# 每20个请求显示报告
if i % 20 == 0 and i > 0:
report = middleware.get_adaptation_report()
print(f"\\n=== Adaptation Report after {i} requests ===")
print(f"Learning efficiency: {report['adaptation_efficiency']:.2%}")
for rec in report['recommended_adjustments'][:2]:
print(f" {rec['priority']}: {rec['description']}")
39.4 总结
状态码性能优化是一个多层次、多维度的系统工程:
核心优化领域:
响应时间优化 – 分解和分析各个组件的时间消耗
缓存策略优化 – 智能的缓存决策和管理
传输优化 – HTTP/2/3头部压缩,CDN优化
自适应处理 – 基于上下文的状态码响应调整
关键技术:
性能剖析 – 详细的时间分解和瓶颈识别
智能缓存 – 基于状态码和上下文的动态缓存策略
协议优化 – 充分利用HTTP/2/3的特性
机器学习 – 自适应策略学习和优化
最佳实践:
分层优化 – 从边缘到后端的全方位优化
数据驱动 – 基于监控数据的优化决策
渐进式改进 – 小步快跑,持续优化
自动化调整 – 基于规则的自动优化
未来趋势:
AI驱动的优化 – 更智能的自适应系统
边缘计算 – 更接近用户的处理
实时优化 – 基于实时数据的即时调整
预测性优化 – 提前预测和预防性能问题
通过综合应用这些优化策略,可以显著提升系统的性能、可靠性和用户体验。
网硕互联帮助中心



![[工作经验]服务器CPU爆表救火记:从99%负载到问题根治的完整实战-网硕互联帮助中心](https://www.wsisp.com/helps/wp-content/uploads/2026/01/20260114064139-69673aa3904fa-220x150.png)

评论前必须登录!
注册