目录
- 一、创建进程
- 1. Process 的语法结构
- 2. 进程不共享全局变量
- 二、进程间通信
- 1. 队列通信
- 2. 管道通信
- 三、进程池
- 1. 常用函数
- 2. 进程池中的 Queue
- 四、应用:复制文件夹(多进程版)
- 五、守护进程和进程同步
- 六、注意事项
通过使用 multiprocessing 模块,Python 程序可以在多核处理器上实现并行处理,提高程序的执行效率和响应速度。
一、创建进程
要创建一个新的进程,需要实例化 multiprocessing.Process 类,并调用它的 start() 方法。Process 类的构造函数接受参数 target 作为子进程需要执行的函数,args 和 kwargs 作为传递给 target 函数的参数。
1. Process 的语法结构
① 创建 Process 对象:Process(group , target , name , args , kwargs)
- target :表示调用对象,指定子进程要执行的函数。
- args :传递给 target 函数的参数 (tuple) 。
- kwargs :传递给 target 函数的关键字参数 (dict) 。
- name :子进程名称,可以不设定。
- group :指定进程组,大多数情况下用不到,默认值为 None 。
② Process 创建的实例对象的常用方法:
start()
:启动子进程实例。
注:
- 创建 Process 对象时,实际上是创建了子进程的 “描述对象” ,此时还没有真正启动子进程。
- 子进程真正启动是在调用 p.start() 的时候,操作系统才会分配资源创建子进程,执行对应代码。
- 之后,子进程执行传入的 target 函数或重写的 run() 方法中的代码。
- start() 是非阻塞的,父进程继续执行,而子进程代码则独立运行。
-
is_alive()
:判断子进程是否还活着。 -
join(timeout=None)
:用于阻塞当前(父)进程,直到子进程执行结束。如果不调用该函数,主进程会继续执行,不管子进程是否完成。
timeout(可选):指定等待子进程结束的最长时间,单位是秒。如果在超时时间内子进程结束,join() 返回,父进程继续,否则超时后父进程继续(子进程仍继续执行)。
terminate()
:调用后,操作系统会立即杀死子进程,不管子进程是否完成任务。
terminate() 是个强制退出方法,子进程不会正常执行清理工作(如 finally 代码块、关闭文件等可能不被执行)。一般用于子进程 “无响应” 或超时等情况下,强制结束子进程。
时机 | 行为 |
---|---|
创建 Process 对象 | 创建子进程描述对象,尚未启动 |
调用 start() | 操作系统创建子进程并运行子进程代码 |
子进程执行传入的 target 函数 | 子进程代码开始在子进程空间运行 |
调用 join() | 父进程等待子进程运行结束 |
Python 代码示例:
# !/usr/bin/python
# -*- coding:utf-8 -*-from multiprocessing import Processdef run_proc(name, age, **kwargs):"""子进程要执行的代码"""print(f'My name is {name} and my age is {age}')print(kwargs)# print(1 / 0) # 退出码是 1exit(12)if __name__ == '__main__':p = Process(target=run_proc, args=('Alice', 20), kwargs={'city':'beijing', 'country':'China'})p.start()print(f'1.子进程是否存活:{p.is_alive()}')# p.terminate() # 退出码是 -15p.join()# p.join(0)print(f'2.子进程是否存活:{p.is_alive()}')print(f'子进程的退出码是:{p.exitcode}')print('父进程结束')
③ Process 创建的实例对象的常用属性:
- name :当前进程的别名,默认为 Process-N ,N 为从 1 开始递增的整数
- pid :当前进程的 pid(进程号)
- exitcode :子进程的退出码
④ 其他:
-
孤儿进程 :父进程退出(kill 杀死父进程),子进程变为孤儿。
-
僵尸进程 :子进程退出,父进程在忙碌,没有回收它,要避免僵尸。
-
Python 的
os
模块封装了常见的系统调用,其中就包括:- 创建子进程:
os.fork()
- 获取自身 ID :
os.getpid()
- 获取父进程 ID :
os.getppid()
- 创建子进程:
Python 代码示例:
# !/usr/bin/python
# -*- coding:utf-8 -*-from multiprocessing import Process
import os
import timedef run_proc():"""子进程要执行的代码"""print('运行子进程 : pid = %d , ppid = %d' % (os.getpid(), os.getppid()))print('子进程运行结束')if __name__ == '__main__':print('运行父进程 : pid = %d' % os.getpid()) # os.getpid 获取当前进程的进程号p = Process(target=run_proc)p.start()time.sleep(1)print('父进程运行结束')
2. 进程不共享全局变量
进程不共享全局变量或者列表,主要原因是每个进程拥有独立的内存空间。具体来说:
-
独立的地址空间:每个进程在操作系统中都有自己独立的虚拟地址空间,全局变量和列表都存储在该进程的内存空间内。不同进程的内存空间相互隔离,不能直接访问对方的变量。
-
进程隔离保证安全和稳定:这种隔离防止一个进程意外(或恶意)修改另一个进程的数据,从而保证系统的稳定性和安全性。
-
复制而非共享:当创建新进程(如使用
fork
),父进程的内存会被复制给子进程,这样变量看似 “相同” ,但其实是不同地址的独立副本,修改一个进程的变量不会影响另一个进程。 -
共享数据需显式机制:如果需要进程间通信(IPC)和共享数据,必须使用特殊机制,如共享内存(
shm
)、消息队列、管道(pipe
)、信号量、套接字等,而不能直接共享普通的全局变量或数据结构。
总结:进程间的内存隔离使得全局变量、列表等数据结构不被共享,确保各进程运行互不干扰。
Python 代码示例:
# !/usr/bin/python
# -*- coding:utf-8 -*-import os
import time
from multiprocessing import Processnums = [11, 22]
n = 10def work1():"""子进程要执行的代码"""global nn = 100print("in process1 pid=%d , nums=%s" % (os.getpid(), nums)) # nums=[11, 22]for i in range(3):nums.append(i)time.sleep(1)print("in process1 pid=%d , nums=%s" % (os.getpid(), nums)) # nums=[11, 22, 0, 1, 2]print("in process1 pid=%d , n=%d" % (os.getpid(), n)) # n=100def work2():"""子进程要执行的代码"""print("in process2 pid=%d , nums=%s" % (os.getpid(), nums)) # nums=[11, 22]print("in process2 pid=%d , n=%d" % (os.getpid(), n)) # n=10if __name__ == '__main__':p1 = Process(target=work1)p1.start()p1.join()print('-' * 50)p2 = Process(target=work2)p2.start()
二、进程间通信
进程间的通信通常通过 Queue 或 Pipe 实现。Queue 是一个进程安全的队列,可以用于多个进程之间的数据传递。Pipe 则提供了两个连接对象,通过管道连接,允许两个进程间的双向通信。
1. 队列通信
初始化 Queue() 对象时(例如:q=Queue()
),若括号中没有指定最大可接收的消息数量,或数量为负值,那么就代表可接受的消息数量没有上限(直到内存的尽头)。
-
Queue.qsize()
:返回当前队列包含的消息数量。 -
Queue.empty()
:如果队列为空,返回 True ,反之返回 False 。 -
Queue.full()
:如果队列满了,返回 True ,反之返回 False 。 -
Queue.get([block[, timeout]])
:获取队列中的一条消息,然后将其从列队中移除,block 默认值为 True 。- 如果 block 使用默认值,且没有设置 timeout(单位秒),消息列队如果为空,此时程序将被阻塞(停在读取状态),直到从消息列队读到消息为止,如果设置了 timeout ,则会等待 timeout 秒,若还没读取到任何消息,则抛出 “Queue.Empty” 异常。
- 如果 block 值为 False,消息列队如果为空,则会立刻抛出 “Queue.Empty” 异常。
-
Queue.get_nowait()
:相当于Queue.get(block=False)
。 -
Queue.put(item,[block[, timeout]])
:将 item 消息写入队列,block 默认值为 True 。- 如果 block 使用默认值,且没有设置 timeout(单位秒),消息列队如果已经没有空间可写入,此时程序将被阻塞(停在写入状态),直到从消息列队腾出空间为止,如果设置了 timeout ,则会等待 timeout 秒,若还没空间,则抛出 “Queue.Full” 异常。
- 如果 block 值为 False,消息列队如果没有空间可写入,则会立刻抛出 “Queue.Full” 异常。
-
Queue.put_nowait(item)
:相当于Queue.put(item, False)
。
Python 代码示例:
# !/usr/bin/python
# -*- coding:utf-8 -*-from multiprocessing import Process, Queue
import timedef write(q):for i in range(10):print('Put %d to queue...' % i)q.put(i)time.sleep(0.1)def read(q):while True:if not q.empty():print('Get %d from queue.' % q.get())time.sleep(0.2)else:breakif __name__ == '__main__':q = Queue()p_w = Process(target=write, args=(q,))p_r = Process(target=read, args=(q,))p_w.start()p_r.start()p_w.join()p_r.join()
2. 管道通信
通常情况下,管道有 2 个口,而 Pipe 也常用来实现 2 个进程之间的通信,这 2 个进程分别位于管道的两端,一端用来发送数据,另一端用来接收数据。
使用 Pipe 实现进程通信,首先需要调用 multiprocessing.Pipe() 函数来创建一个管道。该函
数的语法格式为:conn1, conn2 = multiprocessing.Pipe([duplex=True])
。
其中,conn1 和 conn2 分别用来接收 Pipe 函数返回的 2 个端口;duplex 参数默认为 True ,表示该管道是双向的,即位于 2 个端口的进程既可以发送数据,也可以接受数据,而如果将 duplex 值设为 False ,则表示管道是单向的,conn1 只能用来接收数据,而 conn2 只能用来发送数据。另外值得一提的是,conn1 和 conn2 都属于 PipeConnection 对象。
三、进程池
当需要创建的子进程数量不多时,可以直接利用 multiprocessing 中的 Process 动态成生多个进程,但当有大量的任务(上百甚至上千个)需要并行处理时,手动的去创建进程的工作量巨大,此时就可以用到 multiprocessing 模块提供的 Pool 方法。
进程池允许将任务分配给池中的工作进程执行,这样可以有效管理进程的创建和销毁,避免系统资源的浪费。初始化 Pool 时,可以指定一个最大进程数,当有新的请求提交到 Pool 中时:
-
如果池中的进程数没有达到指定的最大值,那么就会创建一个新的进程用来执行该请求;
-
如果池中的进程数已经达到指定的最大值,那么该请求就会等待,直到池中有进程结束,该进程会被用来执行新的任务。
1. 常用函数
multiprocessing.Pool
常用函数解析:
-
apply_async(func[, args[, kwds]])
:使用非阻塞方式调用 func(并行执行,堵塞方式必须等待上一个进程退出才能执行下一个进程),args 为传递给 func 的参数列表,kwds 为传递给 func 的关键字参数列表。 -
close()
:关闭 Pool ,使其不再接受新的任务。 -
terminate()
:不管任务是否完成,立即终止。 -
join()
:主进程阻塞,等待子进程的退出, 必须在 close 或 terminate 之后使用。
Python 代码示例:
# !/usr/bin/python
# -*- coding:utf-8 -*-from multiprocessing.pool import Pool
import os, time, randomdef worker(msg):t_start = time.time()print(f'{os.getpid()} 开始执行任务 {msg}')# random.random()随机生成 0~1 之间的浮点数time.sleep(random.random() * 2)t_stop = time.time()print("任务 %d 执行完毕,%d 释放,耗时%0.2f" % (msg, os.getpid(), t_stop - t_start))if __name__ == '__main__':po = Pool(3) # 定义一个进程池,最大进程数 3(注意这句必须放在 main 下面)for i in range(0, 10):# Pool().apply_async(要调用的目标,(传递给目标的参数元祖,))# 每次循环将会用空闲出来的子进程去调用目标po.apply_async(worker, (i,)) # 这里的 worker 不能是对象方法print("------start------")po.close() # 关闭进程池,关闭后 po 不再接收新的请求po.join() # 等待 po 中所有子进程执行完成,必须放在 close 语句之后print("-------end-------")
2. 进程池中的 Queue
如果要使用 Pool 创建进程,就需要使用 multiprocessing.Manager().Queue()
,而不是 multiprocessing.Queue()
,否则会得到一条如下的错误信息:
RuntimeError: Queue objects should only be shared between processes through inheritance.
Python 代码示例:
# !/usr/bin/python
# -*- coding:utf-8 -*-from multiprocessing import Manager, Pool
import time, osdef reader(q):print("reader 启动 (%s) , 父进程为 (%s)" % (os.getpid(), os.getppid()))for i in range(q.qsize()):print("reader 从 Queue 获取到消息:%s" % q.get())def writer(q):print("writer 启动 (%s) , 父进程为 (%s)" % (os.getpid(), os.getppid()))for i in "hello":q.put(i)time.sleep(1)if __name__ == "__main__":print("(%s) start" % os.getpid())q = Manager().Queue() # 使用 Manager 中的 Queuepo = Pool()po.apply_async(writer, (q,))time.sleep(1) # 先让上面的任务向 Queue 存入数据,然后再让下面的任务开始从中取数据po.apply_async(reader, (q,))po.close()po.join()print("(%s) End" % os.getpid())
四、应用:复制文件夹(多进程版)
# !/usr/bin/python
# -*- coding:utf-8 -*-import multiprocessing
import os
import time
import randomdef copy_file(queue, file_name, source_folder_name, dest_folder_name):"""copy 文件到指定的路径"""f_read = open(source_folder_name + "/" + file_name, "rb")f_write = open(dest_folder_name + "/" + file_name, "wb")while True:time.sleep(random.random())content = f_read.read(1024)if content:f_write.write(content)else:breakf_read.close()f_write.close()# 发送已经拷贝完毕的文件名字queue.put(file_name)def main():# 获取要复制的文件夹source_folder_name = input("请输入要复制文件夹名字:")# 整理目标文件夹dest_folder_name = source_folder_name + "[副本]"# 创建目标文件夹try:os.mkdir(dest_folder_name)except FileExistsError:print('该文件夹已存在') # 如果文件夹已经存在,那么创建会失败# 获取这个文件夹中所有的普通文件名file_names = os.listdir(source_folder_name)# 创建 Queuequeue = multiprocessing.Manager().Queue()# 创建进程池pool = multiprocessing.Pool(3)for file_name in file_names:# 向进程池中添加任务pool.apply_async(copy_file, args=(queue, file_name, source_folder_name, dest_folder_name))# 主进程显示进度pool.close()all_file_num = len(file_names)while True:file_name = queue.get()if file_name in file_names:file_names.remove(file_name)copy_rate = (all_file_num - len(file_names)) * 100 / all_file_numprint("\r%.2f...(%s)" % (copy_rate, file_name) + " "*50, end="")if copy_rate >= 100:breakprint()if __name__ == "__main__":main()
五、守护进程和进程同步
multiprocessing 模块还提供了守护进程(daemon process)的概念,守护进程会在主进程代码执行结束后自动终止。此外,模块中还包含了锁(Lock)和信号量(Semaphore)等同步原语,用于在进程间同步操作。
六、注意事项
-
使用 multiprocessing 时,应避免在多个进程间共享状态。
-
确保传递给 Process 类的所有参数都是可序列化的。
-
在主模块中使用
if __name__ == "__main__":
来保护程序的入口点。 -
尽量不要使用 Process.terminate() 来终止进程,因为这可能会导致共享资源变得不可用。