一、前言
实盘交易中,系统监控和告警至关重要。及时发现异常、账户风险、网络问题等,可以避免重大损失。本文将介绍如何构建完整的监控告警系统。
本文将介绍:
- 系统状态监控
- 账户风险监控
- 异常告警机制
- 监控数据可视化
二、为什么选择天勤量化(TqSdk)
TqSdk监控支持:
| 账户信息 | 实时账户数据 |
| 持仓监控 | 持仓状态跟踪 |
| 订单状态 | 订单执行监控 |
| 连接状态 | 网络连接检测 |
安装方法:
pip install tqsdk
三、系统状态监控
3.1 基础监控框架
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
功能:系统监控框架
说明:本代码仅供学习参考
"""
import psutil
import time
import logging
from datetime import datetime
from threading import Thread
logger = logging.getLogger(__name__)
class SystemMonitor:
"""系统监控器"""
def __init__(self, check_interval=10):
self.check_interval = check_interval
self.running = False
self.monitor_thread = None
self.alerts = []
def start(self):
"""启动监控"""
self.running = True
self.monitor_thread = Thread(target=self._monitor_loop, daemon=True)
self.monitor_thread.start()
logger.info("系统监控已启动")
def stop(self):
"""停止监控"""
self.running = False
if self.monitor_thread:
self.monitor_thread.join(timeout=2)
logger.info("系统监控已停止")
def _monitor_loop(self):
"""监控循环"""
while self.running:
try:
self.check_system_health()
time.sleep(self.check_interval)
except Exception as e:
logger.error(f"监控异常: {e}")
def check_system_health(self):
"""检查系统健康状态"""
# CPU使用率
cpu_percent = psutil.cpu_percent(interval=1)
if cpu_percent > 90:
self.alert("CPU使用率过高", f"CPU: {cpu_percent:.1f}%")
# 内存使用率
memory = psutil.virtual_memory()
if memory.percent > 90:
self.alert("内存使用率过高", f"内存: {memory.percent:.1f}%")
# 磁盘使用率
disk = psutil.disk_usage('/')
if disk.percent > 90:
self.alert("磁盘空间不足", f"磁盘: {disk.percent:.1f}%")
def alert(self, title, message):
"""发送告警"""
alert = {
'time': datetime.now(),
'title': title,
'message': message,
'level': 'WARNING'
}
self.alerts.append(alert)
logger.warning(f"[告警] {title}: {message}")
def get_status(self):
"""获取系统状态"""
cpu = psutil.cpu_percent(interval=0.1)
memory = psutil.virtual_memory()
disk = psutil.disk_usage('/')
return {
'cpu_percent': cpu,
'memory_percent': memory.percent,
'memory_used_gb': memory.used / (1024**3),
'disk_percent': disk.percent,
'alerts_count': len(self.alerts)
}
# 使用示例
monitor = SystemMonitor(check_interval=5)
monitor.start()
print("=" * 50)
print("系统监控运行中…")
print("=" * 50)
time.sleep(15)
status = monitor.get_status()
print(f"\\n系统状态:")
print(f"CPU: {status['cpu_percent']:.1f}%")
print(f"内存: {status['memory_percent']:.1f}%")
print(f"告警数量: {status['alerts_count']}")
monitor.stop()
3.2 网络连接监控
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
功能:网络连接监控
说明:本代码仅供学习参考
"""
import socket
import time
import logging
from datetime import datetime
logger = logging.getLogger(__name__)
class NetworkMonitor:
"""网络监控器"""
def __init__(self, hosts=[("www.shinnytech.com", 80)], timeout=3):
self.hosts = hosts
self.timeout = timeout
self.latency_history = []
self.failure_count = 0
def check_connectivity(self):
"""检查网络连通性"""
results = []
for host, port in self.hosts:
start = time.time()
try:
sock = socket.create_connection((host, port), timeout=self.timeout)
sock.close()
latency = (time.time() – start) * 1000
results.append({
'host': host,
'status': 'OK',
'latency_ms': latency
})
self.latency_history.append(latency)
if len(self.latency_history) > 100:
self.latency_history.pop(0)
self.failure_count = 0
except Exception as e:
self.failure_count += 1
results.append({
'host': host,
'status': 'FAIL',
'error': str(e)
})
logger.error(f"网络连接失败 {host}:{port} – {e}")
return results
def get_avg_latency(self):
"""获取平均延迟"""
if self.latency_history:
return sum(self.latency_history) / len(self.latency_history)
return None
def is_healthy(self):
"""判断网络是否健康"""
return self.failure_count < 3
# 使用示例
network_monitor = NetworkMonitor()
print("=" * 50)
print("网络连接监控")
print("=" * 50)
for i in range(5):
results = network_monitor.check_connectivity()
for result in results:
if result['status'] == 'OK':
print(f"{result['host']}: 延迟 {result['latency_ms']:.1f}ms")
else:
print(f"{result['host']}: 连接失败")
avg_latency = network_monitor.get_avg_latency()
if avg_latency:
print(f"平均延迟: {avg_latency:.1f}ms")
print(f"网络健康: {network_monitor.is_healthy()}\\n")
time.sleep(2)
四、账户风险监控
4.1 账户实时监控
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
功能:账户风险监控
说明:本代码仅供学习参考
"""
from tqsdk import TqApi, TqAuth, TqSim
import logging
from datetime import datetime
logger = logging.getLogger(__name__)
class AccountMonitor:
"""账户监控器"""
def __init__(self, api, init_balance, max_drawdown=0.15, max_loss_per_trade=0.02):
self.api = api
self.account = api.get_account()
self.init_balance = init_balance
self.max_drawdown = max_drawdown
self.max_loss_per_trade = max_loss_per_trade
self.highest_balance = init_balance
self.alerts = []
def check_account_risk(self):
"""检查账户风险"""
self.api.wait_update()
current_balance = self.account.balance
# 更新最高权益
if current_balance > self.highest_balance:
self.highest_balance = current_balance
# 计算回撤
drawdown = (self.highest_balance – current_balance) / self.highest_balance
# 检查最大回撤
if drawdown > self.max_drawdown:
self.alert("最大回撤超限",
f"当前回撤: {drawdown:.2%}, 限制: {self.max_drawdown:.2%}")
# 检查风险度
if self.account.risk_ratio and self.account.risk_ratio > 0.8:
self.alert("账户风险度过高",
f"风险度: {self.account.risk_ratio:.2%}")
# 检查可用资金
if self.account.available < self.init_balance * 0.1:
self.alert("可用资金不足",
f"可用资金: {self.account.available:.2f}")
return {
'balance': current_balance,
'available': self.account.available,
'drawdown': drawdown,
'risk_ratio': self.account.risk_ratio or 0,
'float_profit': self.account.float_profit
}
def alert(self, title, message):
"""发送告警"""
alert = {
'time': datetime.now(),
'title': title,
'message': message,
'level': 'CRITICAL'
}
self.alerts.append(alert)
logger.critical(f"[账户告警] {title}: {message}")
# 使用示例
api = TqApi(TqSim(init_balance=200000), auth=TqAuth("快期账户", "快期密码"))
monitor = AccountMonitor(api, 200000, max_drawdown=0.15)
print("=" * 50)
print("账户风险监控")
print("=" * 50)
for i in range(10):
status = monitor.check_account_risk()
print(f"\\n账户状态:")
print(f" 权益: {status['balance']:.2f}")
print(f" 可用: {status['available']:.2f}")
print(f" 回撤: {status['drawdown']:.2%}")
print(f" 风险度: {status['risk_ratio']:.2%}")
if monitor.alerts:
print(f"\\n告警数量: {len(monitor.alerts)}")
for alert in monitor.alerts[–3:]: # 显示最近3条
print(f" [{alert['time'].strftime('%H:%M:%S')}] {alert['title']}: {alert['message']}")
time.sleep(1)
api.close()
4.2 持仓风险监控
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
功能:持仓风险监控
说明:本代码仅供学习参考
"""
from tqsdk import TqApi, TqAuth, TqSim
import logging
logger = logging.getLogger(__name__)
class PositionMonitor:
"""持仓监控器"""
def __init__(self, api, max_position_ratio=0.3, max_loss_per_position=0.05):
self.api = api
self.account = api.get_account()
self.max_position_ratio = max_position_ratio
self.max_loss_per_position = max_loss_per_position
self.alerts = []
def check_positions(self):
"""检查持仓风险"""
self.api.wait_update()
positions = self.api.get_position()
account_balance = self.account.balance
total_margin = 0
total_float_loss = 0
for symbol, position in positions.items():
if position.pos_long != 0 or position.pos_short != 0:
# 持仓保证金
margin = position.margin
total_margin += margin
# 浮动盈亏
float_profit = position.float_profit
total_float_loss += min(0, float_profit) # 只计算亏损
# 单持仓亏损检查
if margin > 0:
loss_ratio = abs(min(0, float_profit)) / margin
if loss_ratio > self.max_loss_per_position:
self.alert("单持仓亏损超限",
f"{symbol}: 亏损比例 {loss_ratio:.2%}")
# 总持仓比例检查
if account_balance > 0:
position_ratio = total_margin / account_balance
if position_ratio > self.max_position_ratio:
self.alert("持仓比例超限",
f"持仓比例: {position_ratio:.2%}, 限制: {self.max_position_ratio:.2%}")
return {
'total_margin': total_margin,
'position_ratio': position_ratio if account_balance > 0 else 0,
'total_float_loss': total_float_loss,
'positions_count': len([p for p in positions.values()
if p.pos_long != 0 or p.pos_short != 0])
}
def alert(self, title, message):
"""发送告警"""
logger.warning(f"[持仓告警] {title}: {message}")
self.alerts.append({
'title': title,
'message': message
})
# 使用示例
api = TqApi(TqSim(init_balance=200000), auth=TqAuth("快期账户", "快期密码"))
monitor = PositionMonitor(api, max_position_ratio=0.3)
print("=" * 50)
print("持仓风险监控")
print("=" * 50)
status = monitor.check_positions()
print(f"总保证金: {status['total_margin']:.2f}")
print(f"持仓比例: {status['position_ratio']:.2%}")
print(f"持仓数量: {status['positions_count']}")
api.close()
五、告警机制
5.1 多级告警系统
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
功能:多级告警系统
说明:本代码仅供学习参考
"""
import logging
from datetime import datetime
from enum import Enum
class AlertLevel(Enum):
"""告警级别"""
INFO = 1
WARNING = 2
ERROR = 3
CRITICAL = 4
class AlertManager:
"""告警管理器"""
def __init__(self):
self.alerts = []
self.alert_handlers = {
AlertLevel.INFO: self._handle_info,
AlertLevel.WARNING: self._handle_warning,
AlertLevel.ERROR: self._handle_error,
AlertLevel.CRITICAL: self._handle_critical,
}
def alert(self, level, title, message, data=None):
"""发送告警"""
alert = {
'time': datetime.now(),
'level': level,
'title': title,
'message': message,
'data': data or {}
}
self.alerts.append(alert)
# 调用对应级别的处理函数
handler = self.alert_handlers.get(level)
if handler:
handler(alert)
return alert
def _handle_info(self, alert):
"""处理信息级别告警"""
logging.info(f"[告警] {alert['title']}: {alert['message']}")
def _handle_warning(self, alert):
"""处理警告级别告警"""
logging.warning(f"[告警] {alert['title']}: {alert['message']}")
# 可以发送邮件、短信等
def _handle_error(self, alert):
"""处理错误级别告警"""
logging.error(f"[告警] {alert['title']}: {alert['message']}")
# 发送紧急通知
def _handle_critical(self, alert):
"""处理严重级别告警"""
logging.critical(f"[告警] {alert['title']}: {alert['message']}")
# 立即通知,可能需要停止交易
def get_recent_alerts(self, level=None, count=10):
"""获取最近的告警"""
alerts = self.alerts
if level:
alerts = [a for a in alerts if a['level'] == level]
return alerts[–count:]
# 使用示例
alert_manager = AlertManager()
print("=" * 50)
print("多级告警系统")
print("=" * 50)
alert_manager.alert(AlertLevel.INFO, "系统启动", "交易系统已启动")
alert_manager.alert(AlertLevel.WARNING, "CPU使用率高", "CPU使用率达到85%")
alert_manager.alert(AlertLevel.ERROR, "网络连接失败", "无法连接到交易服务器")
alert_manager.alert(AlertLevel.CRITICAL, "账户回撤超限", "账户回撤达到20%",
{'drawdown': 0.20, 'balance': 160000})
recent = alert_manager.get_recent_alerts(count=5)
print(f"\\n最近告警 ({len(recent)}条):")
for alert in recent:
print(f" [{alert['level'].name}] {alert['title']}: {alert['message']}")
5.2 告警通知(邮件/微信)
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
功能:告警通知
说明:本代码仅供学习参考
"""
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from datetime import datetime
class EmailNotifier:
"""邮件通知器"""
def __init__(self, smtp_server, smtp_port, username, password, to_emails):
self.smtp_server = smtp_server
self.smtp_port = smtp_port
self.username = username
self.password = password
self.to_emails = to_emails if isinstance(to_emails, list) else [to_emails]
def send_alert(self, title, message, level='WARNING'):
"""发送告警邮件"""
try:
msg = MIMEMultipart()
msg['From'] = self.username
msg['To'] = ', '.join(self.to_emails)
msg['Subject'] = f"[{level}] {title}"
body = f"""
告警时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
告警级别: {level}
告警标题: {title}
告警内容: {message}
"""
msg.attach(MIMEText(body, 'plain', 'utf-8'))
server = smtplib.SMTP(self.smtp_server, self.smtp_port)
server.starttls()
server.login(self.username, self.password)
server.send_message(msg)
server.quit()
return True
except Exception as e:
print(f"发送邮件失败: {e}")
return False
# 使用示例(需要配置SMTP信息)
# notifier = EmailNotifier(
# smtp_server="smtp.qq.com",
# smtp_port=587,
# username="your_email@qq.com",
# password="your_password",
# to_emails=["recipient@example.com"]
# )
#
# notifier.send_alert("账户回撤超限", "账户回撤达到15%", "CRITICAL")
六、监控数据记录
6.1 监控数据存储
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
功能:监控数据存储
说明:本代码仅供学习参考
"""
import sqlite3
import json
from datetime import datetime
class MonitorDatabase:
"""监控数据库"""
def __init__(self, db_file="monitor.db"):
self.db_file = db_file
self.init_database()
def init_database(self):
"""初始化数据库"""
conn = sqlite3.connect(self.db_file)
cursor = conn.cursor()
# 系统监控表
cursor.execute('''
CREATE TABLE IF NOT EXISTS system_monitor (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT,
cpu_percent REAL,
memory_percent REAL,
disk_percent REAL
)
''')
# 账户监控表
cursor.execute('''
CREATE TABLE IF NOT EXISTS account_monitor (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT,
balance REAL,
available REAL,
drawdown REAL,
risk_ratio REAL
)
''')
# 告警表
cursor.execute('''
CREATE TABLE IF NOT EXISTS alerts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT,
level TEXT,
title TEXT,
message TEXT,
data TEXT
)
''')
conn.commit()
conn.close()
def save_system_status(self, status):
"""保存系统状态"""
conn = sqlite3.connect(self.db_file)
cursor = conn.cursor()
cursor.execute('''
INSERT INTO system_monitor (timestamp, cpu_percent, memory_percent, disk_percent)
VALUES (?, ?, ?, ?)
''', (
datetime.now().isoformat(),
status.get('cpu_percent', 0),
status.get('memory_percent', 0),
status.get('disk_percent', 0)
))
conn.commit()
conn.close()
def save_account_status(self, status):
"""保存账户状态"""
conn = sqlite3.connect(self.db_file)
cursor = conn.cursor()
cursor.execute('''
INSERT INTO account_monitor (timestamp, balance, available, drawdown, risk_ratio)
VALUES (?, ?, ?, ?, ?)
''', (
datetime.now().isoformat(),
status.get('balance', 0),
status.get('available', 0),
status.get('drawdown', 0),
status.get('risk_ratio', 0)
))
conn.commit()
conn.close()
def save_alert(self, alert):
"""保存告警"""
conn = sqlite3.connect(self.db_file)
cursor = conn.cursor()
cursor.execute('''
INSERT INTO alerts (timestamp, level, title, message, data)
VALUES (?, ?, ?, ?, ?)
''', (
alert['time'].isoformat(),
alert['level'].name if hasattr(alert['level'], 'name') else str(alert['level']),
alert['title'],
alert['message'],
json.dumps(alert.get('data', {}))
))
conn.commit()
conn.close()
# 使用示例
db = MonitorDatabase()
# 保存系统状态
db.save_system_status({
'cpu_percent': 45.2,
'memory_percent': 60.5,
'disk_percent': 30.0
})
# 保存账户状态
db.save_account_status({
'balance': 200000,
'available': 150000,
'drawdown': 0.05,
'risk_ratio': 0.25
})
print("监控数据已保存")
七、总结
| 系统监控 | CPU、内存、磁盘 |
| 网络监控 | 连接状态、延迟 |
| 账户监控 | 回撤、风险度、可用资金 |
| 持仓监控 | 持仓比例、单持仓亏损 |
监控告警速查
# 系统监控
monitor = SystemMonitor()
monitor.start()
# 账户监控
account_monitor = AccountMonitor(api, init_balance)
status = account_monitor.check_account_risk()
# 告警
alert_manager.alert(AlertLevel.CRITICAL, "标题", "内容")
免责声明:本文仅供学习交流使用,不构成任何投资建议。期货交易有风险,入市需谨慎。
更多资源:
- 天勤量化官网:https://www.shinnytech.com
- GitHub开源地址:https://github.com/shinnytech/tqsdk-python
- 官方文档:https://doc.shinnytech.com/tqsdk/latest
网硕互联帮助中心






评论前必须登录!
注册