赞
踩
阻塞队列就是一种支持阻塞的插入和移除操作的特殊容器。
在多线程中,所谓阻塞:在某些情况下会挂起线程(阻塞),一旦条件满足,被挂起的线程又回自动被唤醒。
为什么需要BlockingQueue:好处:我们不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程,因为这一切BlockingQueue都给你一手包办了。
阻塞的插入:当队列满时,向队列中插入元素的线程会被阻塞,直到队列中有元素被移除,即队列不满时,阻塞的线程才能继续向队列中插入元素;
阻塞的移除:当队列中没有元素时,即队列为空时,从队列中移除元素的线程就会被阻塞,直到队列中有新的元素被添加,即队列中有元素时,阻塞的线程才能继续从队列中移除元素;
阻塞队列的常见操作如下:
抛出异常:只要超过其边界就会报异常
。当队列满时,如果再往队列中插入元素,会抛出IllegalStateException(“Queue full”)异常。当队列空时,从队列里获取元素会抛出NoSuchElementException异常。
返回特殊值:超过边界时返回false(成功为true,失败为false)
。当往队列插入元素时,会返回元素是否插入成功,成功返回true。如果是移除方法,则是从队列里取出一个元素,如果没有则返回null。
一直阻塞:当阻塞队列满时,如果生产者线程往队列里put元素,队列会一直阻塞生产者线程,直到队列可用或者响应中断退出。当队列空时,如果消费者线程从队列里take元素,队列会阻塞消费者线程,直到队列不为空。
超时退出:当阻塞队列满时,如果生产者线程往队列里面插入元素,队列会阻塞生产者线程一段时间,如果超过了指定时间,生产者线程就会退出。
常见的几种阻塞队列:
BlockingQueue是一个接口,主要有下面7种实现类:
ArrayBlockingQueue:基于数组的阻塞队列实现,在其内部,维护了一个定长数组,以便缓存队列中的数据对象,其内部没实现读写分离,也就意味着生产和消费不能完全并行,长度是需要自己定义的,可以指定先进先出或者先进后出,也被称为“有界队列”
LinkedBlockingQueue:基于链表的阻塞队列,跟ArrayBlockingQueue类似,其内部也维持着一个数据缓冲队列(该队列由一个链表构成),LinkedBlockingQueue之所以能够高效的处理并发数据,是因为其内部实现采用分离锁(读写分离两个锁),从而实现生产者和消费者操作完全并发执行,也是一个“无界队列”
PriorityBlockingQueue:基于优先级的阻塞队列(优先级的判断通过构造函数传入的Compator(比较器)对象决定,也就是说传入队列的对象必须实现Comparable接口),在实现PriorityBlockingQueue时,内部控制线程同步的锁采用的是公平锁,是一个“无界队列”(PriorityBlockingQueue调用take后需要重新排序,调一次重新排一次)
DelayQueue:带有延迟时间的无界阻塞Queue,其中的元素只有当指定的延迟时间到了,才能够从队列中获取该元素。
DelayQueue中的元素必须实现Delayed接口,DelayQueue是一个没有大小限制的队列,应用场景比较多,比如对缓存超时的数据进行移除,任务超时处理,空间连接的关闭等等
SynchronousQueue:不存储任何元素的队列,生产者产生的数据直接会被消费者获取并消费
,即每一个put操作必须等待一个take操作,否则不能继续添加元素,或者你可以理解为是只能存储一个元素的队列,存一个就满了,该元素必须被移除掉,才能继续添加。
LinkedTransferQueue:一个由链表结构组成的无界阻塞传输队列,主要体现在LinkedTransferQueue多2个方法:
LinkedBlockingDeque:由链表结构组成的双向阻塞队列,即可以从队列的两端插入和移出元素。
阻塞队列的底层主要使用的还是之前我们介绍过得**等待通知机制来实现的**,等待通知机制在阻塞队列中具体体现为如下思想: :当生产者往一个满队列中添加元素时,生产者会被阻塞,当消费者从该队列中消费了一个元素后,会通知阻塞的插入操作的生产者线程,告诉它当前队列不满,可以继续执行添加操作。我们通过下面源码可以更好的理解这一点:
final ReentrantLock lock; /** Condition for waiting takes */ private final Condition notEmpty; /** Condition for waiting puts */ private final Condition notFull; public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); // 使用condition模式等待通知 notEmpty = lock.newCondition(); notFull = lock.newCondition(); } public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) // 当队列满时,阻塞 notFull.await(); // 否则继续添加元素 insert(e); } finally { lock.unlock(); } } private void insert(E x) { items[putIndex] = x; putIndex = inc(putIndex); ++count; // 添加元素时会唤醒等待移出数据的take线程 notEmpty.signal(); } public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) // 当队列为空时,获取数据的线程等待 notEmpty.await(); // 否则就取出元素,并且唤醒等待的put线程 return extract(); } finally { lock.unlock(); } } private E extract() { final Object[] items = this.items; E x = this.<E>cast(items[takeIndex]); items[takeIndex] = null; takeIndex = inc(takeIndex); --count; // 唤醒等待的put线程 notFull.signal(); return x; }
感谢并参考:
https://www.javazhiyin.com/50848.html#m
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。