赞
踩
最近实现复杂问句解析时,需要用到将问句转化成由一系列的小的动作组成的有向图结构。该结构在执行的过程中 会动态生成一个自顶向下执行的逻辑图。 这就用到了Pipeline 模式了。
对于Pipeline 根据侧重点的不同,有两种实现方式用于加速多线程任务的pipeline
用于加速多线程任务的 Pipeline 主要强调 任务的顺序执行, 转移之间不涉及过于复杂的逻辑。所以 每个pipe 通过自身调用 next pipe。整体上强调 后向连续性。
2. 用于控制流程的pipeline
用于流程控制的piepline, 强调任务的 逻辑性, 由外部 manager 来控制 pipeline 的执行方向。整体上强调前向的依赖性, 使用拓扑排序确定执行顺序。
下面首先来看 用于加速多线程任务的pipeline的实现方法:
# !/usr/bin/python
# -*- coding: utf-8 -*-
# ------------------------------------------------------------------------
# @Time : 2020/9/28
# @Author : xiaoshan.zhang
# @Emial : zxssdu@yeah.net
# @File : pipeline.py
# @Software : PyCharm
# @desc : 并发型pipeline , 强调pipe 的并发执行效率
# java 实现的参考网址:
# https://blog.csdn.net/tjy451592402/article/details/79459013
# ------------------------------------------------------------------------
"""Pipe: 处理阶段的抽象, 负责对输入输出进行处理, 并将输出作为下一个阶段的输入。pipe 可以理解为 (输入、处理、输出) 三元组init: 初始化当前处理阶段对外提供的服务。shutdown: 关闭当前处理阶段,对外提供的服务。setNextPipe: 设置当前处理阶段的下一个处理阶段。ThreadPoolPipeDecorator: 基于线程池的Pipe 实现类, 主要作用是实现线程池去执行对各个输入元素的处理。AbstractPipe: Pipe 的抽象实现类。process: 接收前一阶段的处理结果,作为输入, 并调用子类的doProcess 方法对元素进行处理,相应的处理结果会提交给下一个阶段进行处理do_process: 留给子类实现的抽象方法PipeContext: 对各个处理阶段的计算环境的抽象, 主要用于异常处理Pipeline: 对 复合pipe的抽象, 一个Pipeline 实例可以包含多个pipe 实例。addPipe: 向该Pipeline 实例中添加一个Pipe实例SimplePipeline: 基于AbstractPipe 的 Pipeline 接口实现的一个简单类PipelineBuilder : pipeline 构造器, 用于从配置文件中加载构建 pieplinePipelineMananger: 管理多个Pipeline 的构建、销毁、执行"""
import time
import random
import Queue
import threading
from abc import ABCMeta, abstractmethod
from threading import Condition, Thread
from concurrent import futures
from concurrent.futures import ThreadPoolExecutor
class CountDownLatch:
"""任务同步,用于同步异步任务,当注册了该同步锁的异步任务都执行完成后才释放锁。"""
def __init__(self, count):
self.count = count
self.condition = Condition()
def await(self):
try:
self.condition.acquire()
while self.count > 0:
self.condition.wait()
finally:
self.condition.release()
def countDown(self):
try:
self.condition.acquire()
self.count -= 1
self.condition.notifyAll()
finally:
self.condition.release()
class AbstractPipe(object):
def __init__(self, pipe_name=None, pipe_context=None):
self.pipe_name = pipe_name
self.next_pipe = None
self.pipe_context = pipe_context
def set_next(self, next_pipe):
self.next_pipe = next_pipe
def init(self, pipe_context):
self.pipe_context = pipe_context
def shut_down(self, timeout, time_unit):
"""关闭 pipe 执行的任务:param timeout::param time_unit::return:"""
def process(self, input):
# try:
out = self.do_process(input)
if 'results' in self.pipe_context:
self.pipe_context['results'][self.pipe_name] = out
# 如果正确输出,并且当前正确定义了下一个pipe,调用下一个pipeline
if out and self.next_pipe:
print("当前 结果不为空, 下一个 pipe 不为 None: {}, 主动调用 下一个 pipe: {}".format(self.next_pipe, out))
self.next_pipe.process(out)
def do_process(self, input):
raise NotImplementedError("Please implement do_process in inherit pipe class!")
class Function():
__metaclass__ = ABCMeta
def __init__(self, params={}, result={}, nlu_template=None, nlg_template=None ):
self.params = {}
self.result = {}
self.nlu_template = nlu_template
self.nlg_tempalte = nlg_template
def process(self, input):
raise NotImplementedError("Please implement Function`s process logical")
def gen_nlu_pattern(self):
return self.nlu_template
def gen_nlg_pattern(self):
return self.nlg_tempalte
def __call__(self, input):
self.process(input)
class FunctionPipe(AbstractPipe):
"""Pipe 函数修饰类。调用内部封装的 函数类执行具体的逻辑"""
__metaclass__ = ABCMeta
def __init__(self, pipe_name, function):
super(FunctionPipe, self).__init__(pipe_name=pipe_name)
self.function = function
@abstractmethod
def do_process(self, inputs):
""":param inputs::return:"""
# 根据函数定义的参数列表,从context 中取出参数对应的值,
kwargs = dict([(param_name, self.pipe_context[param_name]) \
for param_name in self.function.params])
# 传入 exec函数中
result = self.function.execute(**kwargs)
# 根据函数定义的返回参数列表,将处理结果放在 context 中
for res_name in self.function.res_names:
self.pipe_context[res_name] = result[res_name]
# 返回 std_nlu 和 nlg语句
std_nlu = None
nlg = None
return std_nlu , nlg
class Constraint(Function):
"""约束基类, 也是函数的一种直接解析往往比较困难,而且会不可避免地造成程序和语言的分歧。数据流的存在给了我们另一种选择:根据字面意思把引用解释成某种约束(Constraint),再调用「解析」函数把符合约束的程序从数据流中找出来。"""
__metaclass__ = ABCMeta
def __init__(self,type_):
self.type_ = type_
def do_process(self, input):
self.fit(input)
@abstractmethod
def fit(self,input):
raise NotImplementedError("Please implement in inherit class!")
class ThreadPipeDecorator(AbstractPipe):
"""Pipe 的线程修饰类, 它不会维持一直存在的worker,而是任务到来时启动一个thread,这样, 内存压力会比较少,是标准的做法, 但是有线程切换开销。"""
def __init__(self, delegate_pipe, pool_executor):
""":param delegate_pipe::param pool_executor:"""
self.delegate_pipe = delegate_pipe
self.thread_pool = pool_executor
def init(self, pipe_context):
"""为业务对象 pipe 设置上下文:param pipe_context::return:"""
self.delegate_pipe.init(pipe_context)
def process(self, input):
"""注意 线程装饰器 的 process 函数不需要 调用 下一个 pipe, 由业务对象 pipe自己去调用:param input::return:"""
print("当前 pipe thread recive input: {}".format(input))
task = lambda input: self.delegate_pipe.process(input)
self.thread_pool.submit(task, input)
# 使用单线程 提交任务
# thread = threading.Thread(target=task, args=[input,])
# thread.setDaemon(True)
# thread.start()
def set_next(self, next_pipe):
"""为业务对象设置上下文:param next_pipe::return:"""
self.deleg
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。