云计算百科
云计算领域专业知识百科平台

Python实例题:分布式任务调度系统

目录

Python实例题

题目

问题描述

调度中心(Server):

工作节点(Worker):

客户端(Client):

系统架构

关键实现思路

核心代码框架(调度中心部分)

难点分析

扩展方向

Python实例题

题目

分布式任务调度系统

问题描述

设计一个简化版分布式任务调度系统,包含以下组件:

  • 调度中心(Server):

    • 接收客户端任务提交
    • 维护任务队列和工作节点状态
    • 分配任务到空闲节点
    • 存储任务执行结果和日志
  • 工作节点(Worker):

    • 连接调度中心获取任务
    • 执行计算密集型任务(如数学建模、数据处理)
    • 返回任务结果
  • 客户端(Client):

    • 提交任务到调度中心
    • 查询任务状态和结果

系统架构

+—————-+ +—————-+ +—————-+
| 客户端 |<—>| 调度中心 |<—>| 工作节点1 |
+—————-+ +—————-+ +—————-+
^
|
+—————-+ +—————-+ +—————-+
| 客户端 |<—>| 调度中心 |<—>| 工作节点2 |
+—————-+ +—————-+ +—————-+

关键实现思路

  • 使用Socket实现网络通信(或选择ZeroMQ等更专业的消息队列)
  • 用SQLite/PostgreSQL存储任务信息和结果
  • 采用multiprocessing或concurrent.futures处理并发任务
  • 设计自定义通信协议(如 JSON 格式消息)
  • 实现心跳机制检测工作节点存活状态
  • 支持任务优先级和超时处理

核心代码框架(调度中心部分)

import socket
import json
import threading
import time
import sqlite3
import queue
from typing import Dict, List, Tuple, Optional

# 通信协议定义
class Protocol:
# 消息类型
TYPE_TASK_SUBMIT = "TASK_SUBMIT" # 提交任务
TYPE_TASK_ASSIGN = "TASK_ASSIGN" # 分配任务
TYPE_TASK_RESULT = "TASK_RESULT" # 返回结果
TYPE_WORKER_HEARTBEAT = "HEARTBEAT" # 工作节点心跳
TYPE_WORKER_REGISTER = "REGISTER" # 工作节点注册

@staticmethod
def pack_message(msg_type: str, data: dict) -> bytes:
"""打包消息为JSON格式"""
message = {"type": msg_type, "data": data, "timestamp": time.time()}
return json.dumps(message).encode()

@staticmethod
def unpack_message(msg: bytes) -> dict:
"""解析JSON消息"""
try:
return json.loads(msg.decode())
except:
return {"type": "ERROR", "data": "Invalid message format"}

# 任务管理类
class TaskManager:
def __init__(self, db_path="task_system.db"):
self.task_queue = queue.PriorityQueue() # 优先级队列,数字越小优先级越高
self.task_status = {} # task_id: {"status": "", "worker": "", "result": ""}
self.worker_nodes = {} # worker_id: {"address": "", "last_heartbeat": 0, "busy": False}
self.db_path = db_path
self._init_database()

def _init_database(self):
"""初始化数据库"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS tasks (
task_id TEXT PRIMARY KEY,
name TEXT,
priority INTEGER,
status TEXT,
input_data TEXT,
result TEXT,
worker_id TEXT,
create_time REAL,
update_time REAL
)
''')
conn.commit()
conn.close()

def add_task(self, task_id: str, name: str, priority: int, data: dict) -> bool:
"""添加任务到队列"""
if task_id in self.task_status:
return False

self.task_status[task_id] = {
"status": "PENDING",
"worker": "",
"result": None,
"create_time": time.time()
}
self.task_queue.put((priority, task_id, name, data))

# 存储到数据库
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute(
"INSERT INTO tasks (task_id, name, priority, status, input_data, create_time, update_time) "
"VALUES (?, ?, ?, ?, ?, ?, ?)",
(task_id, name, priority, "PENDING", json.dumps(data), time.time(), time.time())
)
conn.commit()
conn.close()
return True

def get_available_worker(self) -> Optional[str]:
"""获取可用工作节点"""
current_time = time.time()
available_workers = []

# 过滤存活且空闲的节点(10秒内有心跳)
for worker_id, info in self.worker_nodes.items():
if (not info["busy"]) and (current_time – info["last_heartbeat"] < 10):
available_workers.append(worker_id)

if available_workers:
# 简单轮询或选择负载最低的节点
return available_workers[0]
return None

def assign_task(self) -> Optional[Tuple[str, str, dict]]:
"""分配任务给工作节点"""
if self.task_queue.empty():
return None

# 获取最高优先级任务
priority, task_id, task_name, task_data = self.task_queue.get()

# 检查任务状态
if self.task_status[task_id]["status"] != "PENDING":
return self.assign_task() # 任务状态已变更,重新分配

# 获取可用工作节点
worker_id = self.get_available_worker()
if not worker_id:
# 无可用节点,将任务放回队列
self.task_queue.put((priority, task_id, task_name, task_data))
return None

# 更新任务状态
self.task_status[task_id] = {
**self.task_status[task_id],
"status": "RUNNING",
"worker": worker_id,
"update_time": time.time()
}

# 更新工作节点状态
self.worker_nodes[worker_id]["busy"] = True

# 更新数据库
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute(
"UPDATE tasks SET status = ?, worker_id = ?, update_time = ? WHERE task_id = ?",
("RUNNING", worker_id, time.time(), task_id)
)
conn.commit()
conn.close()

return (worker_id, task_id, task_data)

def process_result(self, task_id: str, result: dict, worker_id: str) -> bool:
"""处理任务结果"""
if task_id not in self.task_status:
return False

# 更新任务状态
self.task_status[task_id] = {
**self.task_status[task_id],
"status": "COMPLETED",
"result": result,
"update_time": time.time()
}

# 释放工作节点
if worker_id in self.worker_nodes:
self.worker_nodes[worker_id]["busy"] = False

# 存储结果到数据库
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute(
"UPDATE tasks SET status = ?, result = ?, worker_id = ?, update_time = ? WHERE task_id = ?",
("COMPLETED", json.dumps(result), "", time.time(), task_id)
)
conn.commit()
conn.close()
return True

def register_worker(self, worker_id: str, address: str) -> bool:
"""注册工作节点"""
if worker_id in self.worker_nodes:
return False

self.worker_nodes[worker_id] = {
"address": address,
"last_heartbeat": time.time(),
"busy": False
}
return True

def update_heartbeat(self, worker_id: str) -> bool:
"""更新工作节点心跳"""
if worker_id in self.worker_nodes:
self.worker_nodes[worker_id]["last_heartbeat"] = time.time()
return True
return False

# 调度中心服务器类
class SchedulerServer:
def __init__(self, host="0.0.0.0", port=9999):
self.host = host
self.port = port
self.task_manager = TaskManager()
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.running = False
self.clients = {} # client_id: socket

def start(self):
"""启动服务器"""
self.server_socket.bind((self.host, self.port))
self.server_socket.listen(10)
self.running = True
print(f"调度中心启动成功,监听地址: {self.host}:{self.port}")

# 启动心跳检测线程
heartbeat_thread = threading.Thread(target=self._heartbeat_check, daemon=True)
heartbeat_thread.start()

# 接受客户端连接
while self.running:
try:
client_socket, address = self.server_socket.accept()
print(f"新连接: {address}")
client_thread = threading.Thread(
target=self._handle_client,
args=(client_socket, address),
daemon=True
)
client_thread.start()
except Exception as e:
print(f"接受连接失败: {e}")
if self.running:
time.sleep(1)

def _handle_client(self, client_socket: socket.socket, address: tuple):
"""处理客户端连接"""
client_id = f"{address[0]}:{address[1]}"
self.clients[client_id] = client_socket

try:
while True:
# 接收消息
data = client_socket.recv(4096)
if not data:
break

# 解析消息
message = Protocol.unpack_message(data)
msg_type = message.get("type", "UNKNOWN")
msg_data = message.get("data", {})
print(f"收到消息 [{client_id}]: {msg_type}")

# 处理不同类型消息
if msg_type == Protocol.TYPE_WORKER_REGISTER:
worker_id = msg_data.get("worker_id")
if self.task_manager.register_worker(worker_id, client_id):
response = Protocol.pack_message(
"REGISTER_SUCCESS",
{"message": "Worker registered successfully"}
)
else:
response = Protocol.pack_message(
"REGISTER_FAILED",
{"message": "Worker ID already exists"}
)
client_socket.send(response)

elif msg_type == Protocol.TYPE_WORKER_HEARTBEAT:
worker_id = msg_data.get("worker_id")
if self.task_manager.update_heartbeat(worker_id):
# 尝试分配任务
task = self.task_manager.assign_task()
if task:
worker_id, task_id, task_data = task
response = Protocol.pack_message(
Protocol.TYPE_TASK_ASSIGN,
{"task_id": task_id, "data": task_data}
)
else:
response = Protocol.pack_message(
"NO_TASK_ASSIGN",
{"message": "No task available"}
)
else:
response = Protocol.pack_message(
"HEARTBEAT_FAILED",
{"message": "Worker not registered"}
)
client_socket.send(response)

elif msg_type == Protocol.TYPE_TASK_SUBMIT:
task_id = msg_data.get("task_id")
task_name = msg_data.get("task_name", "Unknown Task")
priority = msg_data.get("priority", 5) # 默认为中等优先级
task_data = msg_data.get("data", {})

if self.task_manager.add_task(task_id, task_name, priority, task_data):
response = Protocol.pack_message(
"TASK_SUBMIT_SUCCESS",
{"task_id": task_id, "message": "Task submitted"}
)
else:
response = Protocol.pack_message(
"TASK_SUBMIT_FAILED",
{"message": "Task ID already exists"}
)
client_socket.send(response)

elif msg_type == Protocol.TYPE_TASK_RESULT:
task_id = msg_data.get("task_id")
result = msg_data.get("result", {})
worker_id = msg_data.get("worker_id")

if self.task_manager.process_result(task_id, result, worker_id):
response = Protocol.pack_message(
"RESULT_RECEIVED",
{"task_id": task_id, "message": "Result received"}
)
else:
response = Protocol.pack_message(
"RESULT_FAILED",
{"message": "Invalid task ID"}
)
client_socket.send(response)

else:
response = Protocol.pack_message(
"UNKNOWN_MESSAGE",
{"message": "Unknown message type"}
)
client_socket.send(response)

except Exception as e:
print(f"客户端处理错误 [{client_id}]: {e}")
finally:
if client_id in self.clients:
del self.clients[client_id]
client_socket.close()
print(f"连接关闭: {client_id}")

def _heartbeat_check(self):
"""定期检查工作节点心跳"""
while self.running:
current_time = time.time()
for worker_id, info in list(self.worker_nodes.items()):
if current_time – info["last_heartbeat"] > 30:
# 节点超时,标记为离线
print(f"工作节点 {worker_id} 超时,标记为离线")
info["busy"] = False # 释放可能正在处理的任务
time.sleep(10) # 每10秒检查一次

def stop(self):
"""停止服务器"""
self.running = False
if self.server_socket:
self.server_socket.close()
print("调度中心已停止")

# 使用示例
if __name__ == "__main__":
server = SchedulerServer(port=9999)
try:
server.start()
except KeyboardInterrupt:
print("接收到停止信号")
finally:
server.stop()

难点分析

  • 分布式系统一致性:多个工作节点和客户端同时操作时确保数据一致性
  • 网络通信可靠性:处理网络延迟、断开重连、消息丢失等问题
  • 任务优先级调度:实现公平且高效的任务分配算法
  • 系统监控与容错:设计完善的监控机制和故障恢复策略
  • 性能优化:处理大量任务时的并发性能瓶颈(如数据库连接池、异步 IO)

扩展方向

  • 添加 Web 管理界面(使用 Flask/Django)
  • 集成 Redis 作为分布式消息队列
  • 支持任务依赖关系(DAG 任务流)
  • 实现资源监控(CPU / 内存 / 磁盘)
  • 增加任务重试和失败转移机制
赞(0)
未经允许不得转载:网硕互联帮助中心 » Python实例题:分布式任务调度系统
分享到: 更多 (0)

评论 抢沙发

评论前必须登录!