云计算百科
云计算领域专业知识百科平台

AI应用对话生成的安全性优化

流量保护

AI对话接口是重要功能成本最高,使用Redisson分布式限流来防止恶意攻击和资源滥用。

首先引入Redisson依赖:

<!– Spring Session + Redis –>
<dependency>
<groupId>org.springframework.session</groupId>
<artifactId>spring-session-data-redis</artifactId>
</dependency>
<!– Redisson –>
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.50.0</version>
</dependency>

配置application:

# redis
spring:
data:
redis:
host: localhost
port: 6379
ttl: 3600
database: 0
password:

编写Redisson配置:

@Configuration
public class RedissonConfig {
@Value("${spring.data.redis.host}")
private String redisHost;

@Value("${spring.data.redis.port}")
private int redisPort;

@Value("${spring.data.redis.password}")
private String redisPassword;

@Value("${spring.data.redis.database}")
private int redisDataBase;

@Bean
public RedissonClient redissonClient() {
Config config = new Config();
String address = "redis://" + redisHost + ":" + redisPort;
SingleServerConfig singleServerConfig = config.useSingleServer()
.setAddress(address)
.setDatabase(redisDataBase)
.setConnectionMinimumIdleSize(1)
.setConnectionPoolSize(10)
.setConnectTimeout(5000)
.setIdleConnectionTimeout(30000)
.setTimeout(3000)
.setRetryAttempts(3)
.setRetryDelay(new ConstantDelay(Duration.ofMillis(1500)));
if (redisPassword != null && !redisPassword.isEmpty()) {
singleServerConfig.setPassword(redisPassword);
}
return Redisson.create(config);
}
}

创建限流的枚举类:

public enum RateLimitType {
/**
* 接口级别限流
*/
API,

/**
* 用户级别限流
*/
USER,

/**
* IP级别限流
*/
IP
}

限流是个可以抽取出来的公共逻辑,因此我们可以使用AOP,创建注解的形式限流提供灵活的配置选项:

@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface RateLimit {
/**
* 限流key前缀
*/
String key() default "";

/**
* 每个时间窗口的请求速率(允许的请求数)
*/
int rate() default 10;

/**
* 时间窗口大小(单位:秒)
*/
int rateInterval() default 1;

/**
* 限流类型
*/
RateLimitType limitType() default RateLimitType.USER;

/**
* 限流提示信息
*/
String message() default "请求过于频繁,请稍后再试";
}

实现限流切面:

@Aspect
@Component
@Slf4j
public class RateLimitAspect {
@Resource
private RedissonClient redissonClient;

@Resource
private UserService userService;

@Before("@annotation(rateLimit)")
public void doBefore(JoinPoint point, RateLimit rateLimit){
String key = generateRateLimitKey(point, rateLimit);
// 使用Redisson的分布式限流器
RRateLimiter rateLimiter = redissonClient.getRateLimiter(key);
rateLimiter.expire(Duration.ofHours(1));
// 设置限流器参数,每个时间窗口允许的请求数和时间窗口
rateLimiter.trySetRate(RateType.OVERALL, rateLimit.rate(), rateLimit.rateInterval(), RateIntervalUnit.SECONDS);
// 尝试获取令牌,如果获取失败则限流
if (!rateLimiter.tryAcquire(1)) {
throw new BusinessException(ErrorCode.TOO_MANY_REQUEST_ERROR, rateLimit.message());
}
}

private String generateRateLimitKey(JoinPoint point, RateLimit rateLimit) {
StringBuilder keyBuilder = new StringBuilder();
keyBuilder.append("rate_limit:");
// 添加自定义前缀
if (!rateLimit.key().isEmpty()) {
keyBuilder.append(rateLimit.key()).append(":");
}
// 根据限流类型生成不同的key
switch (rateLimit.limitType()) {
case API:
// 接口级别:方法名
MethodSignature signature = (MethodSignature) point.getSignature();
Method method = signature.getMethod();
keyBuilder.append("api:").append(method.getDeclaringClass().getSimpleName())
.append(".").append(method.getName());
break;
case USER:
// 用户级别:用户ID
try {
ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
if (attributes != null) {
HttpServletRequest request = attributes.getRequest();
User loginUser = userService.getLoginUser(request);
keyBuilder.append("user:").append(loginUser.getId());
} else {
// 无法获取请求上下文,使用IP限流
keyBuilder.append("ip:").append(getClientIP());
}
} catch (BusinessException e) {
// 未登录用户使用IP限流
keyBuilder.append("ip:").append(getClientIP());
}
break;
case IP:
// IP级别:客户端IP
keyBuilder.append("ip:").append(getClientIP());
break;
default:
throw new BusinessException(ErrorCode.SYSTEM_ERROR, "不支持的限流类型");
}
return keyBuilder.toString();
}
private String getClientIP() {
ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
if (attributes == null) {
return "unknown";
}
HttpServletRequest request = attributes.getRequest();
String ip = request.getHeader("X-Forwarded-For");
if (ip == null || ip.isEmpty() || "unknown".equalsIgnoreCase(ip)) {
ip = request.getHeader("X-Real-IP");
}
if (ip == null || ip.isEmpty() || "unknown".equalsIgnoreCase(ip)) {
ip = request.getRemoteAddr();
}
// 处理多级代理的情况
if (ip != null && ip.contains(",")) {
ip = ip.split(",")[0].trim();
}
return ip != null ? ip : "unknown";
}

}

在AI对话接口中应用限流注解:

/**
* 应用聊天生成代码(流式 SSE)
*
* @param appId 应用 ID
* @param message 用户消息
* @param request 请求对象
* @return 生成结果流
*/
@GetMapping(value = "/chat/gen/code", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
@RateLimit(rate = 5, rateInterval = 60, limitType = RateLimitType.USER, message = "AI 对话请求过于频繁,请稍后再试")
public Flux<ServerSentEvent<String>> chatToGenCode(@RequestParam Long appId,
@RequestParam String message,
HttpServletRequest request) {

优化 SSE 错误处理

在测试过程中,我们发现了一个问题:当限流触发时,前端无法正确显示后端返回的错误信息。这是因为限流异常在进入 SSE 接口之前就被抛出了,没有通过流式返回,需要特殊处理。思路是将限流异常消息也作为 SSE 返回给前端。

SSE 是一种服务器向客户端推送数据的技术,它基于 HTTP 协议,使用 text/event-stream 媒体类型。SSE 消息的标准格式是:

event: 事件类型
data: 数据内容

优化全局异常处理器,来实现能够正确处理SSE请求的异常:

@Hidden
@RestControllerAdvice
@Slf4j
public class GlobalExceptionHandler {

@ExceptionHandler(BusinessException.class)
public BaseResponse<?> businessExceptionHandler(BusinessException e) {
log.error("BusinessException", e);
// 尝试处理 SSE 请求
if (handleSseError(e.getCode(), e.getMessage())) {
return null;
}
// 对于普通请求,返回标准 JSON 响应
return ResultUtils.error(e.getCode(), e.getMessage());
}

@ExceptionHandler(RuntimeException.class)
public BaseResponse<?> runtimeExceptionHandler(RuntimeException e) {
log.error("RuntimeException", e);
// 尝试处理 SSE 请求
if (handleSseError(ErrorCode.SYSTEM_ERROR.getCode(), "系统错误")) {
return null;
}
return ResultUtils.error(ErrorCode.SYSTEM_ERROR, "系统错误");
}

/**
* 处理SSE请求的错误响应
*
* @param errorCode 错误码
* @param errorMessage 错误信息
* @return true表示是SSE请求并已处理,false表示不是SSE请求
*/
private boolean handleSseError(int errorCode, String errorMessage) {
ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
if (attributes == null) {
return false;
}
HttpServletRequest request = attributes.getRequest();
HttpServletResponse response = attributes.getResponse();
// 判断是否是SSE请求(通过Accept头或URL路径)
String accept = request.getHeader("Accept");
String uri = request.getRequestURI();
if ((accept != null && accept.contains("text/event-stream")) ||
uri.contains("/chat/gen/code")) {
try {
// 设置SSE响应头
response.setContentType("text/event-stream");
response.setCharacterEncoding("UTF-8");
response.setHeader("Cache-Control", "no-cache");
response.setHeader("Connection", "keep-alive");
// 构造错误消息的SSE格式
Map<String, Object> errorData = Map.of(
"error", true,
"code", errorCode,
"message", errorMessage
);
String errorJson = JSONUtil.toJsonStr(errorData);
// 发送业务错误事件(避免与标准error事件冲突)
String sseData = "event: business-error\\ndata: " + errorJson + "\\n\\n";
response.getWriter().write(sseData);
response.getWriter().flush();
// 发送结束事件
response.getWriter().write("event: done\\ndata: {}\\n\\n");
response.getWriter().flush();
// 表示已处理SSE请求
return true;
} catch (IOException ioException) {
log.error("Failed to write SSE error response", ioException);
// 即使写入失败,也表示这是SSE请求
return true;
}
}
return false;
}
}

同样,前端也需要添加对自定义错误事件的处理:

// 处理business-error事件(后端限流等错误)
eventSource.addEventListener('business-error', function (event: MessageEvent) {
if (streamCompleted) return

try {
const errorData = JSON.parse(event.data)
console.error('SSE业务错误事件:', errorData)

// 显示具体的错误信息
const errorMessage = errorData.message || '生成过程中出现错误'
messages.value[aiMessageIndex].content = `❌ ${errorMessage}`
messages.value[aiMessageIndex].loading = false
message.error(errorMessage)

streamCompleted = true
isGenerating.value = false
eventSource?.close()
} catch (parseError) {
console.error('解析错误事件失败:', parseError, '原始数据:', event.data)
handleError(new Error('服务器返回错误'), aiMessageIndex)
}
})

测试效果如图,一分钟内快速点击生成应用,超过5次触发限流,符合预期:

Prompt安全审查(Guardrails护轨机制)

除了流量保护,我们还需要防范恶意输入和 Prompt 注入攻击的防护的策略。

比如在将 prompt 交给 AI执行之前,会进行一次审核:

基于LangChain4j的护轨功能,我们可以轻松实现调用 AI 前的审查能力。

什么是护轨 Guardrails?

护轨是 AI 应用中的安全机制,类似于道路上的护栏,用于防止恶意的 Prompt 输入、防止 AI模型产生不当或有害的内容。

其实我们把它理解为拦截器就好了,护轨分为两种:

1、输入护轨(Input Guardrails):在用户输入传递给 AI 模型之前进行检查和过滤

2、输出护轨(Output Guardrails):在 AI模型生成内容后进行检查和过滤

除了输入 Prompt 和 AI 输出结果的安全校验外,你还可以利用护轨进行权限校验、日志记录等。下面我们来利用输入护轨实现 Prompt 安全审核,防止一些非法 Prompt,比如:

1、拒绝过长的 Prompt

2、拒绝包含敏感词的 Prompt

3、拒绝包含注入攻击的 Prompt

开发实现护轨

这里实现基础检测,实际生产环境中可能需要更复杂的检测逻辑,包括使用 AI 模型、或者一些平台专门安全服务的敏感词内容检测审核服务来检测更复杂的攻击模式。

public class PromptSafetyInputGuardrail implements InputGuardrail {

// 敏感词列表
private static final List<String> SENSITIVE_WORDS = Arrays.asList(
"忽略之前的指令", "ignore previous instructions", "ignore above",
"破解", "hack", "绕过", "bypass", "越狱", "jailbreak"
);

// 注入攻击模式
private static final List<Pattern> INJECTION_PATTERNS = Arrays.asList(
Pattern.compile("(?i)ignore\\\\s+(?:previous|above|all)\\\\s+(?:instructions?|commands?|prompts?)"),
Pattern.compile("(?i)(?:forget|disregard)\\\\s+(?:everything|all)\\\\s+(?:above|before)"),
Pattern.compile("(?i)(?:pretend|act|behave)\\\\s+(?:as|like)\\\\s+(?:if|you\\\\s+are)"),
Pattern.compile("(?i)system\\\\s*:\\\\s*you\\\\s+are"),
Pattern.compile("(?i)new\\\\s+(?:instructions?|commands?|prompts?)\\\\s*:")
);

@Override
public InputGuardrailResult validate(UserMessage userMessage) {
String input = userMessage.singleText();
// 检查输入长度
if (input.length() > 1000) {
return fatal("输入内容过长,不要超过 1000 字");
}
// 检查是否为空
if (input.trim().isEmpty()) {
return fatal("输入内容不能为空");
}
// 检查敏感词
String lowerInput = input.toLowerCase();
for (String sensitiveWord : SENSITIVE_WORDS) {
if (lowerInput.contains(sensitiveWord.toLowerCase())) {
return fatal("输入包含不当内容,请修改后重试");
}
}
// 检查注入攻击模式
for (Pattern pattern : INJECTION_PATTERNS) {
if (pattern.matcher(input).find()) {
return fatal("检测到恶意输入,请求被拒绝");
}
}
return success();
}
}

集成护轨机制到 AI 服务工厂中:

yield AiServices.builder(AiCodeGeneratorService.class)
.streamingChatModel(reasoningStreamingChatModel)
.chatMemoryProvider(memoryId -> chatMemory)
.tools(toolManager.getAllTools())
.inputGuardrails(new PromptSafetyInputGuardrail()) // 添加输入护轨
.build();

如果是只给某个方法添加护轨,可以使用注解的形式写法:

public interface Assistant {
@InputGuardrails({ FirstInputGuardrail.class, SecondInputGuardrail.class })
String chat(String question);

String doSomethingElse(String question);
}

测试效果如图:

赞(0)
未经允许不得转载:网硕互联帮助中心 » AI应用对话生成的安全性优化
分享到: 更多 (0)

评论 抢沙发

评论前必须登录!