TokenStream 与流式响应异常处理详解
一、TokenStream 深度解析
1.1 TokenStream 的概念与设计
/**
* TokenStream 核心概念解析
*
* 在 LLM 流式响应中,TokenStream 代表一个**字符序列的异步流**
* 类似 Java 的 Stream API,但专为 AI 响应设计
*/
public class TokenStreamConcept {
// TokenStream 的核心特性
public interface TokenStreamCharacteristics {
/**
* 1. 异步性:非阻塞的数据流
* 2. 增量性:逐步生成和消费
* 3. 连续性:保持语义连贯
* 4. 可中断性:允许中途取消
* 5. 错误传播:支持异常传递
*/
}
// TokenStream 的基本结构
public static class TokenStream<T> {
private final Publisher<T> source;
private final TokenProcessor<T> processor;
private final FlowControl flowControl;
public TokenStream(Publisher<T> source) {
this.source = source;
this.processor = new DefaultTokenProcessor<>();
this.flowControl = new RateBasedFlowControl();
}
// 流的核心操作
public void subscribe(Subscriber<T> subscriber) {
source.subscribe(new SafeSubscriber<>(subscriber, processor));
}
}
// Token 的定义
public static class Token {
private final String content;
private final int index;
private final boolean isComplete;
private final TokenType type;
private final Map<String, Object> metadata;
public enum TokenType {
TEXT, // 文本token
FUNCTION_CALL, // 函数调用
TOOL_RESULT, // 工具结果
CONTROL, // 控制token(开始、结束、暂停)
ERROR // 错误token
}
}
}
1.2 TokenStream 的实现模式
/**
* TokenStream 的多种实现模式
*/
public class TokenStreamImplementations {
// 模式1:响应式流(Reactive Streams)
public static class ReactiveTokenStream<T> implements Publisher<T> {
private final List<Subscriber<? super T>> subscribers =
new CopyOnWriteArrayList<>();
private volatile boolean cancelled = false;
@Override
public void subscribe(Subscriber<? super T> subscriber) {
if (cancelled) {
subscriber.onError(new IllegalStateException("Stream cancelled"));
return;
}
subscribers.add(subscriber);
subscriber.onSubscribe(new TokenSubscription(subscriber));
}
private class TokenSubscription implements Subscription {
private final Subscriber<? super T> subscriber;
private volatile boolean active = true;
TokenSubscription(Subscriber<? super T> subscriber) {
this.subscriber = subscriber;
}
@Override
public void request(long n) {
if (!active || cancelled) return;
// 模拟生成token
for (long i = 0; i < n && active; i++) {
T token = generateNextToken();
if (token != null) {
subscriber.onNext(token);
} else {
subscriber.onComplete();
break;
}
}
}
@Override
public void cancel() {
active = false;
subscribers.remove(subscriber);
}
}
}
// 模式2:回调式流(Callback-based)
public static class CallbackTokenStream<T> {
private final TokenGenerator<T> generator;
private final ExecutorService executor;
private final List<TokenCallback<T>> callbacks = new ArrayList<>();
private volatile boolean isStreaming = false;
@FunctionalInterface
public interface TokenCallback<T> {
void onToken(T token);
default void onError(Throwable error) {}
default void onComplete() {}
}
public void startStreaming(TokenCallback<T> callback) {
if (isStreaming) {
throw new IllegalStateException("Stream already in progress");
}
isStreaming = true;
callbacks.add(callback);
executor.submit(() -> {
try {
while (isStreaming) {
T token = generator.generate();
if (token == null) {
complete();
break;
}
// 同步回调通知
synchronized (callbacks) {
for (TokenCallback<T> cb : callbacks) {
cb.onToken(token);
}
}
}
} catch (Exception e) {
error(e);
}
});
}
public void stopStreaming() {
isStreaming = false;
executor.shutdown();
}
}
// 模式3:迭代器式流(Iterator-based)
public static class IteratorTokenStream<T> implements Iterator<T>, AutoCloseable {
private final TokenSource<T> source;
private T nextToken;
private boolean hasNext = true;
private volatile boolean closed = false;
public IteratorTokenStream(TokenSource<T> source) {
this.source = source;
fetchNext();
}
private void fetchNext() {
if (closed) {
hasNext = false;
return;
}
try {
nextToken = source.next();
hasNext = (nextToken != null);
} catch (Exception e) {
hasNext = false;
throw new TokenStreamException("Failed to fetch next token", e);
}
}
@Override
public boolean hasNext() {
return hasNext && !closed;
}
@Override
public T next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
T current = nextToken;
fetchNext();
return current;
}
@Override
public void close() {
closed = true;
source.close();
}
}
}
二、流式响应异常处理架构
2.1 异常分类与处理策略
/**
* 流式响应异常分类体系
*/
public class StreamExceptionHierarchy {
// 异常基类
public abstract static class StreamException extends RuntimeException {
private final ErrorCategory category;
private final Instant timestamp;
private final String streamId;
public StreamException(String message, ErrorCategory category,
String streamId) {
super(message);
this.category = category;
this.timestamp = Instant.now();
this.streamId = streamId;
}
public abstract boolean isRecoverable();
public abstract RecoveryStrategy getRecoveryStrategy();
}
// 错误分类
public enum ErrorCategory {
NETWORK, // 网络错误
RATE_LIMIT, // 速率限制
TIMEOUT, // 超时
MODEL, // 模型错误
CLIENT, // 客户端错误
SERVER, // 服务器错误
VALIDATION, // 验证错误
INTERRUPTION // 中断错误
}
// 恢复策略
public enum RecoveryStrategy {
RETRY, // 重试
RESUME, // 恢复
RESTART, // 重启
FALLBACK, // 降级
FAIL_FAST, // 快速失败
CONTINUE // 继续
}
// 具体异常类型
public static class NetworkException extends StreamException {
private final String endpoint;
private final int statusCode;
public NetworkException(String endpoint, int statusCode,
String streamId) {
super(String.format("Network error connecting to %s (status: %d)",
endpoint, statusCode),
ErrorCategory.NETWORK, streamId);
this.endpoint = endpoint;
this.statusCode = statusCode;
}
@Override
public boolean isRecoverable() {
// 5xx错误可能恢复,4xx错误不可恢复
return statusCode >= 500 && statusCode < 600;
}
@Override
public RecoveryStrategy getRecoveryStrategy() {
if (statusCode == 429) { // 速率限制
return RecoveryStrategy.RETRY;
} else if (statusCode >= 500) {
return RecoveryStrategy.RESUME;
} else {
return RecoveryStrategy.FAIL_FAST;
}
}
}
public static class RateLimitException extends StreamException {
private final Duration retryAfter;
private final int limit;
private final int remaining;
public RateLimitException(Duration retryAfter, int limit,
int remaining, String streamId) {
super(String.format("Rate limit exceeded. Limit: %d, Remaining: %d",
limit, remaining),
ErrorCategory.RATE_LIMIT, streamId);
this.retryAfter = retryAfter;
this.limit = limit;
this.remaining = remaining;
}
@Override
public boolean isRecoverable() {
return true; // 速率限制总是可恢复的
}
@Override
public RecoveryStrategy getRecoveryStrategy() {
return RecoveryStrategy.RETRY;
}
public Duration getRetryAfter() {
return retryAfter;
}
}
public static class StreamInterruptedException extends StreamException {
private final InterruptionSource source;
private final boolean userInitiated;
public enum InterruptionSource {
USER_CANCEL, // 用户取消
TIMEOUT, // 超时
RESOURCE_LIMIT, // 资源限制
SYSTEM_SHUTDOWN, // 系统关闭
CONNECTION_LOST // 连接丢失
}
public StreamInterruptedException(InterruptionSource source,
boolean userInitiated,
String streamId) {
super(String.format("Stream interrupted by %s", source),
ErrorCategory.INTERRUPTION, streamId);
this.source = source;
this.userInitiated = userInitiated;
}
@Override
public boolean isRecoverable() {
return source == InterruptionSource.CONNECTION_LOST;
}
@Override
public RecoveryStrategy getRecoveryStrategy() {
switch (source) {
case USER_CANCEL:
return RecoveryStrategy.FAIL_FAST;
case TIMEOUT:
return RecoveryStrategy.RESUME;
case CONNECTION_LOST:
return RecoveryStrategy.RESTART;
default:
return RecoveryStrategy.FAIL_FAST;
}
}
}
}
2.2 异常处理框架实现
/**
* 完整的异常处理框架
*/
public class StreamExceptionHandler {
// 异常处理器接口
@FunctionalInterface
public interface ExceptionHandler<T extends StreamException> {
HandlingResult handle(T exception, StreamContext context);
}
// 处理结果
public static class HandlingResult {
private final boolean shouldContinue;
private final RecoveryAction recoveryAction;
private final String message;
public enum RecoveryAction {
NONE,
RETRY_IMMEDIATELY,
RETRY_AFTER_DELAY,
SWITCH_ENDPOINT,
USE_FALLBACK,
RECONNECT_STREAM
}
public HandlingResult(boolean shouldContinue,
RecoveryAction recoveryAction,
String message) {
this.shouldContinue = shouldContinue;
this.recoveryAction = recoveryAction;
this.message = message;
}
}
// 流上下文
public static class StreamContext {
private final String streamId;
private final Instant startTime;
private volatile Instant lastTokenTime;
private final AtomicInteger tokenCount;
private final Map<String, Object> metadata;
public StreamContext(String streamId) {
this.streamId = streamId;
this.startTime = Instant.now();
this.lastTokenTime = startTime;
this.tokenCount = new AtomicInteger(0);
this.metadata = new ConcurrentHashMap<>();
}
public void recordToken() {
tokenCount.incrementAndGet();
lastTokenTime = Instant.now();
}
}
// 异常处理器注册中心
public static class ExceptionHandlerRegistry {
private final Map<Class<? extends StreamException>,
ExceptionHandler<?>> handlers = new ConcurrentHashMap<>();
private final ExceptionHandler<StreamException> defaultHandler;
public ExceptionHandlerRegistry() {
this.defaultHandler = (ex, ctx) ->
new HandlingResult(false,
HandlingResult.RecoveryAction.NONE,
"Unhandled exception: " + ex.getMessage());
}
public <T extends StreamException> void registerHandler(
Class<T> exceptionType,
ExceptionHandler<T> handler) {
handlers.put(exceptionType, handler);
}
@SuppressWarnings("unchecked")
public HandlingResult handle(StreamException exception,
StreamContext context) {
ExceptionHandler<StreamException> handler =
(ExceptionHandler<StreamException>)
handlers.get(exception.getClass());
if (handler == null) {
// 查找父类处理器
for (Map.Entry<Class<? extends StreamException>,
ExceptionHandler<?>> entry : handlers.entrySet()) {
if (entry.getKey().isAssignableFrom(exception.getClass())) {
handler = (ExceptionHandler<StreamException>) entry.getValue();
break;
}
}
}
if (handler == null) {
handler = defaultHandler;
}
return handler.handle(exception, context);
}
}
// 具体处理器实现
public static class NetworkExceptionHandler
implements ExceptionHandler<NetworkException> {
private final RetryPolicy retryPolicy;
private final CircuitBreaker circuitBreaker;
private final EndpointManager endpointManager;
public NetworkExceptionHandler(RetryPolicy retryPolicy,
CircuitBreaker circuitBreaker,
EndpointManager endpointManager) {
this.retryPolicy = retryPolicy;
this.circuitBreaker = circuitBreaker;
this.endpointManager = endpointManager;
}
@Override
public HandlingResult handle(NetworkException exception,
StreamContext context) {
if (!exception.isRecoverable()) {
circuitBreaker.recordFailure(exception.getEndpoint());
return new HandlingResult(false,
HandlingResult.RecoveryAction.NONE,
"Non-recoverable network error");
}
// 检查是否应该重试
if (retryPolicy.shouldRetry(exception, context)) {
Duration delay = retryPolicy.getNextRetryDelay();
// 检查是否应该切换端点
if (endpointManager.shouldSwitchEndpoint(
exception.getEndpoint(), exception.getStatusCode())) {
String newEndpoint = endpointManager.getNextEndpoint();
return new HandlingResult(true,
HandlingResult.RecoveryAction.SWITCH_ENDPOINT,
String.format("Switching to endpoint: %s after delay: %s",
newEndpoint, delay));
}
return new HandlingResult(true,
HandlingResult.RecoveryAction.RETRY_AFTER_DELAY,
String.format("Will retry after %s", delay));
}
return new HandlingResult(false,
HandlingResult.RecoveryAction.USE_FALLBACK,
"Retry limit exceeded, using fallback");
}
}
public static class RateLimitExceptionHandler
implements ExceptionHandler<RateLimitException> {
@Override
public HandlingResult handle(RateLimitException exception,
StreamContext context) {
Duration retryAfter = exception.getRetryAfter();
if (retryAfter == null) {
// 使用指数退避
retryAfter = calculateExponentialBackoff(context);
}
return new HandlingResult(true,
HandlingResult.RecoveryAction.RETRY_AFTER_DELAY,
String.format("Rate limited, retrying after %s", retryAfter));
}
private Duration calculateExponentialBackoff(StreamContext context) {
// 简单的指数退避算法
int attemptCount = getAttemptCount(context);
long delayMillis = Math.min(
1000L * (long) Math.pow(2, attemptCount),
60000L // 最大1分钟
);
return Duration.ofMillis(delayMillis);
}
private int getAttemptCount(StreamContext context) {
// 从上下文获取尝试次数
return (int) context.getMetadata()
.getOrDefault("rateLimitAttempts", 0);
}
}
public static class InterruptionHandler
implements ExceptionHandler<StreamInterruptedException> {
private final StreamRecoveryManager recoveryManager;
private final boolean allowResume;
public InterruptionHandler(StreamRecoveryManager recoveryManager,
boolean allowResume) {
this.recoveryManager = recoveryManager;
this.allowResume = allowResume;
}
@Override
public HandlingResult handle(StreamInterruptedException exception,
StreamContext context) {
if (!exception.isRecoverable()) {
return new HandlingResult(false,
HandlingResult.RecoveryAction.NONE,
"Non-recoverable interruption");
}
if (allowResume && recoveryManager.canResume(context)) {
// 尝试恢复流
boolean resumed = recoveryManager.resumeStream(context);
if (resumed) {
return new HandlingResult(true,
HandlingResult.RecoveryAction.RECONNECT_STREAM,
"Stream resumed successfully");
}
}
// 如果无法恢复,尝试重启
boolean restarted = recoveryManager.restartStream(context);
if (restarted) {
return new HandlingResult(true,
HandlingResult.RecoveryAction.RESTART,
"Stream restarted");
}
return new HandlingResult(false,
HandlingResult.RecoveryAction.USE_FALLBACK,
"Unable to recover stream, using fallback");
}
}
}
三、流式响应完整实现
3.1 安全的 TokenStream 实现
/**
* 带完整异常处理的 TokenStream 实现
*/
public class SafeTokenStream<T> implements Publisher<T>, AutoCloseable {
private final TokenSource<T> tokenSource;
private final ExecutorService executor;
private final ExceptionHandlerRegistry exceptionHandler;
private final StreamRecoveryManager recoveryManager;
private final CircuitBreaker circuitBreaker;
private final RateLimiter rateLimiter;
private final List<Subscriber<? super T>> subscribers =
new CopyOnWriteArrayList<>();
private final Map<Subscriber<? super T>, StreamContext> contexts =
new ConcurrentHashMap<>();
private volatile boolean isClosed = false;
private final AtomicInteger activeStreams = new AtomicInteger(0);
public SafeTokenStream(TokenSource<T> tokenSource,
ExecutorService executor,
ExceptionHandlerRegistry exceptionHandler,
StreamRecoveryManager recoveryManager) {
this.tokenSource = tokenSource;
this.executor = executor;
this.exceptionHandler = exceptionHandler;
this.recoveryManager = recoveryManager;
this.circuitBreaker = new CircuitBreaker(5, Duration.ofMinutes(1));
this.rateLimiter = RateLimiter.create(100.0); // 每秒100个token
}
@Override
public void subscribe(Subscriber<? super T> subscriber) {
if (isClosed) {
subscriber.onError(new IllegalStateException("Stream is closed"));
return;
}
if (subscribers.contains(subscriber)) {
subscriber.onError(new IllegalStateException("Already subscribed"));
return;
}
subscribers.add(subscriber);
StreamContext context = new StreamContext(generateStreamId());
contexts.put(subscriber, context);
subscriber.onSubscribe(new SafeSubscription(subscriber, context));
// 异步启动流处理
executor.submit(() -> processStream(subscriber, context));
}
private class SafeSubscription implements Subscription {
private final Subscriber<? super T> subscriber;
private final StreamContext context;
private volatile boolean cancelled = false;
private final AtomicLong requested = new AtomicLong(0);
public SafeSubscription(Subscriber<? super T> subscriber,
StreamContext context) {
this.subscriber = subscriber;
this.context = context;
}
@Override
public void request(long n) {
if (n <= 0) {
subscriber.onError(new IllegalArgumentException(
"Request must be positive"));
return;
}
requested.addAndGet(n);
synchronized (this) {
notify(); // 唤醒等待的流处理线程
}
}
@Override
public void cancel() {
cancelled = true;
cleanup();
}
public boolean isCancelled() {
return cancelled;
}
public long getRequested() {
return requested.get();
}
public void consumeRequest() {
requested.decrementAndGet();
}
}
private void processStream(Subscriber<? super T> subscriber,
StreamContext context) {
activeStreams.incrementAndGet();
try {
while (!isClosed && !getSubscription(subscriber).isCancelled()) {
// 检查是否有请求的token
SafeSubscription subscription = getSubscription(subscriber);
if (subscription.getRequested() <= 0) {
synchronized (subscription) {
try {
subscription.wait(1000); // 等待请求
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
continue;
}
// 限流控制
rateLimiter.acquire();
// 断路器检查
if (!circuitBreaker.allowRequest()) {
handleException(new CircuitBreakerOpenException(
"Circuit breaker is open", context.getStreamId()),
subscriber, context);
break;
}
try {
// 生成下一个token
T token = tokenSource.nextToken();
if (token == null) {
// 流结束
subscriber.onComplete();
break;
}
// 发送token
subscriber.onNext(token);
subscription.consumeRequest();
context.recordToken();
} catch (StreamInterruptedException e) {
// 处理中断
HandlingResult result = exceptionHandler.handle(e, context);
if (!result.shouldContinue()) {
handleException(e, subscriber, context);
break;
}
// 根据恢复策略处理
handleRecovery(result, subscriber, context);
} catch (StreamException e) {
// 处理其他流异常
HandlingResult result = exceptionHandler.handle(e, context);
if (!result.shouldContinue()) {
handleException(e, subscriber, context);
break;
}
handleRecovery(result, subscriber, context);
} catch (Exception e) {
// 处理非流异常
StreamException wrapped = new GenericStreamException(
"Unexpected error", e, context.getStreamId());
handleException(wrapped, subscriber, context);
break;
}
}
} finally {
cleanupSubscriber(subscriber);
activeStreams.decrementAndGet();
}
}
private void handleException(StreamException exception,
Subscriber<? super T> subscriber,
StreamContext context) {
// 记录异常
context.getMetadata().put("lastError", exception);
// 通知订阅者
subscriber.onError(exception);
// 清理资源
cleanupSubscriber(subscriber);
}
private void handleRecovery(HandlingResult result,
Subscriber<? super T> subscriber,
StreamContext context) {
switch (result.getRecoveryAction()) {
case RETRY_IMMEDIATELY:
// 立即重试,无需延迟
break;
case RETRY_AFTER_DELAY:
// 延迟后重试
try {
Thread.sleep(extractDelay(result));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
break;
case SWITCH_ENDPOINT:
// 切换端点
tokenSource.switchEndpoint();
break;
case RECONNECT_STREAM:
// 重新连接流
recoveryManager.reconnect(context);
break;
case USE_FALLBACK:
// 使用降级策略
useFallback(subscriber, context);
break;
case RESTART:
// 重启流
restartStream(subscriber, context);
break;
}
}
private void useFallback(Subscriber<? super T> subscriber,
StreamContext context) {
// 实现降级逻辑
try {
T fallbackToken = getFallbackToken(context);
if (fallbackToken != null) {
subscriber.onNext(fallbackToken);
}
subscriber.onComplete();
} catch (Exception e) {
subscriber.onError(e);
}
}
private void restartStream(Subscriber<? super T> subscriber,
StreamContext context) {
// 清理当前订阅
cleanupSubscriber(subscriber);
// 创建新的订阅
StreamContext newContext = new StreamContext(
context.getStreamId() + "-restarted");
contexts.put(subscriber, newContext);
// 重新启动流处理
executor.submit(() -> processStream(subscriber, newContext));
}
private long extractDelay(HandlingResult result) {
// 从消息中提取延迟时间(简化实现)
String message = result.getMessage();
// 实际实现需要更复杂的解析逻辑
return 1000L; // 默认1秒
}
private SafeSubscription getSubscription(Subscriber<? super T> subscriber) {
return (SafeSubscription) subscriber;
}
private void cleanupSubscriber(Subscriber<? super T> subscriber) {
subscribers.remove(subscriber);
contexts.remove(subscriber);
}
@Override
public void close() {
isClosed = true;
// 通知所有订阅者
for (Subscriber<? super T> subscriber : subscribers) {
subscriber.onError(new StreamInterruptedException(
StreamInterruptedException.InterruptionSource.SYSTEM_SHUTDOWN,
false,
contexts.get(subscriber).getStreamId()));
}
// 清理资源
subscribers.clear();
contexts.clear();
// 等待活跃流结束
awaitTermination();
// 关闭执行器
executor.shutdown();
try {
if (!executor.awaitTermination(10, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}
private void awaitTermination() {
int maxWaitTime = 30; // 最多等待30秒
for (int i = 0; i < maxWaitTime && activeStreams.get() > 0; i++) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}
private String generateStreamId() {
return "stream-" + UUID.randomUUID().toString().substring(0, 8);
}
// 辅助类
private static class GenericStreamException extends StreamException {
public GenericStreamException(String message, Throwable cause,
String streamId) {
super(message, ErrorCategory.CLIENT, streamId);
initCause(cause);
}
@Override
public boolean isRecoverable() {
return false;
}
@Override
public RecoveryStrategy getRecoveryStrategy() {
return RecoveryStrategy.FAIL_FAST;
}
}
private static class CircuitBreakerOpenException extends StreamException {
public CircuitBreakerOpenException(String message, String streamId) {
super(message, ErrorCategory.CLIENT, streamId);
}
@Override
public boolean isRecoverable() {
return true; // 断路器最终会关闭
}
@Override
public RecoveryStrategy getRecoveryStrategy() {
return RecoveryStrategy.RETRY_AFTER_DELAY;
}
}
}
3.2 高级特性:断点续传与状态恢复
/**
* 支持断点续传的 TokenStream
*/
public class ResumableTokenStream<T> extends SafeTokenStream<T> {
private final StreamStateStorage stateStorage;
private final CheckpointStrategy checkpointStrategy;
public ResumableTokenStream(TokenSource<T> tokenSource,
ExecutorService executor,
ExceptionHandlerRegistry exceptionHandler,
StreamRecoveryManager recoveryManager,
StreamStateStorage stateStorage,
CheckpointStrategy checkpointStrategy) {
super(tokenSource, executor, exceptionHandler, recoveryManager);
this.stateStorage = stateStorage;
this.checkpointStrategy = checkpointStrategy;
}
@Override
public void subscribe(Subscriber<? super T> subscriber) {
// 检查是否有保存的状态
StreamState savedState = stateStorage.loadState(
getStreamIdFromSubscriber(subscriber));
if (savedState != null && savedState.canResume()) {
// 恢复订阅
resumeFromState(subscriber, savedState);
} else {
// 新的订阅
super.subscribe(subscriber);
}
}
private void resumeFromState(Subscriber<? super T> subscriber,
StreamState state) {
StreamContext context = createContextFromState(state);
// 恢复token源到检查点位置
getTokenSource().restoreFromCheckpoint(state.getCheckpoint());
// 重新订阅
subscribers.add(subscriber);
contexts.put(subscriber, context);
subscriber.onSubscribe(new ResumableSubscription(
subscriber, context, state.getLastProcessedToken()));
// 继续处理
executor.submit(() -> processResumedStream(
subscriber, context, state));
}
private class ResumableSubscription extends SafeSubscription {
private final T lastProcessedToken;
private boolean hasDeliveredResumeToken = false;
public ResumableSubscription(Subscriber<? super T> subscriber,
StreamContext context,
T lastProcessedToken) {
super(subscriber, context);
this.lastProcessedToken = lastProcessedToken;
}
@Override
public void request(long n) {
if (!hasDeliveredResumeToken && lastProcessedToken != null) {
// 首先发送恢复标记
subscriber.onNext(createResumeToken());
hasDeliveredResumeToken = true;
n—; // 减少一个请求计数
}
if (n > 0) {
super.request(n);
}
}
private T createResumeToken() {
// 创建特殊的恢复token
return (T) new ResumeToken<>(lastProcessedToken);
}
}
private void processResumedStream(Subscriber<? super T> subscriber,
StreamContext context,
StreamState state) {
// 跳过已经处理过的token
skipProcessedTokens(state.getProcessedCount());
// 继续正常处理
super.processStream(subscriber, context);
}
private void skipProcessedTokens(int count) {
for (int i = 0; i < count; i++) {
try {
getTokenSource().nextToken(); // 跳过
} catch (Exception e) {
// 记录但继续
logger.warn("Failed to skip token {}", i, e);
}
}
}
// 定期保存状态
private void createCheckpoint(StreamContext context,
T lastToken,
int processedCount) {
if (checkpointStrategy.shouldCheckpoint(context, lastToken)) {
StreamState state = new StreamState(
context.getStreamId(),
getTokenSource().createCheckpoint(),
lastToken,
processedCount,
Instant.now()
);
stateStorage.saveState(state);
}
}
// 流状态类
public static class StreamState {
private final String streamId;
private final Checkpoint checkpoint;
private final Object lastProcessedToken;
private final int processedCount;
private final Instant checkpointTime;
public StreamState(String streamId, Checkpoint checkpoint,
Object lastProcessedToken, int processedCount,
Instant checkpointTime) {
this.streamId = streamId;
this.checkpoint = checkpoint;
this.lastProcessedToken = lastProcessedToken;
this.processedCount = processedCount;
this.checkpointTime = checkpointTime;
}
public boolean canResume() {
return checkpoint != null &&
checkpointTime.isAfter(Instant.now().minus(Duration.ofHours(1)));
}
}
// 恢复token标记
public static class ResumeToken<T> {
private final T previousToken;
private final Instant resumeTime;
public ResumeToken(T previousToken) {
this.previousToken = previousToken;
this.resumeTime = Instant.now();
}
}
}
四、最佳实践与模式总结
4.1 异常处理最佳实践
/**
* TokenStream 异常处理的最佳实践
*/
public class TokenStreamBestPractices {
// 实践1:分层异常处理
public static class LayeredExceptionHandling {
public void processStreamWithLayers() {
try {
// 外层:连接和初始化异常
TokenStream<String> stream = createTokenStream();
stream.subscribe(new Subscriber<String>() {
@Override
public void onSubscribe(Subscription s) {
// 中层:订阅和请求异常
try {
s.request(1);
} catch (IllegalArgumentException e) {
handleRequestError(e);
}
}
@Override
public void onNext(String token) {
// 内层:数据处理异常
try {
processToken(token);
} catch (ProcessingException e) {
handleTokenError(e, token);
}
}
@Override
public void onError(Throwable t) {
// 错误处理层
handleStreamError(t);
}
@Override
public void onComplete() {
handleCompletion();
}
});
} catch (StreamCreationException e) {
handleCreationError(e);
}
}
}
// 实践2:优雅降级
public static class GracefulDegradation {
private final TokenStream<String> primaryStream;
private final TokenStream<String> fallbackStream;
private final DegradationPolicy degradationPolicy;
public void startStreamWithFallback(Subscriber<String> subscriber) {
AtomicBoolean fallbackActive = new AtomicBoolean(false);
Subscriber<String> wrappedSubscriber = new Subscriber<String>() {
@Override
public void onSubscribe(Subscription s) {
subscriber.onSubscribe(s);
}
@Override
public void onNext(String token) {
subscriber.onNext(token);
}
@Override
public void onError(Throwable t) {
if (!fallbackActive.get() &&
degradationPolicy.shouldUseFallback(t)) {
fallbackActive.set(true);
logger.warn("Switching to fallback stream", t);
// 切换到降级流
fallbackStream.subscribe(this);
} else {
subscriber.onError(t);
}
}
@Override
public void onComplete() {
subscriber.onComplete();
}
};
primaryStream.subscribe(wrappedSubscriber);
}
}
// 实践3:监控和指标
public static class MonitoringAndMetrics {
private final MeterRegistry meterRegistry;
private final Tracer tracer;
public TokenStream<String> createMonitoredStream(
TokenStream<String> delegate) {
return new TokenStream<String>() {
private final Timer streamTimer = Timer.builder("token.stream.duration")
.register(meterRegistry);
private final Counter tokenCounter = Counter.builder("token.count")
.register(meterRegistry);
private final Counter errorCounter = Counter.builder("token.stream.errors")
.tag("type", "stream")
.register(meterRegistry);
@Override
public void subscribe(Subscriber<String> subscriber) {
Timer.Sample sample = Timer.start();
delegate.subscribe(new Subscriber<String>() {
private Span span;
@Override
public void onSubscribe(Subscription s) {
// 开始追踪
span = tracer.buildSpan("token.stream")
.startSpan();
subscriber.onSubscribe(s);
}
@Override
public void onNext(String token) {
tokenCounter.increment();
// 记录token大小
DistributionSummary.builder("token.size")
.register(meterRegistry)
.record(token.length());
subscriber.onNext(token);
}
@Override
public void onError(Throwable t) {
errorCounter.increment();
// 记录错误标签
span.setTag("error", true);
span.log(Map.of(
"error.message", t.getMessage(),
"error.type", t.getClass().getName()
));
span.finish();
sample.stop(streamTimer);
subscriber.onError(t);
}
@Override
public void onComplete() {
span.finish();
sample.stop(streamTimer);
subscriber.onComplete();
}
});
}
};
}
}
// 实践4:资源清理
public static class ResourceCleanupPattern implements AutoCloseable {
private final List<AutoCloseable> resources = new ArrayList<>();
private final ScheduledExecutorService cleanupExecutor;
private final AtomicBoolean closed = new AtomicBoolean(false);
public ResourceCleanupPattern() {
this.cleanupExecutor = Executors.newSingleThreadScheduledExecutor(
r -> new Thread(r, "stream-cleanup")
);
// 定期检查泄漏
cleanupExecutor.scheduleAtFixedRate(
this::checkForLeaks,
5, 60, TimeUnit.SECONDS);
// 添加关闭钩子
Runtime.getRuntime().addShutdownHook(new Thread(this::safeClose));
}
public <T extends AutoCloseable> T trackResource(T resource) {
resources.add(resource);
return resource;
}
private void checkForLeaks() {
// 检查未关闭的资源
// 实际实现需要更复杂的检测逻辑
logger.debug("Active resources: {}", resources.size());
}
@Override
public void close() {
safeClose();
}
private void safeClose() {
if (!closed.compareAndSet(false, true)) {
return; // 已经关闭
}
// 关闭清理执行器
cleanupExecutor.shutdown();
// 逆序关闭资源(依赖关系)
ListIterator<AutoCloseable> iterator =
resources.listIterator(resources.size());
while (iterator.hasPrevious()) {
AutoCloseable resource = iterator.previous();
try {
resource.close();
} catch (Exception e) {
logger.warn("Failed to close resource", e);
}
}
resources.clear();
}
}
}
4.2 面试要点总结
Q1: 什么是 TokenStream?
A: TokenStream 是处理大型语言模型流式响应的核心抽象,具有以下特点:
Q2: 如何处理流式响应中的异常?
A: 采用分层异常处理策略:
异常分类:
// 网络异常:可恢复,需要重试策略
// 业务异常:可能需要降级处理
// 中断异常:需要清理资源并通知用户
处理策略:
- 重试机制:指数退避 + 最大重试次数
- 熔断器模式:防止级联故障
- 优雅降级:主服务失败时切换备用服务
- 错误隔离:防止单个流异常影响整体系统
实现要点:
// 1. 使用专门的状态对象跟踪流状态
// 2. 为不同异常类型注册处理器
// 3. 实现恢复策略(重试、恢复、重启)
// 4. 确保资源正确清理
Q3: 如何处理中断?
A: 中断处理的关键原则:
处理流程:
public void handleInterruption() {
// 1. 设置中断标志
volatile boolean interrupted = false;
// 2. 定期检查中断
while (!interrupted) {
if (Thread.currentThread().isInterrupted()) {
interrupted = true;
break;
}
// 处理逻辑…
}
// 3. 清理资源
cleanupResources();
// 4. 保存状态(支持恢复)
saveCheckpoint();
// 5. 通知相关方
notifyInterruption();
}
Q4: 如何设计高性能的 TokenStream?
A: 性能优化策略:
// 性能优化示例
public class OptimizedTokenStream {
// 1. 使用对象池减少GC压力
private final ObjectPool<Token> tokenPool;
// 2. 批处理提高效率
private final BatchingProcessor batchProcessor;
// 3. 使用零拷贝技术
private final DirectBufferAllocator bufferAllocator;
// 4. 异步IO提高并发
private final AsyncHttpClient httpClient;
}
通过这样的设计,TokenStream 能够在大规模生产环境中稳定运行,提供可靠的流式响应处理能力。
网硕互联帮助中心



评论前必须登录!
注册