赞
踩
开发智能截图系统的时候将系统设计为取帧模块、分析模块、评估模块、截图模块和显示模块的线性串联。每个模块都是独立的进程或进程池,像流水线一样处理画面并转交下一模块,模块之间以队列连接。将这种多个生产者/消费者串联起来协同工作的思想进一步扩展便成了一个项目框架。
import multiprocessing as mp
from threading import Thread
from math import tanh
from time import time, sleep, perf_counter as pc
from queue import Empty, Full
try:
# linux下采用fork方式启动虽然更快但会导致子进程的CUDA初始化失败,不用CUDA也可以用fork启动
mp.set_start_method('spawn')
except RuntimeError:
pass
在生产者消费者之间传递的对象,通过继承该类来扩展更多属性和方法。
class Product:
def __init__(self, number):
self.number = number # 产品序号
self.time_stamp = time() # 生成时间
@property
def duration(self): # 生存时间
return time() - self.time_stamp
所有生产者/消费者的基类,通过继承该类并实现prepare(self)(加工机启动时的准备工作,可以返回一个工具箱tools用于后续工作)、process(self, product, tools)(加工机对产品的具体处理,对于纯生产者来说则是生成一个产品,tools来自prepare()的返回值)和finish(self, tools)(加工机结束时的收尾工作,同时负责处理tools,比如tools是一个需要保存的列表则可以在此保存)方法来具体化加工机的功能。
class Processor: # 参数:进程数、是否是生产者、是否是消费者、读取队列最长等待时长 def __init__(self, processes=1, is_producer=False, is_consumer=False, get_timeout=1): self.__is_producer = is_producer self.__is_consumer = is_consumer self.__timeout = get_timeout self.__console = None # 控制台,负责流水线和加工机的交互 self.__previous = None # 前置加工机 self.__next = None # 后继加工机 self.__outque = None # 输出队列 self.__inque = None # 输入队列 # 根据生产者消费者类型选择调度程序 if is_producer and is_consumer: self.__scheduler = self._scheduler_of_both elif is_producer: self.__scheduler = self._scheduler_of_producer elif is_consumer: self.__scheduler = self._scheduler_of_consumer else: self.__scheduler = self._scheduler_of_neither # 进程列表 self.__p_list = [mp.Process(target=self.__scheduler) for _ in range(processes)] @property def p_num(self): return len(self.__p_list) @property def is_producer(self): return self.__is_producer @property def is_consumer(self): return self.__is_consumer @property def timeout(self): return self.__timeout @property def console(self): return self.__console @console.setter def console(self, csl): if self.__console: raise RuntimeError('This console has been set already!') else: self.__console = csl @property def previous(self): return self.__previous @previous.setter def previous(self, processor): if self.__previous: raise RuntimeError('This previous has been set already!') else: self.__previous = processor @property def next(self): return self.__next @next.setter def next(self, processor): if self.__next: raise RuntimeError('This next has been set already!') else: self.__next = processor @property def input_queue(self): return self.__inque @input_queue.setter def input_queue(self, queue): if self.is_consumer: if self.__inque: raise RuntimeError('This queue has been set already!') else: self.__inque = queue else: raise TypeError('Non consumer have no input!') @property def output_queue(self): return self.__outque @output_queue.setter def output_queue(self, queue): if self.is_producer: if self.__outque: raise RuntimeError('This queue has been set already!') else: self.__outque = queue else: raise TypeError('Non producer have no output!') @property def input_not_empty(self): if self.input_queue: return not self.input_queue.empty() else: return False @property def output_qsize(self): if self.output_queue: return self.output_queue.qsize() else: return 0 @property def not_finished(self): return self.console['run_permit'] or self.console['is_running'] @property def previous_not_finished(self): if self.previous: return self.previous.not_finished else: return False @property def next_p_num(self): if self.next: return self.next.p_num else: return 1 @property def next_spend_time(self): if self.next: return self.next.console['spend_time'] else: return 0 @property def keep_running(self): return self.console['run_permit'] or self.input_not_empty or self.previous_not_finished def run(self): # 启动加工机 if not self.console: self.__console = mp.Manager().dict({'run_permit': True, 'is_running': False, 'done': False, 'sleep_time': 0.0, 'spend_time': 0.0}) self.console['run_permit'] = True for p in self.__p_list: p.start() self.console['is_running'] = True print(str(type(self))[8: -2] + ' is running!') def shutdown(self): # 关闭加工机 self.console['run_permit'] = False for p in self.__p_list: p.join() self.console['is_running'] = False def shutdown_line(self): # 关闭整条流水线,需要流水线支持内部关闭 self.console['done'] = True def prepare(self): # 实现开启的准备工作,可以返回需要的对象 return None def process(self, product, tools): # 实现加工机对产品的具体处理,tools来自prepare()的返回值 return product def finish(self, tools): # 实现关闭的收尾工作,tools来自prepare()的返回值(经过process) pass def _scheduler_of_producer(self): tools = self.prepare() while self.keep_running: t = pc() product = self.process(None, tools) if product: try: self.output_queue.put(product, block=False) except Full: pass sleep(self.console['sleep_time']) self.console['spend_time'] = pc() - t self.finish(tools) def _scheduler_of_consumer(self): tools = self.prepare() while self.keep_running: t = pc() try: product = self.input_queue.get(timeout=self.timeout) except Empty: product = None self.process(product, tools) sleep(self.console['sleep_time']) self.console['spend_time'] = pc() - t self.finish(tools) def _scheduler_of_both(self): tools = self.prepare() while self.keep_running: t = pc() try: product = self.input_queue.get(timeout=self.timeout) except Empty: product = None product = self.process(product, tools) if product: try: self.output_queue.put(product, block=False) except Full: pass sleep(self.console['sleep_time']) self.console['spend_time'] = pc() - t self.finish(tools) def _scheduler_of_neither(self): tools = self.prepare() while self.keep_running: t = pc() self.process(None, tools) sleep(self.console['sleep_time']) self.console['spend_time'] = pc() - t self.finish(tools)
继承自加工机,用于桥接两条流水线或者流水线外部的加工机。通过桥接器能实现流水线的合流或分流,即整合两条流水线生产的产品到一条线或者复制一条线的产品产生分支流水线。
class Bridge(Processor): # 参数:是否有内部输入,是否有内部输出,是否有外部输入,是否有外部输出 def __init__(self, internal_input=False, internal_output=False, external_input=False, external_output=False): if not ((internal_input or external_input) and (internal_output or external_output)): raise TypeError('This Bridge have no input or output!') Processor.__init__(self, processes=1, is_producer=internal_output, is_consumer=internal_input, get_timeout=0) self.__external_input = external_input self.__external_output = external_output self.__external_previous = None self.__external_next = None self.__extoutque = None self.__extinque = None @property def external_input(self): return self.__external_input @property def external_output(self): return self.__external_output @property def external_previous(self): return self.__external_previous @external_previous.setter def external_previous(self, processor): if self.__external_previous: raise RuntimeError('This previous has been set already!') else: self.__external_previous = processor @property def external_next(self): return self.__external_next @external_next.setter def external_next(self, processor): if self.__external_next: raise RuntimeError('This next has been set already!') else: self.__external_next = processor @property def external_input_queue(self): return self.__extinque @external_input_queue.setter def external_input_queue(self, queue): if self.external_input: if self.__extinque: raise RuntimeError('This queue has been set already!') else: self.__extinque = queue else: raise TypeError('This Bridge have no external input!') @property def external_output_queue(self): return self.__extoutque @external_output_queue.setter def external_output_queue(self, queue): if self.external_output: if self.__extoutque: raise RuntimeError('This queue has been set already!') else: self.__extoutque = queue else: raise TypeError('This Bridge have no external output!') @property def input_not_empty(self): if self.input_queue and self.external_input_queue: return not (self.input_queue.empty() and self.external_input_queue.empty()) elif self.input_queue: return not self.input_queue.empty() elif self.external_input_queue: return not self.external_input_queue.empty() else: return False @property def output_qsize(self): if self.output_queue and self.external_output_queue: return max(self.output_queue.qsize(), self.external_output_queue.qsize()) elif self.output_queue: return self.output_queue.qsize() elif self.external_output_queue: return self.external_output_queue.qsize() else: return 0 @property def previous_not_finished(self): if self.previous and self.external_previous: return self.previous.not_finished or self.external_previous.not_finished elif self.previous: return self.previous.not_finished elif self.external_previous: return self.external_previous.not_finished else: return False @property def next_p_num(self): if self.next and self.external_next: return min(self.next.p_num, self.external_next.p_num) elif self.next: return self.next.p_num elif self.external_next: return self.external_next.p_num else: return 1 @property def next_spend_time(self): if self.next and self.external_next: return max(self.next.console['spend_time'], self.external_next.console['spend_time']) elif self.next: return self.next.console['spend_time'] elif self.external_next: return self.external_next.console['spend_time'] else: return 0 def process(self, product, tools): if not self.previous_not_finished: self.console['done'] = True external_product = None if self.external_input: try: external_product = self.external_input_queue.get(block=False) except Empty: pass if external_product and self.is_producer: try: self.output_queue.put(external_product, block=False) except Full: pass if self.external_output: if product: try: self.external_output_queue.put(product, block=False) except Full: pass if external_product: try: self.external_output_queue.put(external_product, block=False) except Full: pass return product
用于串联和管理加工机的类。通过连接桥接器还可以进行外部输入输出。
class AssemblyLine: # 参数:监控时间间隔,更新耗时的权重,是否允许内部加工机关闭流水线,队列最大长度 def __init__(self, monitor_interval=0.01, new_spend_time_weight=0.25, allow_inside_shutdown=True, max_qsize=100): self.__manager = mp.Manager() self.__processors = [] self.__queues = [] self.__external_queues = [] self.__consoles = [] self.__p_monitor = None self.__p_checker = None self.__interval = monitor_interval self.__weight = new_spend_time_weight self.__allow_inside_shutdown = allow_inside_shutdown self.__max_qsize = max_qsize @property def is_running(self): for p in self.__processors: if p.not_finished: return True return False @property def last_processor(self): return self.__processors[-1] def connect(self, processor: Processor): # 添加并连接一个加工机到流水线末尾 if self.__processors: self.__processors[-1].next = processor processor.previous = self.__processors[-1] queue = self.__manager.Queue(self.__max_qsize) self.__queues.append(queue) self.__processors[-1].output_queue = queue processor.input_queue = queue console = self.__manager.dict({'run_permit': True, 'is_running': False, 'done': False, 'sleep_time': 0.0, 'spend_time': 0.0}) self.__consoles.append(console) processor.console = console self.__processors.append(processor) def external_connect(self, bridge: Bridge): # 流水线末尾是桥接器时为其连接一个外部桥接器用于外部输出,该外部桥接器需要允许外部输入 if self.__processors and isinstance(self.__processors[-1], Bridge): self.__processors[-1].external_next = bridge bridge.external_previous = self.__processors[-1] queue = self.__manager.Queue(self.__max_qsize) self.__external_queues.append(queue) self.__processors[-1].external_output_queue = queue bridge.external_input_queue = queue else: raise TypeError('Only Bridge can use external connection!') def run(self, async=False): # 启动流水线,async为True时不会占用主线程 if self.__processors[-1].is_producer: raise TypeError('AssemblyLine cannot run when the end is producer!') self.__processors.reverse() for p in self.__processors: p.run() self.__processors.reverse() if async: self.__p_monitor = Thread(target=self.monitor) self.__p_monitor.start() else: self.monitor() def shutdown(self): # 关闭流水线 for p in self.__processors: p.shutdown() if self.__p_monitor: self.__p_monitor.join() if self.__p_checker: self.__p_checker.join() def checker(self): # 子线程检查内部关闭信号 while self.is_running: for c in self.__consoles: if c['done']: self.shutdown() return sleep(self.__interval) def monitor(self): # 监视器线程,协调各加工机的工作速度,防止产品堆积在队列 if self.__allow_inside_shutdown: self.__p_checker = Thread(target=self.checker).start() next_spend_time = [p.next_spend_time for p in self.__processors] while self.is_running: for i, p in enumerate(self.__processors): p.console['sleep_time'] = 2 * next_spend_time[i] * p.p_num / p.next_p_num * tanh(0.05 * p.output_qsize) next_spend_time[i] = self.__weight * p.next_spend_time + (1 - self.__weight) * next_spend_time[i] # print([p.output_qsize for p in self.__processors]) # print([p.console['sleep_time'] for p in self.__processors]) # print([p.next_spend_time for p in self.__processors]) sleep(self.__interval)
将以上代码写入factory.py备用。下面的示例代码定义了奇数生成器、偶数生成器、平方器和显示器,通过如图方式串联在一起并工作。
from factory import * from time import sleep class Number(Product): def __init__(self, value, num): Product.__init__(self, num) self.value = value def introduce(self): print('value:{0}, duration:{1}'.format(self.value, self.duration)) class OddProducer(Processor): def __init__(self): Processor.__init__(self, processes=1, is_producer=True) self.num = None def prepare(self): self.num = -1 return [1, 3, 5, 7, 9] def process(self, number, num_list): sleep(0.5) if self.num >= 20: # 生成20个之后关闭所在流水线,注意关闭动作有延迟,实际会多生成几个数字 self.shutdown_line() self.num += 1 return Number(num_list[self.num % len(num_list)], self.num) class EvenProducer(Processor): def __init__(self): Processor.__init__(self, processes=1, is_producer=True) self.num = None def prepare(self): self.num = -1 return [0, 2, 4, 6, 8, 10] def process(self, number, num_list): sleep(0.2) if self.num >= 15: # 生成15个之后关闭所在流水线,注意关闭动作有延迟,实际会多生成几个数字 self.shutdown_line() self.num += 1 return Number(num_list[self.num % len(num_list)], self.num) class SquareMachine(Processor): def __init__(self): Processor.__init__(self, processes=1, is_producer=True, is_consumer=True) def process(self, number, tools): if number: number.value = number.value ** 2 return number class Displayer(Processor): def __init__(self): Processor.__init__(self, processes=1, is_consumer=True) self.num = 0 def process(self, number, tools): self.num += 1 if number: number.introduce() if __name__ == '__main__': al1 = AssemblyLine() al2 = AssemblyLine() al1.connect(OddProducer()) al1.connect(Bridge(internal_input=True, external_output=True)) al2.connect(EvenProducer()) al2.connect(Bridge(internal_input=True, internal_output=True, external_input=True)) al1.external_connect(al2.last_processor) al2.connect(SquareMachine()) al2.connect(Displayer()) al1.run(async=True) al2.run()
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。