【n8n教程】:SSE Trigger节点,实时接收服务器推送事件
本教程将带你从零开始掌握 SSE Trigger 节点,一个强大的实时数据接收工具。如果你想让 n8n 工作流能够实时接收来自服务器的消息推送,而不是被动等待请求,那么这个教程就是为你准备的。
🎯 学习目标
- ✅ 理解什么是服务器发送事件(SSE)
- ✅ 掌握 SSE Trigger 节点的配置
- ✅ 学会实时接收和处理数据
- ✅ 创建自己的实时监控工作流
点击获取最新AI资讯、n8n工作流、开发经验分享
📖 第一部分:核心概念速览
什么是 SSE(Server-Sent Events)?
SSE 是一种服务器推送技术,它通过 HTTP 连接让服务器能够主动向客户端发送数据。想象这样一个场景:
- 传统方式:你不断敲钉钉问"数据来了没?"(轮询)
- SSE 方式:系统有了新数据就自动通知你(推送)
SSE Trigger 节点的作用就是让 n8n 工作流成为一个**“监听器”**,时刻准备接收来自指定 URL 的服务器推送事件。
SSE 对比其他触发方式
| Webhook | 外部系统推送数据 | 接收第三方应用事件 | ⭐⭐⭐⭐⭐ |
| SSE Trigger | 连接服务器持续接收 | 实时数据流、市场行情 | ⭐⭐⭐⭐⭐ |
| Schedule | 定时执行 | 定期任务 | ⭐ |
| HTTP Request | 主动查询 | 按需获取数据 | ⭐⭐ |
🔧 第二部分:SSE Trigger 节点配置详解
节点参数说明
SSE Trigger 节点配置非常简洁,只有一个核心参数:
URL(必填)
- 说明:指定要连接的 SSE 服务器地址
- 格式:完整的 HTTP 或 HTTPS 地址
- 示例:
- https://api.example.com/stream/events
- https://stock-api.demo.com/sse/ticker
- http://localhost:8080/stream
配置步骤
添加 SSE Trigger 节点
- 在 n8n 工作流编辑器中点击 “+” 添加节点
- 在搜索框输入 “SSE Trigger”
- 点击选择该节点
输入 SSE 服务器 URL
- 在节点的 URL 参数中输入你的服务器地址
- 确保 URL 是有效的且服务器支持 SSE 协议
保存并激活
- 点击 “Save” 保存配置
- 节点会开始监听指定 URL 的事件
工作原理流程
┌─────────────────┐
│ n8n 工作流 │
│ (SSE Trigger) │
└────────┬────────┘
│
│ 建立长连接(HTTP streaming)
↓
┌─────────────────┐
│ SSE 服务器 │
│ (你的数据源) │
└────────┬────────┘
│
│ 推送事件数据
↓
处理数据流
│ → 格式化
│ → 条件判断
│ → 发送通知
└ → 存储记录
💡 第三部分:实战案例 – 构建实时股票行情监控工作流
案例场景
我们将创建一个实时股票价格监控系统,它会:
- 连接到模拟股票 SSE 服务
- 实时接收价格更新
- 当价格超过设定阈值时触发警报
- 记录所有事件到日志
可执行工作流代码
以下是完整的可执行工作流 JSON 代码,你可以直接导入到 n8n:
{
"name": "实时股票监控 – SSE Trigger 工作流",
"nodes": [
{
"parameters": {
"url": "https://n8n.io"
},
"id": "SSE Trigger",
"name": "SSE Trigger",
"type": "n8n-nodes-base.sseTrigger",
"typeVersion": 1,
"position": [250, 300]
},
{
"parameters": {
"functionCode": "// 接收来自 SSE 的原始数据并解析\\nconst data = $input.first().json;\\nconsole.log('收到 SSE 事件:', data);\\n\\n// 模拟股票数据处理\\nreturn {\\n json: {\\n timestamp: new Date().toISOString(),\\n symbol: data.symbol || 'STOCK-001',\\n price: data.price || 100,\\n previousPrice: data.previousPrice || 99,\\n change: (data.price – data.previousPrice).toFixed(2),\\n changePercent: (((data.price – data.previousPrice) / data.previousPrice) * 100).toFixed(2)\\n }\\n};"
},
"id": "Function – 解析数据",
"name": "Function – 解析数据",
"type": "n8n-nodes-base.function",
"typeVersion": 1,
"position": [450, 300],
"credentials": []
},
{
"parameters": {
"conditions": {
"string": [
{
"value1": "={{ $json.change }}",
"operation": "notEmpty"
}
]
}
},
"id": "If – 价格变动检测",
"name": "If – 价格变动检测",
"type": "n8n-nodes-base.if",
"typeVersion": 1,
"position": [650, 300]
},
{
"parameters": {
"functionCode": "// 生成提醒消息\\nconst data = $input.first().json;\\nconst changePercent = parseFloat(data.changePercent);\\nconst alertLevel = Math.abs(changePercent) > 2 ? '高' : '中';\\n\\nreturn {\\n json: {\\n message: `🔔 ${data.symbol} 价格更新: $${data.price} (变化: ${data.change > 0 ? '+' : ''}${data.change}%, 警报等级: ${alertLevel})`,\\n alertLevel: alertLevel,\\n shouldNotify: Math.abs(changePercent) > 1,\\n data: data\\n }\\n};"
},
"id": "Function – 生成警报",
"name": "Function – 生成警报",
"type": "n8n-nodes-base.function",
"typeVersion": 1,
"position": [850, 250]
},
{
"parameters": {
"method": "POST",
"url": "={{ $env.WEBHOOK_URL || 'https://webhook.site/unique-id' }}",
"sendBody": true,
"bodyParameters": {
"parameters": [
{
"name": "timestamp",
"value": "={{ $json.data.timestamp }}"
},
{
"name": "alert_message",
"value": "={{ $json.message }}"
},
{
"name": "stock_symbol",
"value": "={{ $json.data.symbol }}"
},
{
"name": "price",
"value": "={{ $json.data.price }}"
},
{
"name": "alert_level",
"value": "={{ $json.alertLevel }}"
}
]
}
},
"id": "HTTP Request – 发送警报",
"name": "HTTP Request – 发送警报",
"type": "n8n-nodes-base.httpRequest",
"typeVersion": 4,
"position": [1050, 250]
},
{
"parameters": {
"table": "stock_alerts",
"columns": "timestamp,symbol,price,change,alert_level",
"mode": "insert"
},
"id": "SQLite – 记录日志",
"name": "SQLite – 记录日志",
"type": "n8n-nodes-base.sqlite",
"typeVersion": 1,
"position": [1050, 350]
}
],
"connections": {
"SSE Trigger": {
"main": [
[
{
"node": "Function – 解析数据",
"type": "main",
"index": 0
}
]
]
},
"Function – 解析数据": {
"main": [
[
{
"node": "If – 价格变动检测",
"type": "main",
"index": 0
}
]
]
},
"If – 价格变动检测": {
"main": [
[
{
"node": "Function – 生成警报",
"type": "main",
"index": 0
}
],
[]
]
},
"Function – 生成警报": {
"main": [
[
{
"node": "HTTP Request – 发送警报",
"type": "main",
"index": 0
},
{
"node": "SQLite – 记录日志",
"type": "main",
"index": 0
}
]
]
}
}
}
工作流导入步骤
工作流执行流程
1️⃣ SSE Trigger 连接
↓
监听 https://n8n.io 的服务器推送事件
2️⃣ 解析数据
↓
提取股票代码、价格、变动情况
3️⃣ 条件判断
↓
检查是否有价格变动
4️⃣ 生成警报
↓
计算变化幅度,确定警报等级
5️⃣ 双路输出
├─ HTTP 请求:发送警报通知
└─ 数据库:记录历史日志
🚀 第四部分:进阶技巧
1. 处理认证(如需要)
如果你的 SSE 服务器需要身份验证,可以在 URL 中包含认证信息:
https://username:password@api.example.com/stream
或使用环境变量:
https://{{ $env.SSE_API_KEY }}@api.example.com/stream
2. 错误处理
使用 Error Trigger 节点捕获连接失败:
{
"id": "Error Trigger",
"name": "Error Trigger",
"type": "n8n-nodes-base.errorTrigger",
"parameters": {}
}
3. 添加连接超时控制
在连接的 Function 节点中添加超时检查:
// 检查连接状态
if ($input.first().json.error) {
console.error('SSE 连接失败:', $input.first().json.error);
return { json: { status: 'failed' }};
}
return { json: { status: 'connected' }};
4. 数据过滤优化
使用 Filter 节点只处理符合条件的数据:
// 只处理价格变动超过 1% 的事件
const changePercent = parseFloat($json.changePercent);
return Math.abs(changePercent) > 1;
⚠️ 常见问题解答
Q1:工作流激活后没有接收到事件?
解决方案:
- ✅ 确认 SSE 服务器地址是否正确
- ✅ 检查网络连接和防火墙设置
- ✅ 验证服务器是否真的在推送数据
- ✅ 检查 n8n 日志查看具体错误信息
Q2:如何测试 SSE Trigger 是否工作?
解决方案:
- 使用免费的 SSE 测试服务:https://stocksera.pythonanywhere.com/api/stock/real-time/
- 或创建简单的本地 Node.js SSE 服务器来测试
Q3:一个工作流中可以有多个 SSE Trigger 吗?
回答: 可以的!每个 SSE Trigger 连接到不同的 URL,就能同时监听多个数据源。
Q4:SSE 连接会自动重连吗?
回答: n8n 的 SSE Trigger 会在连接断开时自动尝试重新连接,但可能需要几秒钟。
📊 性能建议
- 单个工作流:可以稳定处理每秒数百个事件
- 多个工作流:考虑服务器资源,建议监控 CPU 和内存使用
- 数据去重:如果收到重复事件,使用 Set 节点去重
- 批量处理:使用 Wait 节点累积事件后批量处理
🎓 总结
| SSE 是什么 | 服务器推送事件,实现实时数据流 |
| 何时使用 | 需要实时接收服务器数据更新 |
| 核心参数 | URL(服务器地址) |
| 最佳实践 | 结合 Function 节点进行数据处理和条件判断 |
| 监控日志 | 记录所有接收的事件便于调试 |
- 官方文档
- n8n系列教程
网硕互联帮助中心





评论前必须登录!
注册