当前位置:   article > 正文

python并发编程之Queue线程、进程、协程通信(五)

python asyncio 协程与进程之间的通讯

单线程、多线程之间、进程之间、协程之间很多时候需要协同完成工作,这个时候它们需要进行通讯。或者说为了解耦,普遍采用Queue,生产消费模式。

系列文章

同步deque和多线程Queue

程序有时需要在列表的端点进行操作,比list更加优化的数据结构有Queue和deque。

deque

deque一般用在定长队列,多余的数据会被丢弃,这个队列是线程非安全的。

  1. from queue import Queue, deque
  2. # 大于会截取后面的一段
  3. q = deque(iterable=[1,2,3,4], maxlen=5)
  4. # 参数iterable可以是任何可迭代对象,maxlen代表定长
  5. # 添加与取出
  6. q.append(3) # 从尾部添加
  7. q.pop() # 从尾部弹出一个
  8. q.appendleft(4) # 从首部添加
  9. q.popleft() # 从首部弹出
  10. q.clear() # 清空队列
  11. q.extend([1, 3, 3]) # 将原来的队列从右侧扩展
  12. q.extendleft() # 将原来的队列从左侧扩展
  13. q.insert(2, 3) # 在索引为2的位置插入3,如果队列已达到最大,抛出异常
  14. # 复制
  15. q1 = q.copy() # 完全符合一份队列
  16. # 统计
  17. n = q.count(3) # 统计某个值的数目
  18. x = q.index(3) # 查找某个值的位置
  19. # 变换
  20. q.reverse() # 将原来的q翻转
  21. q.remove(3) # 删除队列中的所有的3
  22. q.rotate(2) # 向右旋转两步

Queue

Queue提供更加完整成熟的队列操作,相对于deque来说偏重型,他是线程安全的队列。

  • 方法和属性分析
  1. from queue import Queue, deque
  2. q = Queue(maxsize=5) #maxsize<=0,队列长度没有限制,这个Queue是线程安全的,通过锁机制保证
  3. print(q.queue) # 一个deque队列
  4. print(q.mutex) # 队列的线程锁
  5. print(q.not_empty) # 非空通知,用在多线程
  6. print(q.not_full) # 非满通知,用在多线程
  7. print(q.all_tasks_done) # 完成的任务
  8. print(q.maxsize)
  9. print(q.unfinished_tasks) # 队列未完成的任务数量,即队列目前的数目
  10. # 数据存取
  11. q.put(3, block=True, timeout=3) # 向队列左边添加数据,block为True队列满了阻塞等待,block为false则直接抛出异常
  12. q.get(block=True, timeout=3) # 队列取出数据,超时抛出异常,block为false忽略timeout
  13. # q.get_nowait() # 立即获取,没有抛出异常
  14. q.put_nowait(4) # 立即插入,已满抛出异常
  15. # 判断
  16. q.full() # 判断当前队列是否已满,满了返回True
  17. q.empty() # 判断当前队列是否为空,空返回True
  18. # 统计
  19. q.task_done() # 用来通知队列任务完成
  20. q.qsize() # 当前队列的任务数量,不绝对可靠
  21. q.join() # 阻塞直到所有的任务完成,即q.unfinished_tasks降为0
  • 实例
  1. from threading import Thread
  2. from queue import Queue, deque
  3. import time
  4. def get_from_queue(queue:Queue):
  5. while True:
  6. if not queue.empty():
  7. print(queue.get_nowait())
  8. queue.task_done() # 任务完成
  9. def put_to_queue(queue:Queue):
  10. for i in range(100):
  11. if not queue.full():
  12. queue.put_nowait(i)
  13. else:
  14. time.sleep(0.1)
  15. q = Queue(5)
  16. th1 = Thread(target=get_from_queue, args=(q,))
  17. th2 = Thread(target=put_to_queue, args=(q,))
  18. th1.start()
  19. th2.start()

进程间通讯

multiprocessing的Queue对象可以作为进程间通讯的第三者。

  1. from multiprocessing import Queue, Process, Pool
  2. import time
  3. def get_from_queue(queue:Queue):
  4. while True:
  5. if not queue.empty():
  6. print(queue.get_nowait())
  7. def put_to_queue(queue:Queue):
  8. for i in range(100):
  9. if not queue.full():
  10. queue.put_nowait(i)
  11. else:
  12. time.sleep(0.1)
  13. if __name__ == '__main__':
  14. q = Queue(9) # 这个Queue可以在多个进程之间共享
  15. p1 = Process(target=get_from_queue, args=(q,))
  16. p2 = Process(target=put_to_queue, args=(q,))
  17. p1.start()
  18. p2.start()

multiprocessing.Queue对象

Queue对象的大部分方法和Queue.Queue的方法相同,用法也一样,但有几个特殊的方法:

  1. q = Queue(9) # 这个Queue可以在多个进程之间共享
  2. # q.close() # 关闭队列,不再接收数据
  3. # q.cancel_join_thread() # 取消阻塞等待
  4. q.join_thread() # 线程阻塞等待

gevent协程的Queue

gevent.queue.Queue基于协程,Queue在多个协程间共享,Queue实现了迭代器协议,可以使用for循环遍历。

  1. from gevent.queue import Queue
  2. import gevent
  3. import time
  4. def get_from_queue(queue:Queue, n):
  5. i = 0
  6. print('start---get--{}'.format(n))
  7. while True:
  8. print(str(queue.get()) + 'get' + str(n))
  9. i += 1
  10. if i == 100:
  11. break
  12. def put_to_queue(queue:Queue, n):
  13. i = 0
  14. print('start---put--{}'.format(n))
  15. while True:
  16. queue.put(i)
  17. print(str(i) + 'put' + str(n))
  18. i += 1
  19. if i == 100:
  20. break
  21. if __name__ == '__main__':
  22. q = Queue(9) # 这个Queue可以在多个进程之间共享
  23. job1 = [gevent.spawn(put_to_queue, q,i) for i in range(2)]
  24. job2 = [gevent.spawn(get_from_queue, q,i) for i in range(2)]
  25. job1.extend(job2)
  26. gevent.joinall(job1)

协程启动后会按照添加到循环的顺序开始执行,上例在队列未满之前一直执行put操作,直到队列满后阻塞就切换到put2协程,也会立即阻塞,然后切换到get1协程,获取所有的值直到队列为空后阻塞切换。

gevent.queue.Queue对象

其方法基本和Queue.Queue的方法相同,特殊方法如下:

  1. q = Queue(9, items=[1,2,3, StopIteration]) # 实现迭代协议,最后一个必须是StopIteration
  2. # q.copy() #复制一个队列
  3. x = q.next() # 唤醒获取值
  4. q.peek(block=True, timeout=None) # 获取一个值但是不删除它
  5. q.peek_nowait() # 立即获取,忽略timeout
  6. q.put() # 会唤醒多个协程完成添加操作
  7. q.get() # 会挂起多个协程

gevent.queue.JoinableQueue对象扩展了Queue的功能,添加了task_done和join方法。

  1. q = JoinableQueue(9, items=[1,2,3, StopIteration]) # 这个Queue可以在多个进程之间共享
  2. q.task_done() # 通知队列一个任务完成
  3. q.unfinished_tasks # 未完成的任务计数
  4. q.join() # 阻塞等待任务完成,如果unfinished_tasks降为0,则解除
  • 实例
  1. from gevent.queue import Queue, JoinableQueue
  2. import gevent
  3. import time
  4. def get_from_queue(queue:JoinableQueue):
  5. while True:
  6. try:
  7. x = queue.get() # 阻塞时就会切换协程
  8. print(x)
  9. finally:
  10. queue.task_done()
  11. if __name__ == '__main__':
  12. q = JoinableQueue(8)
  13. job1 = [gevent.spawn(get_from_queue, q) for i in range(2)]
  14. for i in range(100):
  15. q.put(i) # 当Put被阻塞时将切换协程,
  16. q.join() # 如果不等待的话,最后一次put后将直接退出

转载于:https://www.cnblogs.com/cwp-bg/p/9605328.html

本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号