欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 教育 > 锐评 > Python实例题:分布式任务调度系统

Python实例题:分布式任务调度系统

2025/11/10 11:47:09 来源:https://blog.csdn.net/shangzhiqi/article/details/148703429  浏览:    关键词:Python实例题:分布式任务调度系统

目录

Python实例题

题目

问题描述

调度中心(Server):

工作节点(Worker):

客户端(Client):

系统架构

关键实现思路

核心代码框架(调度中心部分)

难点分析

扩展方向

Python实例题

题目

分布式任务调度系统

问题描述

设计一个简化版分布式任务调度系统,包含以下组件:

  • 调度中心(Server)

    • 接收客户端任务提交
    • 维护任务队列和工作节点状态
    • 分配任务到空闲节点
    • 存储任务执行结果和日志
  • 工作节点(Worker)

    • 连接调度中心获取任务
    • 执行计算密集型任务(如数学建模、数据处理)
    • 返回任务结果
  • 客户端(Client)

    • 提交任务到调度中心
    • 查询任务状态和结果

系统架构

+----------------+     +----------------+     +----------------+
|    客户端      |<--->|    调度中心    |<--->|    工作节点1    |
+----------------+     +----------------+     +----------------+^|
+----------------+     +----------------+     +----------------+
|    客户端      |<--->|    调度中心    |<--->|    工作节点2    |
+----------------+     +----------------+     +----------------+

关键实现思路

  • 使用Socket实现网络通信(或选择ZeroMQ等更专业的消息队列)
  • SQLite/PostgreSQL存储任务信息和结果
  • 采用multiprocessingconcurrent.futures处理并发任务
  • 设计自定义通信协议(如 JSON 格式消息)
  • 实现心跳机制检测工作节点存活状态
  • 支持任务优先级和超时处理

核心代码框架(调度中心部分)

import socket
import json
import threading
import time
import sqlite3
import queue
from typing import Dict, List, Tuple, Optional# 通信协议定义
class Protocol:# 消息类型TYPE_TASK_SUBMIT = "TASK_SUBMIT"    # 提交任务TYPE_TASK_ASSIGN = "TASK_ASSIGN"    # 分配任务TYPE_TASK_RESULT = "TASK_RESULT"    # 返回结果TYPE_WORKER_HEARTBEAT = "HEARTBEAT" # 工作节点心跳TYPE_WORKER_REGISTER = "REGISTER"   # 工作节点注册@staticmethoddef pack_message(msg_type: str, data: dict) -> bytes:"""打包消息为JSON格式"""message = {"type": msg_type, "data": data, "timestamp": time.time()}return json.dumps(message).encode()@staticmethoddef unpack_message(msg: bytes) -> dict:"""解析JSON消息"""try:return json.loads(msg.decode())except:return {"type": "ERROR", "data": "Invalid message format"}# 任务管理类
class TaskManager:def __init__(self, db_path="task_system.db"):self.task_queue = queue.PriorityQueue()  # 优先级队列,数字越小优先级越高self.task_status = {}  # task_id: {"status": "", "worker": "", "result": ""}self.worker_nodes = {}  # worker_id: {"address": "", "last_heartbeat": 0, "busy": False}self.db_path = db_pathself._init_database()def _init_database(self):"""初始化数据库"""conn = sqlite3.connect(self.db_path)cursor = conn.cursor()cursor.execute('''CREATE TABLE IF NOT EXISTS tasks (task_id TEXT PRIMARY KEY,name TEXT,priority INTEGER,status TEXT,input_data TEXT,result TEXT,worker_id TEXT,create_time REAL,update_time REAL)''')conn.commit()conn.close()def add_task(self, task_id: str, name: str, priority: int, data: dict) -> bool:"""添加任务到队列"""if task_id in self.task_status:return Falseself.task_status[task_id] = {"status": "PENDING","worker": "","result": None,"create_time": time.time()}self.task_queue.put((priority, task_id, name, data))# 存储到数据库conn = sqlite3.connect(self.db_path)cursor = conn.cursor()cursor.execute("INSERT INTO tasks (task_id, name, priority, status, input_data, create_time, update_time) ""VALUES (?, ?, ?, ?, ?, ?, ?)",(task_id, name, priority, "PENDING", json.dumps(data), time.time(), time.time()))conn.commit()conn.close()return Truedef get_available_worker(self) -> Optional[str]:"""获取可用工作节点"""current_time = time.time()available_workers = []# 过滤存活且空闲的节点(10秒内有心跳)for worker_id, info in self.worker_nodes.items():if (not info["busy"]) and (current_time - info["last_heartbeat"] < 10):available_workers.append(worker_id)if available_workers:# 简单轮询或选择负载最低的节点return available_workers[0]return Nonedef assign_task(self) -> Optional[Tuple[str, str, dict]]:"""分配任务给工作节点"""if self.task_queue.empty():return None# 获取最高优先级任务priority, task_id, task_name, task_data = self.task_queue.get()# 检查任务状态if self.task_status[task_id]["status"] != "PENDING":return self.assign_task()  # 任务状态已变更,重新分配# 获取可用工作节点worker_id = self.get_available_worker()if not worker_id:# 无可用节点,将任务放回队列self.task_queue.put((priority, task_id, task_name, task_data))return None# 更新任务状态self.task_status[task_id] = {**self.task_status[task_id],"status": "RUNNING","worker": worker_id,"update_time": time.time()}# 更新工作节点状态self.worker_nodes[worker_id]["busy"] = True# 更新数据库conn = sqlite3.connect(self.db_path)cursor = conn.cursor()cursor.execute("UPDATE tasks SET status = ?, worker_id = ?, update_time = ? WHERE task_id = ?",("RUNNING", worker_id, time.time(), task_id))conn.commit()conn.close()return (worker_id, task_id, task_data)def process_result(self, task_id: str, result: dict, worker_id: str) -> bool:"""处理任务结果"""if task_id not in self.task_status:return False# 更新任务状态self.task_status[task_id] = {**self.task_status[task_id],"status": "COMPLETED","result": result,"update_time": time.time()}# 释放工作节点if worker_id in self.worker_nodes:self.worker_nodes[worker_id]["busy"] = False# 存储结果到数据库conn = sqlite3.connect(self.db_path)cursor = conn.cursor()cursor.execute("UPDATE tasks SET status = ?, result = ?, worker_id = ?, update_time = ? WHERE task_id = ?",("COMPLETED", json.dumps(result), "", time.time(), task_id))conn.commit()conn.close()return Truedef register_worker(self, worker_id: str, address: str) -> bool:"""注册工作节点"""if worker_id in self.worker_nodes:return Falseself.worker_nodes[worker_id] = {"address": address,"last_heartbeat": time.time(),"busy": False}return Truedef update_heartbeat(self, worker_id: str) -> bool:"""更新工作节点心跳"""if worker_id in self.worker_nodes:self.worker_nodes[worker_id]["last_heartbeat"] = time.time()return Truereturn False# 调度中心服务器类
class SchedulerServer:def __init__(self, host="0.0.0.0", port=9999):self.host = hostself.port = portself.task_manager = TaskManager()self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)self.running = Falseself.clients = {}  # client_id: socketdef start(self):"""启动服务器"""self.server_socket.bind((self.host, self.port))self.server_socket.listen(10)self.running = Trueprint(f"调度中心启动成功,监听地址: {self.host}:{self.port}")# 启动心跳检测线程heartbeat_thread = threading.Thread(target=self._heartbeat_check, daemon=True)heartbeat_thread.start()# 接受客户端连接while self.running:try:client_socket, address = self.server_socket.accept()print(f"新连接: {address}")client_thread = threading.Thread(target=self._handle_client, args=(client_socket, address),daemon=True)client_thread.start()except Exception as e:print(f"接受连接失败: {e}")if self.running:time.sleep(1)def _handle_client(self, client_socket: socket.socket, address: tuple):"""处理客户端连接"""client_id = f"{address[0]}:{address[1]}"self.clients[client_id] = client_sockettry:while True:# 接收消息data = client_socket.recv(4096)if not data:break# 解析消息message = Protocol.unpack_message(data)msg_type = message.get("type", "UNKNOWN")msg_data = message.get("data", {})print(f"收到消息 [{client_id}]: {msg_type}")# 处理不同类型消息if msg_type == Protocol.TYPE_WORKER_REGISTER:worker_id = msg_data.get("worker_id")if self.task_manager.register_worker(worker_id, client_id):response = Protocol.pack_message("REGISTER_SUCCESS", {"message": "Worker registered successfully"})else:response = Protocol.pack_message("REGISTER_FAILED", {"message": "Worker ID already exists"})client_socket.send(response)elif msg_type == Protocol.TYPE_WORKER_HEARTBEAT:worker_id = msg_data.get("worker_id")if self.task_manager.update_heartbeat(worker_id):# 尝试分配任务task = self.task_manager.assign_task()if task:worker_id, task_id, task_data = taskresponse = Protocol.pack_message(Protocol.TYPE_TASK_ASSIGN,{"task_id": task_id, "data": task_data})else:response = Protocol.pack_message("NO_TASK_ASSIGN", {"message": "No task available"})else:response = Protocol.pack_message("HEARTBEAT_FAILED", {"message": "Worker not registered"})client_socket.send(response)elif msg_type == Protocol.TYPE_TASK_SUBMIT:task_id = msg_data.get("task_id")task_name = msg_data.get("task_name", "Unknown Task")priority = msg_data.get("priority", 5)  # 默认为中等优先级task_data = msg_data.get("data", {})if self.task_manager.add_task(task_id, task_name, priority, task_data):response = Protocol.pack_message("TASK_SUBMIT_SUCCESS", {"task_id": task_id, "message": "Task submitted"})else:response = Protocol.pack_message("TASK_SUBMIT_FAILED", {"message": "Task ID already exists"})client_socket.send(response)elif msg_type == Protocol.TYPE_TASK_RESULT:task_id = msg_data.get("task_id")result = msg_data.get("result", {})worker_id = msg_data.get("worker_id")if self.task_manager.process_result(task_id, result, worker_id):response = Protocol.pack_message("RESULT_RECEIVED", {"task_id": task_id, "message": "Result received"})else:response = Protocol.pack_message("RESULT_FAILED", {"message": "Invalid task ID"})client_socket.send(response)else:response = Protocol.pack_message("UNKNOWN_MESSAGE", {"message": "Unknown message type"})client_socket.send(response)except Exception as e:print(f"客户端处理错误 [{client_id}]: {e}")finally:if client_id in self.clients:del self.clients[client_id]client_socket.close()print(f"连接关闭: {client_id}")def _heartbeat_check(self):"""定期检查工作节点心跳"""while self.running:current_time = time.time()for worker_id, info in list(self.worker_nodes.items()):if current_time - info["last_heartbeat"] > 30:# 节点超时,标记为离线print(f"工作节点 {worker_id} 超时,标记为离线")info["busy"] = False  # 释放可能正在处理的任务time.sleep(10)  # 每10秒检查一次def stop(self):"""停止服务器"""self.running = Falseif self.server_socket:self.server_socket.close()print("调度中心已停止")# 使用示例
if __name__ == "__main__":server = SchedulerServer(port=9999)try:server.start()except KeyboardInterrupt:print("接收到停止信号")finally:server.stop()

难点分析

  • 分布式系统一致性:多个工作节点和客户端同时操作时确保数据一致性
  • 网络通信可靠性:处理网络延迟、断开重连、消息丢失等问题
  • 任务优先级调度:实现公平且高效的任务分配算法
  • 系统监控与容错:设计完善的监控机制和故障恢复策略
  • 性能优化:处理大量任务时的并发性能瓶颈(如数据库连接池、异步 IO)

扩展方向

  • 添加 Web 管理界面(使用 Flask/Django)
  • 集成 Redis 作为分布式消息队列
  • 支持任务依赖关系(DAG 任务流)
  • 实现资源监控(CPU / 内存 / 磁盘)
  • 增加任务重试和失败转移机制

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词