云计算百科
云计算领域专业知识百科平台

高级java每日一道面试题-2025年6月26日-基础篇[LangChain4j]-什么是 TokenStream?如何正确处理流式响应中的异常和中断?

TokenStream 与流式响应异常处理详解

一、TokenStream 深度解析

1.1 TokenStream 的概念与设计

/**
* TokenStream 核心概念解析
*
* 在 LLM 流式响应中,TokenStream 代表一个**字符序列的异步流**
* 类似 Java 的 Stream API,但专为 AI 响应设计
*/

public class TokenStreamConcept {

// TokenStream 的核心特性
public interface TokenStreamCharacteristics {
/**
* 1. 异步性:非阻塞的数据流
* 2. 增量性:逐步生成和消费
* 3. 连续性:保持语义连贯
* 4. 可中断性:允许中途取消
* 5. 错误传播:支持异常传递
*/

}

// TokenStream 的基本结构
public static class TokenStream<T> {
private final Publisher<T> source;
private final TokenProcessor<T> processor;
private final FlowControl flowControl;

public TokenStream(Publisher<T> source) {
this.source = source;
this.processor = new DefaultTokenProcessor<>();
this.flowControl = new RateBasedFlowControl();
}

// 流的核心操作
public void subscribe(Subscriber<T> subscriber) {
source.subscribe(new SafeSubscriber<>(subscriber, processor));
}
}

// Token 的定义
public static class Token {
private final String content;
private final int index;
private final boolean isComplete;
private final TokenType type;
private final Map<String, Object> metadata;

public enum TokenType {
TEXT, // 文本token
FUNCTION_CALL, // 函数调用
TOOL_RESULT, // 工具结果
CONTROL, // 控制token(开始、结束、暂停)
ERROR // 错误token
}
}
}

1.2 TokenStream 的实现模式

/**
* TokenStream 的多种实现模式
*/

public class TokenStreamImplementations {

// 模式1:响应式流(Reactive Streams)
public static class ReactiveTokenStream<T> implements Publisher<T> {
private final List<Subscriber<? super T>> subscribers =
new CopyOnWriteArrayList<>();
private volatile boolean cancelled = false;

@Override
public void subscribe(Subscriber<? super T> subscriber) {
if (cancelled) {
subscriber.onError(new IllegalStateException("Stream cancelled"));
return;
}

subscribers.add(subscriber);
subscriber.onSubscribe(new TokenSubscription(subscriber));
}

private class TokenSubscription implements Subscription {
private final Subscriber<? super T> subscriber;
private volatile boolean active = true;

TokenSubscription(Subscriber<? super T> subscriber) {
this.subscriber = subscriber;
}

@Override
public void request(long n) {
if (!active || cancelled) return;

// 模拟生成token
for (long i = 0; i < n && active; i++) {
T token = generateNextToken();
if (token != null) {
subscriber.onNext(token);
} else {
subscriber.onComplete();
break;
}
}
}

@Override
public void cancel() {
active = false;
subscribers.remove(subscriber);
}
}
}

// 模式2:回调式流(Callback-based)
public static class CallbackTokenStream<T> {
private final TokenGenerator<T> generator;
private final ExecutorService executor;
private final List<TokenCallback<T>> callbacks = new ArrayList<>();
private volatile boolean isStreaming = false;

@FunctionalInterface
public interface TokenCallback<T> {
void onToken(T token);
default void onError(Throwable error) {}
default void onComplete() {}
}

public void startStreaming(TokenCallback<T> callback) {
if (isStreaming) {
throw new IllegalStateException("Stream already in progress");
}

isStreaming = true;
callbacks.add(callback);

executor.submit(() -> {
try {
while (isStreaming) {
T token = generator.generate();
if (token == null) {
complete();
break;
}

// 同步回调通知
synchronized (callbacks) {
for (TokenCallback<T> cb : callbacks) {
cb.onToken(token);
}
}
}
} catch (Exception e) {
error(e);
}
});
}

public void stopStreaming() {
isStreaming = false;
executor.shutdown();
}
}

// 模式3:迭代器式流(Iterator-based)
public static class IteratorTokenStream<T> implements Iterator<T>, AutoCloseable {
private final TokenSource<T> source;
private T nextToken;
private boolean hasNext = true;
private volatile boolean closed = false;

public IteratorTokenStream(TokenSource<T> source) {
this.source = source;
fetchNext();
}

private void fetchNext() {
if (closed) {
hasNext = false;
return;
}

try {
nextToken = source.next();
hasNext = (nextToken != null);
} catch (Exception e) {
hasNext = false;
throw new TokenStreamException("Failed to fetch next token", e);
}
}

@Override
public boolean hasNext() {
return hasNext && !closed;
}

@Override
public T next() {
if (!hasNext()) {
throw new NoSuchElementException();
}

T current = nextToken;
fetchNext();
return current;
}

@Override
public void close() {
closed = true;
source.close();
}
}
}

二、流式响应异常处理架构

2.1 异常分类与处理策略

/**
* 流式响应异常分类体系
*/

public class StreamExceptionHierarchy {

// 异常基类
public abstract static class StreamException extends RuntimeException {
private final ErrorCategory category;
private final Instant timestamp;
private final String streamId;

public StreamException(String message, ErrorCategory category,
String streamId) {
super(message);
this.category = category;
this.timestamp = Instant.now();
this.streamId = streamId;
}

public abstract boolean isRecoverable();
public abstract RecoveryStrategy getRecoveryStrategy();
}

// 错误分类
public enum ErrorCategory {
NETWORK, // 网络错误
RATE_LIMIT, // 速率限制
TIMEOUT, // 超时
MODEL, // 模型错误
CLIENT, // 客户端错误
SERVER, // 服务器错误
VALIDATION, // 验证错误
INTERRUPTION // 中断错误
}

// 恢复策略
public enum RecoveryStrategy {
RETRY, // 重试
RESUME, // 恢复
RESTART, // 重启
FALLBACK, // 降级
FAIL_FAST, // 快速失败
CONTINUE // 继续
}

// 具体异常类型
public static class NetworkException extends StreamException {
private final String endpoint;
private final int statusCode;

public NetworkException(String endpoint, int statusCode,
String streamId) {
super(String.format("Network error connecting to %s (status: %d)",
endpoint, statusCode),
ErrorCategory.NETWORK, streamId);
this.endpoint = endpoint;
this.statusCode = statusCode;
}

@Override
public boolean isRecoverable() {
// 5xx错误可能恢复,4xx错误不可恢复
return statusCode >= 500 && statusCode < 600;
}

@Override
public RecoveryStrategy getRecoveryStrategy() {
if (statusCode == 429) { // 速率限制
return RecoveryStrategy.RETRY;
} else if (statusCode >= 500) {
return RecoveryStrategy.RESUME;
} else {
return RecoveryStrategy.FAIL_FAST;
}
}
}

public static class RateLimitException extends StreamException {
private final Duration retryAfter;
private final int limit;
private final int remaining;

public RateLimitException(Duration retryAfter, int limit,
int remaining, String streamId) {
super(String.format("Rate limit exceeded. Limit: %d, Remaining: %d",
limit, remaining),
ErrorCategory.RATE_LIMIT, streamId);
this.retryAfter = retryAfter;
this.limit = limit;
this.remaining = remaining;
}

@Override
public boolean isRecoverable() {
return true; // 速率限制总是可恢复的
}

@Override
public RecoveryStrategy getRecoveryStrategy() {
return RecoveryStrategy.RETRY;
}

public Duration getRetryAfter() {
return retryAfter;
}
}

public static class StreamInterruptedException extends StreamException {
private final InterruptionSource source;
private final boolean userInitiated;

public enum InterruptionSource {
USER_CANCEL, // 用户取消
TIMEOUT, // 超时
RESOURCE_LIMIT, // 资源限制
SYSTEM_SHUTDOWN, // 系统关闭
CONNECTION_LOST // 连接丢失
}

public StreamInterruptedException(InterruptionSource source,
boolean userInitiated,
String streamId) {
super(String.format("Stream interrupted by %s", source),
ErrorCategory.INTERRUPTION, streamId);
this.source = source;
this.userInitiated = userInitiated;
}

@Override
public boolean isRecoverable() {
return source == InterruptionSource.CONNECTION_LOST;
}

@Override
public RecoveryStrategy getRecoveryStrategy() {
switch (source) {
case USER_CANCEL:
return RecoveryStrategy.FAIL_FAST;
case TIMEOUT:
return RecoveryStrategy.RESUME;
case CONNECTION_LOST:
return RecoveryStrategy.RESTART;
default:
return RecoveryStrategy.FAIL_FAST;
}
}
}
}

2.2 异常处理框架实现

/**
* 完整的异常处理框架
*/

public class StreamExceptionHandler {

// 异常处理器接口
@FunctionalInterface
public interface ExceptionHandler<T extends StreamException> {
HandlingResult handle(T exception, StreamContext context);
}

// 处理结果
public static class HandlingResult {
private final boolean shouldContinue;
private final RecoveryAction recoveryAction;
private final String message;

public enum RecoveryAction {
NONE,
RETRY_IMMEDIATELY,
RETRY_AFTER_DELAY,
SWITCH_ENDPOINT,
USE_FALLBACK,
RECONNECT_STREAM
}

public HandlingResult(boolean shouldContinue,
RecoveryAction recoveryAction,
String message) {
this.shouldContinue = shouldContinue;
this.recoveryAction = recoveryAction;
this.message = message;
}
}

// 流上下文
public static class StreamContext {
private final String streamId;
private final Instant startTime;
private volatile Instant lastTokenTime;
private final AtomicInteger tokenCount;
private final Map<String, Object> metadata;

public StreamContext(String streamId) {
this.streamId = streamId;
this.startTime = Instant.now();
this.lastTokenTime = startTime;
this.tokenCount = new AtomicInteger(0);
this.metadata = new ConcurrentHashMap<>();
}

public void recordToken() {
tokenCount.incrementAndGet();
lastTokenTime = Instant.now();
}
}

// 异常处理器注册中心
public static class ExceptionHandlerRegistry {
private final Map<Class<? extends StreamException>,
ExceptionHandler<?>>
handlers = new ConcurrentHashMap<>();
private final ExceptionHandler<StreamException> defaultHandler;

public ExceptionHandlerRegistry() {
this.defaultHandler = (ex, ctx) ->
new HandlingResult(false,
HandlingResult.RecoveryAction.NONE,
"Unhandled exception: " + ex.getMessage());
}

public <T extends StreamException> void registerHandler(
Class<T> exceptionType,
ExceptionHandler<T> handler) {
handlers.put(exceptionType, handler);
}

@SuppressWarnings("unchecked")
public HandlingResult handle(StreamException exception,
StreamContext context) {
ExceptionHandler<StreamException> handler =
(ExceptionHandler<StreamException>)
handlers.get(exception.getClass());

if (handler == null) {
// 查找父类处理器
for (Map.Entry<Class<? extends StreamException>,
ExceptionHandler<?>>
entry : handlers.entrySet()) {
if (entry.getKey().isAssignableFrom(exception.getClass())) {
handler = (ExceptionHandler<StreamException>) entry.getValue();
break;
}
}
}

if (handler == null) {
handler = defaultHandler;
}

return handler.handle(exception, context);
}
}

// 具体处理器实现
public static class NetworkExceptionHandler
implements ExceptionHandler<NetworkException> {

private final RetryPolicy retryPolicy;
private final CircuitBreaker circuitBreaker;
private final EndpointManager endpointManager;

public NetworkExceptionHandler(RetryPolicy retryPolicy,
CircuitBreaker circuitBreaker,
EndpointManager endpointManager) {
this.retryPolicy = retryPolicy;
this.circuitBreaker = circuitBreaker;
this.endpointManager = endpointManager;
}

@Override
public HandlingResult handle(NetworkException exception,
StreamContext context) {

if (!exception.isRecoverable()) {
circuitBreaker.recordFailure(exception.getEndpoint());
return new HandlingResult(false,
HandlingResult.RecoveryAction.NONE,
"Non-recoverable network error");
}

// 检查是否应该重试
if (retryPolicy.shouldRetry(exception, context)) {
Duration delay = retryPolicy.getNextRetryDelay();

// 检查是否应该切换端点
if (endpointManager.shouldSwitchEndpoint(
exception.getEndpoint(), exception.getStatusCode())) {

String newEndpoint = endpointManager.getNextEndpoint();
return new HandlingResult(true,
HandlingResult.RecoveryAction.SWITCH_ENDPOINT,
String.format("Switching to endpoint: %s after delay: %s",
newEndpoint, delay));
}

return new HandlingResult(true,
HandlingResult.RecoveryAction.RETRY_AFTER_DELAY,
String.format("Will retry after %s", delay));
}

return new HandlingResult(false,
HandlingResult.RecoveryAction.USE_FALLBACK,
"Retry limit exceeded, using fallback");
}
}

public static class RateLimitExceptionHandler
implements ExceptionHandler<RateLimitException> {

@Override
public HandlingResult handle(RateLimitException exception,
StreamContext context) {

Duration retryAfter = exception.getRetryAfter();
if (retryAfter == null) {
// 使用指数退避
retryAfter = calculateExponentialBackoff(context);
}

return new HandlingResult(true,
HandlingResult.RecoveryAction.RETRY_AFTER_DELAY,
String.format("Rate limited, retrying after %s", retryAfter));
}

private Duration calculateExponentialBackoff(StreamContext context) {
// 简单的指数退避算法
int attemptCount = getAttemptCount(context);
long delayMillis = Math.min(
1000L * (long) Math.pow(2, attemptCount),
60000L // 最大1分钟
);
return Duration.ofMillis(delayMillis);
}

private int getAttemptCount(StreamContext context) {
// 从上下文获取尝试次数
return (int) context.getMetadata()
.getOrDefault("rateLimitAttempts", 0);
}
}

public static class InterruptionHandler
implements ExceptionHandler<StreamInterruptedException> {

private final StreamRecoveryManager recoveryManager;
private final boolean allowResume;

public InterruptionHandler(StreamRecoveryManager recoveryManager,
boolean allowResume) {
this.recoveryManager = recoveryManager;
this.allowResume = allowResume;
}

@Override
public HandlingResult handle(StreamInterruptedException exception,
StreamContext context) {

if (!exception.isRecoverable()) {
return new HandlingResult(false,
HandlingResult.RecoveryAction.NONE,
"Non-recoverable interruption");
}

if (allowResume && recoveryManager.canResume(context)) {
// 尝试恢复流
boolean resumed = recoveryManager.resumeStream(context);
if (resumed) {
return new HandlingResult(true,
HandlingResult.RecoveryAction.RECONNECT_STREAM,
"Stream resumed successfully");
}
}

// 如果无法恢复,尝试重启
boolean restarted = recoveryManager.restartStream(context);
if (restarted) {
return new HandlingResult(true,
HandlingResult.RecoveryAction.RESTART,
"Stream restarted");
}

return new HandlingResult(false,
HandlingResult.RecoveryAction.USE_FALLBACK,
"Unable to recover stream, using fallback");
}
}
}

三、流式响应完整实现

3.1 安全的 TokenStream 实现

/**
* 带完整异常处理的 TokenStream 实现
*/

public class SafeTokenStream<T> implements Publisher<T>, AutoCloseable {

private final TokenSource<T> tokenSource;
private final ExecutorService executor;
private final ExceptionHandlerRegistry exceptionHandler;
private final StreamRecoveryManager recoveryManager;
private final CircuitBreaker circuitBreaker;
private final RateLimiter rateLimiter;

private final List<Subscriber<? super T>> subscribers =
new CopyOnWriteArrayList<>();
private final Map<Subscriber<? super T>, StreamContext> contexts =
new ConcurrentHashMap<>();

private volatile boolean isClosed = false;
private final AtomicInteger activeStreams = new AtomicInteger(0);

public SafeTokenStream(TokenSource<T> tokenSource,
ExecutorService executor,
ExceptionHandlerRegistry exceptionHandler,
StreamRecoveryManager recoveryManager) {
this.tokenSource = tokenSource;
this.executor = executor;
this.exceptionHandler = exceptionHandler;
this.recoveryManager = recoveryManager;
this.circuitBreaker = new CircuitBreaker(5, Duration.ofMinutes(1));
this.rateLimiter = RateLimiter.create(100.0); // 每秒100个token
}

@Override
public void subscribe(Subscriber<? super T> subscriber) {
if (isClosed) {
subscriber.onError(new IllegalStateException("Stream is closed"));
return;
}

if (subscribers.contains(subscriber)) {
subscriber.onError(new IllegalStateException("Already subscribed"));
return;
}

subscribers.add(subscriber);
StreamContext context = new StreamContext(generateStreamId());
contexts.put(subscriber, context);

subscriber.onSubscribe(new SafeSubscription(subscriber, context));

// 异步启动流处理
executor.submit(() -> processStream(subscriber, context));
}

private class SafeSubscription implements Subscription {
private final Subscriber<? super T> subscriber;
private final StreamContext context;
private volatile boolean cancelled = false;
private final AtomicLong requested = new AtomicLong(0);

public SafeSubscription(Subscriber<? super T> subscriber,
StreamContext context) {
this.subscriber = subscriber;
this.context = context;
}

@Override
public void request(long n) {
if (n <= 0) {
subscriber.onError(new IllegalArgumentException(
"Request must be positive"));
return;
}

requested.addAndGet(n);
synchronized (this) {
notify(); // 唤醒等待的流处理线程
}
}

@Override
public void cancel() {
cancelled = true;
cleanup();
}

public boolean isCancelled() {
return cancelled;
}

public long getRequested() {
return requested.get();
}

public void consumeRequest() {
requested.decrementAndGet();
}
}

private void processStream(Subscriber<? super T> subscriber,
StreamContext context) {

activeStreams.incrementAndGet();

try {
while (!isClosed && !getSubscription(subscriber).isCancelled()) {

// 检查是否有请求的token
SafeSubscription subscription = getSubscription(subscriber);
if (subscription.getRequested() <= 0) {
synchronized (subscription) {
try {
subscription.wait(1000); // 等待请求
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
continue;
}

// 限流控制
rateLimiter.acquire();

// 断路器检查
if (!circuitBreaker.allowRequest()) {
handleException(new CircuitBreakerOpenException(
"Circuit breaker is open", context.getStreamId()),
subscriber, context);
break;
}

try {
// 生成下一个token
T token = tokenSource.nextToken();

if (token == null) {
// 流结束
subscriber.onComplete();
break;
}

// 发送token
subscriber.onNext(token);
subscription.consumeRequest();
context.recordToken();

} catch (StreamInterruptedException e) {
// 处理中断
HandlingResult result = exceptionHandler.handle(e, context);

if (!result.shouldContinue()) {
handleException(e, subscriber, context);
break;
}

// 根据恢复策略处理
handleRecovery(result, subscriber, context);

} catch (StreamException e) {
// 处理其他流异常
HandlingResult result = exceptionHandler.handle(e, context);

if (!result.shouldContinue()) {
handleException(e, subscriber, context);
break;
}

handleRecovery(result, subscriber, context);

} catch (Exception e) {
// 处理非流异常
StreamException wrapped = new GenericStreamException(
"Unexpected error", e, context.getStreamId());
handleException(wrapped, subscriber, context);
break;
}
}
} finally {
cleanupSubscriber(subscriber);
activeStreams.decrementAndGet();
}
}

private void handleException(StreamException exception,
Subscriber<? super T> subscriber,
StreamContext context) {

// 记录异常
context.getMetadata().put("lastError", exception);

// 通知订阅者
subscriber.onError(exception);

// 清理资源
cleanupSubscriber(subscriber);
}

private void handleRecovery(HandlingResult result,
Subscriber<? super T> subscriber,
StreamContext context) {

switch (result.getRecoveryAction()) {
case RETRY_IMMEDIATELY:
// 立即重试,无需延迟
break;

case RETRY_AFTER_DELAY:
// 延迟后重试
try {
Thread.sleep(extractDelay(result));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
break;

case SWITCH_ENDPOINT:
// 切换端点
tokenSource.switchEndpoint();
break;

case RECONNECT_STREAM:
// 重新连接流
recoveryManager.reconnect(context);
break;

case USE_FALLBACK:
// 使用降级策略
useFallback(subscriber, context);
break;

case RESTART:
// 重启流
restartStream(subscriber, context);
break;
}
}

private void useFallback(Subscriber<? super T> subscriber,
StreamContext context) {
// 实现降级逻辑
try {
T fallbackToken = getFallbackToken(context);
if (fallbackToken != null) {
subscriber.onNext(fallbackToken);
}
subscriber.onComplete();
} catch (Exception e) {
subscriber.onError(e);
}
}

private void restartStream(Subscriber<? super T> subscriber,
StreamContext context) {
// 清理当前订阅
cleanupSubscriber(subscriber);

// 创建新的订阅
StreamContext newContext = new StreamContext(
context.getStreamId() + "-restarted");
contexts.put(subscriber, newContext);

// 重新启动流处理
executor.submit(() -> processStream(subscriber, newContext));
}

private long extractDelay(HandlingResult result) {
// 从消息中提取延迟时间(简化实现)
String message = result.getMessage();
// 实际实现需要更复杂的解析逻辑
return 1000L; // 默认1秒
}

private SafeSubscription getSubscription(Subscriber<? super T> subscriber) {
return (SafeSubscription) subscriber;
}

private void cleanupSubscriber(Subscriber<? super T> subscriber) {
subscribers.remove(subscriber);
contexts.remove(subscriber);
}

@Override
public void close() {
isClosed = true;

// 通知所有订阅者
for (Subscriber<? super T> subscriber : subscribers) {
subscriber.onError(new StreamInterruptedException(
StreamInterruptedException.InterruptionSource.SYSTEM_SHUTDOWN,
false,
contexts.get(subscriber).getStreamId()));
}

// 清理资源
subscribers.clear();
contexts.clear();

// 等待活跃流结束
awaitTermination();

// 关闭执行器
executor.shutdown();
try {
if (!executor.awaitTermination(10, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}

private void awaitTermination() {
int maxWaitTime = 30; // 最多等待30秒
for (int i = 0; i < maxWaitTime && activeStreams.get() > 0; i++) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}

private String generateStreamId() {
return "stream-" + UUID.randomUUID().toString().substring(0, 8);
}

// 辅助类
private static class GenericStreamException extends StreamException {
public GenericStreamException(String message, Throwable cause,
String streamId) {
super(message, ErrorCategory.CLIENT, streamId);
initCause(cause);
}

@Override
public boolean isRecoverable() {
return false;
}

@Override
public RecoveryStrategy getRecoveryStrategy() {
return RecoveryStrategy.FAIL_FAST;
}
}

private static class CircuitBreakerOpenException extends StreamException {
public CircuitBreakerOpenException(String message, String streamId) {
super(message, ErrorCategory.CLIENT, streamId);
}

@Override
public boolean isRecoverable() {
return true; // 断路器最终会关闭
}

@Override
public RecoveryStrategy getRecoveryStrategy() {
return RecoveryStrategy.RETRY_AFTER_DELAY;
}
}
}

3.2 高级特性:断点续传与状态恢复

/**
* 支持断点续传的 TokenStream
*/

public class ResumableTokenStream<T> extends SafeTokenStream<T> {

private final StreamStateStorage stateStorage;
private final CheckpointStrategy checkpointStrategy;

public ResumableTokenStream(TokenSource<T> tokenSource,
ExecutorService executor,
ExceptionHandlerRegistry exceptionHandler,
StreamRecoveryManager recoveryManager,
StreamStateStorage stateStorage,
CheckpointStrategy checkpointStrategy) {
super(tokenSource, executor, exceptionHandler, recoveryManager);
this.stateStorage = stateStorage;
this.checkpointStrategy = checkpointStrategy;
}

@Override
public void subscribe(Subscriber<? super T> subscriber) {
// 检查是否有保存的状态
StreamState savedState = stateStorage.loadState(
getStreamIdFromSubscriber(subscriber));

if (savedState != null && savedState.canResume()) {
// 恢复订阅
resumeFromState(subscriber, savedState);
} else {
// 新的订阅
super.subscribe(subscriber);
}
}

private void resumeFromState(Subscriber<? super T> subscriber,
StreamState state) {

StreamContext context = createContextFromState(state);

// 恢复token源到检查点位置
getTokenSource().restoreFromCheckpoint(state.getCheckpoint());

// 重新订阅
subscribers.add(subscriber);
contexts.put(subscriber, context);

subscriber.onSubscribe(new ResumableSubscription(
subscriber, context, state.getLastProcessedToken()));

// 继续处理
executor.submit(() -> processResumedStream(
subscriber, context, state));
}

private class ResumableSubscription extends SafeSubscription {
private final T lastProcessedToken;
private boolean hasDeliveredResumeToken = false;

public ResumableSubscription(Subscriber<? super T> subscriber,
StreamContext context,
T lastProcessedToken) {
super(subscriber, context);
this.lastProcessedToken = lastProcessedToken;
}

@Override
public void request(long n) {
if (!hasDeliveredResumeToken && lastProcessedToken != null) {
// 首先发送恢复标记
subscriber.onNext(createResumeToken());
hasDeliveredResumeToken = true;
n; // 减少一个请求计数
}

if (n > 0) {
super.request(n);
}
}

private T createResumeToken() {
// 创建特殊的恢复token
return (T) new ResumeToken<>(lastProcessedToken);
}
}

private void processResumedStream(Subscriber<? super T> subscriber,
StreamContext context,
StreamState state) {

// 跳过已经处理过的token
skipProcessedTokens(state.getProcessedCount());

// 继续正常处理
super.processStream(subscriber, context);
}

private void skipProcessedTokens(int count) {
for (int i = 0; i < count; i++) {
try {
getTokenSource().nextToken(); // 跳过
} catch (Exception e) {
// 记录但继续
logger.warn("Failed to skip token {}", i, e);
}
}
}

// 定期保存状态
private void createCheckpoint(StreamContext context,
T lastToken,
int processedCount) {

if (checkpointStrategy.shouldCheckpoint(context, lastToken)) {
StreamState state = new StreamState(
context.getStreamId(),
getTokenSource().createCheckpoint(),
lastToken,
processedCount,
Instant.now()
);

stateStorage.saveState(state);
}
}

// 流状态类
public static class StreamState {
private final String streamId;
private final Checkpoint checkpoint;
private final Object lastProcessedToken;
private final int processedCount;
private final Instant checkpointTime;

public StreamState(String streamId, Checkpoint checkpoint,
Object lastProcessedToken, int processedCount,
Instant checkpointTime) {
this.streamId = streamId;
this.checkpoint = checkpoint;
this.lastProcessedToken = lastProcessedToken;
this.processedCount = processedCount;
this.checkpointTime = checkpointTime;
}

public boolean canResume() {
return checkpoint != null &&
checkpointTime.isAfter(Instant.now().minus(Duration.ofHours(1)));
}
}

// 恢复token标记
public static class ResumeToken<T> {
private final T previousToken;
private final Instant resumeTime;

public ResumeToken(T previousToken) {
this.previousToken = previousToken;
this.resumeTime = Instant.now();
}
}
}

四、最佳实践与模式总结

4.1 异常处理最佳实践

/**
* TokenStream 异常处理的最佳实践
*/

public class TokenStreamBestPractices {

// 实践1:分层异常处理
public static class LayeredExceptionHandling {

public void processStreamWithLayers() {
try {
// 外层:连接和初始化异常
TokenStream<String> stream = createTokenStream();

stream.subscribe(new Subscriber<String>() {
@Override
public void onSubscribe(Subscription s) {
// 中层:订阅和请求异常
try {
s.request(1);
} catch (IllegalArgumentException e) {
handleRequestError(e);
}
}

@Override
public void onNext(String token) {
// 内层:数据处理异常
try {
processToken(token);
} catch (ProcessingException e) {
handleTokenError(e, token);
}
}

@Override
public void onError(Throwable t) {
// 错误处理层
handleStreamError(t);
}

@Override
public void onComplete() {
handleCompletion();
}
});

} catch (StreamCreationException e) {
handleCreationError(e);
}
}
}

// 实践2:优雅降级
public static class GracefulDegradation {

private final TokenStream<String> primaryStream;
private final TokenStream<String> fallbackStream;
private final DegradationPolicy degradationPolicy;

public void startStreamWithFallback(Subscriber<String> subscriber) {
AtomicBoolean fallbackActive = new AtomicBoolean(false);

Subscriber<String> wrappedSubscriber = new Subscriber<String>() {
@Override
public void onSubscribe(Subscription s) {
subscriber.onSubscribe(s);
}

@Override
public void onNext(String token) {
subscriber.onNext(token);
}

@Override
public void onError(Throwable t) {
if (!fallbackActive.get() &&
degradationPolicy.shouldUseFallback(t)) {

fallbackActive.set(true);
logger.warn("Switching to fallback stream", t);

// 切换到降级流
fallbackStream.subscribe(this);
} else {
subscriber.onError(t);
}
}

@Override
public void onComplete() {
subscriber.onComplete();
}
};

primaryStream.subscribe(wrappedSubscriber);
}
}

// 实践3:监控和指标
public static class MonitoringAndMetrics {

private final MeterRegistry meterRegistry;
private final Tracer tracer;

public TokenStream<String> createMonitoredStream(
TokenStream<String> delegate) {

return new TokenStream<String>() {
private final Timer streamTimer = Timer.builder("token.stream.duration")
.register(meterRegistry);

private final Counter tokenCounter = Counter.builder("token.count")
.register(meterRegistry);

private final Counter errorCounter = Counter.builder("token.stream.errors")
.tag("type", "stream")
.register(meterRegistry);

@Override
public void subscribe(Subscriber<String> subscriber) {
Timer.Sample sample = Timer.start();

delegate.subscribe(new Subscriber<String>() {
private Span span;

@Override
public void onSubscribe(Subscription s) {
// 开始追踪
span = tracer.buildSpan("token.stream")
.startSpan();

subscriber.onSubscribe(s);
}

@Override
public void onNext(String token) {
tokenCounter.increment();

// 记录token大小
DistributionSummary.builder("token.size")
.register(meterRegistry)
.record(token.length());

subscriber.onNext(token);
}

@Override
public void onError(Throwable t) {
errorCounter.increment();

// 记录错误标签
span.setTag("error", true);
span.log(Map.of(
"error.message", t.getMessage(),
"error.type", t.getClass().getName()
));

span.finish();
sample.stop(streamTimer);

subscriber.onError(t);
}

@Override
public void onComplete() {
span.finish();
sample.stop(streamTimer);

subscriber.onComplete();
}
});
}
};
}
}

// 实践4:资源清理
public static class ResourceCleanupPattern implements AutoCloseable {

private final List<AutoCloseable> resources = new ArrayList<>();
private final ScheduledExecutorService cleanupExecutor;
private final AtomicBoolean closed = new AtomicBoolean(false);

public ResourceCleanupPattern() {
this.cleanupExecutor = Executors.newSingleThreadScheduledExecutor(
r -> new Thread(r, "stream-cleanup")
);

// 定期检查泄漏
cleanupExecutor.scheduleAtFixedRate(
this::checkForLeaks,
5, 60, TimeUnit.SECONDS);

// 添加关闭钩子
Runtime.getRuntime().addShutdownHook(new Thread(this::safeClose));
}

public <T extends AutoCloseable> T trackResource(T resource) {
resources.add(resource);
return resource;
}

private void checkForLeaks() {
// 检查未关闭的资源
// 实际实现需要更复杂的检测逻辑
logger.debug("Active resources: {}", resources.size());
}

@Override
public void close() {
safeClose();
}

private void safeClose() {
if (!closed.compareAndSet(false, true)) {
return; // 已经关闭
}

// 关闭清理执行器
cleanupExecutor.shutdown();

// 逆序关闭资源(依赖关系)
ListIterator<AutoCloseable> iterator =
resources.listIterator(resources.size());

while (iterator.hasPrevious()) {
AutoCloseable resource = iterator.previous();
try {
resource.close();
} catch (Exception e) {
logger.warn("Failed to close resource", e);
}
}

resources.clear();
}
}
}

4.2 面试要点总结

Q1: 什么是 TokenStream?

A: TokenStream 是处理大型语言模型流式响应的核心抽象,具有以下特点:

  • 异步流式处理:支持非阻塞的增量数据生成
  • Token 序列化:将文本分解为语义单元进行传输
  • 背压支持:基于响应式流规范,支持流量控制
  • 错误传播:在流中传递异常而不中断整个系统
  • 可恢复性:支持从故障点恢复处理
  • Q2: 如何处理流式响应中的异常?

    A: 采用分层异常处理策略:

  • 异常分类:

    // 网络异常:可恢复,需要重试策略
    // 业务异常:可能需要降级处理
    // 中断异常:需要清理资源并通知用户

  • 处理策略:

    • 重试机制:指数退避 + 最大重试次数
    • 熔断器模式:防止级联故障
    • 优雅降级:主服务失败时切换备用服务
    • 错误隔离:防止单个流异常影响整体系统
  • 实现要点:

    // 1. 使用专门的状态对象跟踪流状态
    // 2. 为不同异常类型注册处理器
    // 3. 实现恢复策略(重试、恢复、重启)
    // 4. 确保资源正确清理

  • Q3: 如何处理中断?

    A: 中断处理的关键原则:

  • 快速响应:立即检测中断信号
  • 资源清理:释放所有占用的资源
  • 状态保存:支持断点续传
  • 用户通知:明确告知中断原因
  • 处理流程:

    public void handleInterruption() {
    // 1. 设置中断标志
    volatile boolean interrupted = false;

    // 2. 定期检查中断
    while (!interrupted) {
    if (Thread.currentThread().isInterrupted()) {
    interrupted = true;
    break;
    }
    // 处理逻辑…
    }

    // 3. 清理资源
    cleanupResources();

    // 4. 保存状态(支持恢复)
    saveCheckpoint();

    // 5. 通知相关方
    notifyInterruption();
    }

    Q4: 如何设计高性能的 TokenStream?

    A: 性能优化策略:

  • 缓冲策略:适当缓冲提高吞吐量
  • 并发控制:限制并发流数量
  • 内存管理:防止内存泄漏
  • 连接复用:重用 HTTP 连接
  • 监控指标:实时监控性能数据
  • // 性能优化示例
    public class OptimizedTokenStream {
    // 1. 使用对象池减少GC压力
    private final ObjectPool<Token> tokenPool;

    // 2. 批处理提高效率
    private final BatchingProcessor batchProcessor;

    // 3. 使用零拷贝技术
    private final DirectBufferAllocator bufferAllocator;

    // 4. 异步IO提高并发
    private final AsyncHttpClient httpClient;
    }

    通过这样的设计,TokenStream 能够在大规模生产环境中稳定运行,提供可靠的流式响应处理能力。

    赞(0)
    未经允许不得转载:网硕互联帮助中心 » 高级java每日一道面试题-2025年6月26日-基础篇[LangChain4j]-什么是 TokenStream?如何正确处理流式响应中的异常和中断?
    分享到: 更多 (0)

    评论 抢沙发

    评论前必须登录!