赞
踩
这篇文章总结multiprocessing.Pool的使用
To be continued
Last Modified Date: 2022/1/2
创建进程池可以形象的理解为创建了一个能够并行的流水线,只消耗一次创建流水线的成本(进程的创建会消耗大量的计算机资源,进程的创建远远大于创建线程所占用的资源),处理接收到的的任务。相对的,如果不使用进程池,每个要求并行的任务都会新建一次进程,浪费时间。
编程中本来没有进程池的概念的,除了 python,其他的语言都是使用线程池(而进程是执行分隔开的任务)。python 因为 GIL 的原因(仅限 Cython),线程无法并行,所以把线程池的概念迁移到了进程,命名为进程池。
当需要创建的子进程数量不多时,可以直接利用multiprocessing
中的 Process
动态成生多个进程。
但如果是上百甚至上千个目标,手动的去创建进程的工作量巨大,此时就可以用到multiprocessing
模块提供的Pool
方法。
Pool
类表示一个工作进程池,可以提供指定数量的进程供用户调用,初始化Pool
时可以指定一个最大进程数。当有新的请求提交到Pool中时:
它具有允许以几种不同方式将任务分配到工作进程的方法。进程池的方法只能由创建它的进程使用。
方法 | 含义 |
---|---|
apply() | 该函数用于传递不定参数,主进程会被阻塞直到函数执行结束,函数原型 apply(func, args=(), kwds={}) |
map() | Pool 类中的map 方法,与内置的map 函数用法基本一致,它会使进程阻塞直到结果返回,函数原型 map(func, iterable, chunksize=None) 注意:虽然第二个参数是一个迭代器,但在实际使用中,必须在整个队列都就绪后,程序才会运行子进程。 map_async() 与map 用法一致,但是它是非阻塞的,函数原型map_async(func, iterable, chunksize, callback) |
close() | 阻止后续任务提交到进程池,但已经接受的进程还是会继续执行。当所有任务执行完成后,工作进程会退出。 |
terminal() | 不必等待未完成的任务,立即停止工作进程。当进程池对象被垃圾回收时,会立即调用 terminate()。 |
join() | 主进程堵塞(不执行join下面的语句),等待工作进程结束。join 方法要在close 或terminate 之后使用。 |
pool.get(timeout) | 如果没有设置timeout,将会一直等待结果。 如果设置了timeout,超过timeout将引发multiprocessing.TimeoutError异常。 |
pool.ready() | 如果调用完成,返回True。 |
pool.successful() | 如果调用完成并且没有引发异常,返回True,如果在结果就绪之前调用,将引发AssertionError异常。 |
pool.wait(timeout) | 等待结果变为可用,timeout为等待时间。 |
pool = Pool(numprocess,initializer,initargs)
True
;如果不是alive,返回False
示例1: 使用进程池(非阻塞)
import multiprocessing import time import datetime def func(msg): print(datetime.datetime.now(), multiprocessing.current_process().name, multiprocessing.current_process().is_alive()) print("in:", msg) time.sleep(3) print( "out:", msg) if __name__ == "__main__": print(multiprocessing.current_process().name, 'Alive', multiprocessing.current_process().is_alive()) pool = multiprocessing.Pool(processes=3) PROX = ['processes A', 'processes B', 'processes C', 'processes D', 'processes E'] for PRO in PROX: msg = PRO # 维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去 pool.apply(func, args=(msg,)) print("*"*10) # 调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool。 pool.close() print('job counter', multiprocessing.pool.job_counter) # join函数等待所有子进程结束 pool.join() print(multiprocessing.current_process().name, 'Alive', multiprocessing.current_process().is_alive())
示例1: 结果
MainProcess Alive True 2021-12-30 22:38:50.145227 SpawnPoolWorker-1 True in: processes A out: processes A 2021-12-30 22:38:53.151329 SpawnPoolWorker-2 True in: processes B out: processes B 2021-12-30 22:38:56.154720 SpawnPoolWorker-3 True in: processes C out: processes C 2021-12-30 22:38:59.159238 SpawnPoolWorker-1 True in: processes D out: processes D 2021-12-30 22:39:02.162764 SpawnPoolWorker-2 True in: processes E out: processes E ********** job counter count(5) MainProcess Alive True Process finished with exit code 0
apply(func[, args[, kwds]])
是阻塞的。
创建1个进程池pool,并设定进程的数量为3,for循环会相继产生5个进程对象,5个对象被提交到pool中。
因pool指定进程数为3,所以0、1、2会直接送到进程中执行,当其中一个执行完毕后才空出一个进程处理对象3。
因为为阻塞,主函数会等进程池中的进程们执行结束后再执行自个的,所以运行完for循环后直接输出**********
,主程序在pool.join()
处等待各个进程的结束。
示例2: 使用进程池(阻塞)
import multiprocessing import time import datetime def func(msg): print(datetime.datetime.now(), multiprocessing.current_process().name, multiprocessing.current_process().is_alive()) print("in:", msg) time.sleep(3) print( "out:", msg) if __name__ == "__main__": print(multiprocessing.current_process().name, 'Alive', multiprocessing.current_process().is_alive()) pool = multiprocessing.Pool(processes=3) PROX = ['processes A', 'processes B', 'processes C', 'processes D', 'processes E'] for PRO in PROX: msg = PRO # 维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去 pool.apply_async(func, args=(msg,)) print("*"*10) # 调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool。 pool.close() print('job counter', multiprocessing.pool.job_counter) # join函数等待所有子进程结束 pool.join() print(multiprocessing.current_process().name, 'Alive', multiprocessing.current_process().is_alive())
示例2: 结果
MainProcess Alive True ********** job counter count(5) 2021-12-30 22:43:47.150006 SpawnPoolWorker-1 True in: processes A 2021-12-30 22:43:47.150494 SpawnPoolWorker-2 True in: processes B 2021-12-30 22:43:47.150526 SpawnPoolWorker-3 True in: processes C out: processes A 2021-12-30 22:43:50.152419 SpawnPoolWorker-1 True in: processes D out: processes B 2021-12-30 22:43:50.153061 SpawnPoolWorker-2 True in: processes E out: processes C out: processes D out: processes E MainProcess Alive True Process finished with exit code 0
apply_async(func[, args[, kwds[, callback]]]) 它是非阻塞
是阻塞的。
创建1个进程池pool,并设定进程的数量为3,for循环会相继产生5个进程对象,5个对象被提交到pool中。
因pool指定进程数为3,所以0、1、2会直接送到进程中执行,当指定的3个进程执行完毕后才空出其他的进程处理对象3、4。
因为为非阻塞,主函数会自己执行自个的,不搭理进程的执行,所以运行for循环之前就直接输出**********
,主程序在pool.join()
处等待各个进程的结束。
示例2.1: 使用进程池(阻塞)并关注结果
import multiprocessing import time import datetime def func(msg): print(datetime.datetime.now(), multiprocessing.current_process().name, multiprocessing.current_process().is_alive()) print("in:", msg) time.sleep(3) print( "out:", msg) return 'Done' if __name__ == "__main__": print(multiprocessing.current_process().name, 'Alive', multiprocessing.current_process().is_alive()) pool = multiprocessing.Pool(processes=3) PROX = ['processes A', 'processes B', 'processes C', 'processes D', 'processes E'] result = [] for PRO in PROX: msg = PRO result.append(pool.apply_async(func, args=(msg, ))) print("*"*10) pool.close() print('job counter', multiprocessing.pool.job_counter) pool.join() for res in result: print(":::", res.get()) print(multiprocessing.current_process().name, 'Alive', multiprocessing.current_process().is_alive())
示例2.1: 结果
MainProcess Alive True ********** job counter count(5) 2021-12-30 23:19:40.006049 SpawnPoolWorker-1 True in: processes A 2021-12-30 23:19:40.008816 SpawnPoolWorker-2 True in: processes B 2021-12-30 23:19:40.025276 SpawnPoolWorker-3 True in: processes C out: processes B 2021-12-30 23:19:43.010269 SpawnPoolWorker-2 True in: processes D out: processes A 2021-12-30 23:19:43.011538 SpawnPoolWorker-1 True in: processes E out: processes C out: processes E out: processes D ::: Done ::: Done ::: Done ::: Done ::: Done MainProcess Alive True Process finished with exit code 0
示例3: 使用多个进程池
import multiprocessing, time, datetime, os, random import time import datetime import os import random def func1(msg=1): print(datetime.datetime.now(), multiprocessing.current_process().name, multiprocessing.current_process().is_alive()) print("in:", msg) time.sleep(3) print("out:", msg) def func2(msg=2): print(datetime.datetime.now(), multiprocessing.current_process().name, multiprocessing.current_process().is_alive()) print("in:", msg) time.sleep(3) print("out:", msg) def func3(msg=3): print(datetime.datetime.now(), multiprocessing.current_process().name, multiprocessing.current_process().is_alive()) print("in:", msg) time.sleep(3) print("out:", msg) def func4(msg=4): print(datetime.datetime.now(), multiprocessing.current_process().name, multiprocessing.current_process().is_alive()) print("in:", msg) time.sleep(3) print("out:", msg) def func5(msg=5): print(datetime.datetime.now(), multiprocessing.current_process().name, multiprocessing.current_process().is_alive()) print("in:", msg) time.sleep(3) print("out:", msg) if __name__ == "__main__": print(multiprocessing.current_process().name, 'Alive', multiprocessing.current_process().is_alive()) pool = multiprocessing.Pool(processes=3) funcX = [func1, func2, func3, func4, func5] for func in funcX: pool.apply_async(func) print("*"*10) pool.close() print('job counter', multiprocessing.pool.job_counter) pool.join() print(multiprocessing.current_process().name, 'Alive', multiprocessing.current_process().is_alive())
示例3: 结果
MainProcess Alive True ********** job counter count(5) 2021-12-30 23:30:48.195175 SpawnPoolWorker-1 True in: 1 2021-12-30 23:30:48.198286 SpawnPoolWorker-2 True in: 2 2021-12-30 23:30:48.198334 SpawnPoolWorker-3 True in: 3 out: 1 2021-12-30 23:30:51.200511 SpawnPoolWorker-1 True in: 4 out: 2 out: 3 2021-12-30 23:30:51.203784 SpawnPoolWorker-2 True in: 5 out: 4 out: 5 MainProcess Alive True Process finished with exit code 0
with
语句管理上下文示例4: 使用with
语句来管理进程池,这意味着无需手动调用close()
方法来关闭进城池
import multiprocessing import time import datetime, psutil def func(msg): print(datetime.datetime.now(), multiprocessing.current_process().name, multiprocessing.current_process().is_alive()) print("in:", msg) time.sleep(3) print( "out:", msg) return 'pid count: ', len(psutil.pids()) if __name__ == "__main__": print(multiprocessing.current_process().name, 'pid count: ', len(psutil.pids())) with multiprocessing.Pool(processes=3) as pl: PROX = ['processes A', 'processes B', 'processes C', 'processes D', 'processes E'] for PRO in PROX: msg = PRO result = pl.apply(func, args=(msg,)) print(result) print("*"*10) print(multiprocessing.current_process().name, 'pid count: ', len(psutil.pids())) ######################################################### MainProcess pid count: 414 2022-01-02 15:21:03.470344 SpawnPoolWorker-1 True in: processes A out: processes A ('pid count: ', 417) 2022-01-02 15:21:06.478701 SpawnPoolWorker-2 True in: processes B out: processes B ('pid count: ', 417) 2022-01-02 15:21:09.483914 SpawnPoolWorker-3 True in: processes C out: processes C ('pid count: ', 417) 2022-01-02 15:21:12.492025 SpawnPoolWorker-1 True in: processes D out: processes D ('pid count: ', 417) 2022-01-02 15:21:15.500283 SpawnPoolWorker-2 True in: processes E out: processes E ('pid count: ', 417) ********** MainProcess pid count: 414 Process finished with exit code 0
并发与并行的区别是什么?
multiprocessing — 基于进程的并行
【Python】独特的进程池概念
Python进程专题4:进程池Pool
python进程池:multiprocessing.pool
Python使用进程池管理进程
psutil的Process python获取进程信息
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。