目录
Python实例题
题目
问题描述
调度中心(Server):
工作节点(Worker):
客户端(Client):
系统架构
关键实现思路
核心代码框架(调度中心部分)
难点分析
扩展方向
Python实例题
题目
分布式任务调度系统
问题描述
设计一个简化版分布式任务调度系统,包含以下组件:
-
调度中心(Server):
- 接收客户端任务提交
- 维护任务队列和工作节点状态
- 分配任务到空闲节点
- 存储任务执行结果和日志
-
工作节点(Worker):
- 连接调度中心获取任务
- 执行计算密集型任务(如数学建模、数据处理)
- 返回任务结果
-
客户端(Client):
- 提交任务到调度中心
- 查询任务状态和结果
系统架构
+----------------+ +----------------+ +----------------+
| 客户端 |<--->| 调度中心 |<--->| 工作节点1 |
+----------------+ +----------------+ +----------------+^|
+----------------+ +----------------+ +----------------+
| 客户端 |<--->| 调度中心 |<--->| 工作节点2 |
+----------------+ +----------------+ +----------------+
关键实现思路
- 使用
Socket实现网络通信(或选择ZeroMQ等更专业的消息队列) - 用
SQLite/PostgreSQL存储任务信息和结果 - 采用
multiprocessing或concurrent.futures处理并发任务 - 设计自定义通信协议(如 JSON 格式消息)
- 实现心跳机制检测工作节点存活状态
- 支持任务优先级和超时处理
核心代码框架(调度中心部分)
import socket
import json
import threading
import time
import sqlite3
import queue
from typing import Dict, List, Tuple, Optional# 通信协议定义
class Protocol:# 消息类型TYPE_TASK_SUBMIT = "TASK_SUBMIT" # 提交任务TYPE_TASK_ASSIGN = "TASK_ASSIGN" # 分配任务TYPE_TASK_RESULT = "TASK_RESULT" # 返回结果TYPE_WORKER_HEARTBEAT = "HEARTBEAT" # 工作节点心跳TYPE_WORKER_REGISTER = "REGISTER" # 工作节点注册@staticmethoddef pack_message(msg_type: str, data: dict) -> bytes:"""打包消息为JSON格式"""message = {"type": msg_type, "data": data, "timestamp": time.time()}return json.dumps(message).encode()@staticmethoddef unpack_message(msg: bytes) -> dict:"""解析JSON消息"""try:return json.loads(msg.decode())except:return {"type": "ERROR", "data": "Invalid message format"}# 任务管理类
class TaskManager:def __init__(self, db_path="task_system.db"):self.task_queue = queue.PriorityQueue() # 优先级队列,数字越小优先级越高self.task_status = {} # task_id: {"status": "", "worker": "", "result": ""}self.worker_nodes = {} # worker_id: {"address": "", "last_heartbeat": 0, "busy": False}self.db_path = db_pathself._init_database()def _init_database(self):"""初始化数据库"""conn = sqlite3.connect(self.db_path)cursor = conn.cursor()cursor.execute('''CREATE TABLE IF NOT EXISTS tasks (task_id TEXT PRIMARY KEY,name TEXT,priority INTEGER,status TEXT,input_data TEXT,result TEXT,worker_id TEXT,create_time REAL,update_time REAL)''')conn.commit()conn.close()def add_task(self, task_id: str, name: str, priority: int, data: dict) -> bool:"""添加任务到队列"""if task_id in self.task_status:return Falseself.task_status[task_id] = {"status": "PENDING","worker": "","result": None,"create_time": time.time()}self.task_queue.put((priority, task_id, name, data))# 存储到数据库conn = sqlite3.connect(self.db_path)cursor = conn.cursor()cursor.execute("INSERT INTO tasks (task_id, name, priority, status, input_data, create_time, update_time) ""VALUES (?, ?, ?, ?, ?, ?, ?)",(task_id, name, priority, "PENDING", json.dumps(data), time.time(), time.time()))conn.commit()conn.close()return Truedef get_available_worker(self) -> Optional[str]:"""获取可用工作节点"""current_time = time.time()available_workers = []# 过滤存活且空闲的节点(10秒内有心跳)for worker_id, info in self.worker_nodes.items():if (not info["busy"]) and (current_time - info["last_heartbeat"] < 10):available_workers.append(worker_id)if available_workers:# 简单轮询或选择负载最低的节点return available_workers[0]return Nonedef assign_task(self) -> Optional[Tuple[str, str, dict]]:"""分配任务给工作节点"""if self.task_queue.empty():return None# 获取最高优先级任务priority, task_id, task_name, task_data = self.task_queue.get()# 检查任务状态if self.task_status[task_id]["status"] != "PENDING":return self.assign_task() # 任务状态已变更,重新分配# 获取可用工作节点worker_id = self.get_available_worker()if not worker_id:# 无可用节点,将任务放回队列self.task_queue.put((priority, task_id, task_name, task_data))return None# 更新任务状态self.task_status[task_id] = {**self.task_status[task_id],"status": "RUNNING","worker": worker_id,"update_time": time.time()}# 更新工作节点状态self.worker_nodes[worker_id]["busy"] = True# 更新数据库conn = sqlite3.connect(self.db_path)cursor = conn.cursor()cursor.execute("UPDATE tasks SET status = ?, worker_id = ?, update_time = ? WHERE task_id = ?",("RUNNING", worker_id, time.time(), task_id))conn.commit()conn.close()return (worker_id, task_id, task_data)def process_result(self, task_id: str, result: dict, worker_id: str) -> bool:"""处理任务结果"""if task_id not in self.task_status:return False# 更新任务状态self.task_status[task_id] = {**self.task_status[task_id],"status": "COMPLETED","result": result,"update_time": time.time()}# 释放工作节点if worker_id in self.worker_nodes:self.worker_nodes[worker_id]["busy"] = False# 存储结果到数据库conn = sqlite3.connect(self.db_path)cursor = conn.cursor()cursor.execute("UPDATE tasks SET status = ?, result = ?, worker_id = ?, update_time = ? WHERE task_id = ?",("COMPLETED", json.dumps(result), "", time.time(), task_id))conn.commit()conn.close()return Truedef register_worker(self, worker_id: str, address: str) -> bool:"""注册工作节点"""if worker_id in self.worker_nodes:return Falseself.worker_nodes[worker_id] = {"address": address,"last_heartbeat": time.time(),"busy": False}return Truedef update_heartbeat(self, worker_id: str) -> bool:"""更新工作节点心跳"""if worker_id in self.worker_nodes:self.worker_nodes[worker_id]["last_heartbeat"] = time.time()return Truereturn False# 调度中心服务器类
class SchedulerServer:def __init__(self, host="0.0.0.0", port=9999):self.host = hostself.port = portself.task_manager = TaskManager()self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)self.running = Falseself.clients = {} # client_id: socketdef start(self):"""启动服务器"""self.server_socket.bind((self.host, self.port))self.server_socket.listen(10)self.running = Trueprint(f"调度中心启动成功,监听地址: {self.host}:{self.port}")# 启动心跳检测线程heartbeat_thread = threading.Thread(target=self._heartbeat_check, daemon=True)heartbeat_thread.start()# 接受客户端连接while self.running:try:client_socket, address = self.server_socket.accept()print(f"新连接: {address}")client_thread = threading.Thread(target=self._handle_client, args=(client_socket, address),daemon=True)client_thread.start()except Exception as e:print(f"接受连接失败: {e}")if self.running:time.sleep(1)def _handle_client(self, client_socket: socket.socket, address: tuple):"""处理客户端连接"""client_id = f"{address[0]}:{address[1]}"self.clients[client_id] = client_sockettry:while True:# 接收消息data = client_socket.recv(4096)if not data:break# 解析消息message = Protocol.unpack_message(data)msg_type = message.get("type", "UNKNOWN")msg_data = message.get("data", {})print(f"收到消息 [{client_id}]: {msg_type}")# 处理不同类型消息if msg_type == Protocol.TYPE_WORKER_REGISTER:worker_id = msg_data.get("worker_id")if self.task_manager.register_worker(worker_id, client_id):response = Protocol.pack_message("REGISTER_SUCCESS", {"message": "Worker registered successfully"})else:response = Protocol.pack_message("REGISTER_FAILED", {"message": "Worker ID already exists"})client_socket.send(response)elif msg_type == Protocol.TYPE_WORKER_HEARTBEAT:worker_id = msg_data.get("worker_id")if self.task_manager.update_heartbeat(worker_id):# 尝试分配任务task = self.task_manager.assign_task()if task:worker_id, task_id, task_data = taskresponse = Protocol.pack_message(Protocol.TYPE_TASK_ASSIGN,{"task_id": task_id, "data": task_data})else:response = Protocol.pack_message("NO_TASK_ASSIGN", {"message": "No task available"})else:response = Protocol.pack_message("HEARTBEAT_FAILED", {"message": "Worker not registered"})client_socket.send(response)elif msg_type == Protocol.TYPE_TASK_SUBMIT:task_id = msg_data.get("task_id")task_name = msg_data.get("task_name", "Unknown Task")priority = msg_data.get("priority", 5) # 默认为中等优先级task_data = msg_data.get("data", {})if self.task_manager.add_task(task_id, task_name, priority, task_data):response = Protocol.pack_message("TASK_SUBMIT_SUCCESS", {"task_id": task_id, "message": "Task submitted"})else:response = Protocol.pack_message("TASK_SUBMIT_FAILED", {"message": "Task ID already exists"})client_socket.send(response)elif msg_type == Protocol.TYPE_TASK_RESULT:task_id = msg_data.get("task_id")result = msg_data.get("result", {})worker_id = msg_data.get("worker_id")if self.task_manager.process_result(task_id, result, worker_id):response = Protocol.pack_message("RESULT_RECEIVED", {"task_id": task_id, "message": "Result received"})else:response = Protocol.pack_message("RESULT_FAILED", {"message": "Invalid task ID"})client_socket.send(response)else:response = Protocol.pack_message("UNKNOWN_MESSAGE", {"message": "Unknown message type"})client_socket.send(response)except Exception as e:print(f"客户端处理错误 [{client_id}]: {e}")finally:if client_id in self.clients:del self.clients[client_id]client_socket.close()print(f"连接关闭: {client_id}")def _heartbeat_check(self):"""定期检查工作节点心跳"""while self.running:current_time = time.time()for worker_id, info in list(self.worker_nodes.items()):if current_time - info["last_heartbeat"] > 30:# 节点超时,标记为离线print(f"工作节点 {worker_id} 超时,标记为离线")info["busy"] = False # 释放可能正在处理的任务time.sleep(10) # 每10秒检查一次def stop(self):"""停止服务器"""self.running = Falseif self.server_socket:self.server_socket.close()print("调度中心已停止")# 使用示例
if __name__ == "__main__":server = SchedulerServer(port=9999)try:server.start()except KeyboardInterrupt:print("接收到停止信号")finally:server.stop()
难点分析
- 分布式系统一致性:多个工作节点和客户端同时操作时确保数据一致性
- 网络通信可靠性:处理网络延迟、断开重连、消息丢失等问题
- 任务优先级调度:实现公平且高效的任务分配算法
- 系统监控与容错:设计完善的监控机制和故障恢复策略
- 性能优化:处理大量任务时的并发性能瓶颈(如数据库连接池、异步 IO)
扩展方向
- 添加 Web 管理界面(使用 Flask/Django)
- 集成 Redis 作为分布式消息队列
- 支持任务依赖关系(DAG 任务流)
- 实现资源监控(CPU / 内存 / 磁盘)
- 增加任务重试和失败转移机制
