1. 问题定义
流水车间调度问题(Flow Shop Scheduling Problem, FSP) 是研究n个工件在m台机器上的流水加工过程,其核心特征为:
- 工件集合:n个独立工件{J₁, J₂, ..., Jₙ},每个工件需依次经过m道工序加工
- 机器配置:m台机器按固定顺序排列{M₁, M₂, ..., Mₘ},每台机器仅负责一道工序
- 工艺路线:所有工件遵循相同的加工顺序(M₁→M₂→...→Mₘ),即"置换流水车间调度问题(PFSP)"
2. 约束条件
该问题需满足以下基本约束:
- 加工连续性:工件一旦开始加工不可中断
- 机器独占性:每台机器同一时刻只能加工一个工件
- 工艺顺序性:所有工序必须按机器顺序执行,前道工序未完成不得进入下一工序
- 零等待约束:工件在机器间的转移允许无限缓冲等待(非零空闲问题)
- 时间约束:所有工件在零时刻可加工,加工时间已知且非负
3. 目标函数
以最小化最大完工时间(Makespan) 为优化目标:
- 数学表达:C_max = max{C(m,πₙ)},其中C(i,j)表示工件j在机器i的完工时间
- 计算逻辑:通过前向递归公式计算时间轴,具体包括:C(1,j)=Σk=1jp(1,πk)C(i,j)=maxC(i,j−1),C(i−1,j)+p(i,πj)(i≥2,j≥2)
4. 问题特性
该问题属于NP-hard组合优化问题
,具有以下特点:
- 解空间复杂度:O(n!),仅考虑置换排序情况(PFSP)
- 最优解特性:当m≤3时存在全局最优置换排序,m≥4时可能需非置换排序
- 计算挑战:标准算例Car1(11工件×5机器)的最优解验证需复杂搜索
5. 算法适配性
上述代码针对该问题的以下特性设计:
- 编码方式:采用置换排列编码(π = [π₁,...,πₙ])
- 邻域结构:通过OX交叉和交换变异保持合法解空间
- 目标适配:前向计算法精确求解makespan,适应度函数取倒数优化方向
import numpy as np
import matplotlib.pyplot as pltclass FlowShopGA:def __init__(self, processing_times, pop_size=100, max_gen=500,crossover_rate=0.8, mutation_rate=0.2, elite_ratio=0.1):"""改进的流水车间遗传算法:param processing_times: m×n矩阵,m机器数,n工件数(需确保每行长度一致)"""# 输入验证(新增)if not all(len(row) == len(processing_times[0]) for row in processing_times):raise ValueError("加工时间矩阵每行长度必须相同")self.processing_times = np.array(processing_times, dtype=int)self.n_jobs = self.processing_times.shape[1]self.n_machines = self.processing_times.shape[0]self.pop_size = pop_sizeself.max_gen = max_genself.cr = crossover_rateself.mr = mutation_rateself.elite_size = int(pop_size * elite_ratio)def init_population(self):"""初始化种群:生成工件排列的随机排列(整数编码)"""return [np.random.permutation(self.n_jobs).astype(int)for _ in range(self.pop_size)]def calculate_makespan(self, schedule):"""前向计算法求解最大完工时间(增强鲁棒性)"""# 类型验证(关键修复点)schedule = np.array(schedule, dtype=int)if len(np.unique(schedule)) != self.n_jobs:raise ValueError("调度序列包含重复或缺失的工件编号")timeline = np.zeros((self.n_machines, self.n_jobs), dtype=int)# 第一台机器(优化计算方式)timeline[0] = np.cumsum(self.processing_times[0][schedule])# 后续机器(优化计算逻辑)for i in range(1, self.n_machines):timeline[i, 0] = timeline[i - 1, 0] + self.processing_times[i][schedule[0]]for j in range(1, self.n_jobs):timeline[i, j] = max(timeline[i, j - 1], timeline[i - 1, j]) + self.processing_times[i][schedule[j]]return timeline[-1, -1]def fitness(self, population):"""适应度计算(倒数转换+防零除)"""return [1 / (self.calculate_makespan(ind) + 1e-6) for ind in population]def tournament_selection(self, population, fitness, k=3):"""锦标赛选择(k=3)"""selected = []for _ in range(self.pop_size):candidates = np.random.choice(len(population), k, replace=False)winner = candidates[np.argmax([fitness[c] for c in candidates])]selected.append(population[winner])return selecteddef ox_crossover(self, parent1, parent2):"""改进的顺序交叉(OX)(修复基因重复问题)"""size = len(parent1)cx1, cx2 = sorted(np.random.choice(size, 2, replace=False))# 中间段保持整数child1_part = parent1[cx1:cx2].copy()child2_part = parent2[cx1:cx2].copy()# 生成子代(确保唯一性)child1_remain = [g for g in parent2 if g not in child1_part]child1 = np.concatenate([child1_remain[:cx1], child1_part, child1_remain[cx1:]])child2_remain = [g for g in parent1 if g not in child2_part]child2 = np.concatenate([child2_remain[:cx1], child2_part, child2_remain[cx1:]])return child1.astype(int), child2.astype(int)def swap_mutation(self, individual):"""交换变异(增强类型控制)"""individual = individual.astype(int)if np.random.rand() < self.mr:i, j = np.random.choice(len(individual), 2, replace=False)individual[i], individual[j] = individual[j], individual[i]return individualdef run(self):"""算法主流程(增加收敛控制)"""population = self.init_population()best_history = []for gen in range(self.max_gen):# 计算适应度fitness = self.fitness(population)# 精英保留(改进保留策略)elite_indices = np.argsort(fitness)[-self.elite_size:]elites = [population[i].copy() for i in elite_indices]# 选择(加入轮盘赌备用策略)try:selected = self.tournament_selection(population, fitness)except:selected = population # 防止选择失败# 交叉(加入概率控制)children = []for i in range(0, len(selected) - 1, 2):if np.random.rand() < self.cr:child1, child2 = self.ox_crossover(selected[i], selected[i + 1])else:child1, child2 = selected[i].copy(), selected[i + 1].copy()children.extend([child1, child2])# 变异(增强变异控制)mutated = [self.swap_mutation(ind.copy()) for ind in children]# 生成新一代(保持种群规模)population = elites + mutated[:self.pop_size - self.elite_size]# 记录最优解best_idx = np.argmax(fitness)best_makespan = 1 / fitness[best_idx]best_history.append(best_makespan)# 动态输出(每50代显示进度)if gen % 50 == 0:print(f"Generation {gen}: Makespan = {best_makespan:.2f}")# 可视化收敛曲线plt.figure(figsize=(10, 6))plt.plot(best_history, 'b-', linewidth=2)plt.title("Makespan Convergence", fontsize=14)plt.xlabel("Generation", fontsize=12)plt.ylabel("Makespan", fontsize=12)plt.grid(True)plt.show()# 最终验证final_fitness = [self.calculate_makespan(ind) for ind in population]best_solution = population[np.argmin(final_fitness)]return best_solution, self.calculate_makespan(best_solution)if __name__ == "__main__":# 测试案例(Car1标准算例)processing_times = [[375, 632, 12, 460, 528, 796, 532, 14, 257, 896, 532],[12, 452, 876, 542, 101, 245, 230, 124, 527, 896, 302],[142, 758, 124, 523, 789, 632, 543, 214, 753, 214, 501],[245, 278, 534, 120, 124, 375, 896, 543, 210, 258, 765],[412, 398, 765, 499, 999, 123, 452, 785, 463, 259, 988]]try:ga = FlowShopGA(processing_times, pop_size=100, max_gen=500)solution, makespan = ga.run()print("\n最优调度顺序:", solution)print("最小完工时间:", makespan)except Exception as e:print("运行错误:", str(e))
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.pyplot import cmclass FlowShopGA:def __init__(self, processing_times, pop_size=100, max_gen=500,crossover_rate=0.8, mutation_rate=0.2, elite_ratio=0.1):# 输入验证与初始化(网页1的算法框架)if not all(len(row) == len(processing_times[0]) for row in processing_times):raise ValueError("加工时间矩阵每行长度必须相同")self.processing_times = np.array(processing_times, dtype=int)self.n_jobs = self.processing_times.shape[1]self.n_machines = self.processing_times.shape[0]self.pop_size = pop_sizeself.max_gen = max_genself.cr = crossover_rateself.mr = mutation_rateself.elite_size = int(pop_size * elite_ratio)def init_population(self):"""初始化种群(网页1的编码方式)"""return [np.random.permutation(self.n_jobs).astype(int)for _ in range(self.pop_size)]def calculate_makespan(self, schedule):"""前向递归计算最大完工时间(网页1的数学模型)"""schedule = np.array(schedule, dtype=int)if len(np.unique(schedule)) != self.n_jobs:raise ValueError("调度序列包含重复或缺失的工件编号")timeline = np.zeros((self.n_machines, self.n_jobs), dtype=int)timeline[0] = np.cumsum(self.processing_times[0][schedule])for i in range(1, self.n_machines):timeline[i, 0] = timeline[i - 1, 0] + self.processing_times[i][schedule[0]]for j in range(1, self.n_jobs):timeline[i, j] = max(timeline[i, j - 1], timeline[i - 1, j]) + self.processing_times[i][schedule[j]]return timeline[-1, -1]def plot_gantt(self, schedule, makespan):"""甘特图可视化(网页1的时间轴计算+网页3的绘图规范)"""schedule = np.array(schedule, dtype=int)timeline = np.zeros((self.n_machines, self.n_jobs), dtype=int)start_times = np.zeros_like(timeline)# 计算开始时间矩阵timeline[0] = np.cumsum(self.processing_times[0][schedule])start_times[0] = np.insert(timeline[0][:-1], 0, 0)for i in range(1, self.n_machines):timeline[i, 0] = timeline[i - 1, 0] + self.processing_times[i][schedule[0]]start_times[i, 0] = timeline[i - 1, 0]for j in range(1, self.n_jobs):start = max(timeline[i, j - 1], timeline[i - 1, j])start_times[i, j] = starttimeline[i, j] = start + self.processing_times[i][schedule[j]]# 绘制甘特图fig, ax = plt.subplots(figsize=(12, 6))colors = cm.get_cmap('tab20')(np.linspace(0, 1, self.n_jobs))for machine in range(self.n_machines):for job_idx, job in enumerate(schedule):ax.barh(y=machine,width=self.processing_times[machine][job],left=start_times[machine][job_idx],color=colors[job],edgecolor='black',label=f'Job {job + 1}' if machine == 0 and job_idx == 0 else "")ax.text(x=start_times[machine][job_idx] + self.processing_times[machine][job] / 2,y=machine,s=f'J{job + 1}\n({self.processing_times[machine][job]})',va='center',ha='center',color='white',fontweight='bold')# 图表装饰(网页3的可视化规范)ax.set_yticks(range(self.n_machines))ax.set_yticklabels([f'Machine {i + 1}' for i in range(self.n_machines)])ax.set_xlabel('Time Units')ax.set_title(f'Optimal Gantt Chart (Makespan: {makespan})')ax.grid(axis='x', linestyle='--', alpha=0.7)ax.invert_yaxis()handles, labels = ax.get_legend_handles_labels()unique_labels = dict(zip(labels, handles))ax.legend(unique_labels.values(),[f'Job {i + 1}' for i in schedule],title='Job Sequence',bbox_to_anchor=(1.05, 1),loc='upper left')plt.tight_layout()plt.show()# 以下为遗传算法核心组件(网页1的算法流程)def fitness(self, population):return [1 / (self.calculate_makespan(ind) + 1e-6) for ind in population]def tournament_selection(self, population, fitness, k=3):selected = []for _ in range(self.pop_size):candidates = np.random.choice(len(population), k, replace=False)winner = candidates[np.argmax([fitness[c] for c in candidates])]selected.append(population[winner])return selecteddef ox_crossover(self, parent1, parent2):size = len(parent1)cx1, cx2 = sorted(np.random.choice(size, 2, replace=False))child1_part = parent1[cx1:cx2].copy()child1_remain = [g for g in parent2 if g not in child1_part]child1 = np.concatenate([child1_remain[:cx1], child1_part, child1_remain[cx1:]])return child1.astype(int), Nonedef swap_mutation(self, individual):if np.random.rand() < self.mr:i, j = np.random.choice(len(individual), 2, replace=False)individual[i], individual[j] = individual[j], individual[i]return individualdef run(self):"""优化主流程(网页1的进化策略)"""population = self.init_population()best_history = []for gen in range(self.max_gen):fitness = self.fitness(population)elite_indices = np.argsort(fitness)[-self.elite_size:]elites = [population[i].copy() for i in elite_indices]try:selected = self.tournament_selection(population, fitness)except:selected = populationchildren = []for i in range(0, len(selected) - 1, 2):if np.random.rand() < self.cr:child1, _ = self.ox_crossover(selected[i], selected[i + 1])children.append(child1)else:children.append(selected[i].copy())mutated = [self.swap_mutation(ind.copy()) for ind in children]population = elites + mutated[:self.pop_size - self.elite_size]best_idx = np.argmax(fitness)best_makespan = 1 / fitness[best_idx]best_history.append(best_makespan)if gen % 50 == 0:print(f"Generation {gen}: Makespan = {best_makespan:.2f}")# 可视化与结果输出(网页3的配置建议)plt.figure(figsize=(10, 6))plt.plot(best_history, 'b-', linewidth=2)plt.title("Makespan Convergence")plt.xlabel("Generation")plt.ylabel("Makespan")plt.grid(True)plt.show()final_fitness = [self.calculate_makespan(ind) for ind in population]best_solution = population[np.argmin(final_fitness)]makespan = self.calculate_makespan(best_solution)self.plot_gantt(best_solution, makespan)return best_solution, makespanif __name__ == "__main__":# 测试案例(网页1的标准算例)processing_times = [[31, 19, 23, 13, 33],[41, 55, 42, 22, 5],[25, 3, 27, 14, 57],[30, 34, 6, 13, 19]]try:ga = FlowShopGA(processing_times,pop_size=50,max_gen=200,crossover_rate=0.7,mutation_rate=0.3)solution, makespan = ga.run()print("\n最优调度顺序:", solution)print("最小完工时间:", makespan)except Exception as e:print("运行错误:", str(e))