赞
踩
线程池的基类是 concurrent.futures 模块中的 Executor,Executor 提供了两个子类,即 ThreadPoolExecutor 和 ProcessPoolExecutor,其中 ThreadPoolExecutor 用于创建线程池,而 ProcessPoolExecutor 用于创建进程池。
如果使用线程池/进程池来管理并发编程,那么只要将相应的 task 函数提交给线程池/进程池,剩下的事情就由线程池/进程池来搞定。
Exectuor 提供了如下常用方法:
程序将 task 函数提交(submit)给线程池后,submit 方法会返回一个 Future 对象,Future 类主要用于获取线程任务函数的返回值。由于线程任务会在新线程中以异步方式执行,因此,线程执行的函数相当于一个“将来完成”的任务,所以 Python 使用 Future 来代表。
Future 提供了如下方法:
在用完一个线程池后,应该调用该线程池的 shutdown() 方法,该方法将启动线程池的关闭序列。调用 shutdown() 方法后的线程池不再接收新任务,但会将以前所有的已提交任务执行完成。当线程池中的所有任务都执行完成后,该线程池中的所有线程都会死亡。
使用线程池来执行线程任务的步骤如下:
- def test(value1, value2=None):
- print("%s threading is printed %s, %s"%(threading.current_thread().name, value1, value2))
- time.sleep(2)
- return 'finished'
-
- def test_result(future):
- print(future.result())
-
- if __name__ == "__main__":
- import numpy as np
- from concurrent.futures import ThreadPoolExecutor
- threadPool = ThreadPoolExecutor(max_workers=4, thread_name_prefix="test_")
- for i in range(0,10):
- future = threadPool.submit(test, i,i+1)
-
- threadPool.shutdown(wait=True)

结果:
test__0 threading is printed 0, 1
test__1 threading is printed 1, 2
test__2 threading is printed 2, 3
test__3 threading is printed 3, 4
test__1 threading is printed 4, 5
test__0 threading is printed 5, 6
test__3 threading is printed 6, 7
前面程序调用了 Future 的 result() 方法来获取线程任务的运回值,但该方法会阻塞当前主线程,只有等到钱程任务完成后,result() 方法的阻塞才会被解除。
如果程序不希望直接调用 result() 方法阻塞线程,则可通过 Future 的 add_done_callback() 方法来添加回调函数,该回调函数形如 fn(future)。当线程任务完成后,程序会自动触发该回调函数,并将对应的 Future 对象作为参数传给该回调函数。
直接调用result函数结果
- def test(value1, value2=None):
- print("%s threading is printed %s, %s"%(threading.current_thread().name, value1, value2))
- time.sleep(2)
- return 'finished'
-
- def test_result(future):
- print(future.result())
-
- if __name__ == "__main__":
- import numpy as np
- from concurrent.futures import ThreadPoolExecutor
- threadPool = ThreadPoolExecutor(max_workers=4, thread_name_prefix="test_")
- for i in range(0,10):
- future = threadPool.submit(test, i,i+1)
- # future.add_done_callback(test_result)
- print(future.result())
-
- threadPool.shutdown(wait=True)
- print('main finished')

结果:
test__0 threading is printed 0, 1
finished
test__0 threading is printed 1, 2
finished
test__1 threading is printed 2, 3
finished
去掉上面注释部分,调用future.add_done_callback函数,注释掉第16行( print(future.result()) )
test__0 threading is printed 0, 1
test__1 threading is printed 1, 2
test__2 threading is printed 2, 3
test__3 threading is printed 3, 4
finished
finished
finished
test__1 threading is printed 4, 5
test__0 threading is printed 5, 6
finished
另外,由于线程池实现了上下文管理协议(Context Manage Protocol),因此,程序可以使用 with 语句来管理线程池,这样即可避免手动关闭线程池,如上面的程序所示。
此外,Exectuor 还提供了一个 map(func, *iterables, timeout=None, chunksize=1)
方法,该方法的功能类似于全局函数 map(),区别在于线程池的 map() 方法会为 iterables 的每个元素启动一个线程,以并发方式来执行 func 函数。这种方式相当于启动 len(iterables) 个线程,井收集每个线程的执行结果。
例如,如下程序使用 Executor 的 map() 方法来启动线程,并收集线程任务的返回值:
示例换成多参数的:
- def test(value1, value2=None):
- print("%s threading is printed %s, %s"%(threading.current_thread().name, value1, value2))
- # time.sleep(2)
-
-
- if __name__ == "__main__":
- import numpy as np
- from concurrent.futures import ThreadPoolExecutor
- threadPool = ThreadPoolExecutor(max_workers=4, thread_name_prefix="test_")
- for i in range(0,10):
- # test(str(i), str(i+1))
- threadPool.map(test, [i],[i+1]) # 这是运行一次test的参数,众所周知map可以让test执行多次,即一个[]代表一个参数,一个参数赋予不同的值即增加[]的长度如从[1]到[1,2,3]
- threadPool.shutdown(wait=True)
上面程序使用 map() 方法来启动 4个线程(该程序的线程池包含 4 个线程,如果继续使用只包含两个线程的线程池,此时将有一个任务处于等待状态,必须等其中一个任务完成,线程空闲出来才会获得执行的机会),map() 方法的返回值将会收集每个线程任务的返回结果。
通过上面程序可以看出,使用 map() 方法来启动线程,并收集线程的执行结果,不仅具有代码简单的优点,而且虽然程序会以并发方式来执行 test() 函数,但最后收集的 test() 函数的执行结果,依然与传入参数的结果保持一致。
Reference:
[1] http://c.biancheng.net/view/2627.html
[2] https://www.cnblogs.com/gongxijun/p/6862333.html
另外附上一些其他自己的研究:
如何在回调函数中添加参数:用functools.partial封装函数
- def test(value1, value2=None):
- print("%s threading is printed %s, %s"%(threading.current_thread().name, value1, value2))
- time.sleep(2)
- return 'finished'
-
- def test_result(future, n):
- print(future.result(), n)
-
- if __name__ == "__main__":
- import numpy as np
- from concurrent.futures import ThreadPoolExecutor
- threadPool = ThreadPoolExecutor(max_workers=4, thread_name_prefix="test_")
- for i in range(0,10):
- future = threadPool.submit(test, i,i+1)
- future.add_done_callback(functools.partial(test_result, n=i))
-
- threadPool.shutdown(wait=True)
- print('main finished')

Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。