欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 汽车 > 维修 > Python-多进程编程 (multiprocessing 模块)

Python-多进程编程 (multiprocessing 模块)

2025/5/22 13:31:31 来源:https://blog.csdn.net/weixin_49272453/article/details/147976096  浏览:    关键词:Python-多进程编程 (multiprocessing 模块)

目录

  • 一、创建进程
    • 1. Process 的语法结构
    • 2. 进程不共享全局变量
  • 二、进程间通信
    • 1. 队列通信
    • 2. 管道通信
  • 三、进程池
    • 1. 常用函数
    • 2. 进程池中的 Queue
  • 四、应用:复制文件夹(多进程版)
  • 五、守护进程和进程同步
  • 六、注意事项


通过使用 multiprocessing 模块,Python 程序可以在多核处理器上实现并行处理,提高程序的执行效率和响应速度。

一、创建进程

要创建一个新的进程,需要实例化 multiprocessing.Process 类,并调用它的 start() 方法。Process 类的构造函数接受参数 target 作为子进程需要执行的函数,args 和 kwargs 作为传递给 target 函数的参数。

1. Process 的语法结构

创建 Process 对象:Process(group , target , name , args , kwargs)

  • target :表示调用对象,指定子进程要执行的函数。
  • args :传递给 target 函数的参数 (tuple) 。
  • kwargs :传递给 target 函数的关键字参数 (dict) 。
  • name :子进程名称,可以不设定。
  • group :指定进程组,大多数情况下用不到,默认值为 None 。

Process 创建的实例对象的常用方法

  • start() :启动子进程实例。

注:

  1. 创建 Process 对象时,实际上是创建了子进程的 “描述对象” ,此时还没有真正启动子进程。
  2. 子进程真正启动是在调用 p.start() 的时候,操作系统才会分配资源创建子进程,执行对应代码。
  3. 之后,子进程执行传入的 target 函数或重写的 run() 方法中的代码。
  4. start() 是非阻塞的,父进程继续执行,而子进程代码则独立运行。
  • is_alive() :判断子进程是否还活着。

  • join(timeout=None) :用于阻塞当前(父)进程,直到子进程执行结束。如果不调用该函数,主进程会继续执行,不管子进程是否完成。

timeout(可选):指定等待子进程结束的最长时间,单位是秒。如果在超时时间内子进程结束,join() 返回,父进程继续,否则超时后父进程继续(子进程仍继续执行)。

  • terminate() :调用后,操作系统会立即杀死子进程,不管子进程是否完成任务。

terminate() 是个强制退出方法,子进程不会正常执行清理工作(如 finally 代码块、关闭文件等可能不被执行)。一般用于子进程 “无响应” 或超时等情况下,强制结束子进程。

时机行为
创建 Process 对象创建子进程描述对象,尚未启动
调用 start()操作系统创建子进程并运行子进程代码
子进程执行传入的 target 函数子进程代码开始在子进程空间运行
调用 join()父进程等待子进程运行结束

Python 代码示例:

# !/usr/bin/python
# -*- coding:utf-8 -*-from multiprocessing import Processdef run_proc(name, age, **kwargs):"""子进程要执行的代码"""print(f'My name is {name} and my age is {age}')print(kwargs)# print(1 / 0)  # 退出码是 1exit(12)if __name__ == '__main__':p = Process(target=run_proc, args=('Alice', 20), kwargs={'city':'beijing', 'country':'China'})p.start()print(f'1.子进程是否存活:{p.is_alive()}')# p.terminate()  # 退出码是 -15p.join()# p.join(0)print(f'2.子进程是否存活:{p.is_alive()}')print(f'子进程的退出码是:{p.exitcode}')print('父进程结束')

Process 创建的实例对象的常用属性

  • name :当前进程的别名,默认为 Process-N ,N 为从 1 开始递增的整数
  • pid :当前进程的 pid(进程号)
  • exitcode :子进程的退出码

其他

  • 孤儿进程 :父进程退出(kill 杀死父进程),子进程变为孤儿。

  • 僵尸进程 :子进程退出,父进程在忙碌,没有回收它,要避免僵尸。

  • Python 的 os 模块封装了常见的系统调用,其中就包括:

    • 创建子进程:os.fork()
    • 获取自身 ID :os.getpid()
    • 获取父进程 ID :os.getppid()

Python 代码示例:

# !/usr/bin/python
# -*- coding:utf-8 -*-from multiprocessing import Process
import os
import timedef run_proc():"""子进程要执行的代码"""print('运行子进程 : pid = %d , ppid = %d' % (os.getpid(), os.getppid()))print('子进程运行结束')if __name__ == '__main__':print('运行父进程 : pid = %d' % os.getpid())  # os.getpid 获取当前进程的进程号p = Process(target=run_proc)p.start()time.sleep(1)print('父进程运行结束')

2. 进程不共享全局变量

进程不共享全局变量或者列表,主要原因是每个进程拥有独立的内存空间。具体来说:

  • 独立的地址空间:每个进程在操作系统中都有自己独立的虚拟地址空间,全局变量和列表都存储在该进程的内存空间内。不同进程的内存空间相互隔离,不能直接访问对方的变量。

  • 进程隔离保证安全和稳定:这种隔离防止一个进程意外(或恶意)修改另一个进程的数据,从而保证系统的稳定性和安全性。

  • 复制而非共享:当创建新进程(如使用 fork),父进程的内存会被复制给子进程,这样变量看似 “相同” ,但其实是不同地址的独立副本,修改一个进程的变量不会影响另一个进程。

  • 共享数据需显式机制:如果需要进程间通信(IPC)和共享数据,必须使用特殊机制,如共享内存(shm)、消息队列、管道(pipe)、信号量、套接字等,而不能直接共享普通的全局变量或数据结构。

总结:进程间的内存隔离使得全局变量、列表等数据结构不被共享,确保各进程运行互不干扰。

Python 代码示例:

# !/usr/bin/python
# -*- coding:utf-8 -*-import os
import time
from multiprocessing import Processnums = [11, 22]
n = 10def work1():"""子进程要执行的代码"""global nn = 100print("in process1 pid=%d , nums=%s" % (os.getpid(), nums))  # nums=[11, 22]for i in range(3):nums.append(i)time.sleep(1)print("in process1 pid=%d , nums=%s" % (os.getpid(), nums))  # nums=[11, 22, 0, 1, 2]print("in process1 pid=%d , n=%d" % (os.getpid(), n))  # n=100def work2():"""子进程要执行的代码"""print("in process2 pid=%d , nums=%s" % (os.getpid(), nums))  # nums=[11, 22]print("in process2 pid=%d , n=%d" % (os.getpid(), n))  # n=10if __name__ == '__main__':p1 = Process(target=work1)p1.start()p1.join()print('-' * 50)p2 = Process(target=work2)p2.start()

二、进程间通信

进程间的通信通常通过 Queue 或 Pipe 实现。Queue 是一个进程安全的队列,可以用于多个进程之间的数据传递。Pipe 则提供了两个连接对象,通过管道连接,允许两个进程间的双向通信。

1. 队列通信

初始化 Queue() 对象时(例如:q=Queue()),若括号中没有指定最大可接收的消息数量,或数量为负值,那么就代表可接受的消息数量没有上限(直到内存的尽头)。

  • Queue.qsize() :返回当前队列包含的消息数量。

  • Queue.empty() :如果队列为空,返回 True ,反之返回 False 。

  • Queue.full() :如果队列满了,返回 True ,反之返回 False 。

  • Queue.get([block[, timeout]]) :获取队列中的一条消息,然后将其从列队中移除,block 默认值为 True 。

    • 如果 block 使用默认值,且没有设置 timeout(单位秒),消息列队如果为空,此时程序将被阻塞(停在读取状态),直到从消息列队读到消息为止,如果设置了 timeout ,则会等待 timeout 秒,若还没读取到任何消息,则抛出 “Queue.Empty” 异常。
    • 如果 block 值为 False,消息列队如果为空,则会立刻抛出 “Queue.Empty” 异常。
  • Queue.get_nowait() :相当于 Queue.get(block=False)

  • Queue.put(item,[block[, timeout]]) :将 item 消息写入队列,block 默认值为 True 。

    • 如果 block 使用默认值,且没有设置 timeout(单位秒),消息列队如果已经没有空间可写入,此时程序将被阻塞(停在写入状态),直到从消息列队腾出空间为止,如果设置了 timeout ,则会等待 timeout 秒,若还没空间,则抛出 “Queue.Full” 异常。
    • 如果 block 值为 False,消息列队如果没有空间可写入,则会立刻抛出 “Queue.Full” 异常。
  • Queue.put_nowait(item) :相当于 Queue.put(item, False)

Python 代码示例:

# !/usr/bin/python
# -*- coding:utf-8 -*-from multiprocessing import Process, Queue
import timedef write(q):for i in range(10):print('Put %d to queue...' % i)q.put(i)time.sleep(0.1)def read(q):while True:if not q.empty():print('Get %d from queue.' % q.get())time.sleep(0.2)else:breakif __name__ == '__main__':q = Queue()p_w = Process(target=write, args=(q,))p_r = Process(target=read, args=(q,))p_w.start()p_r.start()p_w.join()p_r.join()

2. 管道通信

通常情况下,管道有 2 个口,而 Pipe 也常用来实现 2 个进程之间的通信,这 2 个进程分别位于管道的两端,一端用来发送数据,另一端用来接收数据。

使用 Pipe 实现进程通信,首先需要调用 multiprocessing.Pipe() 函数来创建一个管道。该函
数的语法格式为:conn1, conn2 = multiprocessing.Pipe([duplex=True])

其中,conn1 和 conn2 分别用来接收 Pipe 函数返回的 2 个端口;duplex 参数默认为 True ,表示该管道是双向的,即位于 2 个端口的进程既可以发送数据,也可以接受数据,而如果将 duplex 值设为 False ,则表示管道是单向的,conn1 只能用来接收数据,而 conn2 只能用来发送数据。另外值得一提的是,conn1 和 conn2 都属于 PipeConnection 对象。

三、进程池

当需要创建的子进程数量不多时,可以直接利用 multiprocessing 中的 Process 动态成生多个进程,但当有大量的任务(上百甚至上千个)需要并行处理时,手动的去创建进程的工作量巨大,此时就可以用到 multiprocessing 模块提供的 Pool 方法。

进程池允许将任务分配给池中的工作进程执行,这样可以有效管理进程的创建和销毁,避免系统资源的浪费。初始化 Pool 时,可以指定一个最大进程数,当有新的请求提交到 Pool 中时:

  • 如果池中的进程数没有达到指定的最大值,那么就会创建一个新的进程用来执行该请求;

  • 如果池中的进程数已经达到指定的最大值,那么该请求就会等待,直到池中有进程结束,该进程会被用来执行新的任务。

1. 常用函数

multiprocessing.Pool 常用函数解析:

  • apply_async(func[, args[, kwds]]) :使用非阻塞方式调用 func(并行执行,堵塞方式必须等待上一个进程退出才能执行下一个进程),args 为传递给 func 的参数列表,kwds 为传递给 func 的关键字参数列表。

  • close() :关闭 Pool ,使其不再接受新的任务。

  • terminate() :不管任务是否完成,立即终止。

  • join() :主进程阻塞,等待子进程的退出, 必须在 close 或 terminate 之后使用。

Python 代码示例:

# !/usr/bin/python
# -*- coding:utf-8 -*-from multiprocessing.pool import Pool
import os, time, randomdef worker(msg):t_start = time.time()print(f'{os.getpid()} 开始执行任务 {msg}')# random.random()随机生成 0~1 之间的浮点数time.sleep(random.random() * 2)t_stop = time.time()print("任务 %d 执行完毕,%d 释放,耗时%0.2f" % (msg, os.getpid(), t_stop - t_start))if __name__ == '__main__':po = Pool(3)  # 定义一个进程池,最大进程数 3(注意这句必须放在 main 下面)for i in range(0, 10):# Pool().apply_async(要调用的目标,(传递给目标的参数元祖,))# 每次循环将会用空闲出来的子进程去调用目标po.apply_async(worker, (i,))  # 这里的 worker 不能是对象方法print("------start------")po.close()  # 关闭进程池,关闭后 po 不再接收新的请求po.join()  # 等待 po 中所有子进程执行完成,必须放在 close 语句之后print("-------end-------")

2. 进程池中的 Queue

如果要使用 Pool 创建进程,就需要使用 multiprocessing.Manager().Queue() ,而不是 multiprocessing.Queue() ,否则会得到一条如下的错误信息:

RuntimeError: Queue objects should only be shared between processes through inheritance.

Python 代码示例:

# !/usr/bin/python
# -*- coding:utf-8 -*-from multiprocessing import Manager, Pool
import time, osdef reader(q):print("reader 启动 (%s) , 父进程为 (%s)" % (os.getpid(), os.getppid()))for i in range(q.qsize()):print("reader 从 Queue 获取到消息:%s" % q.get())def writer(q):print("writer 启动 (%s) , 父进程为 (%s)" % (os.getpid(), os.getppid()))for i in "hello":q.put(i)time.sleep(1)if __name__ == "__main__":print("(%s) start" % os.getpid())q = Manager().Queue()  # 使用 Manager 中的 Queuepo = Pool()po.apply_async(writer, (q,))time.sleep(1)  # 先让上面的任务向 Queue 存入数据,然后再让下面的任务开始从中取数据po.apply_async(reader, (q,))po.close()po.join()print("(%s) End" % os.getpid())

四、应用:复制文件夹(多进程版)

# !/usr/bin/python
# -*- coding:utf-8 -*-import multiprocessing
import os
import time
import randomdef copy_file(queue, file_name, source_folder_name, dest_folder_name):"""copy 文件到指定的路径"""f_read = open(source_folder_name + "/" + file_name, "rb")f_write = open(dest_folder_name + "/" + file_name, "wb")while True:time.sleep(random.random())content = f_read.read(1024)if content:f_write.write(content)else:breakf_read.close()f_write.close()# 发送已经拷贝完毕的文件名字queue.put(file_name)def main():# 获取要复制的文件夹source_folder_name = input("请输入要复制文件夹名字:")# 整理目标文件夹dest_folder_name = source_folder_name + "[副本]"# 创建目标文件夹try:os.mkdir(dest_folder_name)except FileExistsError:print('该文件夹已存在')  # 如果文件夹已经存在,那么创建会失败# 获取这个文件夹中所有的普通文件名file_names = os.listdir(source_folder_name)# 创建 Queuequeue = multiprocessing.Manager().Queue()# 创建进程池pool = multiprocessing.Pool(3)for file_name in file_names:# 向进程池中添加任务pool.apply_async(copy_file, args=(queue, file_name, source_folder_name, dest_folder_name))# 主进程显示进度pool.close()all_file_num = len(file_names)while True:file_name = queue.get()if file_name in file_names:file_names.remove(file_name)copy_rate = (all_file_num - len(file_names)) * 100 / all_file_numprint("\r%.2f...(%s)" % (copy_rate, file_name) + " "*50, end="")if copy_rate >= 100:breakprint()if __name__ == "__main__":main()

五、守护进程和进程同步

multiprocessing 模块还提供了守护进程(daemon process)的概念,守护进程会在主进程代码执行结束后自动终止。此外,模块中还包含了锁(Lock)和信号量(Semaphore)等同步原语,用于在进程间同步操作。

六、注意事项

  • 使用 multiprocessing 时,应避免在多个进程间共享状态。

  • 确保传递给 Process 类的所有参数都是可序列化的。

  • 在主模块中使用 if __name__ == "__main__": 来保护程序的入口点。

  • 尽量不要使用 Process.terminate() 来终止进程,因为这可能会导致共享资源变得不可用。

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词