云计算百科
云计算领域专业知识百科平台

HTTP 状态码:客户端与服务器的通信语言——第六部分:状态码的实践应用(二)

第33章:微服务架构中的状态码传播

33.1 微服务架构中的状态码挑战

33.1.1 分布式系统的复杂性

微服务架构带来了状态码处理的全新挑战:

yaml

# 微服务调用链示例
request:
path: /api/order/123
flow:
– API Gateway (nginx/ingress)
– Authentication Service
– Order Service
– calls: User Service
– calls: Inventory Service
– calls: Payment Service
– calls: Notification Service
– Load Balancer
– Service Mesh (Istio/Linkerd)

# 每个服务都可能返回不同的状态码
status_codes_chain:
– API Gateway: 200
– Authentication Service: 401 (token expired)
– Order Service: 500 (dependency failed)
– User Service: 404 (user not found)
– Inventory Service: 409 (insufficient stock)
– Payment Service: 422 (payment failed)
– Notification Service: 503 (service unavailable)

33.1.2 状态码传播原则

java

// 微服务状态码传播的核心原则
public class StatusCodePropagationPrinciples {

// 原则1: 透明的错误传播
public static class TransparentPropagation {
// 错误应该尽可能透明地传播到调用链的上游
// 但同时需要避免敏感信息泄露
}

// 原则2: 上下文完整性
public static class ContextIntegrity {
// 每个错误都应该携带完整的调用链上下文
// 包括服务名称、调用路径、时间戳等
}

// 原则3: 降级策略
public static class GracefulDegradation {
// 部分服务的失败不应该导致整个系统崩溃
// 需要实现优雅的降级策略
}

// 原则4: 可追溯性
public static class Traceability {
// 每个状态码都应该能够追溯到具体的服务和操作
}
}

33.2 服务间状态码传播模式

33.2.1 直接传播模式

go

// Go语言实现的状态码直接传播
package main

import (
"context"
"encoding/json"
"fmt"
"net/http"
"time"

"github.com/opentracing/opentracing-go"
"go.uber.org/zap"
)

// 直接传播中间件
func DirectPropagationMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// 1. 从上游服务接收请求上下文
ctx := r.Context()

// 2. 提取上游的状态码信息
upstreamStatusCode := r.Header.Get("X-Upstream-Status-Code")
upstreamService := r.Header.Get("X-Upstream-Service")

// 3. 创建响应包装器以捕获状态码
rw := &responseWriter{ResponseWriter: w}

// 4. 处理请求
next.ServeHTTP(rw, r)

// 5. 向下游传播状态码
if rw.statusCode >= 400 {
// 记录错误上下文
logErrorContext(r, upstreamStatusCode, upstreamService, rw.statusCode)

// 设置响应头,为可能的下游服务提供上下文
w.Header().Set("X-Error-Propagated", "true")
w.Header().Set("X-Failed-Service", getServiceName())

// 如果有上游服务,记录调用链
if upstreamService != "" {
w.Header().Set("X-Error-Chain",
fmt.Sprintf("%s->%s:%d",
upstreamService, getServiceName(), rw.statusCode))
}
}
})
}

// 响应包装器
type responseWriter struct {
http.ResponseWriter
statusCode int
wroteHeader bool
}

func (rw *responseWriter) WriteHeader(code int) {
if !rw.wroteHeader {
rw.statusCode = code
rw.ResponseWriter.WriteHeader(code)
rw.wroteHeader = true
}
}

func (rw *responseWriter) Write(b []byte) (int, error) {
if !rw.wroteHeader {
rw.WriteHeader(http.StatusOK)
}
return rw.ResponseWriter.Write(b)
}

// 日志记录错误上下文
func logErrorContext(r *http.Request, upstreamCode, upstreamService string, currentCode int) {
logger := zap.L()

errorContext := map[string]interface{}{
"timestamp": time.Now().UTC(),
"request_id": r.Header.Get("X-Request-ID"),
"trace_id": r.Header.Get("X-Trace-ID"),
"current_service": getServiceName(),
"current_status": currentCode,
"upstream_service": upstreamService,
"upstream_status": upstreamCode,
"endpoint": r.URL.Path,
"method": r.Method,
"user_agent": r.UserAgent(),
"client_ip": getClientIP(r),
}

// 添加分布式追踪信息
if span := opentracing.SpanFromContext(r.Context()); span != nil {
errorContext["span_id"] = fmt.Sprintf("%v", span)
}

logger.Error("Service error with propagation context",
zap.Any("context", errorContext))
}

// HTTP客户端支持状态码传播
type PropagatingHTTPClient struct {
client *http.Client
serviceName string
}

func NewPropagatingHTTPClient(serviceName string) *PropagatingHTTPClient {
return &PropagatingHTTPClient{
client: &http.Client{
Timeout: 30 * time.Second,
Transport: &http.Transport{
MaxIdleConns: 100,
MaxIdleConnsPerHost: 10,
IdleConnTimeout: 90 * time.Second,
},
},
serviceName: serviceName,
}
}

func (c *PropagatingHTTPClient) Do(req *http.Request) (*http.Response, error) {
// 添加传播头
req.Header.Set("X-Caller-Service", c.serviceName)
req.Header.Set("X-Caller-Request-ID", req.Header.Get("X-Request-ID"))

// 执行请求
resp, err := c.client.Do(req)

if err != nil {
return nil, fmt.Errorf("HTTP request failed: %w", err)
}

// 如果响应是错误,记录传播信息
if resp.StatusCode >= 400 {
errorChain := resp.Header.Get("X-Error-Chain")
if errorChain == "" {
resp.Header.Set("X-Error-Chain",
fmt.Sprintf("%s:%d", c.serviceName, resp.StatusCode))
} else {
resp.Header.Set("X-Error-Chain",
fmt.Sprintf("%s->%s:%d", errorChain, c.serviceName, resp.StatusCode))
}
}

return resp, nil
}

33.2.2 聚合传播模式

python

# Python聚合传播实现
from typing import Dict, List, Optional, Any, Tuple
from dataclasses import dataclass, asdict
import json
import time
from enum import Enum
import logging
from concurrent.futures import ThreadPoolExecutor, as_completed

logger = logging.getLogger(__name__)

class ErrorSeverity(Enum):
CRITICAL = 100 # 5xx errors
ERROR = 200 # 4xx errors that affect user
WARNING = 300 # 4xx errors that don't affect user
INFO = 400 # 2xx with warnings

@dataclass
class ServiceError:
service: str
status_code: int
message: str
endpoint: str
timestamp: float
severity: ErrorSeverity
retryable: bool
dependencies: List[str] = None

def to_dict(self) -> Dict:
return {
**asdict(self),
'severity': self.severity.value,
'timestamp': time.strftime('%Y-%m-%d %H:%M:%S',
time.localtime(self.timestamp))
}

class StatusCodeAggregator:
def __init__(self, service_name: str):
self.service_name = service_name
self.errors: List[ServiceError] = []
self.successes: List[Dict] = []

def add_service_result(self, service: str, status_code: int,
message: str, endpoint: str,
retryable: bool = False):
"""添加服务调用结果"""
# 确定错误严重性
if 500 <= status_code < 600:
severity = ErrorSeverity.CRITICAL
elif status_code == 401 or status_code == 403:
severity = ErrorSeverity.ERROR
elif 400 <= status_code < 500:
severity = ErrorSeverity.WARNING
elif 200 <= status_code < 300:
severity = ErrorSeverity.INFO
else:
severity = ErrorSeverity.WARNING

error = ServiceError(
service=service,
status_code=status_code,
message=message,
endpoint=endpoint,
timestamp=time.time(),
severity=severity,
retryable=retryable
)

if status_code >= 400:
self.errors.append(error)
else:
self.successes.append(error.to_dict())

return error

def aggregate_status_code(self) -> Tuple[int, Dict[str, Any]]:
"""聚合多个服务的状态码"""
if not self.errors:
return 200, {
'status': 'success',
'data': self.successes,
'timestamp': time.time()
}

# 按严重性排序错误
self.errors.sort(key=lambda x: x.severity.value)

# 获取最高严重性的错误
primary_error = self.errors[0]

# 确定最终状态码
final_status_code = self._determine_final_status_code()

# 构建响应
response = {
'status': 'partial_failure' if len(self.errors) < len(self.errors) + len(self.successes) else 'failure',
'error': {
'code': f"AGGREGATED_ERROR_{final_status_code}",
'message': self._generate_aggregated_message(),
'timestamp': time.time(),
'details': {
'primary_error': primary_error.to_dict(),
'all_errors': [e.to_dict() for e in self.errors],
'successful_calls': self.successes,
'aggregation_strategy': 'highest_severity'
}
}
}

return final_status_code, response

def _determine_final_status_code(self) -> int:
"""根据聚合策略确定最终状态码"""
# 策略1: 如果有5xx错误,返回500
if any(e.severity == ErrorSeverity.CRITICAL for e in self.errors):
return 500

# 策略2: 如果有用户相关的4xx错误,返回最相关的那个
user_errors = [e for e in self.errors
if e.severity == ErrorSeverity.ERROR]
if user_errors:
# 优先返回认证/授权错误
for error in user_errors:
if error.status_code in [401, 403]:
return error.status_code
# 否则返回第一个用户错误
return user_errors[0].status_code

# 策略3: 返回第一个警告级别的错误
return self.errors[0].status_code

def _generate_aggregated_message(self) -> str:
"""生成聚合错误消息"""
if len(self.errors) == 1:
return f"Service {self.errors[0].service} failed: {self.errors[0].message}"

error_count = len(self.errors)
critical_count = sum(1 for e in self.errors
if e.severity == ErrorSeverity.CRITICAL)

if critical_count > 0:
return f"{critical_count} critical service(s) failed out of {error_count} errors"

return f"{error_count} service(s) reported errors"

# 使用示例:订单处理服务
class OrderProcessingService:
def __init__(self):
self.aggregator = StatusCodeAggregator("order-service")

def process_order(self, order_data: Dict) -> Tuple[int, Dict]:
"""处理订单,调用多个微服务"""
# 并行调用多个服务
with ThreadPoolExecutor(max_workers=4) as executor:
futures = {
executor.submit(self._validate_user, order_data['user_id']): 'user-service',
executor.submit(self._check_inventory, order_data['items']): 'inventory-service',
executor.submit(self._calculate_shipping, order_data['address']): 'shipping-service',
executor.submit(self._validate_payment, order_data['payment']): 'payment-service'
}

for future in as_completed(futures):
service_name = futures[future]
try:
status_code, message = future.result(timeout=10)
self.aggregator.add_service_result(
service_name, status_code, message,
f"/api/{service_name.replace('-', '/')}"
)
except TimeoutError:
self.aggregator.add_service_result(
service_name, 504, "Service timeout",
f"/api/{service_name.replace('-', '/')}",
retryable=True
)
except Exception as e:
self.aggregator.add_service_result(
service_name, 500, str(e),
f"/api/{service_name.replace('-', '/')}",
retryable=True
)

# 聚合结果
final_status, response = self.aggregator.aggregate_status_code()

# 如果所有必需服务都成功,创建订单
if final_status == 200:
order_result = self._create_order(order_data)
response['data']['order'] = order_result

return final_status, response

def _validate_user(self, user_id: str) -> Tuple[int, str]:
# 调用用户服务
# 返回 (status_code, message)
pass

def _check_inventory(self, items: List) -> Tuple[int, str]:
# 调用库存服务
pass

def _calculate_shipping(self, address: Dict) -> Tuple[int, str]:
# 调用物流服务
pass

def _validate_payment(self, payment: Dict) -> Tuple[int, str]:
# 调用支付服务
pass

def _create_order(self, order_data: Dict) -> Dict:
# 创建订单
pass

33.3 分布式追踪与状态码

33.3.1 OpenTelemetry集成

typescript

// TypeScript OpenTelemetry集成
import { NodeTracerProvider } from '@opentelemetry/node';
import { SimpleSpanProcessor } from '@opentelemetry/tracing';
import { JaegerExporter } from '@opentelemetry/exporter-jaeger';
import { ZipkinExporter } from '@opentelemetry/exporter-zipkin';
import { context, Span, SpanStatusCode } from '@opentelemetry/api';
import { HttpInstrumentation } from '@opentelemetry/instrumentation-http';
import { ExpressInstrumentation } from '@opentelemetry/instrumentation-express';
import { Resource } from '@opentelemetry/resources';
import { SemanticResourceAttributes } from '@opentelemetry/semantic-conventions';

// 配置追踪提供商
const provider = new NodeTracerProvider({
resource: new Resource({
[SemanticResourceAttributes.SERVICE_NAME]: process.env.SERVICE_NAME || 'unknown-service',
[SemanticResourceAttributes.DEPLOYMENT_ENVIRONMENT]: process.env.NODE_ENV || 'development',
}),
});

// 添加导出器
const jaegerExporter = new JaegerExporter({
endpoint: process.env.JAEGER_ENDPOINT || 'http://localhost:14268/api/traces',
});

const zipkinExporter = new ZipkinExporter({
url: process.env.ZIPKIN_ENDPOINT || 'http://localhost:9411/api/v2/spans',
});

provider.addSpanProcessor(new SimpleSpanProcessor(jaegerExporter));
provider.addSpanProcessor(new SimpleSpanProcessor(zipkinExporter));

// 注册提供商
provider.register();

// 仪表化HTTP和Express
const httpInstrumentation = new HttpInstrumentation();
const expressInstrumentation = new ExpressInstrumentation();

// 状态码追踪中间件
export function statusCodeTracingMiddleware(req: any, res: any, next: Function) {
const tracer = provider.getTracer('http-server');
const span = tracer.startSpan(`${req.method} ${req.path}`, {
attributes: {
'http.method': req.method,
'http.url': req.url,
'http.route': req.path,
'http.user_agent': req.get('user-agent'),
'http.client_ip': req.ip,
},
});

// 将span存储在上下文中
const ctx = context.active();
context.bind(ctx, span);

// 存储span在请求对象上以便后续访问
req.span = span;

// 监听响应完成
const originalEnd = res.end;
res.end = function(…args: any[]) {
// 记录状态码
span.setAttribute('http.status_code', res.statusCode);
span.setAttribute('http.status_text', res.statusMessage);

// 根据状态码设置span状态
if (res.statusCode >= 400) {
span.setStatus({
code: SpanStatusCode.ERROR,
message: `HTTP ${res.statusCode}`,
});

// 添加错误属性
span.setAttribute('error', true);
span.setAttribute('error.type', `HTTP_${res.statusCode}`);

// 如果有错误信息,记录它
if (res.locals.error) {
span.setAttribute('error.message', res.locals.error.message);
span.recordException(res.locals.error);
}
} else {
span.setStatus({ code: SpanStatusCode.OK });
}

// 记录响应大小
const contentLength = res.get('content-length');
if (contentLength) {
span.setAttribute('http.response_size', parseInt(contentLength));
}

// 记录持续时间
const startTime = req._startTime || Date.now();
const duration = Date.now() – startTime;
span.setAttribute('http.duration_ms', duration);

// 结束span
span.end();

// 调用原始的end方法
return originalEnd.apply(this, args);
};

// 记录请求开始时间
req._startTime = Date.now();

next();
}

// HTTP客户端仪表化
export class TracedHttpClient {
private tracer: any;

constructor(private baseURL: string, serviceName: string) {
this.tracer = provider.getTracer(serviceName);
}

async request<T>(
method: string,
path: string,
data?: any,
options: any = {}
): Promise<{ status: number; data: T; headers: any }> {
const span = this.tracer.startSpan(`${method} ${path}`, {
attributes: {
'http.method': method,
'http.url': `${this.baseURL}${path}`,
'peer.service': this.baseURL.replace(/https?:\\/\\//, ''),
},
});

try {
const url = `${this.baseURL}${path}`;
const headers = {
'Content-Type': 'application/json',
…options.headers,
};

// 传播追踪上下文
const carrier: any = {};
const ctx = context.active();
const activeSpan = context.getActiveSpan();
if (activeSpan) {
const traceparent = activeSpan.spanContext().traceId;
headers['traceparent'] = `00-${traceparent}-${span.spanContext().spanId}-01`;
headers['tracestate'] = activeSpan.spanContext().traceState?.serialize();
}

const response = await fetch(url, {
method,
headers,
body: data ? JSON.stringify(data) : undefined,
…options,
});

// 记录响应信息
span.setAttribute('http.status_code', response.status);
span.setAttribute('http.status_text', response.statusText);

const responseData = await response.json().catch(() => null);

// 根据状态码设置span状态
if (response.status >= 400) {
span.setStatus({
code: SpanStatusCode.ERROR,
message: `HTTP ${response.status}`,
});

// 记录错误详情
span.setAttribute('error', true);
span.setAttribute('error.type', `HTTP_${response.status}`);

if (responseData?.error) {
span.setAttribute('error.message', responseData.error.message);
}
} else {
span.setStatus({ code: SpanStatusCode.OK });
}

// 记录响应头中的追踪信息
const serverTraceId = response.headers.get('x-trace-id');
if (serverTraceId) {
span.setAttribute('peer.trace_id', serverTraceId);
}

return {
status: response.status,
data: responseData,
headers: Object.fromEntries(response.headers.entries()),
};

} catch (error: any) {
// 记录异常
span.setStatus({
code: SpanStatusCode.ERROR,
message: error.message,
});
span.recordException(error);
span.setAttribute('error', true);
span.setAttribute('error.type', 'NETWORK_ERROR');

throw error;
} finally {
span.end();
}
}

// 批量请求追踪
async batchRequest<T>(
requests: Array<{ method: string; path: string; data?: any }>
): Promise<Array<{ status: number; data: T; error?: any }>> {
const batchSpan = this.tracer.startSpan('batch_request', {
attributes: {
'batch.request_count': requests.length,
'batch.service': this.baseURL,
},
});

try {
const results = await Promise.all(
requests.map(async (req) => {
const span = this.tracer.startSpan(`${req.method} ${req.path}`, {
attributes: {
'http.method': req.method,
'http.url': `${this.baseURL}${req.path}`,
},
});

try {
const result = await this.request<T>(req.method, req.path, req.data);

span.setAttribute('http.status_code', result.status);

if (result.status >= 400) {
span.setStatus({ code: SpanStatusCode.ERROR });
} else {
span.setStatus({ code: SpanStatusCode.OK });
}

return result;
} catch (error: any) {
span.setStatus({
code: SpanStatusCode.ERROR,
message: error.message,
});
span.recordException(error);

return {
status: 500,
data: null as any,
error: error.message,
};
} finally {
span.end();
}
})
);

// 统计批量结果
const successCount = results.filter(r => r.status < 400).length;
const errorCount = results.length – successCount;

batchSpan.setAttribute('batch.success_count', successCount);
batchSpan.setAttribute('batch.error_count', errorCount);

if (errorCount > 0) {
batchSpan.setStatus({
code: SpanStatusCode.ERROR,
message: `${errorCount} requests failed in batch`,
});
} else {
batchSpan.setStatus({ code: SpanStatusCode.OK });
}

return results;
} finally {
batchSpan.end();
}
}
}

// 状态码分析中间件
export function statusCodeAnalyticsMiddleware(req: any, res: any, next: Function) {
const tracer = provider.getTracer('analytics');
const span = tracer.startSpan('status_code_analysis', {
attributes: {
'analysis.type': 'status_code_distribution',
'service.name': process.env.SERVICE_NAME,
},
});

// 收集指标
const metrics = {
request_count: 0,
status_2xx: 0,
status_3xx: 0,
status_4xx: 0,
status_5xx: 0,
errors_by_code: {} as Record<string, number>,
avg_response_time: 0,
};

// 监听响应完成
const originalEnd = res.end;
res.end = function(…args: any[]) {
metrics.request_count++;

// 分类状态码
if (res.statusCode >= 200 && res.statusCode < 300) {
metrics.status_2xx++;
} else if (res.statusCode >= 300 && res.statusCode < 400) {
metrics.status_3xx++;
} else if (res.statusCode >= 400 && res.statusCode < 500) {
metrics.status_4xx++;
metrics.errors_by_code[res.statusCode] =
(metrics.errors_by_code[res.statusCode] || 0) + 1;
} else if (res.statusCode >= 500) {
metrics.status_5xx++;
metrics.errors_by_code[res.statusCode] =
(metrics.errors_by_code[res.statusCode] || 0) + 1;
}

// 记录响应时间
const startTime = req._startTime || Date.now();
const duration = Date.now() – startTime;

// 更新平均响应时间
metrics.avg_response_time =
(metrics.avg_response_time * (metrics.request_count – 1) + duration) /
metrics.request_count;

// 更新span属性
span.setAttributes({
'analysis.request_count': metrics.request_count,
'analysis.status_2xx': metrics.status_2xx,
'analysis.status_3xx': metrics.status_3xx,
'analysis.status_4xx': metrics.status_4xx,
'analysis.status_5xx': metrics.status_5xx,
'analysis.avg_response_time_ms': metrics.avg_response_time,
…Object.entries(metrics.errors_by_code).reduce((acc, [code, count]) => ({
…acc,
[`analysis.errors.${code}`]: count,
}), {}),
});

// 如果错误率过高,记录警告
const errorRate = (metrics.status_4xx + metrics.status_5xx) / metrics.request_count;
if (errorRate > 0.1 && metrics.request_count > 10) {
span.setStatus({
code: SpanStatusCode.ERROR,
message: `High error rate: ${(errorRate * 100).toFixed(1)}%`,
});
span.setAttribute('analysis.high_error_rate', true);
span.setAttribute('analysis.error_rate', errorRate);
}

return originalEnd.apply(this, args);
};

// 定期提交指标
const interval = setInterval(() => {
if (metrics.request_count > 0) {
span.addEvent('metrics_snapshot', metrics);

// 重置计数器(保留错误分布)
metrics.request_count = 0;
metrics.status_2xx = 0;
metrics.status_3xx = 0;
metrics.status_4xx = 0;
metrics.status_5xx = 0;
metrics.avg_response_time = 0;
}
}, 60000); // 每分钟提交一次

// 清理定时器
req.on('close', () => {
clearInterval(interval);
span.end();
});

next();
}

33.3.2 Jaeger追踪可视化

yaml

# jaeger-service-map.yaml
# Jaeger服务依赖图配置
service_dependencies:
enabled: true
storage: elasticsearch

# 状态码相关的标签
tags:
– "http.status_code"
– "error"
– "span.kind"

# 服务级别指标
metrics:
– name: "error_rate"
query: "sum(rate(http_server_requests_seconds_count{status=~\\"4..|5..\\"}[5m])) / sum(rate(http_server_requests_seconds_count[5m]))"
threshold: 0.05
severity: "warning"

– name: "p95_latency"
query: "histogram_quantile(0.95, rate(http_server_requests_seconds_bucket[5m]))"
threshold: 1.0
severity: "warning"

# 状态码分布仪表板
dashboard:
panels:
– title: "Status Code Distribution"
type: "heatmap"
query: "sum(rate(http_server_requests_seconds_count[5m])) by (status, service)"

– title: "Error Propagation Chain"
type: "graph"
query: |
trace_id, span_id, parent_id,
operationName, serviceName,
tags.http.status_code as status_code,
tags.error as error
filter: "tags.http.status_code >= 400"

– title: "Service Dependency Health"
type: "dependency_graph"
nodes:
– service: "api-gateway"
health: "sum(rate(http_server_requests_seconds_count{service=\\"api-gateway\\", status=~\\"2..\\"}[5m])) / sum(rate(http_server_requests_seconds_count{service=\\"api-gateway\\"}[5m]))"

– service: "order-service"
health: "sum(rate(http_server_requests_seconds_count{service=\\"order-service\\", status=~\\"2..\\"}[5m])) / sum(rate(http_server_requests_seconds_count{service=\\"order-service\\"}[5m]))"

– service: "payment-service"
health: "sum(rate(http_server_requests_seconds_count{service=\\"payment-service\\", status=~\\"2..\\"}[5m])) / sum(rate(http_server_requests_seconds_count{service=\\"payment-service\\"}[5m]))"

33.4 断路器与状态码处理

33.4.1 Resilience4j断路器

java

// Java Resilience4j断路器配置
@Configuration
public class CircuitBreakerConfiguration {

private final MeterRegistry meterRegistry;
private final Tracer tracer;

public CircuitBreakerConfiguration(MeterRegistry meterRegistry,
Tracer tracer) {
this.meterRegistry = meterRegistry;
this.tracer = tracer;
}

@Bean
public CircuitBreakerRegistry circuitBreakerRegistry() {
CircuitBreakerConfig config = CircuitBreakerConfig.custom()
// 基于状态码的失败判断
.recordExceptions(
HttpClientErrorException.class, // 4xx
HttpServerErrorException.class, // 5xx
TimeoutException.class,
IOException.class
)
.ignoreExceptions(
// 忽略某些特定的4xx错误,如404(资源不存在)
NotFoundException.class
)
// 滑动窗口配置
.slidingWindowType(SlidingWindowType.COUNT_BASED)
.slidingWindowSize(100)
// 失败率阈值
.failureRateThreshold(50)
// 慢调用阈值
.slowCallRateThreshold(30)
.slowCallDurationThreshold(Duration.ofSeconds(2))
// 半开状态配置
.permittedNumberOfCallsInHalfOpenState(10)
.maxWaitDurationInHalfOpenState(Duration.ofSeconds(10))
// 自动从开启状态转换
.automaticTransitionFromOpenToHalfOpenEnabled(true)
.waitDurationInOpenState(Duration.ofSeconds(30))
.build();

CircuitBreakerRegistry registry = CircuitBreakerRegistry.of(config);

// 添加指标
TaggedCircuitBreakerMetrics.ofCircuitBreakerRegistry(registry)
.bindTo(meterRegistry);

return registry;
}

@Bean
public RetryRegistry retryRegistry() {
RetryConfig config = RetryConfig.custom()
.maxAttempts(3)
.waitDuration(Duration.ofMillis(500))
.intervalFunction(IntervalFunction.ofExponentialBackoff())
.retryOnException(e -> {
// 只在特定状态码下重试
if (e instanceof HttpClientErrorException) {
HttpClientErrorException ex = (HttpClientErrorException) e;
return ex.getStatusCode().is5xxServerError() ||
ex.getStatusCode() == HttpStatus.REQUEST_TIMEOUT ||
ex.getStatusCode() == HttpStatus.TOO_MANY_REQUESTS;
}
return e instanceof IOException ||
e instanceof TimeoutException;
})
.failAfterMaxAttempts(true)
.build();

return RetryRegistry.of(config);
}

@Bean
public BulkheadRegistry bulkheadRegistry() {
BulkheadConfig config = BulkheadConfig.custom()
.maxConcurrentCalls(100)
.maxWaitDuration(Duration.ofMillis(500))
.build();

return BulkheadRegistry.of(config);
}

// 状态码感知的断路器装饰器
@Bean
public RestTemplate statusCodeAwareRestTemplate(
CircuitBreakerRegistry circuitBreakerRegistry,
RetryRegistry retryRegistry,
BulkheadRegistry bulkheadRegistry) {

RestTemplate restTemplate = new RestTemplate();

// 添加拦截器
restTemplate.getInterceptors().add((request, body, execution) -> {
String serviceName = extractServiceName(request.getURI());

// 获取或创建断路器
CircuitBreaker circuitBreaker = circuitBreakerRegistry
.circuitBreaker(serviceName, serviceName);

Retry retry = retryRegistry.retry(serviceName, serviceName);

Bulkhead bulkhead = bulkheadRegistry.bulkhead(serviceName, serviceName);

// 创建追踪span
Span span = tracer.buildSpan("http_request")
.withTag("http.method", request.getMethod().name())
.withTag("http.url", request.getURI().toString())
.withTag("peer.service", serviceName)
.start();

try (Scope scope = tracer.activateSpan(span)) {
// 使用Resilience4j装饰调用
Supplier<ClientHttpResponse> supplier = () -> {
try {
return execution.execute(request, body);
} catch (IOException e) {
throw new RuntimeException(e);
}
};

// 组合 Resilience4j 装饰器
Supplier<ClientHttpResponse> decoratedSupplier = Decorators.ofSupplier(supplier)
.withCircuitBreaker(circuitBreaker)
.withRetry(retry)
.withBulkhead(bulkhead)
.decorate();

ClientHttpResponse response = decoratedSupplier.get();

// 记录状态码
span.setTag("http.status_code", response.getRawStatusCode());

if (response.getRawStatusCode() >= 400) {
span.setTag("error", true);
span.log(Map.of(
"event", "error",
"message", "HTTP error response",
"status_code", response.getRawStatusCode(),
"status_text", response.getStatusText()
));

// 根据状态码决定是否应该触发断路器
if (response.getRawStatusCode() >= 500) {
// 5xx错误应该被记录为失败
circuitBreaker.onError(
response.getRawStatusCode(),
new HttpServerErrorException(
response.getStatusCode(),
response.getStatusText()
)
);
} else if (response.getRawStatusCode() == 429) {
// 429 Too Many Requests,可能应该等待
circuitBreaker.onError(
response.getRawStatusCode(),
new HttpClientErrorException(
response.getStatusCode(),
response.getStatusText()
)
);
}
} else {
circuitBreaker.onSuccess(
response.getRawStatusCode(),
response.getStatusCode()
);
}

return response;

} catch (Exception e) {
span.setTag("error", true);
span.log(Map.of(
"event", "error",
"message", e.getMessage(),
"error.object", e.getClass().getName()
));

// 记录到断路器
circuitBreaker.onError(
-1, // 未知状态码
e
);

throw e;
} finally {
span.finish();
}
});

return restTemplate;
}

private String extractServiceName(URI uri) {
return uri.getHost();
}
}

// 状态码感知的断路器监控
@Component
public class CircuitBreakerMonitor {

private final CircuitBreakerRegistry circuitBreakerRegistry;
private final MeterRegistry meterRegistry;
private final Map<String, CircuitBreaker.State> previousStates = new ConcurrentHashMap<>();

public CircuitBreakerMonitor(CircuitBreakerRegistry circuitBreakerRegistry,
MeterRegistry meterRegistry) {
this.circuitBreakerRegistry = circuitBreakerRegistry;
this.meterRegistry = meterRegistry;

// 定期监控断路器状态
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
scheduler.scheduleAtFixedRate(this::monitorCircuitBreakers,
0, 10, TimeUnit.SECONDS);
}

private void monitorCircuitBreakers() {
circuitBreakerRegistry.getAllCircuitBreakers().forEach((name, cb) -> {
CircuitBreaker.Metrics metrics = cb.getMetrics();
CircuitBreaker.State currentState = cb.getState();

// 记录指标
Gauge.builder("circuitbreaker.state", cb,
circuitBreaker -> circuitBreaker.getState().getOrder())
.tag("name", name)
.register(meterRegistry);

Counter.builder("circuitbreaker.transitions")
.tag("name", name)
.tag("from", previousStates.getOrDefault(name, CircuitBreaker.State.CLOSED).name())
.tag("to", currentState.name())
.register(meterRegistry)
.increment();

// 记录基于状态码的失败统计
Map<Integer, Long> statusCodeFailures = getStatusCodeFailures(name);
statusCodeFailures.forEach((statusCode, count) -> {
Counter.builder("circuitbreaker.failures_by_status")
.tag("name", name)
.tag("status_code", String.valueOf(statusCode))
.register(meterRegistry)
.increment(count);
});

// 状态变更通知
if (previousStates.containsKey(name) &&
previousStates.get(name) != currentState) {

logStateChange(name, previousStates.get(name), currentState, metrics);

// 发送告警
if (currentState == CircuitBreaker.State.OPEN) {
sendAlert(name, "Circuit breaker OPENED", metrics);
} else if (currentState == CircuitBreaker.State.HALF_OPEN) {
sendAlert(name, "Circuit breaker HALF_OPEN", metrics);
} else if (currentState == CircuitBreaker.State.CLOSED &&
previousStates.get(name) == CircuitBreaker.State.OPEN) {
sendAlert(name, "Circuit breaker CLOSED", metrics);
}
}

previousStates.put(name, currentState);
});
}

private Map<Integer, Long> getStatusCodeFailures(String circuitBreakerName) {
// 这里应该从断路器的事件流中提取基于状态码的失败统计
// 实际实现可能需要自定义事件处理器
return new HashMap<>();
}

private void logStateChange(String name, CircuitBreaker.State from,
CircuitBreaker.State to, CircuitBreaker.Metrics metrics) {
logger.info("Circuit breaker state changed: {} {} -> {} (failureRate: {}%)",
name, from, to,
metrics.getFailureRate());
}

private void sendAlert(String circuitBreakerName, String message,
CircuitBreaker.Metrics metrics) {
// 发送告警到监控系统
Map<String, Object> alert = Map.of(
"circuit_breaker", circuitBreakerName,
"message", message,
"failure_rate", metrics.getFailureRate(),
"slow_call_rate", metrics.getSlowCallRate(),
"timestamp", Instant.now().toString()
);

// 发送到监控系统
// monitoringService.sendAlert(alert);
}
}

33.5 API网关中的状态码处理

33.5.1 Kong网关配置

yaml

# kong-status-code-handling.yaml
_format_version: "2.1"
_transform: true

# 服务定义
services:
– name: order-service
url: http://order-service:8080
routes:
– name: order-routes
paths:
– /orders
– /orders/
strip_path: true

# 插件配置
plugins:
# 1. 请求转换插件
– name: request-transformer
config:
add:
headers:
– "X-Request-ID:$ {request_id}"
– "X-Client-IP:$ {real_ip_remote_addr}"
– "X-Forwarded-For:$ {proxy_add_x_forwarded_for}"

# 2. 响应转换插件
– name: response-transformer
config:
add:
headers:
– "X-Service-Name:order-service"
– "X-Response-Time:$ {latency}"
remove:
headers:
– "Server"
– "X-Powered-By"

# 3. 状态码重写插件
– name: status-code-rewrite
enabled: true
config:
rules:
# 将特定的后端错误转换为标准错误
– backend_status: 502
gateway_status: 503
message: "Service temporarily unavailable"
body: '{"error":{"code":"SERVICE_UNAVAILABLE","message":"The service is temporarily unavailable"}}'

– backend_status: 504
gateway_status: 503
message: "Service timeout"
body: '{"error":{"code":"TIMEOUT","message":"The service did not respond in time"}}'

# 隐藏内部错误详情
– backend_status: 500
gateway_status: 500
message: "Internal server error"
body: '{"error":{"code":"INTERNAL_ERROR","message":"An internal server error occurred"}}'
hide_details: true

# 4. 断路器插件
– name: circuit-breaker
config:
window_size: 60
window_type: sliding
failure_threshold: 5
unhealthy:
http_statuses:
– 500
– 502
– 503
– 504
tcp_failures: 2
timeouts: 3
healthy:
http_statuses:
– 200
– 201
– 202
successes: 1
healthcheck:
active:
type: http
http_path: /health
timeout: 5
concurrency: 10
healthy:
interval: 30
http_statuses:
– 200
– 302
successes: 2
unhealthy:
interval: 30
http_statuses:
– 429
– 500
– 503
tcp_failures: 2
timeouts: 3
http_failures: 2
passive:
type: http
healthy:
http_statuses:
– 200
– 201
– 202
– 203
– 204
– 205
– 206
– 207
– 208
– 226
successes: 5
unhealthy:
http_statuses:
– 500
– 502
– 503
– 504
tcp_failures: 2
timeouts: 7
http_failures: 5

# 全局插件
plugins:
– name: correlation-id
config:
header_name: X-Request-ID
generator: uuid
echo_downstream: true

– name: prometheus
config:
status_code_metrics: true
latency_metrics: true
bandwidth_metrics: true
upstream_health_metrics: true

– name: zipkin
config:
http_endpoint: http://zipkin:9411/api/v2/spans
sample_ratio: 1
include_credential: true
traceid_byte_count: 16
header_type: preserve

# 全局错误处理器
– name: error-handler
config:
default_response:
status_code: 500
content_type: application/json
body: '{"error":{"code":"INTERNAL_ERROR","message":"An unexpected error occurred"}}'

custom_responses:
– status_code: 404
content_type: application/json
body: '{"error":{"code":"NOT_FOUND","message":"The requested resource was not found"}}'

– status_code: 429
content_type: application/json
headers:
Retry-After: "60"
body: '{"error":{"code":"RATE_LIMITED","message":"Too many requests, please try again later"}}'

– status_code: 503
content_type: application/json
headers:
Retry-After: "30"
body: '{"error":{"code":"SERVICE_UNAVAILABLE","message":"Service is temporarily unavailable"}}'

# 上游健康检查
upstreams:
– name: order-service-upstream
algorithm: round-robin
healthchecks:
active:
type: http
http_path: /health
timeout: 5
healthy:
interval: 30
http_statuses:
– 200
– 302
successes: 2
unhealthy:
interval: 30
http_statuses:
– 429
– 500
– 503
tcp_failures: 2
timeouts: 3
http_failures: 2
passive:
healthy:
http_statuses:
– 200
– 201
– 202
– 203
– 204
– 205
– 206
– 207
– 208
– 226
successes: 5
unhealthy:
http_statuses:
– 500
– 502
– 503
– 504
tcp_failures: 2
timeouts: 7
http_failures: 5

targets:
– target: order-service-1:8080
weight: 100
– target: order-service-2:8080
weight: 100

33.5.2 Envoy代理配置

yaml

# envoy-status-code-config.yaml
static_resources:
listeners:
– name: api_listener
address:
socket_address:
address: 0.0.0.0
port_value: 8080

filter_chains:
– filters:
– name: envoy.filters.network.http_connection_manager
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager

stat_prefix: ingress_http
route_config:
name: local_route
virtual_hosts:
– name: api
domains: ["*"]
routes:
– match:
prefix: "/orders"
route:
cluster: order_service
timeout: 30s
retry_policy:
retry_on: "5xx,gateway-error,connect-failure,retriable-4xx"
num_retries: 3
per_try_timeout: 10s
retry_back_off:
base_interval: 0.1s
max_interval: 10s

# 状态码重写
upgrade_configs:
– upgrade_type: "websocket"

# HTTP过滤器
http_filters:
# 1. 状态码重写过滤器
– name: envoy.filters.http.status_code_rewrite
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.http.status_code_rewrite.v3.StatusCodeRewrite
rules:
– upstream_status: "502"
gateway_status: "503"
headers_to_add:
– header:
key: "X-Status-Reason"
value: "Bad Gateway converted to Service Unavailable"

– upstream_status: "504"
gateway_status: "503"
headers_to_add:
– header:
key: "X-Status-Reason"
value: "Gateway Timeout converted to Service Unavailable"

# 2. 断路器过滤器
– name: envoy.filters.http.circuit_breaker
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.http.circuit_breaker.v3.CircuitBreaker
max_connections: 1024
max_pending_requests: 1024
max_requests: 1024
max_retries: 3
track_remaining: true

# 3. 故障注入过滤器
– name: envoy.filters.http.fault
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.http.fault.v3.HTTPFault
abort:
http_status: 503
percentage:
numerator: 5 # 5%的请求会收到503错误
denominator: HUNDRED

delay:
fixed_delay: 1s
percentage:
numerator: 10 # 10%的请求会有1秒延迟
denominator: HUNDRED

# 4. 外部授权过滤器
– name: envoy.filters.http.ext_authz
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.http.ext_authz.v3.ExtAuthz
http_service:
server_uri:
uri: auth-service:9000
cluster: auth_service
timeout: 0.25s

authorization_request:
allowed_headers:
patterns:
– exact: "content-type"
– exact: "authorization"
– exact: "x-request-id"

authorization_response:
allowed_upstream_headers:
patterns:
– exact: "x-user-id"
– exact: "x-user-role"

allowed_client_headers:
patterns:
– exact: "x-auth-status"

# 5. 速率限制过滤器
– name: envoy.filters.http.ratelimit
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.http.ratelimit.v3.RateLimit
domain: api-gateway
failure_mode_deny: false
timeout: 0.05s
rate_limit_service:
grpc_service:
envoy_grpc:
cluster_name: rate_limit_service

# 6. 路由器过滤器(必须最后一个)
– name: envoy.filters.http.router
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.http.router.v3.Router
suppress_envoy_headers: true
start_child_span: true

# 访问日志
access_log:
– name: envoy.access_loggers.file
typed_config:
"@type": type.googleapis.com/envoy.extensions.access_loggers.file.v3.FileAccessLog
path: /dev/stdout

log_format:
json_format:
timestamp: "%START_TIME%"
request_id: "%REQ(X-REQUEST-ID)%"
client_ip: "%DOWNSTREAM_REMOTE_ADDRESS%"
method: "%REQ(:METHOD)%"
path: "%REQ(X-ENVOY-ORIGINAL-PATH?:PATH)%"
protocol: "%PROTOCOL%"
response_code: "%RESPONSE_CODE%"
response_flags: "%RESPONSE_FLAGS%"
bytes_received: "%BYTES_RECEIVED%"
bytes_sent: "%BYTES_SENT%"
duration: "%DURATION%"
upstream_service_time: "%RESP(X-ENVOY-UPSTREAM-SERVICE-TIME)%"
upstream_host: "%UPSTREAM_HOST%"
upstream_cluster: "%UPSTREAM_CLUSTER%"
upstream_local_address: "%UPSTREAM_LOCAL_ADDRESS%"
downstream_local_address: "%DOWNSTREAM_LOCAL_ADDRESS%"
downstream_remote_address: "%DOWNSTREAM_REMOTE_ADDRESS%"
requested_server_name: "%REQUESTED_SERVER_NAME%"
route_name: "%ROUTE_NAME%"

# 追踪配置
tracing:
provider:
name: envoy.tracers.zipkin
typed_config:
"@type": type.googleapis.com/envoy.config.trace.v3.ZipkinConfig
collector_cluster: zipkin
collector_endpoint: "/api/v2/spans"
collector_endpoint_version: HTTP_JSON
shared_span_context: false
trace_id_128bit: true

# 集群定义
clusters:
– name: order_service
type: STRICT_DNS
lb_policy: ROUND_ROBIN
load_assignment:
cluster_name: order_service
endpoints:
– lb_endpoints:
– endpoint:
address:
socket_address:
address: order-service
port_value: 8080

circuit_breakers:
thresholds:
– priority: DEFAULT
max_connections: 1024
max_pending_requests: 1024
max_requests: 1024
max_retries: 3
– priority: HIGH
max_connections: 2048
max_pending_requests: 2048
max_requests: 2048
max_retries: 3

outlier_detection:
consecutive_5xx: 10
interval: 30s
base_ejection_time: 30s
max_ejection_percent: 50

health_checks:
– timeout: 5s
interval: 30s
unhealthy_threshold: 3
healthy_threshold: 2
http_health_check:
path: /health
expected_statuses:
start: 200
end: 299

– name: auth_service
type: STRICT_DNS
connect_timeout: 0.25s
lb_policy: ROUND_ROBIN
load_assignment:
cluster_name: auth_service
endpoints:
– lb_endpoints:
– endpoint:
address:
socket_address:
address: auth-service
port_value: 9000

– name: rate_limit_service
type: STRICT_DNS
connect_timeout: 0.05s
lb_policy: ROUND_ROBIN
http2_protocol_options: {}
load_assignment:
cluster_name: rate_limit_service
endpoints:
– lb_endpoints:
– endpoint:
address:
socket_address:
address: rate-limit-service
port_value: 8081

– name: zipkin
type: STRICT_DNS
lb_policy: ROUND_ROBIN
load_assignment:
cluster_name: zipkin
endpoints:
– lb_endpoints:
– endpoint:
address:
socket_address:
address: zipkin
port_value: 9411

# 管理接口
admin:
address:
socket_address:
address: 0.0.0.0
port_value: 9901

33.6 服务网格中的状态码传播

33.6.1 Istio服务网格配置

yaml

# istio-status-code-policies.yaml
apiVersion: networking.istio.io/v1beta1
kind: VirtualService
metadata:
name: order-service
spec:
hosts:
– order-service
http:
– match:
– uri:
prefix: /orders
route:
– destination:
host: order-service
port:
number: 8080
weight: 100

# 重试策略
retries:
attempts: 3
retryOn: "5xx,gateway-error,connect-failure,retriable-4xx"
perTryTimeout: 10s

# 超时配置
timeout: 30s

# 故障注入
fault:
abort:
percentage:
value: 0.1 # 0.1%的请求会收到503错误
httpStatus: 503
delay:
percentage:
value: 1 # 1%的请求会有100ms延迟
fixedDelay: 100ms

# 状态码重写
headers:
request:
set:
X-Request-ID: "%REQ(X-REQUEST-ID)%"
X-Client-IP: "%DOWNSTREAM_REMOTE_ADDRESS%"
response:
set:
X-Service-Version: "v1.0.0"
X-Response-Time: "%RESPONSE_DURATION%"
remove:
– "Server"

# CORS策略
corsPolicy:
allowOrigin:
– "*"
allowMethods:
– POST
– GET
– OPTIONS
– PUT
– DELETE
allowHeaders:
– content-type
– authorization
– x-request-id
maxAge: "24h"


apiVersion: networking.istio.io/v1beta1
kind: DestinationRule
metadata:
name: order-service
spec:
host: order-service
trafficPolicy:
# 连接池设置
connectionPool:
tcp:
maxConnections: 100
connectTimeout: 30ms
http:
http1MaxPendingRequests: 1024
http2MaxRequests: 1024
maxRequestsPerConnection: 1024
maxRetries: 3
idleTimeout: 3600s

# 负载均衡
loadBalancer:
simple: ROUND_ROBIN

# 异常检测
outlierDetection:
consecutive5xxErrors: 10
interval: 30s
baseEjectionTime: 30s
maxEjectionPercent: 50

# TLS设置
tls:
mode: ISTIO_MUTUAL

# 状态码监控配置
apiVersion: telemetry.istio.io/v1alpha1
kind: Telemetry
metadata:
name: status-code-metrics
spec:
metrics:
– providers:
– name: prometheus
overrides:
# 请求计数,按状态码分类
– match:
metric: REQUEST_COUNT
mode: CLIENT_AND_SERVER
tagOverrides:
response_code:
value: "%RESPONSE_CODE%"

# 请求持续时间,按状态码分类
– match:
metric: REQUEST_DURATION
mode: CLIENT_AND_SERVER
tagOverrides:
response_code:
value: "%RESPONSE_CODE%"

# 请求大小,按状态码分类
– match:
metric: REQUEST_SIZE
mode: CLIENT_AND_SERVER
tagOverrides:
response_code:
value: "%RESPONSE_CODE%"

# 响应大小,按状态码分类
– match:
metric: RESPONSE_SIZE
mode: CLIENT_AND_SERVER
tagOverrides:
response_code:
value: "%RESPONSE_CODE%"

# 自定义指标
customMetrics:
– name: error_rate_by_service
dimensions:
destination_service: "string(destination.service)"
response_code: "string(response.code)"
source_service: "string(source.service)"
value: "double(1)"

– name: latency_by_status_code
dimensions:
destination_service: "string(destination.service)"
response_code: "string(response.code)"
percentile: "string(percentile)"
value: "double(response.duration)"

# 状态码告警规则
apiVersion: monitoring.coreos.com/v1
kind: PrometheusRule
metadata:
name: status-code-alerts
spec:
groups:
– name: status-code-monitoring
rules:
# 高错误率告警
– alert: High5xxErrorRate
expr: |
sum(rate(istio_requests_total{
response_code=~"5..",
reporter="destination"
}[5m])) by (destination_service)
/
sum(rate(istio_requests_total{
reporter="destination"
}[5m])) by (destination_service)
> 0.05
for: 5m
labels:
severity: critical
annotations:
summary: "High 5xx error rate for {{ $labels.destination_service }}"
description: "5xx error rate is {{ $value | humanizePercentage }} for service {{ $labels.destination_service }}"

# 4xx错误率增加
– alert: High4xxErrorRate
expr: |
sum(rate(istio_requests_total{
response_code=~"4..",
reporter="destination"
}[5m])) by (destination_service)
/
sum(rate(istio_requests_total{
reporter="destination"
}[5m])) by (destination_service)
> 0.10
for: 10m
labels:
severity: warning
annotations:
summary: "High 4xx error rate for {{ $labels.destination_service }}"
description: "4xx error rate is {{ $value | humanizePercentage }} for service {{ $labels.destination_service }}"

# 服务不可用
– alert: ServiceUnavailable
expr: |
sum(rate(istio_requests_total{
response_code=~"5..",
reporter="destination"
}[5m])) by (destination_service) == 0
and
sum(rate(istio_requests_total{
reporter="destination"
}[5m])) by (destination_service) == 0
for: 2m
labels:
severity: critical
annotations:
summary: "Service {{ $labels.destination_service }} is unavailable"
description: "Service {{ $labels.destination_service }} has not received any requests in the last 2 minutes"

# 慢响应告警
– alert: SlowResponses
expr: |
histogram_quantile(0.95,
sum(rate(istio_request_duration_milliseconds_bucket{
reporter="destination"
}[5m])) by (le, destination_service)
) > 1000
for: 5m
labels:
severity: warning
annotations:
summary: "Slow responses for {{ $labels.destination_service }}"
description: "95th percentile response time is {{ $value }}ms for service {{ $labels.destination_service }}"

33.7 错误恢复与降级策略

33.7.1 多级降级策略

python

# 多级降级策略实现
from enum import Enum
from typing import Dict, Any, Optional, Callable
import time
import logging
from dataclasses import dataclass
from functools import wraps

logger = logging.getLogger(__name__)

class DegradationLevel(Enum):
NORMAL = 1 # 正常模式
DEGRADED = 2 # 降级模式
LIMITED = 3 # 限制模式
MAINTENANCE = 4 # 维护模式
FAILSAFE = 5 # 安全模式

@dataclass
class ServiceHealth:
name: str
status_code: int
response_time: float
error_rate: float
last_check: float
degradation_level: DegradationLevel = DegradationLevel.NORMAL

class MultiLevelDegradation:
def __init__(self):
self.service_health: Dict[str, ServiceHealth] = {}
self.degradation_strategies: Dict[DegradationLevel, Callable] = {}
self._setup_strategies()

def _setup_strategies(self):
"""设置各级降级策略"""
self.degradation_strategies = {
DegradationLevel.NORMAL: self._normal_strategy,
DegradationLevel.DEGRADED: self._degraded_strategy,
DegradationLevel.LIMITED: self._limited_strategy,
DegradationLevel.MAINTENANCE: self._maintenance_strategy,
DegradationLevel.FAILSAFE: self._failsafe_strategy,
}

def assess_health(self, service_name: str,
status_code: int,
response_time: float) -> DegradationLevel:
"""评估服务健康状况并确定降级级别"""
if service_name not in self.service_health:
self.service_health[service_name] = ServiceHealth(
name=service_name,
status_code=status_code,
response_time=response_time,
error_rate=0.0,
last_check=time.time()
)

health = self.service_health[service_name]

# 更新健康指标
is_error = status_code >= 400
error_count = 1 if is_error else 0
total_count = 1

# 计算滑动窗口错误率(简化实现)
window_size = 100
health.error_rate = (
health.error_rate * (window_size – 1) + error_count
) / window_size

health.status_code = status_code
health.response_time = response_time
health.last_check = time.time()

# 确定降级级别
if status_code == 503:
return DegradationLevel.MAINTENANCE

elif status_code >= 500:
if health.error_rate > 0.5:
return DegradationLevel.FAILSAFE
elif health.error_rate > 0.2:
return DegradationLevel.LIMITED
else:
return DegradationLevel.DEGRADED

elif status_code == 429: # Rate limited
return DegradationLevel.LIMITED

elif status_code >= 400:
if health.error_rate > 0.3:
return DegradationLevel.DEGRADED
else:
return DegradationLevel.NORMAL

elif response_time > 5.0: # 5秒超时
return DegradationLevel.DEGRADED

elif response_time > 2.0: # 2秒延迟
if health.error_rate > 0.1:
return DegradationLevel.DEGRADED
else:
return DegradationLevel.NORMAL

else:
return DegradationLevel.NORMAL

def apply_strategy(self, service_name: str,
original_call: Callable,
*args, **kwargs) -> Any:
"""应用降级策略执行调用"""
health = self.service_health.get(service_name)

if not health:
# 首次调用,正常执行
return self._execute_with_monitoring(service_name, original_call, *args, **kwargs)

# 获取降级策略
strategy = self.degradation_strategies.get(
health.degradation_level,
self._normal_strategy
)

return strategy(service_name, original_call, *args, **kwargs)

def _execute_with_monitoring(self, service_name: str,
original_call: Callable,
*args, **kwargs) -> Any:
"""执行调用并监控结果"""
start_time = time.time()

try:
result = original_call(*args, **kwargs)
response_time = time.time() – start_time

# 假设result有status_code属性
status_code = getattr(result, 'status_code', 200)

# 评估健康状况
degradation_level = self.assess_health(
service_name, status_code, response_time
)

# 更新降级级别
if service_name in self.service_health:
self.service_health[service_name].degradation_level = degradation_level

return result

except Exception as e:
response_time = time.time() – start_time

# 根据异常类型确定状态码
status_code = self._exception_to_status_code(e)

# 评估健康状况
degradation_level = self.assess_health(
service_name, status_code, response_time
)

# 更新降级级别
if service_name in self.service_health:
self.service_health[service_name].degradation_level = degradation_level

raise

def _exception_to_status_code(self, e: Exception) -> int:
"""将异常转换为状态码"""
if hasattr(e, 'status_code'):
return e.status_code
elif isinstance(e, TimeoutError):
return 504
elif isinstance(e, ConnectionError):
return 503
else:
return 500

# 各级策略实现
def _normal_strategy(self, service_name: str,
original_call: Callable,
*args, **kwargs) -> Any:
"""正常策略:完整功能"""
logger.debug(f"Normal strategy for {service_name}")
return original_call(*args, **kwargs)

def _degraded_strategy(self, service_name: str,
original_call: Callable,
*args, **kwargs) -> Any:
"""降级策略:基本功能,重试机制"""
logger.warning(f"Degraded strategy for {service_name}")

# 实现重试机制
max_retries = 2
for attempt in range(max_retries + 1):
try:
return original_call(*args, **kwargs)
except Exception as e:
if attempt == max_retries:
raise

# 指数退避
delay = 2 ** attempt
time.sleep(delay)

logger.info(f"Retry {attempt + 1} for {service_name} after {delay}s")

def _limited_strategy(self, service_name: str,
original_call: Callable,
*args, **kwargs) -> Any:
"""限制策略:简化功能,使用缓存"""
logger.warning(f"Limited strategy for {service_name}")

# 尝试使用缓存
cache_key = f"{service_name}:{str(args)}:{str(kwargs)}"
cached_result = self._get_from_cache(cache_key)

if cached_result:
logger.info(f"Using cached result for {service_name}")
return cached_result

# 有限重试
try:
result = original_call(*args, **kwargs)
self._save_to_cache(cache_key, result)
return result
except Exception as e:
# 返回降级结果
return self._get_fallback_result(service_name, *args, **kwargs)

def _maintenance_strategy(self, service_name: str,
original_call: Callable,
*args, **kwargs) -> Any:
"""维护策略:返回维护信息,不尝试调用"""
logger.error(f"Maintenance strategy for {service_name}")

# 直接返回维护信息
return {
"status": "maintenance",
"service": service_name,
"message": "Service is under maintenance",
"timestamp": time.time()
}

def _failsafe_strategy(self, service_name: str,
original_call: Callable,
*args, **kwargs) -> Any:
"""安全模式:返回默认值,保护系统"""
logger.critical(f"Failsafe strategy for {service_name}")

# 返回安全默认值
return self._get_failsafe_result(service_name, *args, **kwargs)

def _get_from_cache(self, key: str) -> Optional[Any]:
"""从缓存获取结果(简化实现)"""
# 实际实现应该使用Redis、Memcached等
return None

def _save_to_cache(self, key: str, value: Any) -> None:
"""保存结果到缓存(简化实现)"""
pass

def _get_fallback_result(self, service_name: str,
*args, **kwargs) -> Any:
"""获取降级结果"""
# 根据服务类型返回不同的降级结果
if "user" in service_name:
return {
"status": "degraded",
"user": {"id": "unknown", "name": "Guest"},
"message": "User service is degraded"
}
elif "order" in service_name:
return {
"status": "degraded",
"order": {"status": "processing"},
"message": "Order service is degraded"
}
else:
return {
"status": "degraded",
"message": f"Service {service_name} is degraded"
}

def _get_failsafe_result(self, service_name: str,
*args, **kwargs) -> Any:
"""获取安全模式结果"""
return {
"status": "failsafe",
"service": service_name,
"message": "Service is in failsafe mode",
"timestamp": time.time()
}

# 使用装饰器实现
def degradation_aware(service_name: str):
"""降级感知装饰器"""
degradation_manager = MultiLevelDegradation()

def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
return degradation_manager.apply_strategy(
service_name, func, *args, **kwargs
)
return wrapper
return decorator

# 使用示例
class UserService:
@degradation_aware("user-service")
def get_user_profile(self, user_id: str):
# 调用实际的用户服务
response = self._call_user_service(f"/users/{user_id}")
return response

def _call_user_service(self, endpoint: str):
# 模拟HTTP调用
# 实际实现应该使用HTTP客户端
pass

class OrderService:
@degradation_aware("order-service")
def create_order(self, order_data: Dict) -> Dict:
# 调用订单服务
response = self._call_order_service("/orders", order_data)
return response

@degradation_aware("inventory-service")
def check_inventory(self, product_id: str) -> Dict:
# 调用库存服务
response = self._call_inventory_service(f"/inventory/{product_id}")
return response

def _call_order_service(self, endpoint: str, data: Dict):
pass

def _call_inventory_service(self, endpoint: str):
pass

33.8 监控与告警

33.8.1 Prometheus状态码监控

yaml

# prometheus-status-code-rules.yaml
global:
scrape_interval: 15s
evaluation_interval: 15s

scrape_configs:
– job_name: 'api-gateway'
static_configs:
– targets: ['api-gateway:9090']
metrics_path: '/metrics'

– job_name: 'order-service'
static_configs:
– targets: ['order-service:8080']
metrics_path: '/actuator/prometheus'

– job_name: 'user-service'
static_configs:
– targets: ['user-service:8080']
metrics_path: '/actuator/prometheus'

– job_name: 'payment-service'
static_configs:
– targets: ['payment-service:8080']
metrics_path: '/metrics'

# 记录规则
rule_files:
– "status-code-rules.yml"

# 告警规则
alerting:
alertmanagers:
– static_configs:
– targets: ['alertmanager:9093']

# 状态码规则文件(status-code-rules.yml)
groups:
– name: status_code_recording_rules
interval: 30s
rules:
# 按服务统计状态码
– record: http_requests_total:rate5m
expr: |
sum(rate(http_requests_total[5m])) by (service, status_code)

# 错误率
– record: http_error_rate:rate5m
expr: |
sum(rate(http_requests_total{status_code=~"4..|5.."}[5m])) by (service)
/
sum(rate(http_requests_total[5m])) by (service)

# 5xx错误率
– record: http_5xx_error_rate:rate5m
expr: |
sum(rate(http_requests_total{status_code=~"5.."}[5m])) by (service)
/
sum(rate(http_requests_total[5m])) by (service)

# 4xx错误率
– record: http_4xx_error_rate:rate5m
expr: |
sum(rate(http_requests_total{status_code=~"4.."}[5m])) by (service)
/
sum(rate(http_requests_total[5m])) by (service)

# 按状态码分类的响应时间百分位
– record: http_response_time_percentile:status
expr: |
histogram_quantile(0.95,
sum(rate(http_request_duration_seconds_bucket[5m])) by (le, service, status_code)
)

# 服务依赖健康度
– record: service_dependency_health
expr: |
(
sum(rate(http_requests_total{status_code=~"2.."}[5m])) by (caller, target)
/
sum(rate(http_requests_total[5m])) by (caller, target)
) * 100

# 错误传播链
– record: error_propagation_chain
expr: |
count by (error_chain) (
http_requests_total{status_code=~"4..|5.."}
)

– name: status_code_alerts
rules:
# 全局错误率告警
– alert: GlobalHighErrorRate
expr: |
sum(rate(http_requests_total{status_code=~"4..|5.."}[5m]))
/
sum(rate(http_requests_total[5m]))
> 0.05
for: 5m
labels:
severity: critical
team: platform
annotations:
summary: "Global error rate is high"
description: "Global error rate is {{ $value | humanizePercentage }}"

# 服务级错误率告警
– alert: ServiceHighErrorRate
expr: |
http_error_rate:rate5m > 0.10
for: 5m
labels:
severity: warning
annotations:
summary: "High error rate for service {{ $labels.service }}"
description: "Error rate is {{ $value | humanizePercentage }} for service {{ $labels.service }}"

# 5xx错误率告警
– alert: ServiceHigh5xxErrorRate
expr: |
http_5xx_error_rate:rate5m > 0.05
for: 3m
labels:
severity: critical
annotations:
summary: "High 5xx error rate for service {{ $labels.service }}"
description: "5xx error rate is {{ $value | humanizePercentage }} for service {{ $labels.service }}"

# 服务完全不可用
– alert: ServiceCompletelyUnavailable
expr: |
up{job=~".*"} == 0
for: 2m
labels:
severity: critical
annotations:
summary: "Service {{ $labels.job }} is completely unavailable"
description: "Service {{ $labels.job }} has been down for more than 2 minutes"

# 错误传播链检测
– alert: ErrorPropagationDetected
expr: |
increase(error_propagation_chain[5m]) > 10
for: 2m
labels:
severity: warning
annotations:
summary: "Error propagation detected in the system"
description: "Error chain {{ $labels.error_chain }} has propagated {{ $value }} times in the last 5 minutes"

# 慢响应告警
– alert: SlowServiceResponses
expr: |
http_response_time_percentile:status > 2
for: 5m
labels:
severity: warning
annotations:
summary: "Slow responses for service {{ $labels.service }}"
description: "95th percentile response time is {{ $value }}s for status code {{ $labels.status_code }} in service {{ $labels.service }}"

# 依赖服务健康度下降
– alert: ServiceDependencyDegraded
expr: |
service_dependency_health < 95
for: 10m
labels:
severity: warning
annotations:
summary: "Service dependency health is degraded"
description: "Dependency from {{ $labels.caller }} to {{ $labels.target }} has health of {{ $value }}%"

33.9 总结

33.9.1 微服务状态码传播最佳实践

  • 透明传播与封装平衡

    • 内部错误细节不应该直接暴露给客户端

    • 但需要足够的上下文进行故障诊断

    • 使用标准化的错误格式(如RFC 7807)

  • 分布式追踪集成

    • 所有服务调用都应该有唯一的追踪ID

    • 状态码应该作为span标签记录

    • 错误传播链应该在追踪中可视化

  • 智能断路器策略

    • 基于状态码的失败检测

    • 不同状态码应该有不同的重试策略

    • 断路器状态应该被监控和告警

  • 多级降级策略

    • 根据错误类型和频率实施不同级别的降级

    • 降级策略应该是可配置的

    • 降级状态应该被监控和报告

  • 统一的监控告警

    • 所有服务应该暴露标准化的指标

    • 状态码分布应该被实时监控

    • 基于状态码的告警应该及时准确

  • 33.9.2 架构模式总结

    通过本章的学习,我们深入了解了微服务架构中状态码传播的复杂性和重要性。在分布式系统中,状态码不仅仅是单个服务的响应,而是整个调用链健康状况的体现。合理的状态码传播策略、智能的断路器模式、完善的监控告警系统,都是构建可靠微服务架构的关键组成部分。


    第33章要点总结:

  • 微服务状态码传播需要考虑调用链上下文

  • 分布式追踪是理解错误传播的关键工具

  • 断路器应该基于状态码智能决策

  • API网关在状态码转换中起重要作用

  • 多级降级策略可以提高系统韧性

  • 统一的监控告警体系是运维的基础

  • 第34章:状态码与错误处理策略

    34.1 错误处理哲学与原则

    34.1.1 错误处理的三种范式

    在软件工程中,错误处理有三种主要范式,每种范式对状态码的使用有着不同的理解:

    python

    # 错误处理的三种范式示例
    from typing import Union, Optional
    from dataclasses import dataclass

    # 1. 返回码范式(C语言风格)
    class ReturnCodeParadigm:
    """通过返回值表示成功或失败"""
    def divide(self, a: float, b: float) -> Union[float, int]:
    """返回错误码而不是抛出异常"""
    if b == 0:
    return -1 # 错误码
    return a / b

    def process(self):
    result = self.divide(10, 0)
    if result == -1:
    print("Division by zero error")
    else:
    print(f"Result: {result}")

    # 2. 异常范式(Java/Python风格)
    class ExceptionParadigm:
    """通过抛出异常表示错误"""
    class DivisionByZeroError(Exception):
    pass

    def divide(self, a: float, b: float) -> float:
    """抛出异常而不是返回错误码"""
    if b == 0:
    raise self.DivisionByZeroError("Cannot divide by zero")
    return a / b

    def process(self):
    try:
    result = self.divide(10, 0)
    print(f"Result: {result}")
    except self.DivisionByZeroError as e:
    print(f"Error: {e}")

    # 3. 结果类型范式(函数式风格)
    @dataclass
    class Result[T, E]:
    """包含成功值或错误值的容器类型"""
    value: Optional[T] = None
    error: Optional[E] = None
    is_success: bool = True

    @classmethod
    def success(cls, value: T) -> 'Result[T, E]':
    return cls(value=value, is_success=True)

    @classmethod
    def failure(cls, error: E) -> 'Result[T, E]':
    return cls(error=error, is_success=False)

    def unwrap(self) -> T:
    if not self.is_success:
    raise ValueError(f"Result contains error: {self.error}")
    return self.value

    class ResultParadigm:
    """使用Result类型包装可能失败的操作"""
    def divide(self, a: float, b: float) -> Result[float, str]:
    """返回包含结果或错误的Result对象"""
    if b == 0:
    return Result.failure("Division by zero")
    return Result.success(a / b)

    def process(self):
    result = self.divide(10, 0)
    if result.is_success:
    print(f"Result: {result.unwrap()}")
    else:
    print(f"Error: {result.error}")

    34.1.2 错误处理核心原则

    java

    // 错误处理的核心原则实现
    public class ErrorHandlingPrinciples {

    // 原则1: 快速失败(Fail Fast)
    public static class FailFastPrinciple {
    public User validateAndCreate(String email, String password) {
    // 尽早验证,尽早失败
    if (email == null || email.isEmpty()) {
    throw new ValidationException("Email is required");
    }

    if (!isValidEmail(email)) {
    throw new ValidationException("Invalid email format");
    }

    if (password == null || password.length() < 8) {
    throw new ValidationException("Password must be at least 8 characters");
    }

    // 所有验证通过后才执行业务逻辑
    return new User(email, password);
    }
    }

    // 原则2: 明确错误类型(Be Specific)
    public static class SpecificErrorPrinciple {
    public enum ErrorType {
    USER_NOT_FOUND,
    INSUFFICIENT_PERMISSIONS,
    INVALID_INPUT,
    NETWORK_ERROR,
    DATABASE_ERROR
    }

    @Data
    public static class SpecificError {
    private final ErrorType type;
    private final String message;
    private final Map<String, Object> context;
    private final Instant timestamp;

    public SpecificError(ErrorType type, String message,
    Map<String, Object> context) {
    this.type = type;
    this.message = message;
    this.context = context;
    this.timestamp = Instant.now();
    }

    public HttpStatus toHttpStatus() {
    switch (type) {
    case USER_NOT_FOUND:
    return HttpStatus.NOT_FOUND;
    case INSUFFICIENT_PERMISSIONS:
    return HttpStatus.FORBIDDEN;
    case INVALID_INPUT:
    return HttpStatus.BAD_REQUEST;
    case NETWORK_ERROR:
    case DATABASE_ERROR:
    return HttpStatus.INTERNAL_SERVER_ERROR;
    default:
    return HttpStatus.INTERNAL_SERVER_ERROR;
    }
    }
    }
    }

    // 原则3: 可恢复性设计(Design for Recovery)
    public static class RecoveryDesignPrinciple {
    private final RetryTemplate retryTemplate;
    private final CircuitBreaker circuitBreaker;

    public RecoveryDesignPrinciple() {
    this.retryTemplate = new RetryTemplate();
    this.circuitBreaker = new CircuitBreaker();
    }

    public Result<Order> placeOrderWithRecovery(OrderRequest request) {
    return circuitBreaker.executeSupplier(() -> {
    return retryTemplate.execute(context -> {
    try {
    return Result.success(placeOrder(request));
    } catch (TemporaryFailureException e) {
    // 记录重试
    context.setAttribute("retry_count",
    context.getRetryCount() + 1);
    throw e;
    }
    });
    });
    }

    private Order placeOrder(OrderRequest request) {
    // 模拟可能失败的操作
    return new Order();
    }
    }

    // 原则4: 错误隔离(Error Isolation)
    public static class ErrorIsolationPrinciple {
    private final ExecutorService isolatedExecutor;

    public ErrorIsolationPrinciple() {
    // 创建独立的线程池来隔离可能失败的操作
    this.isolatedExecutor = Executors.newFixedThreadPool(3,
    new ThreadFactoryBuilder()
    .setNameFormat("isolated-task-%d")
    .setUncaughtExceptionHandler((t, e) -> {
    // 隔离的异常处理,不会影响主线程
    logIsolatedError(t.getName(), e);
    })
    .build());
    }

    public CompletableFuture<Void> executeIsolated(Runnable task) {
    return CompletableFuture.runAsync(task, isolatedExecutor)
    .exceptionally(throwable -> {
    // 处理异常,但不会传播到调用者
    logIsolatedError("isolated-task", throwable);
    return null;
    });
    }

    private void logIsolatedError(String taskName, Throwable throwable) {
    System.err.printf("Isolated error in %s: %s%n",
    taskName, throwable.getMessage());
    }
    }

    // 原则5: 优雅降级(Graceful Degradation)
    public static class GracefulDegradationPrinciple {
    private final CacheService cacheService;
    private final FallbackService fallbackService;

    public GracefulDegradationPrinciple() {
    this.cacheService = new CacheService();
    this.fallbackService = new FallbackService();
    }

    public Product getProduct(String productId) {
    try {
    // 1. 首先尝试主服务
    return productService.getProduct(productId);
    } catch (ServiceUnavailableException e) {
    // 2. 主服务失败,尝试缓存
    Product cached = cacheService.getProduct(productId);
    if (cached != null) {
    return cached;
    }

    // 3. 缓存也没有,使用降级服务
    return fallbackService.getBasicProductInfo(productId);
    }
    }
    }
    }

    34.2 状态码驱动的错误分类

    34.2.1 错误分类体系

    typescript

    // 基于状态码的错误分类体系
    type ErrorCategory =
    | 'CLIENT_ERROR' // 4xx 错误
    | 'SERVER_ERROR' // 5xx 错误
    | 'NETWORK_ERROR' // 网络相关错误
    | 'VALIDATION_ERROR' // 验证错误
    | 'BUSINESS_ERROR' // 业务逻辑错误
    | 'SECURITY_ERROR' // 安全相关错误
    | 'INTEGRATION_ERROR' // 集成错误
    | 'TIMEOUT_ERROR'; // 超时错误

    interface ErrorClassification {
    category: ErrorCategory;
    severity: 'LOW' | 'MEDIUM' | 'HIGH' | 'CRITICAL';
    retryable: boolean;
    userFacing: boolean;
    logLevel: 'DEBUG' | 'INFO' | 'WARN' | 'ERROR' | 'FATAL';
    suggestedAction: string;
    }

    class StatusCodeClassifier {
    private static readonly CLASSIFICATION_MAP: Map<number, ErrorClassification> = new Map([
    // 4xx 错误分类
    [400, {
    category: 'CLIENT_ERROR',
    severity: 'MEDIUM',
    retryable: false,
    userFacing: true,
    logLevel: 'WARN',
    suggestedAction: 'Fix request parameters and retry'
    }],
    [401, {
    category: 'SECURITY_ERROR',
    severity: 'MEDIUM',
    retryable: true,
    userFacing: true,
    logLevel: 'WARN',
    suggestedAction: 'Authenticate and retry with valid credentials'
    }],
    [403, {
    category: 'SECURITY_ERROR',
    severity: 'MEDIUM',
    retryable: false,
    userFacing: true,
    logLevel: 'WARN',
    suggestedAction: 'Request appropriate permissions'
    }],
    [404, {
    category: 'CLIENT_ERROR',
    severity: 'LOW',
    retryable: false,
    userFacing: true,
    logLevel: 'DEBUG',
    suggestedAction: 'Check resource identifier and retry'
    }],
    [409, {
    category: 'BUSINESS_ERROR',
    severity: 'MEDIUM',
    retryable: true,
    userFacing: true,
    logLevel: 'WARN',
    suggestedAction: 'Resolve conflict and retry'
    }],
    [422, {
    category: 'VALIDATION_ERROR',
    severity: 'MEDIUM',
    retryable: false,
    userFacing: true,
    logLevel: 'WARN',
    suggestedAction: 'Fix validation errors and retry'
    }],
    [429, {
    category: 'CLIENT_ERROR',
    severity: 'MEDIUM',
    retryable: true,
    userFacing: true,
    logLevel: 'WARN',
    suggestedAction: 'Wait and retry after rate limit resets'
    }],

    // 5xx 错误分类
    [500, {
    category: 'SERVER_ERROR',
    severity: 'HIGH',
    retryable: true,
    userFacing: false,
    logLevel: 'ERROR',
    suggestedAction: 'Retry after some time or contact support'
    }],
    [502, {
    category: 'INTEGRATION_ERROR',
    severity: 'HIGH',
    retryable: true,
    userFacing: false,
    logLevel: 'ERROR',
    suggestedAction: 'Retry after upstream service recovers'
    }],
    [503, {
    category: 'SERVER_ERROR',
    severity: 'HIGH',
    retryable: true,
    userFacing: true,
    logLevel: 'ERROR',
    suggestedAction: 'Retry after service maintenance completes'
    }],
    [504, {
    category: 'TIMEOUT_ERROR',
    severity: 'MEDIUM',
    retryable: true,
    userFacing: false,
    logLevel: 'WARN',
    suggestedAction: 'Retry with longer timeout or check network'
    }]
    ]);

    static classify(statusCode: number): ErrorClassification {
    const classification = this.CLASSIFICATION_MAP.get(statusCode);

    if (!classification) {
    // 默认分类
    if (statusCode >= 400 && statusCode < 500) {
    return {
    category: 'CLIENT_ERROR',
    severity: 'MEDIUM',
    retryable: false,
    userFacing: true,
    logLevel: 'WARN',
    suggestedAction: 'Review request and retry'
    };
    } else if (statusCode >= 500) {
    return {
    category: 'SERVER_ERROR',
    severity: 'HIGH',
    retryable: true,
    userFacing: false,
    logLevel: 'ERROR',
    suggestedAction: 'Retry after some time'
    };
    }

    throw new Error(`Unsupported status code: ${statusCode}`);
    }

    return classification;
    }

    static getErrorResponse(statusCode: number, errorDetails?: any): ErrorResponse {
    const classification = this.classify(statusCode);

    return {
    statusCode,
    error: {
    code: `HTTP_${statusCode}`,
    message: this.getDefaultMessage(statusCode),
    category: classification.category,
    severity: classification.severity,
    retryable: classification.retryable,
    timestamp: new Date().toISOString(),
    details: errorDetails,
    suggestedAction: classification.suggestedAction
    }
    };
    }

    private static getDefaultMessage(statusCode: number): string {
    const messages: Record<number, string> = {
    400: 'Bad Request',
    401: 'Unauthorized',
    403: 'Forbidden',
    404: 'Not Found',
    409: 'Conflict',
    422: 'Unprocessable Entity',
    429: 'Too Many Requests',
    500: 'Internal Server Error',
    502: 'Bad Gateway',
    503: 'Service Unavailable',
    504: 'Gateway Timeout'
    };

    return messages[statusCode] || 'Unknown Error';
    }

    static shouldRetry(statusCode: number, retryCount: number = 0): boolean {
    const classification = this.classify(statusCode);

    if (!classification.retryable) {
    return false;
    }

    // 检查重试次数限制
    const maxRetries = this.getMaxRetries(statusCode);
    return retryCount < maxRetries;
    }

    private static getMaxRetries(statusCode: number): number {
    const retryConfig: Record<number, number> = {
    401: 1, // 认证错误,重试1次
    429: 3, // 限流错误,最多重试3次
    500: 3, // 服务器错误,最多重试3次
    502: 3, // 网关错误,最多重试3次
    503: 5, // 服务不可用,最多重试5次
    504: 2 // 超时错误,最多重试2次
    };

    return retryConfig[statusCode] || 0;
    }

    static getRetryDelay(statusCode: number, retryCount: number): number {
    // 指数退避算法
    const baseDelay = 1000; // 1秒
    const maxDelay = 30000; // 30秒

    if (statusCode === 429) {
    // 对于限流错误,使用更保守的退避
    return Math.min(baseDelay * Math.pow(1.5, retryCount), maxDelay);
    }

    // 对于其他可重试错误,使用指数退避
    return Math.min(baseDelay * Math.pow(2, retryCount), maxDelay);
    }
    }

    // 使用示例
    const response = await fetch('https://api.example.com/data');
    if (!response.ok) {
    const classification = StatusCodeClassifier.classify(response.status);

    console.log('Error Classification:', {
    category: classification.category,
    severity: classification.severity,
    shouldRetry: classification.retryable,
    suggestedAction: classification.suggestedAction
    });

    // 根据分类决定如何处理错误
    switch (classification.category) {
    case 'CLIENT_ERROR':
    // 客户端错误,通常不需要重试
    throw new UserFacingError(
    StatusCodeClassifier.getDefaultMessage(response.status)
    );

    case 'SERVER_ERROR':
    // 服务器错误,可以重试
    if (classification.retryable) {
    const delay = StatusCodeClassifier.getRetryDelay(
    response.status,
    0 // 第一次重试
    );
    await new Promise(resolve => setTimeout(resolve, delay));
    // 重试逻辑…
    }
    break;

    case 'SECURITY_ERROR':
    // 安全错误,可能需要重新认证
    if (response.status === 401) {
    await refreshAuthentication();
    // 重试请求…
    }
    break;
    }
    }

    34.2.2 分层错误处理架构

    java

    // Java分层错误处理架构
    public class LayeredErrorHandlingArchitecture {

    // 第一层:基础设施层错误
    public static class InfrastructureLayer {
    @Service
    public class DatabaseService {
    private final JdbcTemplate jdbcTemplate;
    private final CircuitBreaker circuitBreaker;

    public DatabaseService(JdbcTemplate jdbcTemplate) {
    this.jdbcTemplate = jdbcTemplate;
    this.circuitBreaker = CircuitBreaker.ofDefaults("database");
    }

    public User findUserById(String userId) {
    return circuitBreaker.executeSupplier(() -> {
    try {
    return jdbcTemplate.queryForObject(
    "SELECT * FROM users WHERE id = ?",
    new UserRowMapper(),
    userId
    );
    } catch (DataAccessException e) {
    // 基础设施错误转换为领域错误
    throw new DatabaseUnavailableException(
    "Database temporarily unavailable",
    e
    );
    }
    });
    }
    }

    @Service
    public class ExternalApiClient {
    private final RestTemplate restTemplate;
    private final RetryTemplate retryTemplate;

    public ExternalApiClient(RestTemplate restTemplate) {
    this.restTemplate = restTemplate;
    this.retryTemplate = new RetryTemplate();
    }

    public ApiResponse callExternalApi(String endpoint) {
    return retryTemplate.execute(context -> {
    try {
    ResponseEntity<ApiResponse> response = restTemplate
    .getForEntity(endpoint, ApiResponse.class);

    if (!response.getStatusCode().is2xxSuccessful()) {
    // 根据状态码决定是否重试
    if (shouldRetry(response.getStatusCode())) {
    throw new RetryableApiException(
    "API returned error: " + response.getStatusCode()
    );
    } else {
    throw new NonRetryableApiException(
    "API returned unrecoverable error"
    );
    }
    }

    return response.getBody();
    } catch (RestClientException e) {
    // 网络错误,可以重试
    throw new RetryableApiException(
    "Network error calling external API",
    e
    );
    }
    });
    }

    private boolean shouldRetry(HttpStatus status) {
    return status.is5xxServerError() ||
    status == HttpStatus.REQUEST_TIMEOUT ||
    status == HttpStatus.TOO_MANY_REQUESTS;
    }
    }
    }

    // 第二层:领域层错误
    public static class DomainLayer {
    public static class InsufficientFundsException
    extends BusinessException {
    public InsufficientFundsException(BigDecimal current,
    BigDecimal required) {
    super(
    "Insufficient funds",
    ErrorCode.INSUFFICIENT_FUNDS,
    Map.of(
    "current_balance", current,
    "required_amount", required
    )
    );
    }
    }

    public static class ProductOutOfStockException
    extends BusinessException {
    public ProductOutOfStockException(String productId,
    int available) {
    super(
    "Product is out of stock",
    ErrorCode.PRODUCT_OUT_OF_STOCK,
    Map.of(
    "product_id", productId,
    "available_quantity", available
    )
    );
    }
    }

    @Service
    public class OrderService {
    private final PaymentService paymentService;
    private final InventoryService inventoryService;

    public OrderService(PaymentService paymentService,
    InventoryService inventoryService) {
    this.paymentService = paymentService;
    this.inventoryService = inventoryService;
    }

    @Transactional
    public Order placeOrder(OrderRequest request) {
    // 验证库存
    if (!inventoryService.isInStock(request.getProductId(),
    request.getQuantity())) {
    throw new ProductOutOfStockException(
    request.getProductId(),
    inventoryService.getAvailableQuantity(
    request.getProductId()
    )
    );
    }

    // 处理支付
    try {
    PaymentResult result = paymentService.processPayment(
    request.getPaymentDetails(),
    request.getTotalAmount()
    );

    if (!result.isSuccess()) {
    throw new PaymentFailedException(
    "Payment processing failed",
    result.getErrorCode()
    );
    }

    // 扣减库存
    inventoryService.reduceStock(
    request.getProductId(),
    request.getQuantity()
    );

    // 创建订单
    return createOrder(request, result);

    } catch (PaymentProcessingException e) {
    // 支付处理异常转换为领域异常
    throw new PaymentFailedException(
    "Payment processing error",
    e.getErrorCode(),
    e
    );
    }
    }
    }
    }

    // 第三层:应用层错误
    public static class ApplicationLayer {
    @RestControllerAdvice
    public class GlobalExceptionHandler {

    // 处理领域层异常
    @ExceptionHandler(BusinessException.class)
    public ResponseEntity<ErrorResponse> handleBusinessException(
    BusinessException ex) {

    ErrorResponse error = ErrorResponse.builder()
    .code(ex.getErrorCode())
    .message(ex.getMessage())
    .details(ex.getContext())
    .timestamp(Instant.now())
    .build();

    // 根据错误类型决定HTTP状态码
    HttpStatus status = mapErrorCodeToHttpStatus(ex.getErrorCode());

    return ResponseEntity
    .status(status)
    .body(error);
    }

    // 处理基础设施层异常
    @ExceptionHandler(DatabaseUnavailableException.class)
    public ResponseEntity<ErrorResponse> handleDatabaseException(
    DatabaseUnavailableException ex) {

    ErrorResponse error = ErrorResponse.builder()
    .code(ErrorCode.DATABASE_UNAVAILABLE)
    .message("Database service is temporarily unavailable")
    .timestamp(Instant.now())
    .build();

    return ResponseEntity
    .status(HttpStatus.SERVICE_UNAVAILABLE)
    .header("Retry-After", "30")
    .body(error);
    }

    // 处理外部API异常
    @ExceptionHandler(ApiException.class)
    public ResponseEntity<ErrorResponse> handleApiException(
    ApiException ex) {

    ErrorResponse error = ErrorResponse.builder()
    .code(ErrorCode.EXTERNAL_SERVICE_ERROR)
    .message("External service error occurred")
    .timestamp(Instant.now())
    .build();

    return ResponseEntity
    .status(HttpStatus.BAD_GATEWAY)
    .body(error);
    }

    // 处理验证异常
    @ExceptionHandler(MethodArgumentNotValidException.class)
    public ResponseEntity<ErrorResponse> handleValidationException(
    MethodArgumentNotValidException ex) {

    List<ValidationError> validationErrors = ex.getBindingResult()
    .getFieldErrors()
    .stream()
    .map(fieldError -> ValidationError.builder()
    .field(fieldError.getField())
    .message(fieldError.getDefaultMessage())
    .rejectedValue(fieldError.getRejectedValue())
    .build())
    .collect(Collectors.toList());

    ErrorResponse error = ErrorResponse.builder()
    .code(ErrorCode.VALIDATION_ERROR)
    .message("Request validation failed")
    .details(Map.of("validation_errors", validationErrors))
    .timestamp(Instant.now())
    .build();

    return ResponseEntity
    .status(HttpStatus.UNPROCESSABLE_ENTITY)
    .body(error);
    }

    // 兜底异常处理
    @ExceptionHandler(Exception.class)
    public ResponseEntity<ErrorResponse> handleGenericException(
    Exception ex) {

    // 记录未知异常
    log.error("Unhandled exception occurred", ex);

    ErrorResponse error = ErrorResponse.builder()
    .code(ErrorCode.INTERNAL_ERROR)
    .message("An unexpected error occurred")
    .timestamp(Instant.now())
    .build();

    return ResponseEntity
    .status(HttpStatus.INTERNAL_SERVER_ERROR)
    .body(error);
    }

    private HttpStatus mapErrorCodeToHttpStatus(ErrorCode errorCode) {
    switch (errorCode) {
    case INSUFFICIENT_FUNDS:
    case PRODUCT_OUT_OF_STOCK:
    return HttpStatus.CONFLICT;
    case VALIDATION_ERROR:
    return HttpStatus.UNPROCESSABLE_ENTITY;
    case AUTHENTICATION_ERROR:
    return HttpStatus.UNAUTHORIZED;
    case AUTHORIZATION_ERROR:
    return HttpStatus.FORBIDDEN;
    case RESOURCE_NOT_FOUND:
    return HttpStatus.NOT_FOUND;
    default:
    return HttpStatus.BAD_REQUEST;
    }
    }
    }

    @Service
    public class ErrorMonitoringService {
    private final ErrorReportingClient errorReportingClient;
    private final MetricsService metricsService;

    public ErrorMonitoringService(ErrorReportingClient errorReportingClient,
    MetricsService metricsService) {
    this.errorReportingClient = errorReportingClient;
    this.metricsService = metricsService;
    }

    public void reportError(Throwable throwable, ErrorContext context) {
    // 1. 记录指标
    metricsService.incrementCounter("errors_total",
    "error_type", throwable.getClass().getSimpleName());

    // 2. 发送到错误报告系统
    ErrorReport report = ErrorReport.builder()
    .error(throwable)
    .context(context)
    .timestamp(Instant.now())
    .build();

    errorReportingClient.report(report);

    // 3. 根据错误类型决定是否需要告警
    if (isCriticalError(throwable)) {
    sendAlert(report);
    }
    }

    private boolean isCriticalError(Throwable throwable) {
    return throwable instanceof OutOfMemoryError ||
    throwable instanceof DatabaseUnavailableException ||
    throwable instanceof CircuitBreakerOpenException;
    }

    private void sendAlert(ErrorReport report) {
    // 发送告警到监控系统
    Alert alert = Alert.builder()
    .severity(AlertSeverity.CRITICAL)
    .message("Critical error detected")
    .details(report)
    .timestamp(Instant.now())
    .build();

    // alertService.send(alert);
    }
    }
    }

    // 错误基类
    public static abstract class BusinessException extends RuntimeException {
    private final ErrorCode errorCode;
    private final Map<String, Object> context;

    public BusinessException(String message, ErrorCode errorCode,
    Map<String, Object> context) {
    super(message);
    this.errorCode = errorCode;
    this.context = context;
    }

    public BusinessException(String message, ErrorCode errorCode,
    Map<String, Object> context, Throwable cause) {
    super(message, cause);
    this.errorCode = errorCode;
    this.context = context;
    }

    public ErrorCode getErrorCode() { return errorCode; }
    public Map<String, Object> getContext() { return context; }
    }
    }

    34.3 智能重试策略

    34.3.1 自适应重试算法

    python

    # 自适应重试策略实现
    import time
    import math
    import random
    from dataclasses import dataclass
    from enum import Enum
    from typing import Optional, Callable, Any, Dict
    from datetime import datetime, timedelta

    class RetryStrategy(Enum):
    """重试策略枚举"""
    EXPONENTIAL_BACKOFF = "exponential_backoff"
    FIBONACCI_BACKOFF = "fibonacci_backoff"
    FIXED_DELAY = "fixed_delay"
    LINEAR_BACKOFF = "linear_backoff"
    RANDOM_JITTER = "random_jitter"

    class ErrorPattern(Enum):
    """错误模式枚举"""
    TRANSIENT = "transient" # 临时错误,可以重试
    PERMANENT = "permanent" # 永久错误,不应重试
    THROTTLING = "throttling" # 限流错误,需要等待
    NETWORK = "network" # 网络错误

    @dataclass
    class RetryContext:
    """重试上下文"""
    attempt_number: int
    max_attempts: int
    last_error: Optional[Exception]
    last_status_code: Optional[int]
    start_time: datetime
    total_duration: timedelta
    custom_data: Dict[str, Any]

    def should_continue(self) -> bool:
    """是否应该继续重试"""
    if self.attempt_number >= self.max_attempts:
    return False

    # 检查总持续时间限制
    if self.total_duration.total_seconds() > 300: # 5分钟
    return False

    return True

    class AdaptiveRetryStrategy:
    """自适应重试策略"""

    def __init__(
    self,
    max_attempts: int = 5,
    base_delay: float = 1.0, # 秒
    max_delay: float = 60.0, # 秒
    strategy: RetryStrategy = RetryStrategy.EXPONENTIAL_BACKOFF,
    jitter: bool = True,
    jitter_factor: float = 0.1
    ):
    self.max_attempts = max_attempts
    self.base_delay = base_delay
    self.max_delay = max_delay
    self.strategy = strategy
    self.jitter = jitter
    self.jitter_factor = jitter_factor

    # 错误模式检测器
    self.error_patterns = self._initialize_error_patterns()

    def _initialize_error_patterns(self) -> Dict[ErrorPattern, Callable[[Exception], bool]]:
    """初始化错误模式检测器"""
    return {
    ErrorPattern.TRANSIENT: self._is_transient_error,
    ErrorPattern.PERMANENT: self._is_permanent_error,
    ErrorPattern.THROTTLING: self._is_throttling_error,
    ErrorPattern.NETWORK: self._is_network_error
    }

    def execute(self, func: Callable[[], Any], context_data: Dict[str, Any] = None) -> Any:
    """执行带重试的函数"""
    context = RetryContext(
    attempt_number=0,
    max_attempts=self.max_attempts,
    last_error=None,
    last_status_code=None,
    start_time=datetime.now(),
    total_duration=timedelta(),
    custom_data=context_data or {}
    )

    while context.should_continue():
    try:
    result = func()

    # 成功,重置重试状态(如果需要的话)
    self._on_success(context)
    return result

    except Exception as e:
    context.last_error = e
    context.last_status_code = getattr(e, 'status_code', None)
    context.attempt_number += 1

    # 分析错误模式
    error_pattern = self._analyze_error(e, context)

    # 根据错误模式决定是否重试
    if not self._should_retry(error_pattern, context):
    raise e

    # 计算等待时间
    wait_time = self._calculate_wait_time(context, error_pattern)

    # 更新总持续时间
    context.total_duration += timedelta(seconds=wait_time)

    # 等待
    self._wait(wait_time, context)

    def _analyze_error(self, error: Exception, context: RetryContext) -> ErrorPattern:
    """分析错误模式"""
    # 检查错误类型
    for pattern, detector in self.error_patterns.items():
    if detector(error):
    return pattern

    # 检查状态码
    if context.last_status_code:
    if context.last_status_code == 429:
    return ErrorPattern.THROTTLING
    elif 500 <= context.last_status_code < 600:
    return ErrorPattern.TRANSIENT
    elif 400 <= context.last_status_code < 500:
    # 某些4xx错误可能是临时性的
    if context.last_status_code in [408, 423, 425, 429]:
    return ErrorPattern.TRANSIENT
    else:
    return ErrorPattern.PERMANENT

    # 默认认为是网络错误
    return ErrorPattern.NETWORK

    def _should_retry(self, error_pattern: ErrorPattern, context: RetryContext) -> bool:
    """根据错误模式决定是否重试"""
    if error_pattern == ErrorPattern.PERMANENT:
    return False

    # 检查重试次数
    if context.attempt_number >= self.max_attempts:
    return False

    # 检查总持续时间
    if context.total_duration.total_seconds() > 300: # 5分钟
    return False

    return True

    def _calculate_wait_time(self, context: RetryContext, error_pattern: ErrorPattern) -> float:
    """计算等待时间"""
    attempt = context.attempt_number

    # 基础延迟计算
    if error_pattern == ErrorPattern.THROTTLING:
    # 限流错误使用更长的等待时间
    base_wait = self._calculate_throttling_wait(context)
    else:
    # 根据策略计算基础等待时间
    base_wait = self._calculate_base_wait(attempt)

    # 添加抖动
    if self.jitter:
    jitter_amount = base_wait * self.jitter_factor
    base_wait += random.uniform(-jitter_amount, jitter_amount)

    # 确保在合理范围内
    return max(0, min(base_wait, self.max_delay))

    def _calculate_base_wait(self, attempt: int) -> float:
    """根据策略计算基础等待时间"""
    if self.strategy == RetryStrategy.EXPONENTIAL_BACKOFF:
    # 指数退避:1, 2, 4, 8, 16, …
    return self.base_delay * (2 ** (attempt – 1))

    elif self.strategy == RetryStrategy.FIBONACCI_BACKOFF:
    # 斐波那契退避:1, 1, 2, 3, 5, 8, …
    fib = self._fibonacci(attempt)
    return self.base_delay * fib

    elif self.strategy == RetryStrategy.FIXED_DELAY:
    # 固定延迟
    return self.base_delay

    elif self.strategy == RetryStrategy.LINEAR_BACKOFF:
    # 线性退避:1, 2, 3, 4, 5, …
    return self.base_delay * attempt

    else:
    # 默认使用指数退避
    return self.base_delay * (2 ** (attempt – 1))

    def _calculate_throttling_wait(self, context: RetryContext) -> float:
    """计算限流等待时间"""
    # 检查错误中是否包含Retry-After头
    if hasattr(context.last_error, 'headers'):
    headers = context.last_error.headers
    if 'Retry-After' in headers:
    retry_after = headers['Retry-After']
    try:
    # 可能是秒数或日期时间
    if retry_after.isdigit():
    return float(retry_after)
    else:
    # 解析日期时间
    retry_time = datetime.fromisoformat(retry_after)
    wait_seconds = (retry_time – datetime.now()).total_seconds()
    return max(0, wait_seconds)
    except (ValueError, TypeError):
    pass

    # 没有Retry-After头,使用指数退避加上额外延迟
    base_wait = self._calculate_base_wait(context.attempt_number)
    return base_wait * 2 # 限流错误等待时间加倍

    def _fibonacci(self, n: int) -> int:
    """计算斐波那契数"""
    if n <= 0:
    return 0
    elif n == 1:
    return 1
    else:
    a, b = 0, 1
    for _ in range(n):
    a, b = b, a + b
    return a

    def _is_transient_error(self, error: Exception) -> bool:
    """判断是否为临时错误"""
    transient_error_types = [
    'TimeoutError',
    'ConnectionError',
    'TemporaryFailure',
    'ServiceUnavailable'
    ]

    error_type = error.__class__.__name__
    return error_type in transient_error_types

    def _is_permanent_error(self, error: Exception) -> bool:
    """判断是否为永久错误"""
    permanent_error_types = [
    'ValidationError',
    'AuthenticationError',
    'AuthorizationError',
    'InvalidRequest'
    ]

    error_type = error.__class__.__name__
    return error_type in permanent_error_types

    def _is_throttling_error(self, error: Exception) -> bool:
    """判断是否为限流错误"""
    if hasattr(error, 'status_code') and error.status_code == 429:
    return True

    error_type = error.__class__.__name__
    return error_type in ['RateLimitError', 'ThrottlingError']

    def _is_network_error(self, error: Exception) -> bool:
    """判断是否为网络错误"""
    network_error_types = [
    'ConnectionError',
    'NetworkError',
    'SocketError',
    'HTTPError'
    ]

    error_type = error.__class__.__name__
    return error_type in network_error_types

    def _wait(self, seconds: float, context: RetryContext):
    """等待指定的时间"""
    # 记录等待开始时间
    wait_start = datetime.now()

    # 实际等待
    time.sleep(seconds)

    # 记录等待结束时间
    wait_end = datetime.now()

    # 可以在这里添加等待日志或监控
    self._log_wait(context, seconds, wait_start, wait_end)

    def _log_wait(self, context: RetryContext, wait_time: float,
    start_time: datetime, end_time: datetime):
    """记录等待信息"""
    log_data = {
    'attempt': context.attempt_number,
    'wait_time_seconds': wait_time,
    'start_time': start_time.isoformat(),
    'end_time': end_time.isoformat(),
    'error_type': context.last_error.__class__.__name__ if context.last_error else None,
    'status_code': context.last_status_code,
    'total_duration_seconds': context.total_duration.total_seconds()
    }

    print(f"Retry wait: {log_data}")

    def _on_success(self, context: RetryContext):
    """成功时的回调"""
    # 可以在这里添加成功日志、重置计数器等
    success_data = {
    'attempts_made': context.attempt_number,
    'total_duration_seconds': context.total_duration.total_seconds(),
    'success_time': datetime.now().isoformat()
    }

    print(f"Operation succeeded: {success_data}")

    # 使用示例
    def test_retry_strategy():
    # 创建自适应重试策略
    retry_strategy = AdaptiveRetryStrategy(
    max_attempts=5,
    base_delay=1.0,
    max_delay=30.0,
    strategy=RetryStrategy.EXPONENTIAL_BACKOFF,
    jitter=True,
    jitter_factor=0.1
    )

    # 模拟一个可能失败的操作
    call_count = 0

    def unreliable_operation():
    nonlocal call_count
    call_count += 1

    if call_count < 3:
    # 模拟临时错误
    class MockError(Exception):
    status_code = 503

    raise MockError("Service temporarily unavailable")
    elif call_count == 3:
    # 模拟限流错误
    class ThrottlingError(Exception):
    status_code = 429
    headers = {'Retry-After': '5'}

    raise ThrottlingError("Too many requests")
    else:
    # 成功
    return {"data": "success"}

    try:
    # 执行带重试的操作
    result = retry_strategy.execute(unreliable_operation)
    print(f"最终结果: {result}")
    except Exception as e:
    print(f"操作最终失败: {e}")

    # 运行测试
    if __name__ == "__main__":
    test_retry_strategy()

    34.3.2 基于机器学习的智能重试

    python

    # 基于机器学习的智能重试系统
    import numpy as np
    import pandas as pd
    from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor
    from sklearn.preprocessing import LabelEncoder, StandardScaler
    from sklearn.model_selection import train_test_split
    import joblib
    from datetime import datetime
    from typing import Dict, List, Any, Optional, Tuple
    import hashlib
    import json

    class MLRetryOptimizer:
    """基于机器学习的重试优化器"""

    def __init__(self, model_path: str = None):
    # 特征工程
    self.feature_columns = [
    'hour_of_day', 'day_of_week', 'response_time_ms',
    'status_code', 'error_type', 'retry_count',
    'service_name', 'endpoint', 'client_ip_prefix',
    'request_size_kb', 'response_size_kb', 'concurrent_requests'
    ]

    # 目标变量
    self.target_column = 'retry_success_probability'

    # 编码器
    self.label_encoders = {}
    self.scaler = StandardScaler()

    # 模型
    self.classification_model = RandomForestClassifier(
    n_estimators=100,
    random_state=42
    )
    self.regression_model = RandomForestRegressor(
    n_estimators=100,
    random_state=42
    )

    # 数据存储
    self.training_data = []
    self.model_path = model_path

    # 加载现有模型
    if model_path:
    self.load_model(model_path)

    def extract_features(self, request_context: Dict[str, Any],
    error_context: Dict[str, Any]) -> Dict[str, Any]:
    """从请求和错误上下文中提取特征"""
    features = {}

    # 时间特征
    timestamp = request_context.get('timestamp', datetime.now())
    features['hour_of_day'] = timestamp.hour
    features['day_of_week'] = timestamp.weekday() # 0=Monday, 6=Sunday

    # 响应特征
    features['response_time_ms'] = error_context.get('response_time_ms', 0)
    features['status_code'] = error_context.get('status_code', 0)
    features['error_type'] = error_context.get('error_type', 'unknown')

    # 请求特征
    features['retry_count'] = error_context.get('retry_count', 0)
    features['service_name'] = request_context.get('service_name', 'unknown')
    features['endpoint'] = request_context.get('endpoint', 'unknown')

    # 客户端特征
    client_ip = request_context.get('client_ip', '0.0.0.0')
    features['client_ip_prefix'] = self._extract_ip_prefix(client_ip)

    # 请求大小
    features['request_size_kb'] = request_context.get('request_size_kb', 0)
    features['response_size_kb'] = error_context.get('response_size_kb', 0)

    # 系统负载特征
    features['concurrent_requests'] = request_context.get(
    'concurrent_requests', 0
    )

    return features

    def predict_retry_success(self, features: Dict[str, Any]) -> Dict[str, Any]:
    """预测重试成功的概率和最佳等待时间"""
    # 转换为DataFrame
    features_df = pd.DataFrame([features])

    # 编码分类特征
    encoded_features = self._encode_features(features_df)

    # 预测
    if hasattr(self, 'classification_model') and self.classification_model:
    # 分类:预测是否会成功
    success_probability = self.classification_model.predict_proba(
    encoded_features
    )[0][1] # 获取正类(成功)的概率

    # 回归:预测最佳等待时间
    if hasattr(self, 'regression_model') and self.regression_model:
    optimal_wait_time = self.regression_model.predict(
    encoded_features
    )[0]
    else:
    optimal_wait_time = self._calculate_default_wait_time(
    features['retry_count']
    )
    else:
    # 没有训练好的模型,使用启发式方法
    success_probability = self._heuristic_success_probability(features)
    optimal_wait_time = self._calculate_default_wait_time(
    features['retry_count']
    )

    return {
    'success_probability': float(success_probability),
    'optimal_wait_time_seconds': float(optimal_wait_time),
    'should_retry': success_probability > 0.5, # 阈值可以调整
    'confidence': min(abs(success_probability – 0.5) * 2, 1.0)
    }

    def record_retry_outcome(self, features: Dict[str, Any],
    outcome: Dict[str, Any]):
    """记录重试结果用于模型训练"""
    # 准备训练数据
    training_example = features.copy()
    training_example['retry_success'] = outcome.get('success', False)
    training_example['actual_wait_time'] = outcome.get('wait_time_seconds', 0)

    # 添加时间戳
    training_example['recorded_at'] = datetime.now().isoformat()

    # 存储
    self.training_data.append(training_example)

    # 定期训练模型
    if len(self.training_data) % 1000 == 0: # 每1000条记录训练一次
    self.train_models()

    def train_models(self):
    """训练机器学习模型"""
    if len(self.training_data) < 100:
    print("Not enough training data")
    return

    # 转换为DataFrame
    df = pd.DataFrame(self.training_data)

    # 准备特征
    X = df[self.feature_columns].copy()

    # 编码分类特征
    X_encoded = self._encode_features(X, fit=True)

    # 准备目标变量
    y_classification = df['retry_success'].astype(int)
    y_regression = df['actual_wait_time']

    # 划分训练集和测试集
    X_train, X_test, y_train_cls, y_test_cls, y_train_reg, y_test_reg = \\
    train_test_split(
    X_encoded, y_classification, y_regression,
    test_size=0.2, random_state=42
    )

    # 训练分类模型
    print("Training classification model…")
    self.classification_model.fit(X_train, y_train_cls)

    # 评估分类模型
    train_score = self.classification_model.score(X_train, y_train_cls)
    test_score = self.classification_model.score(X_test, y_test_cls)
    print(f"Classification model – Train score: {train_score:.3f}, "
    f"Test score: {test_score:.3f}")

    # 训练回归模型(只在成功的情况下)
    success_mask = y_classification == 1
    if success_mask.sum() > 10: # 需要有足够的成功样本
    X_success = X_encoded[success_mask]
    y_wait_success = y_regression[success_mask]

    print("Training regression model…")
    self.regression_model.fit(X_success, y_wait_success)

    # 评估回归模型
    y_pred = self.regression_model.predict(X_success)
    mse = np.mean((y_wait_success – y_pred) ** 2)
    print(f"Regression model – MSE: {mse:.3f}")

    # 保存模型
    if self.model_path:
    self.save_model(self.model_path)

    def _encode_features(self, df: pd.DataFrame, fit: bool = False) -> np.ndarray:
    """编码特征"""
    df_encoded = df.copy()

    # 对分类特征进行标签编码
    categorical_columns = ['error_type', 'service_name', 'endpoint',
    'client_ip_prefix']

    for col in categorical_columns:
    if col in df_encoded.columns:
    if fit:
    # 拟合编码器
    le = LabelEncoder()
    df_encoded[col] = le.fit_transform(df_encoded[col].fillna('unknown'))
    self.label_encoders[col] = le
    else:
    # 使用已有的编码器
    if col in self.label_encoders:
    le = self.label_encoders[col]
    # 处理未见过的标签
    unseen_mask = ~df_encoded[col].isin(le.classes_)
    df_encoded.loc[unseen_mask, col] = 'unknown'
    df_encoded[col] = le.transform(df_encoded[col].fillna('unknown'))

    # 标准化数值特征
    numerical_columns = ['hour_of_day', 'day_of_week', 'response_time_ms',
    'status_code', 'retry_count', 'request_size_kb',
    'response_size_kb', 'concurrent_requests']

    numerical_features = df_encoded[numerical_columns].values

    if fit:
    numerical_features = self.scaler.fit_transform(numerical_features)
    else:
    numerical_features = self.scaler.transform(numerical_features)

    # 合并特征
    categorical_features = df_encoded[categorical_columns].values
    features = np.hstack([numerical_features, categorical_features])

    return features

    def _extract_ip_prefix(self, ip: str) -> str:
    """提取IP地址前缀"""
    if '.' in ip: # IPv4
    parts = ip.split('.')
    return '.'.join(parts[:2]) # 取前两个八位组
    elif ':' in ip: # IPv6
    # 简化处理,取前4个字符
    return ip[:4]
    else:
    return 'unknown'

    def _calculate_default_wait_time(self, retry_count: int) -> float:
    """计算默认等待时间(指数退避)"""
    base_delay = 1.0
    max_delay = 60.0
    wait_time = min(base_delay * (2 ** retry_count), max_delay)
    return wait_time

    def _heuristic_success_probability(self, features: Dict[str, Any]) -> float:
    """启发式方法估计成功概率"""
    probability = 0.5 # 基础概率

    # 基于状态码调整
    status_code = features.get('status_code', 0)
    if 500 <= status_code < 600:
    probability -= 0.2 # 服务器错误,成功概率降低
    elif status_code == 429:
    probability -= 0.1 # 限流错误,成功概率降低
    elif 400 <= status_code < 500:
    probability -= 0.3 # 客户端错误,成功概率降低

    # 基于重试次数调整
    retry_count = features.get('retry_count', 0)
    probability -= retry_count * 0.1 # 重试次数越多,成功概率越低

    # 基于响应时间调整
    response_time = features.get('response_time_ms', 0)
    if response_time > 5000: # 5秒以上
    probability -= 0.15 # 响应慢,成功概率降低

    # 确保在0-1范围内
    return max(0.0, min(1.0, probability))

    def save_model(self, path: str):
    """保存模型到文件"""
    model_data = {
    'classification_model': self.classification_model,
    'regression_model': self.regression_model,
    'label_encoders': self.label_encoders,
    'scaler': self.scaler,
    'feature_columns': self.feature_columns
    }

    joblib.dump(model_data, path)
    print(f"Model saved to {path}")

    def load_model(self, path: str):
    """从文件加载模型"""
    try:
    model_data = joblib.load(path)
    self.classification_model = model_data['classification_model']
    self.regression_model = model_data['regression_model']
    self.label_encoders = model_data['label_encoders']
    self.scaler = model_data['scaler']
    self.feature_columns = model_data['feature_columns']
    print(f"Model loaded from {path}")
    except Exception as e:
    print(f"Failed to load model: {e}")

    # 使用示例
    class SmartRetryManager:
    """智能重试管理器"""

    def __init__(self, ml_optimizer: MLRetryOptimizer = None):
    self.ml_optimizer = ml_optimizer or MLRetryOptimizer()
    self.retry_history = {}

    async def execute_with_smart_retry(
    self,
    operation: callable,
    operation_id: str,
    context: Dict[str, Any],
    max_attempts: int = 5
    ) -> Any:
    """使用智能重试执行操作"""

    attempt = 0
    last_error = None

    while attempt < max_attempts:
    try:
    # 执行操作
    result = await operation()

    # 记录成功
    self._record_success(operation_id, attempt, context)
    return result

    except Exception as e:
    attempt += 1
    last_error = e

    # 分析错误
    error_analysis = self._analyze_error(e, attempt)

    # 提取特征
    features = self.ml_optimizer.extract_features(
    context, error_analysis
    )

    # 预测重试成功的概率
    if self.ml_optimizer:
    prediction = self.ml_optimizer.predict_retry_success(features)

    # 根据预测决定是否重试
    if not prediction['should_retry']:
    print(f"Predicted low success probability: "
    f"{prediction['success_probability']:.2f}")
    break

    # 使用预测的最佳等待时间
    wait_time = prediction['optimal_wait_time_seconds']
    else:
    # 使用默认的退避策略
    wait_time = self._calculate_backoff(attempt)

    # 等待
    print(f"Attempt {attempt} failed, waiting {wait_time:.1f}s before retry…")
    await asyncio.sleep(wait_time)

    # 所有重试都失败了
    self._record_failure(operation_id, attempt, context, last_error)
    raise last_error

    def _analyze_error(self, error: Exception, attempt: int) -> Dict[str, Any]:
    """分析错误"""
    analysis = {
    'error_type': error.__class__.__name__,
    'error_message': str(error),
    'retry_count': attempt,
    'timestamp': datetime.now().isoformat()
    }

    # 提取状态码(如果可用)
    if hasattr(error, 'status_code'):
    analysis['status_code'] = error.status_code

    # 提取响应时间(如果可用)
    if hasattr(error, 'response_time_ms'):
    analysis['response_time_ms'] = error.response_time_ms

    return analysis

    def _calculate_backoff(self, attempt: int) -> float:
    """计算退避时间"""
    base_delay = 1.0
    max_delay = 60.0
    return min(base_delay * (2 ** (attempt – 1)), max_delay)

    def _record_success(self, operation_id: str, attempt: int,
    context: Dict[str, Any]):
    """记录成功"""
    if self.ml_optimizer:
    # 为机器学习准备数据
    features = self.ml_optimizer.extract_features(context, {
    'retry_count': attempt
    })

    # 记录成功结果
    self.ml_optimizer.record_retry_outcome(features, {
    'success': True,
    'wait_time_seconds': 0
    })

    # 更新历史记录
    if operation_id not in self.retry_history:
    self.retry_history[operation_id] = []

    self.retry_history[operation_id].append({
    'timestamp': datetime.now().isoformat(),
    'attempt': attempt,
    'success': True
    })

    def _record_failure(self, operation_id: str, attempt: int,
    context: Dict[str, Any], error: Exception):
    """记录失败"""
    if self.ml_optimizer:
    # 为机器学习准备数据
    features = self.ml_optimizer.extract_features(context, {
    'retry_count': attempt,
    'error_type': error.__class__.__name__,
    'status_code': getattr(error, 'status_code', None)
    })

    # 记录失败结果
    self.ml_optimizer.record_retry_outcome(features, {
    'success': False,
    'wait_time_seconds': 0
    })

    # 更新历史记录
    if operation_id not in self.retry_history:
    self.retry_history[operation_id] = []

    self.retry_history[operation_id].append({
    'timestamp': datetime.now().isoformat(),
    'attempt': attempt,
    'success': False,
    'error': str(error)
    })

    # 异步使用示例
    import asyncio

    async def example_usage():
    # 创建智能重试管理器
    retry_manager = SmartRetryManager()

    # 模拟一个可能失败的操作
    class ExternalServiceError(Exception):
    def __init__(self, status_code, message):
    self.status_code = status_code
    self.message = message
    super().__init__(f"{status_code}: {message}")

    async def call_external_service():
    # 模拟外部服务调用
    import random

    # 随机失败
    if random.random() < 0.7: # 70%的概率失败
    raise ExternalServiceError(
    503,
    "Service temporarily unavailable"
    )

    return {"data": "success"}

    # 准备上下文
    context = {
    'service_name': 'external-api',
    'endpoint': '/api/data',
    'client_ip': '192.168.1.100',
    'timestamp': datetime.now(),
    'request_size_kb': 2.5,
    'concurrent_requests': 10
    }

    try:
    # 使用智能重试执行
    result = await retry_manager.execute_with_smart_retry(
    call_external_service,
    operation_id="external_api_call_001",
    context=context,
    max_attempts=5
    )

    print(f"操作成功: {result}")

    except Exception as e:
    print(f"操作最终失败: {e}")

    # 运行示例
    if __name__ == "__main__":
    asyncio.run(example_usage())

    34.4 错误监控与告警

    34.4.1 实时错误监控系统

    typescript

    // TypeScript实时错误监控系统
    import { EventEmitter } from 'events';
    import { createHash } from 'crypto';
    import WebSocket from 'ws';

    interface ErrorEvent {
    id: string;
    timestamp: Date;
    service: string;
    endpoint: string;
    statusCode: number;
    errorType: string;
    errorMessage: string;
    stackTrace?: string;
    context: Record<string, any>;
    severity: 'LOW' | 'MEDIUM' | 'HIGH' | 'CRITICAL';
    environment: 'development' | 'staging' | 'production';
    }

    interface ErrorAggregate {
    errorHash: string;
    firstOccurrence: Date;
    lastOccurrence: Date;
    count: number;
    affectedServices: Set<string>;
    affectedEndpoints: Set<string>;
    statusCodes: Set<number>;
    sampleError: ErrorEvent;
    }

    interface AlertRule {
    id: string;
    name: string;
    description: string;
    condition: (aggregate: ErrorAggregate) => boolean;
    severity: 'LOW' | 'MEDIUM' | 'HIGH' | 'CRITICAL';
    actions: string[];
    }

    class RealTimeErrorMonitor extends EventEmitter {
    private errors: Map<string, ErrorEvent[]> = new Map();
    private aggregates: Map<string, ErrorAggregate> = new Map();
    private alertRules: AlertRule[] = [];
    private webSocketServer: WebSocket.Server;
    private aggregationWindow: number = 60000; // 1分钟

    constructor(port: number = 8080) {
    super();

    // 初始化WebSocket服务器
    this.webSocketServer = new WebSocket.Server({ port });
    this.setupWebSocket();

    // 初始化默认告警规则
    this.initializeDefaultRules();

    // 定期清理旧数据
    setInterval(() => this.cleanupOldData(), 3600000); // 每小时清理一次
    }

    recordError(error: Omit<ErrorEvent, 'id' | 'timestamp'>): string {
    const errorEvent: ErrorEvent = {
    id: this.generateErrorId(),
    timestamp: new Date(),
    …error
    };

    // 存储原始错误
    const key = this.getStorageKey(errorEvent);
    if (!this.errors.has(key)) {
    this.errors.set(key, []);
    }
    this.errors.get(key)!.push(errorEvent);

    // 聚合错误
    this.aggregateError(errorEvent);

    // 触发事件
    this.emit('errorRecorded', errorEvent);

    // 检查告警规则
    this.checkAlertRules(errorEvent);

    // 广播到WebSocket客户端
    this.broadcastError(errorEvent);

    return errorEvent.id;
    }

    private generateErrorId(): string {
    return createHash('sha256')
    .update(Date.now().toString() + Math.random().toString())
    .digest('hex')
    .substring(0, 16);
    }

    private getStorageKey(error: ErrorEvent): string {
    // 按服务、错误类型和日期分组存储
    const date = error.timestamp.toISOString().split('T')[0];
    return `${error.service}:${error.errorType}:${date}`;
    }

    private aggregateError(error: ErrorEvent): void {
    const errorHash = this.calculateErrorHash(error);

    if (!this.aggregates.has(errorHash)) {
    this.aggregates.set(errorHash, {
    errorHash,
    firstOccurrence: error.timestamp,
    lastOccurrence: error.timestamp,
    count: 1,
    affectedServices: new Set([error.service]),
    affectedEndpoints: new Set([error.endpoint]),
    statusCodes: new Set([error.statusCode]),
    sampleError: error
    });
    } else {
    const aggregate = this.aggregates.get(errorHash)!;
    aggregate.count++;
    aggregate.lastOccurrence = error.timestamp;
    aggregate.affectedServices.add(error.service);
    aggregate.affectedEndpoints.add(error.endpoint);
    aggregate.statusCodes.add(error.statusCode);
    }

    // 触发聚合更新事件
    this.emit('aggregateUpdated', this.aggregates.get(errorHash));
    }

    private calculateErrorHash(error: ErrorEvent): string {
    // 基于错误特征计算哈希
    const errorFingerprint = {
    service: error.service,
    errorType: error.errorType,
    statusCode: error.statusCode,
    // 可以添加更多特征
    };

    return createHash('sha256')
    .update(JSON.stringify(errorFingerprint))
    .digest('hex')
    .substring(0, 16);
    }

    private initializeDefaultRules(): void {
    this.alertRules = [
    {
    id: 'high-error-rate',
    name: '高错误率告警',
    description: '短时间内相同错误发生多次',
    condition: (aggregate) => {
    const timeWindow = 5 * 60 * 1000; // 5分钟
    const timeDiff = aggregate.lastOccurrence.getTime() –
    aggregate.firstOccurrence.getTime();

    if (timeDiff < timeWindow && aggregate.count >= 10) {
    const rate = aggregate.count / (timeDiff / 60000); // 每分钟错误数
    return rate >= 2; // 每分钟2个以上相同错误
    }
    return false;
    },
    severity: 'HIGH',
    actions: ['email', 'slack', 'pagerduty']
    },
    {
    id: 'critical-errors',
    name: '关键错误告警',
    description: '出现严重级别的错误',
    condition: (aggregate) => {
    return aggregate.sampleError.severity === 'CRITICAL';
    },
    severity: 'CRITICAL',
    actions: ['pagerduty', 'sms', 'phone']
    },
    {
    id: 'service-outage',
    name: '服务中断告警',
    description: '同一服务出现大量不同错误',
    condition: (aggregate) => {
    // 检查过去10分钟内同一服务的错误数量
    const tenMinutesAgo = new Date(Date.now() – 10 * 60 * 1000);
    const serviceErrors = Array.from(this.aggregates.values())
    .filter(agg => agg.affectedServices.has(aggregate.sampleError.service))
    .filter(agg => agg.lastOccurrence > tenMinutesAgo);

    const totalErrors = serviceErrors.reduce((sum, agg) => sum + agg.count, 0);
    return totalErrors >= 50;
    },
    severity: 'CRITICAL',
    actions: ['pagerduty', 'sms', 'phone']
    },
    {
    id: 'status-code-5xx',
    name: '5xx错误告警',
    description: '出现服务器错误',
    condition: (aggregate) => {
    return Array.from(aggregate.statusCodes)
    .some(code => code >= 500 && code < 600);
    },
    severity: 'HIGH',
    actions: ['email', 'slack']
    }
    ];
    }

    private checkAlertRules(error: ErrorEvent): void {
    const errorHash = this.calculateErrorHash(error);
    const aggregate = this.aggregates.get(errorHash);

    if (!aggregate) return;

    for (const rule of this.alertRules) {
    if (rule.condition(aggregate)) {
    this.triggerAlert(rule, aggregate);
    }
    }
    }

    private triggerAlert(rule: AlertRule, aggregate: ErrorAggregate): void {
    const alert = {
    id: createHash('sha256')
    .update(rule.id + aggregate.errorHash + Date.now().toString())
    .digest('hex')
    .substring(0, 16),
    rule,
    aggregate,
    timestamp: new Date(),
    acknowledged: false,
    resolved: false
    };

    // 触发告警事件
    this.emit('alertTriggered', alert);

    // 执行告警动作
    this.executeAlertActions(alert);

    // 广播到WebSocket客户端
    this.broadcastAlert(alert);
    }

    private executeAlertActions(alert: any): void {
    for (const action of alert.rule.actions) {
    switch (action) {
    case 'email':
    this.sendEmailAlert(alert);
    break;
    case 'slack':
    this.sendSlackAlert(alert);
    break;
    case 'pagerduty':
    this.sendPagerDutyAlert(alert);
    break;
    case 'sms':
    this.sendSmsAlert(alert);
    break;
    case 'phone':
    this.makePhoneCall(alert);
    break;
    }
    }
    }

    private sendEmailAlert(alert: any): void {
    const emailContent = this.formatEmailAlert(alert);
    console.log('Sending email alert:', emailContent.subject);
    // 实际实现应该调用邮件服务
    }

    private sendSlackAlert(alert: any): void {
    const slackMessage = this.formatSlackAlert(alert);
    console.log('Sending Slack alert:', slackMessage);
    // 实际实现应该调用Slack Webhook
    }

    private formatEmailAlert(alert: any): { subject: string; body: string } {
    return {
    subject: `[${alert.rule.severity}] ${alert.rule.name} – ${alert.aggregate.sampleError.service}`,
    body: `
    Alert: ${alert.rule.name}
    Description: ${alert.rule.description}

    Error Details:
    – Service: ${alert.aggregate.sampleError.service}
    – Endpoint: ${alert.aggregate.sampleError.endpoint}
    – Error Type: ${alert.aggregate.sampleError.errorType}
    – Status Code: ${alert.aggregate.sampleError.statusCode}
    – Occurrences: ${alert.aggregate.count}
    – First Occurrence: ${alert.aggregate.firstOccurrence.toISOString()}
    – Last Occurrence: ${alert.aggregate.lastOccurrence.toISOString()}

    Sample Error Message: ${alert.aggregate.sampleError.errorMessage}

    Please investigate immediately.
    `
    };
    }

    private formatSlackAlert(alert: any): any {
    const severityColors = {
    'LOW': '#36a64f', // 绿色
    'MEDIUM': '#ffcc00', // 黄色
    'HIGH': '#ff9900', // 橙色
    'CRITICAL': '#ff0000' // 红色
    };

    return {
    attachments: [{
    color: severityColors[alert.rule.severity],
    title: `🚨 ${alert.rule.name}`,
    text: alert.rule.description,
    fields: [
    {
    title: 'Service',
    value: alert.aggregate.sampleError.service,
    short: true
    },
    {
    title: 'Error Count',
    value: alert.aggregate.count.toString(),
    short: true
    },
    {
    title: 'Status Code',
    value: alert.aggregate.sampleError.statusCode.toString(),
    short: true
    },
    {
    title: 'Environment',
    value: alert.aggregate.sampleError.environment,
    short: true
    }
    ],
    footer: `Alert ID: ${alert.id}`,
    ts: Math.floor(alert.timestamp.getTime() / 1000)
    }]
    };
    }

    private setupWebSocket(): void {
    this.webSocketServer.on('connection', (ws) => {
    console.log('New WebSocket connection');

    // 发送当前状态
    ws.send(JSON.stringify({
    type: 'INITIAL_STATE',
    data: {
    activeAlerts: Array.from(this.aggregates.values())
    .filter(agg => this.shouldShowAggregate(agg)),
    recentErrors: this.getRecentErrors(50)
    }
    }));

    ws.on('message', (message) => {
    this.handleWebSocketMessage(ws, message.toString());
    });

    ws.on('close', () => {
    console.log('WebSocket connection closed');
    });
    });
    }

    private handleWebSocketMessage(ws: WebSocket, message: string): void {
    try {
    const data = JSON.parse(message);

    switch (data.type) {
    case 'SUBSCRIBE':
    this.handleSubscribe(ws, data.channel);
    break;
    case 'UNSUBSCRIBE':
    this.handleUnsubscribe(ws, data.channel);
    break;
    case 'ACKNOWLEDGE_ALERT':
    this.handleAcknowledgeAlert(data.alertId);
    break;
    }
    } catch (error) {
    console.error('Error handling WebSocket message:', error);
    }
    }

    private handleSubscribe(ws: WebSocket, channel: string): void {
    // 实际实现应该管理订阅关系
    console.log(`Client subscribed to ${channel}`);
    }

    private broadcastError(error: ErrorEvent): void {
    const message = JSON.stringify({
    type: 'NEW_ERROR',
    data: error
    });

    this.webSocketServer.clients.forEach((client) => {
    if (client.readyState === WebSocket.OPEN) {
    client.send(message);
    }
    });
    }

    private broadcastAlert(alert: any): void {
    const message = JSON.stringify({
    type: 'NEW_ALERT',
    data: alert
    });

    this.webSocketServer.clients.forEach((client) => {
    if (client.readyState === WebSocket.OPEN) {
    client.send(message);
    }
    });
    }

    private getRecentErrors(count: number): ErrorEvent[] {
    const allErrors: ErrorEvent[] = [];

    for (const errors of this.errors.values()) {
    allErrors.push(…errors);
    }

    return allErrors
    .sort((a, b) => b.timestamp.getTime() – a.timestamp.getTime())
    .slice(0, count);
    }

    private shouldShowAggregate(aggregate: ErrorAggregate): boolean {
    // 只显示最近活跃的聚合
    const fiveMinutesAgo = new Date(Date.now() – 5 * 60 * 1000);
    return aggregate.lastOccurrence > fiveMinutesAgo && aggregate.count >= 5;
    }

    private cleanupOldData(): void {
    const twentyFourHoursAgo = new Date(Date.now() – 24 * 60 * 60 * 1000);

    // 清理旧错误
    for (const [key, errors] of this.errors) {
    const recentErrors = errors.filter(error =>
    error.timestamp > twentyFourHoursAgo
    );

    if (recentErrors.length === 0) {
    this.errors.delete(key);
    } else {
    this.errors.set(key, recentErrors);
    }
    }

    // 清理旧聚合
    for (const [hash, aggregate] of this.aggregates) {
    if (aggregate.lastOccurrence < twentyFourHoursAgo) {
    this.aggregates.delete(hash);
    }
    }

    console.log('Cleanup completed');
    }

    // 公共API方法
    getErrorStatistics(): any {
    const now = new Date();
    const oneHourAgo = new Date(now.getTime() – 60 * 60 * 1000);
    const twentyFourHoursAgo = new Date(now.getTime() – 24 * 60 * 60 * 1000);

    const recentErrors = this.getRecentErrors(1000);
    const hourErrors = recentErrors.filter(e => e.timestamp > oneHourAgo);
    const dayErrors = recentErrors.filter(e => e.timestamp > twentyFourHoursAgo);

    const stats = {
    current: {
    total: hourErrors.length,
    byService: this.groupBy(hourErrors, 'service'),
    byStatusCode: this.groupBy(hourErrors, 'statusCode'),
    bySeverity: this.groupBy(hourErrors, 'severity')
    },
    daily: {
    total: dayErrors.length,
    byService: this.groupBy(dayErrors, 'service'),
    byStatusCode: this.groupBy(dayErrors, 'statusCode'),
    bySeverity: this.groupBy(dayErrors, 'severity')
    },
    aggregates: Array.from(this.aggregates.values())
    .filter(agg => this.shouldShowAggregate(agg))
    .map(agg => ({
    errorHash: agg.errorHash,
    count: agg.count,
    services: Array.from(agg.affectedServices),
    statusCodes: Array.from(agg.statusCodes),
    lastOccurrence: agg.lastOccurrence
    }))
    };

    return stats;
    }

    private groupBy(errors: ErrorEvent[], key: keyof ErrorEvent): Record<string, number> {
    const result: Record<string, number> = {};

    for (const error of errors) {
    const value = String(error[key]);
    result[value] = (result[value] || 0) + 1;
    }

    return result;
    }

    acknowledgeAlert(alertId: string): void {
    this.emit('alertAcknowledged', alertId);
    }

    addAlertRule(rule: AlertRule): void {
    this.alertRules.push(rule);
    }

    removeAlertRule(ruleId: string): void {
    this.alertRules = this.alertRules.filter(rule => rule.id !== ruleId);
    }
    }

    // 使用示例
    const errorMonitor = new RealTimeErrorMonitor(8080);

    // 监听事件
    errorMonitor.on('errorRecorded', (error: ErrorEvent) => {
    console.log('Error recorded:', error.id, error.errorType);
    });

    errorMonitor.on('alertTriggered', (alert) => {
    console.log('Alert triggered:', alert.rule.name);
    });

    // 记录错误
    errorMonitor.recordError({
    service: 'order-service',
    endpoint: '/api/orders',
    statusCode: 503,
    errorType: 'ServiceUnavailableError',
    errorMessage: 'Database connection failed',
    context: {
    userId: 'user123',
    orderId: 'order456',
    retryCount: 2
    },
    severity: 'HIGH',
    environment: 'production'
    });

    // 获取统计信息
    const stats = errorMonitor.getErrorStatistics();
    console.log('Error statistics:', stats);

    34.5 错误处理最佳实践总结

    34.5.1 状态码与错误处理检查清单

    yaml

    # 状态码与错误处理最佳实践检查清单
    error_handling_checklist:

    # 设计阶段
    design_phase:
    – 定义了清晰的错误分类体系
    – 建立了错误严重性等级
    – 设计了分层错误处理架构
    – 确定了错误传播策略
    – 规划了降级和恢复机制

    # 开发阶段
    development_phase:
    http_status_codes:
    – 正确使用语义化的状态码
    – 4xx错误用于客户端问题
    – 5xx错误用于服务器问题
    – 为特殊情况使用正确的状态码(如422、429)

    error_responses:
    – 错误响应格式标准化(RFC 7807)
    – 包含唯一的错误标识符
    – 提供人类可读的错误信息
    – 包含机器可读的错误代码
    – 提供相关的上下文信息
    – 包含有帮助的建议操作

    client_handling:
    – 客户端有适当的重试逻辑
    – 实现了指数退避算法
    – 考虑了网络不稳定性
    – 提供了用户友好的错误提示
    – 实现了优雅的降级处理

    server_handling:
    – 实现了全局异常处理
    – 错误被正确记录和监控
    – 敏感信息不被泄露
    – 实现了断路器模式
    – 有超时和重试控制

    # 运维阶段
    operations_phase:
    monitoring:
    – 设置了错误率监控
    – 配置了适当的告警规则
    – 实现了分布式追踪
    – 有实时错误仪表板
    – 错误日志结构化

    alerting:
    – 告警级别定义清晰
    – 告警接收人配置正确
    – 告警阈值设置合理
    – 有告警升级机制
    – 告警包含足够的上下文

    incident_response:
    – 有明确的故障排查流程
    – 错误ID可以追踪到具体请求
    – 有回滚和恢复计划
    – 定期进行故障演练
    – 有事后分析(Post-mortem)流程

    # 持续改进
    continuous_improvement:
    – 定期分析错误模式
    – 根据错误数据优化系统
    – 更新错误处理策略
    – 改进监控和告警
    – 分享错误处理经验

    34.5.2 错误处理策略决策树

    34.5.3 错误处理成熟度模型

    markdown

    # 错误处理成熟度模型

    ## Level 0: 混沌阶段
    – 错误被忽略或简单打印
    – 没有统一的错误处理
    – 状态码使用随意
    – 错误信息泄露敏感数据
    – 没有监控和告警

    ## Level 1: 基础阶段
    – 基本的异常处理
    – 统一的错误响应格式
    – 正确的HTTP状态码使用
    – 简单的日志记录
    – 基础的错误分类

    ## Level 2: 标准阶段
    – 分层的错误处理架构
    – 智能重试机制
    – 断路器模式实现
    – 结构化日志记录
    – 基本的监控和告警

    ## Level 3: 先进阶段
    – 自适应的错误处理策略
    – 预测性错误预防
    – 机器学习优化
    – 实时错误监控
    – 自动化故障恢复

    ## Level 4: 卓越阶段
    – 主动错误预防
    – 自我修复系统
    – 预测性维护
    – 全链路错误追踪
    – 持续优化和改进

    ## 评估指标
    1. 错误处理覆盖率:处理了多大比例的错误场景
    2. 错误恢复时间:从错误中恢复的平均时间
    3. 错误预防能力:预防了多少潜在错误
    4. 用户体验影响:错误对用户的影响程度
    5. 运维效率:错误排查和解决的效率

    34.6 未来趋势

    34.6.1 AI驱动的错误处理

    python

    # AI驱动的智能错误处理系统
    import openai
    from typing import Dict, Any, List, Optional
    import json
    from datetime import datetime

    class AIErrorHandler:
    """AI驱动的错误处理器"""

    def __init__(self, api_key: str):
    openai.api_key = api_key
    self.error_history: List[Dict[str, Any]] = []
    self.solution_cache: Dict[str, str] = {}

    async def analyze_error(self, error: Exception, context: Dict[str, Any]) -> Dict[str, Any]:
    """使用AI分析错误"""

    # 准备分析提示
    prompt = self._create_analysis_prompt(error, context)

    try:
    # 调用AI分析
    response = await openai.ChatCompletion.acreate(
    model="gpt-4",
    messages=[
    {"role": "system", "content": "你是一个经验丰富的软件工程师,擅长错误分析和调试。"},
    {"role": "user", "content": prompt}
    ],
    temperature=0.7,
    max_tokens=500
    )

    analysis_text = response.choices[0].message.content

    # 解析AI响应
    analysis = self._parse_ai_response(analysis_text)

    # 记录分析结果
    analysis_record = {
    "error": str(error),
    "error_type": error.__class__.__name__,
    "context": context,
    "analysis": analysis,
    "timestamp": datetime.now().isoformat()
    }

    self.error_history.append(analysis_record)

    return analysis

    except Exception as e:
    # AI分析失败,返回基本分析
    return self._basic_error_analysis(error, context)

    def _create_analysis_prompt(self, error: Exception, context: Dict[str, Any]) -> str:
    """创建AI分析提示"""

    prompt = f"""
    请分析以下错误并提供解决方案建议:

    错误类型:{error.__class__.__name__}
    错误信息:{str(error)}

    上下文信息:
    {json.dumps(context, indent=2, default=str)}

    请按以下格式提供分析:
    1. 错误原因分析
    2. 影响评估
    3. 立即解决方案
    4. 长期预防措施
    5. 相关文档链接(如果有)

    请用中文回答。
    """

    return prompt

    def _parse_ai_response(self, response_text: str) -> Dict[str, Any]:
    """解析AI响应"""

    # 这里可以添加更复杂的解析逻辑
    # 简单实现:按行分割并分类

    lines = response_text.strip().split('\\n')

    analysis = {
    "cause": "",
    "impact": "",
    "immediate_solution": "",
    "long_term_prevention": "",
    "documentation": []
    }

    current_section = None

    for line in lines:
    line = line.strip()

    if line.startswith('1.') or '错误原因' in line:
    current_section = 'cause'
    elif line.startswith('2.') or '影响评估' in line:
    current_section = 'impact'
    elif line.startswith('3.') or '立即解决方案' in line:
    current_section = 'immediate_solution'
    elif line.startswith('4.') or '长期预防措施' in line:
    current_section = 'long_term_prevention'
    elif line.startswith('5.') or '相关文档链接' in line:
    current_section = 'documentation'
    elif current_section and line:
    if current_section == 'documentation':
    analysis['documentation'].append(line)
    else:
    analysis[current_section] += line + '\\n'

    return analysis

    def _basic_error_analysis(self, error: Exception, context: Dict[str, Any]) -> Dict[str, Any]:
    """基本错误分析(备用)"""

    error_type = error.__class__.__name__

    # 基于错误类型的简单分析
    analysis_templates = {
    "ConnectionError": {
    "cause": "网络连接失败,可能是网络不稳定或服务不可用",
    "impact": "用户无法访问相关功能",
    "immediate_solution": "检查网络连接,重试操作",
    "long_term_prevention": "实现重试机制,增加连接超时设置"
    },
    "TimeoutError": {
    "cause": "操作超时,可能是服务响应慢或网络延迟",
    "impact": "用户体验下降,操作可能失败",
    "immediate_solution": "增加超时时间,优化查询",
    "long_term_prevention": "优化服务性能,实现异步处理"
    },
    "ValueError": {
    "cause": "参数值无效,可能是输入验证不充分",
    "impact": "操作无法完成,可能影响数据一致性",
    "immediate_solution": "检查输入参数,提供更详细的错误信息",
    "long_term_prevention": "加强输入验证,提供更好的用户指导"
    }
    }

    if error_type in analysis_templates:
    return analysis_templates[error_type]
    else:
    return {
    "cause": f"未知错误类型: {error_type}",
    "impact": "需要进一步分析",
    "immediate_solution": "查看详细日志,联系开发人员",
    "long_term_prevention": "增加错误监控和日志记录"
    }

    async def suggest_solution(self, error_signature: str) -> Optional[str]:
    """基于历史记录建议解决方案"""

    # 检查缓存
    if error_signature in self.solution_cache:
    return self.solution_cache[error_signature]

    # 查找类似的历史错误
    similar_errors = self._find_similar_errors(error_signature)

    if similar_errors:
    # 使用AI总结解决方案
    prompt = self._create_solution_prompt(similar_errors)

    try:
    response = await openai.ChatCompletion.acreate(
    model="gpt-4",
    messages=[
    {"role": "system", "content": "你是一个经验丰富的软件工程师。"},
    {"role": "user", "content": prompt}
    ],
    temperature=0.7,
    max_tokens=300
    )

    solution = response.choices[0].message.content

    # 缓存解决方案
    self.solution_cache[error_signature] = solution

    return solution

    except Exception:
    return None

    return None

    def _find_similar_errors(self, error_signature: str) -> List[Dict[str, Any]]:
    """查找类似的错误"""

    # 简单的相似度匹配
    # 实际实现可以使用更复杂的算法

    similar = []

    for error_record in self.error_history[-100:]: # 最近100个错误
    error_text = error_record["error"]

    # 计算简单的相似度
    similarity = self._calculate_similarity(error_signature, error_text)

    if similarity > 0.7: # 相似度阈值
    similar.append(error_record)

    return similar

    def _calculate_similarity(self, text1: str, text2: str) -> float:
    """计算文本相似度(简化实现)"""

    # 实际实现可以使用更复杂的相似度算法
    # 这里使用简单的词重叠率

    words1 = set(text1.lower().split())
    words2 = set(text2.lower().split())

    if not words1 or not words2:
    return 0.0

    intersection = words1.intersection(words2)
    union = words1.union(words2)

    return len(intersection) / len(union)

    def _create_solution_prompt(self, similar_errors: List[Dict[str, Any]]) -> str:
    """创建解决方案提示"""

    prompt = "基于以下类似错误的分析和解决方案,请总结一个通用的解决方案:\\n\\n"

    for i, error_record in enumerate(similar_errors[:3], 1): # 最多3个例子
    prompt += f"错误示例 {i}:\\n"
    prompt += f"错误信息: {error_record['error']}\\n"
    if 'analysis' in error_record:
    prompt += f"分析结果: {json.dumps(error_record['analysis'], ensure_ascii=False)}\\n"
    prompt += "\\n"

    prompt += "请提供一个简洁有效的解决方案,用中文回答。"

    return prompt

    def generate_error_report(self, time_period: str = "daily") -> Dict[str, Any]:
    """生成错误报告"""

    now = datetime.now()

    if time_period == "daily":
    start_time = datetime(now.year, now.month, now.day)
    elif time_period == "weekly":
    start_time = datetime(now.year, now.month, now.day – 7)
    elif time_period == "monthly":
    start_time = datetime(now.year, now.month – 1, now.day)
    else:
    start_time = datetime(1970, 1, 1) # 所有时间

    # 过滤指定时间段的错误
    recent_errors = [
    error for error in self.error_history
    if datetime.fromisoformat(error["timestamp"]) > start_time
    ]

    # 生成报告
    report = {
    "period": time_period,
    "total_errors": len(recent_errors),
    "unique_error_types": len(set(e["error_type"] for e in recent_errors)),
    "most_common_errors": self._get_most_common(recent_errors, "error_type"),
    "error_trend": self._calculate_error_trend(recent_errors),
    "resolution_rate": self._calculate_resolution_rate(recent_errors),
    "recommendations": self._generate_recommendations(recent_errors)
    }

    return report

    def _get_most_common(self, errors: List[Dict[str, Any]], key: str) -> List[Dict[str, Any]]:
    """获取最常见的错误"""

    from collections import Counter

    counter = Counter(e[key] for e in errors)

    return [
    {"type": error_type, "count": count}
    for error_type, count in counter.most_common(5)
    ]

    def _calculate_error_trend(self, errors: List[Dict[str, Any]]) -> str:
    """计算错误趋势"""

    if len(errors) < 2:
    return "数据不足"

    # 按时间分组
    hourly_counts = {}

    for error in errors:
    timestamp = datetime.fromisoformat(error["timestamp"])
    hour_key = timestamp.strftime("%Y-%m-%d %H:00")
    hourly_counts[hour_key] = hourly_counts.get(hour_key, 0) + 1

    # 简单趋势判断
    sorted_hours = sorted(hourly_counts.keys())

    if len(sorted_hours) < 2:
    return "稳定"

    recent_count = hourly_counts[sorted_hours[-1]]
    previous_count = hourly_counts[sorted_hours[-2]]

    if recent_count > previous_count * 1.5:
    return "上升"
    elif recent_count < previous_count * 0.5:
    return "下降"
    else:
    return "稳定"

    def _calculate_resolution_rate(self, errors: List[Dict[str, Any]]) -> float:
    """计算解决率(简化实现)"""

    # 实际实现应该基于实际的解决状态
    # 这里使用一个简化的假设

    total = len(errors)
    if total == 0:
    return 1.0

    # 假设80%的错误已经解决
    return 0.8

    def _generate_recommendations(self, errors: List[Dict[str, Any]]) -> List[str]:
    """生成改进建议"""

    recommendations = []

    # 分析错误模式
    error_types = [e["error_type"] for e in errors]
    type_counter = Counter(error_types)

    for error_type, count in type_counter.most_common(3):
    if count > 10: # 频繁出现的错误
    recommendations.append(
    f"频繁出现 {error_type} 错误({count}次),建议重点排查"
    )

    # 检查是否有新的错误类型
    recent_errors = errors[-20:] # 最近20个错误
    new_types = set(e["error_type"] for e in recent_errors)
    all_types = set(e["error_type"] for e in errors)

    if len(new_types) > len(all_types) * 0.3: # 30%以上是新类型
    recommendations.append(
    "发现大量新的错误类型,建议检查最近的代码变更"
    )

    return recommendations

    # 使用示例
    async def example_ai_error_handling():
    # 初始化AI错误处理器
    ai_handler = AIErrorHandler(api_key="your-openai-api-key")

    # 模拟一个错误
    try:
    # 模拟一个可能失败的操作
    raise ConnectionError("Failed to connect to database: Connection refused")

    except Exception as e:
    # 准备错误上下文
    context = {
    "service": "order-service",
    "endpoint": "/api/orders",
    "user_id": "user123",
    "request_id": "req-abc-123",
    "timestamp": datetime.now().isoformat(),
    "environment": "production"
    }

    # 使用AI分析错误
    analysis = await ai_handler.analyze_error(e, context)

    print("AI错误分析结果:")
    print(f"原因: {analysis.get('cause', 'N/A')}")
    print(f"影响: {analysis.get('impact', 'N/A')}")
    print(f"立即解决方案: {analysis.get('immediate_solution', 'N/A')}")
    print(f"长期预防措施: {analysis.get('long_term_prevention', 'N/A')}")

    # 生成错误报告
    report = ai_handler.generate_error_report("daily")
    print("\\n每日错误报告:")
    print(json.dumps(report, indent=2, ensure_ascii=False))

    # 运行示例
    if __name__ == "__main__":
    import asyncio
    asyncio.run(example_ai_error_handling())

    34.7 总结

    34.7.1 关键收获

  • 错误处理是系统工程

    • 不仅仅是技术实现,更是架构设计

    • 需要考虑用户体验、运维效率、系统稳定性等多个维度

    • 应该作为系统设计的重要组成部分

  • 状态码是指南针

    • 正确的状态码使用是良好API设计的基础

    • 状态码应该与错误类型和严重性相匹配

    • 状态码应该能够指导客户端的行为

  • 智能化的错误处理

    • 重试策略应该基于错误类型和上下文

    • 机器学习可以帮助优化错误处理决策

    • AI可以辅助错误分析和解决方案生成

  • 监控与持续改进

    • 没有监控的错误处理是不完整的

    • 错误数据应该用于持续改进系统

    • 应该建立错误处理的反馈循环

  • 34.7.2 实际应用建议

  • 从小处开始

    • 从最基本的错误分类和状态码开始

    • 逐步引入更高级的错误处理策略

    • 根据实际需求选择合适的工具和技术

  • 关注用户体验

    • 错误信息应该对用户有帮助

    • 提供明确的解决方案或下一步行动

    • 避免技术术语和敏感信息

  • 建立文化

    • 错误处理应该是团队共识

    • 鼓励分享错误处理经验

    • 定期回顾和改进错误处理策略

  • 保持简单

    • 复杂不等于更好

    • 选择简单有效的解决方案

    • 避免过度工程化

  • 34.7.3 未来展望

    随着技术的发展,错误处理将变得更加智能和自动化:

  • 预测性错误预防

    • 在错误发生前预测和预防

    • 基于历史数据的模式识别

    • 自动化的系统优化

  • 自愈系统

    • 系统能够自动检测和修复错误

    • 基于规则的自动恢复

    • 无需人工干预的错误处理

  • 全链路可观测性

    • 端到端的错误追踪

    • 跨服务的错误传播分析

    • 基于AI的根本原因分析

  • 人性化交互

    • 更自然的错误沟通方式

    • 基于上下文的个性化解决方案

    • 增强现实辅助的错误排查

  • 通过本章的学习,您应该对状态码与错误处理策略有了全面的理解。记住,好的错误处理不仅能让系统更加稳定,还能提升用户体验,减少运维负担。在不断变化的数字世界中,强大的错误处理能力是系统可靠性的基石。

    赞(0)
    未经允许不得转载:网硕互联帮助中心 » HTTP 状态码:客户端与服务器的通信语言——第六部分:状态码的实践应用(二)
    分享到: 更多 (0)

    评论 抢沙发

    评论前必须登录!