云计算百科
云计算领域专业知识百科平台

【期货量化实战】量化交易系统监控与告警(Python量化)

一、前言

实盘交易中,系统监控和告警至关重要。及时发现异常、账户风险、网络问题等,可以避免重大损失。本文将介绍如何构建完整的监控告警系统。

本文将介绍:

  • 系统状态监控
  • 账户风险监控
  • 异常告警机制
  • 监控数据可视化

二、为什么选择天勤量化(TqSdk)

TqSdk监控支持:

功能说明
账户信息 实时账户数据
持仓监控 持仓状态跟踪
订单状态 订单执行监控
连接状态 网络连接检测

安装方法:

pip install tqsdk

三、系统状态监控

3.1 基础监控框架

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
功能:系统监控框架
说明:本代码仅供学习参考
"""

import psutil
import time
import logging
from datetime import datetime
from threading import Thread

logger = logging.getLogger(__name__)

class SystemMonitor:
"""系统监控器"""

def __init__(self, check_interval=10):
self.check_interval = check_interval
self.running = False
self.monitor_thread = None
self.alerts = []

def start(self):
"""启动监控"""
self.running = True
self.monitor_thread = Thread(target=self._monitor_loop, daemon=True)
self.monitor_thread.start()
logger.info("系统监控已启动")

def stop(self):
"""停止监控"""
self.running = False
if self.monitor_thread:
self.monitor_thread.join(timeout=2)
logger.info("系统监控已停止")

def _monitor_loop(self):
"""监控循环"""
while self.running:
try:
self.check_system_health()
time.sleep(self.check_interval)
except Exception as e:
logger.error(f"监控异常: {e}")

def check_system_health(self):
"""检查系统健康状态"""
# CPU使用率
cpu_percent = psutil.cpu_percent(interval=1)
if cpu_percent > 90:
self.alert("CPU使用率过高", f"CPU: {cpu_percent:.1f}%")

# 内存使用率
memory = psutil.virtual_memory()
if memory.percent > 90:
self.alert("内存使用率过高", f"内存: {memory.percent:.1f}%")

# 磁盘使用率
disk = psutil.disk_usage('/')
if disk.percent > 90:
self.alert("磁盘空间不足", f"磁盘: {disk.percent:.1f}%")

def alert(self, title, message):
"""发送告警"""
alert = {
'time': datetime.now(),
'title': title,
'message': message,
'level': 'WARNING'
}
self.alerts.append(alert)
logger.warning(f"[告警] {title}: {message}")

def get_status(self):
"""获取系统状态"""
cpu = psutil.cpu_percent(interval=0.1)
memory = psutil.virtual_memory()
disk = psutil.disk_usage('/')

return {
'cpu_percent': cpu,
'memory_percent': memory.percent,
'memory_used_gb': memory.used / (1024**3),
'disk_percent': disk.percent,
'alerts_count': len(self.alerts)
}

# 使用示例
monitor = SystemMonitor(check_interval=5)
monitor.start()

print("=" * 50)
print("系统监控运行中…")
print("=" * 50)

time.sleep(15)
status = monitor.get_status()
print(f"\\n系统状态:")
print(f"CPU: {status['cpu_percent']:.1f}%")
print(f"内存: {status['memory_percent']:.1f}%")
print(f"告警数量: {status['alerts_count']}")

monitor.stop()

3.2 网络连接监控

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
功能:网络连接监控
说明:本代码仅供学习参考
"""

import socket
import time
import logging
from datetime import datetime

logger = logging.getLogger(__name__)

class NetworkMonitor:
"""网络监控器"""

def __init__(self, hosts=[("www.shinnytech.com", 80)], timeout=3):
self.hosts = hosts
self.timeout = timeout
self.latency_history = []
self.failure_count = 0

def check_connectivity(self):
"""检查网络连通性"""
results = []

for host, port in self.hosts:
start = time.time()
try:
sock = socket.create_connection((host, port), timeout=self.timeout)
sock.close()
latency = (time.time() start) * 1000
results.append({
'host': host,
'status': 'OK',
'latency_ms': latency
})
self.latency_history.append(latency)
if len(self.latency_history) > 100:
self.latency_history.pop(0)
self.failure_count = 0
except Exception as e:
self.failure_count += 1
results.append({
'host': host,
'status': 'FAIL',
'error': str(e)
})
logger.error(f"网络连接失败 {host}:{port}{e}")

return results

def get_avg_latency(self):
"""获取平均延迟"""
if self.latency_history:
return sum(self.latency_history) / len(self.latency_history)
return None

def is_healthy(self):
"""判断网络是否健康"""
return self.failure_count < 3

# 使用示例
network_monitor = NetworkMonitor()

print("=" * 50)
print("网络连接监控")
print("=" * 50)

for i in range(5):
results = network_monitor.check_connectivity()
for result in results:
if result['status'] == 'OK':
print(f"{result['host']}: 延迟 {result['latency_ms']:.1f}ms")
else:
print(f"{result['host']}: 连接失败")

avg_latency = network_monitor.get_avg_latency()
if avg_latency:
print(f"平均延迟: {avg_latency:.1f}ms")

print(f"网络健康: {network_monitor.is_healthy()}\\n")
time.sleep(2)

四、账户风险监控

4.1 账户实时监控

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
功能:账户风险监控
说明:本代码仅供学习参考
"""

from tqsdk import TqApi, TqAuth, TqSim
import logging
from datetime import datetime

logger = logging.getLogger(__name__)

class AccountMonitor:
"""账户监控器"""

def __init__(self, api, init_balance, max_drawdown=0.15, max_loss_per_trade=0.02):
self.api = api
self.account = api.get_account()
self.init_balance = init_balance
self.max_drawdown = max_drawdown
self.max_loss_per_trade = max_loss_per_trade
self.highest_balance = init_balance
self.alerts = []

def check_account_risk(self):
"""检查账户风险"""
self.api.wait_update()

current_balance = self.account.balance

# 更新最高权益
if current_balance > self.highest_balance:
self.highest_balance = current_balance

# 计算回撤
drawdown = (self.highest_balance current_balance) / self.highest_balance

# 检查最大回撤
if drawdown > self.max_drawdown:
self.alert("最大回撤超限",
f"当前回撤: {drawdown:.2%}, 限制: {self.max_drawdown:.2%}")

# 检查风险度
if self.account.risk_ratio and self.account.risk_ratio > 0.8:
self.alert("账户风险度过高",
f"风险度: {self.account.risk_ratio:.2%}")

# 检查可用资金
if self.account.available < self.init_balance * 0.1:
self.alert("可用资金不足",
f"可用资金: {self.account.available:.2f}")

return {
'balance': current_balance,
'available': self.account.available,
'drawdown': drawdown,
'risk_ratio': self.account.risk_ratio or 0,
'float_profit': self.account.float_profit
}

def alert(self, title, message):
"""发送告警"""
alert = {
'time': datetime.now(),
'title': title,
'message': message,
'level': 'CRITICAL'
}
self.alerts.append(alert)
logger.critical(f"[账户告警] {title}: {message}")

# 使用示例
api = TqApi(TqSim(init_balance=200000), auth=TqAuth("快期账户", "快期密码"))

monitor = AccountMonitor(api, 200000, max_drawdown=0.15)

print("=" * 50)
print("账户风险监控")
print("=" * 50)

for i in range(10):
status = monitor.check_account_risk()
print(f"\\n账户状态:")
print(f" 权益: {status['balance']:.2f}")
print(f" 可用: {status['available']:.2f}")
print(f" 回撤: {status['drawdown']:.2%}")
print(f" 风险度: {status['risk_ratio']:.2%}")

if monitor.alerts:
print(f"\\n告警数量: {len(monitor.alerts)}")
for alert in monitor.alerts[3:]: # 显示最近3条
print(f" [{alert['time'].strftime('%H:%M:%S')}] {alert['title']}: {alert['message']}")

time.sleep(1)

api.close()

4.2 持仓风险监控

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
功能:持仓风险监控
说明:本代码仅供学习参考
"""

from tqsdk import TqApi, TqAuth, TqSim
import logging

logger = logging.getLogger(__name__)

class PositionMonitor:
"""持仓监控器"""

def __init__(self, api, max_position_ratio=0.3, max_loss_per_position=0.05):
self.api = api
self.account = api.get_account()
self.max_position_ratio = max_position_ratio
self.max_loss_per_position = max_loss_per_position
self.alerts = []

def check_positions(self):
"""检查持仓风险"""
self.api.wait_update()

positions = self.api.get_position()
account_balance = self.account.balance

total_margin = 0
total_float_loss = 0

for symbol, position in positions.items():
if position.pos_long != 0 or position.pos_short != 0:
# 持仓保证金
margin = position.margin
total_margin += margin

# 浮动盈亏
float_profit = position.float_profit
total_float_loss += min(0, float_profit) # 只计算亏损

# 单持仓亏损检查
if margin > 0:
loss_ratio = abs(min(0, float_profit)) / margin
if loss_ratio > self.max_loss_per_position:
self.alert("单持仓亏损超限",
f"{symbol}: 亏损比例 {loss_ratio:.2%}")

# 总持仓比例检查
if account_balance > 0:
position_ratio = total_margin / account_balance
if position_ratio > self.max_position_ratio:
self.alert("持仓比例超限",
f"持仓比例: {position_ratio:.2%}, 限制: {self.max_position_ratio:.2%}")

return {
'total_margin': total_margin,
'position_ratio': position_ratio if account_balance > 0 else 0,
'total_float_loss': total_float_loss,
'positions_count': len([p for p in positions.values()
if p.pos_long != 0 or p.pos_short != 0])
}

def alert(self, title, message):
"""发送告警"""
logger.warning(f"[持仓告警] {title}: {message}")
self.alerts.append({
'title': title,
'message': message
})

# 使用示例
api = TqApi(TqSim(init_balance=200000), auth=TqAuth("快期账户", "快期密码"))

monitor = PositionMonitor(api, max_position_ratio=0.3)

print("=" * 50)
print("持仓风险监控")
print("=" * 50)

status = monitor.check_positions()
print(f"总保证金: {status['total_margin']:.2f}")
print(f"持仓比例: {status['position_ratio']:.2%}")
print(f"持仓数量: {status['positions_count']}")

api.close()

五、告警机制

5.1 多级告警系统

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
功能:多级告警系统
说明:本代码仅供学习参考
"""

import logging
from datetime import datetime
from enum import Enum

class AlertLevel(Enum):
"""告警级别"""
INFO = 1
WARNING = 2
ERROR = 3
CRITICAL = 4

class AlertManager:
"""告警管理器"""

def __init__(self):
self.alerts = []
self.alert_handlers = {
AlertLevel.INFO: self._handle_info,
AlertLevel.WARNING: self._handle_warning,
AlertLevel.ERROR: self._handle_error,
AlertLevel.CRITICAL: self._handle_critical,
}

def alert(self, level, title, message, data=None):
"""发送告警"""
alert = {
'time': datetime.now(),
'level': level,
'title': title,
'message': message,
'data': data or {}
}

self.alerts.append(alert)

# 调用对应级别的处理函数
handler = self.alert_handlers.get(level)
if handler:
handler(alert)

return alert

def _handle_info(self, alert):
"""处理信息级别告警"""
logging.info(f"[告警] {alert['title']}: {alert['message']}")

def _handle_warning(self, alert):
"""处理警告级别告警"""
logging.warning(f"[告警] {alert['title']}: {alert['message']}")
# 可以发送邮件、短信等

def _handle_error(self, alert):
"""处理错误级别告警"""
logging.error(f"[告警] {alert['title']}: {alert['message']}")
# 发送紧急通知

def _handle_critical(self, alert):
"""处理严重级别告警"""
logging.critical(f"[告警] {alert['title']}: {alert['message']}")
# 立即通知,可能需要停止交易

def get_recent_alerts(self, level=None, count=10):
"""获取最近的告警"""
alerts = self.alerts
if level:
alerts = [a for a in alerts if a['level'] == level]
return alerts[count:]

# 使用示例
alert_manager = AlertManager()

print("=" * 50)
print("多级告警系统")
print("=" * 50)

alert_manager.alert(AlertLevel.INFO, "系统启动", "交易系统已启动")
alert_manager.alert(AlertLevel.WARNING, "CPU使用率高", "CPU使用率达到85%")
alert_manager.alert(AlertLevel.ERROR, "网络连接失败", "无法连接到交易服务器")
alert_manager.alert(AlertLevel.CRITICAL, "账户回撤超限", "账户回撤达到20%",
{'drawdown': 0.20, 'balance': 160000})

recent = alert_manager.get_recent_alerts(count=5)
print(f"\\n最近告警 ({len(recent)}条):")
for alert in recent:
print(f" [{alert['level'].name}] {alert['title']}: {alert['message']}")

5.2 告警通知(邮件/微信)

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
功能:告警通知
说明:本代码仅供学习参考
"""

import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from datetime import datetime

class EmailNotifier:
"""邮件通知器"""

def __init__(self, smtp_server, smtp_port, username, password, to_emails):
self.smtp_server = smtp_server
self.smtp_port = smtp_port
self.username = username
self.password = password
self.to_emails = to_emails if isinstance(to_emails, list) else [to_emails]

def send_alert(self, title, message, level='WARNING'):
"""发送告警邮件"""
try:
msg = MIMEMultipart()
msg['From'] = self.username
msg['To'] = ', '.join(self.to_emails)
msg['Subject'] = f"[{level}] {title}"

body = f"""
告警时间:
{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
告警级别:
{level}
告警标题:
{title}
告警内容:
{message}
"""

msg.attach(MIMEText(body, 'plain', 'utf-8'))

server = smtplib.SMTP(self.smtp_server, self.smtp_port)
server.starttls()
server.login(self.username, self.password)
server.send_message(msg)
server.quit()

return True
except Exception as e:
print(f"发送邮件失败: {e}")
return False

# 使用示例(需要配置SMTP信息)
# notifier = EmailNotifier(
# smtp_server="smtp.qq.com",
# smtp_port=587,
# username="your_email@qq.com",
# password="your_password",
# to_emails=["recipient@example.com"]
# )
#
# notifier.send_alert("账户回撤超限", "账户回撤达到15%", "CRITICAL")

六、监控数据记录

6.1 监控数据存储

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
功能:监控数据存储
说明:本代码仅供学习参考
"""

import sqlite3
import json
from datetime import datetime

class MonitorDatabase:
"""监控数据库"""

def __init__(self, db_file="monitor.db"):
self.db_file = db_file
self.init_database()

def init_database(self):
"""初始化数据库"""
conn = sqlite3.connect(self.db_file)
cursor = conn.cursor()

# 系统监控表
cursor.execute('''
CREATE TABLE IF NOT EXISTS system_monitor (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT,
cpu_percent REAL,
memory_percent REAL,
disk_percent REAL
)
'''
)

# 账户监控表
cursor.execute('''
CREATE TABLE IF NOT EXISTS account_monitor (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT,
balance REAL,
available REAL,
drawdown REAL,
risk_ratio REAL
)
'''
)

# 告警表
cursor.execute('''
CREATE TABLE IF NOT EXISTS alerts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT,
level TEXT,
title TEXT,
message TEXT,
data TEXT
)
'''
)

conn.commit()
conn.close()

def save_system_status(self, status):
"""保存系统状态"""
conn = sqlite3.connect(self.db_file)
cursor = conn.cursor()

cursor.execute('''
INSERT INTO system_monitor (timestamp, cpu_percent, memory_percent, disk_percent)
VALUES (?, ?, ?, ?)
'''
, (
datetime.now().isoformat(),
status.get('cpu_percent', 0),
status.get('memory_percent', 0),
status.get('disk_percent', 0)
))

conn.commit()
conn.close()

def save_account_status(self, status):
"""保存账户状态"""
conn = sqlite3.connect(self.db_file)
cursor = conn.cursor()

cursor.execute('''
INSERT INTO account_monitor (timestamp, balance, available, drawdown, risk_ratio)
VALUES (?, ?, ?, ?, ?)
'''
, (
datetime.now().isoformat(),
status.get('balance', 0),
status.get('available', 0),
status.get('drawdown', 0),
status.get('risk_ratio', 0)
))

conn.commit()
conn.close()

def save_alert(self, alert):
"""保存告警"""
conn = sqlite3.connect(self.db_file)
cursor = conn.cursor()

cursor.execute('''
INSERT INTO alerts (timestamp, level, title, message, data)
VALUES (?, ?, ?, ?, ?)
'''
, (
alert['time'].isoformat(),
alert['level'].name if hasattr(alert['level'], 'name') else str(alert['level']),
alert['title'],
alert['message'],
json.dumps(alert.get('data', {}))
))

conn.commit()
conn.close()

# 使用示例
db = MonitorDatabase()

# 保存系统状态
db.save_system_status({
'cpu_percent': 45.2,
'memory_percent': 60.5,
'disk_percent': 30.0
})

# 保存账户状态
db.save_account_status({
'balance': 200000,
'available': 150000,
'drawdown': 0.05,
'risk_ratio': 0.25
})

print("监控数据已保存")

七、总结

监控类型关键指标
系统监控 CPU、内存、磁盘
网络监控 连接状态、延迟
账户监控 回撤、风险度、可用资金
持仓监控 持仓比例、单持仓亏损

监控告警速查

# 系统监控
monitor = SystemMonitor()
monitor.start()

# 账户监控
account_monitor = AccountMonitor(api, init_balance)
status = account_monitor.check_account_risk()

# 告警
alert_manager.alert(AlertLevel.CRITICAL, "标题", "内容")


免责声明:本文仅供学习交流使用,不构成任何投资建议。期货交易有风险,入市需谨慎。

更多资源:

  • 天勤量化官网:https://www.shinnytech.com
  • GitHub开源地址:https://github.com/shinnytech/tqsdk-python
  • 官方文档:https://doc.shinnytech.com/tqsdk/latest
赞(0)
未经允许不得转载:网硕互联帮助中心 » 【期货量化实战】量化交易系统监控与告警(Python量化)
分享到: 更多 (0)

评论 抢沙发

评论前必须登录!