赞
踩
目录
在Java并发编程中,阻塞队列(BlockingQueue)是一个非常有用的工具。它是一个线程安全的队列,支持生产者-消费者模式,可以解决多线程并发访问的问题。
阻塞队列是一种特殊的队列,它具有以下几个特点:
(1)线程安全:阻塞队列是线程安全的,多个线程可以并发访问它而不会发生冲突。
(2)生产者-消费者模式:阻塞队列支持生产者-消费者模式,即生产者向队列中添加元素,消费者从队列中取出元素。
(3)阻塞等待:当队列为空时,消费者会被阻塞等待直到队列中有元素可供消费;当队列已满时,生产者会被阻塞等待直到队列有空闲位置可供添加元素。
阻塞队列的实现原理主要涉及到两个方面:线程安全和阻塞等待。
(1)线程安全实现:阻塞队列的线程安全实现主要依靠锁和同步机制来保证多线程访问的安全。在Java中,常用的锁有ReentrantLock和synchronized,它们可以保证同一时刻只有一个线程可以访问共享资源。
(2)阻塞等待实现:阻塞队列的阻塞等待实现主要依靠条件变量来实现。在Java中,常用的条件变量有Condition和wait/notify机制,它们可以使线程在满足特定条件时挂起等待,直到条件满足时被唤醒。
阻塞队列在Java并发编程中有着广泛的应用场景,主要包括以下几个:
(1)线程池:Java中的线程池使用了阻塞队列来管理任务队列,当线程池中的线程数达到最大值时,新的任务会被放入阻塞队列中等待执行。
(2)生产者-消费者模式:阻塞队列可以非常方便地实现生产者-消费者模式,生产者向队列中添加数据,消费者从队列中取出数据,阻塞队列可以保证生产者和消费者之间的同步和协调。
(3)消息队列:阻塞队列可以用于实现消息队列,例如Java消息服务(JMS)中的队列和主题就是基于阻塞队列实现的。
(4)多线程协作:阻塞队列可以用于多线程之间的协作,例如一个线程生产数据,另一个线程消费数据,它们可以通过阻塞队列来进行数据交换和协作。生产线程向阻塞队列中添加数据,消费线程从队列中取出数据进行处理,如果队列为空则消费线程会阻塞等待,直到有数据被添加到队列中。
总结:
阻塞队列是Java并发编程中非常重要的一个工具类,它可以实现多线程之间的协作,提高程序的效率和可靠性。在使用阻塞队列时需要注意以下几点:
综上所述,阻塞队列是Java并发编程中非常重要的一个工具类,可以实现多线程之间的协作,提高程序的效率和可靠性。在使用阻塞队列时需要注意容量设置、线程安全、阻塞特性等问题,选择合适的实现方式,才能发挥阻塞队列的优势。
BlockingQueue 阻塞队列(生产/消费者队列) ★★★★★
Java中提供了多种阻塞队列的实现,以下是最常用的几个:
1. ArrayBlockingQueue
ArrayBlockingQueue是一个有界的阻塞队列,底层是由数组实现的,当队列满时,新元素将无法添加到队列中,直到队列中有空闲位置为止。当队列为空时,获取元素的操作将会阻塞,直到队列中有元素可用。
2. LinkedBlockingQueue
LinkedBlockingQueue是一个无界的阻塞队列,底层是由链表实现的,可以存储任意数量的元素。当队列满时,新元素将会一直阻塞等待,直到队列中有空闲位置为止。当队列为空时,获取元素的操作将会阻塞,直到队列中有元素可用。
3. PriorityBlockingQueue
PriorityBlockingQueue是一个支持优先级的阻塞队列,底层是由堆实现的,可以根据元素的优先级顺序进行排序。当添加元素时,会根据元素的优先级自动排序,获取元素时会返回当前队列中优先级最高的元素。当队列为空时,获取元素的操作将会阻塞,直到队列中有元素可用。
4. SynchronousQueue
SynchronousQueue是一个特殊的阻塞队列,它并不保存任何元素,每次插入操作必须等待另一个线程的移除操作,每次移除操作必须等待另一个线程的插入操作,因此它可以用于两个线程之间进行数据交换。
以下是各类实现详解
ArrayBlockingQueue在初始化的时候,必须指定当前队列的长度。因为ArrayBlockingQueue是基于数组实现的队列结构,数组长度不可变,必须提前设置数组长度信息。
类上方的英文文档:
A bounded blocking queue backed by an array. This queue orders elements FIFO (first-in-first-out). The head of the queue is that element that has been on the queue the longest time. The tail of the queue is that element that has been on the queue the shortest time. New elements are inserted at the tail of the queue, and the queue retrieval operations obtain elements at the head of the queue.
微软翻译:由数组支持的有界阻塞队列。此队列按 FIFO(先进先出)对元素进行排序。队列的头部是在队列中停留时间最长的元素。队列的尾部是队列中停留时间最短的元素。新元素插入到队列的尾部,队列检索操作获取队列末尾的元素。
This is a classic "bounded buffer", in which a fixed-sized array holds elements inserted by producers and extracted by consumers. Once created, the capacity cannot be changed. Attempts to put an element into a full queue will result in the operation blocking; attempts to take an element from an empty queue will similarly block.这是一个经典的“有界缓冲区”,其中固定大小的数组包含由生产者插入并由消费者提取的元素。一旦创建,就无法更改容量。尝试将元素放入完整队列将导致操作阻塞;尝试从空队列中获取元素同样会阻塞。
This class supports an optional fairness policy for ordering waiting producer and consumer threads. By default, this ordering is not guaranteed. However, a queue constructed with fairness set to true grants threads access in FIFO order. Fairness generally decreases throughput but reduces variability and avoids starvation.此类支持可选的公平性策略,用于对等待的生产者和使用者线程进行排序。默认情况下,不保证此排序。但是,如果将公平性设置为 true 而构造的队列将按 FIFO 顺序授予线程访问权限。公平性通常会降低
This class and its iterator implement all of the optional methods of the Collection and Iterator interfaces.
- import java.util.concurrent.ArrayBlockingQueue;
- import java.util.concurrent.BlockingQueue;
- import java.util.concurrent.TimeUnit;
-
- public class TestArrayBlockingQueue {
- public static void main(String[] args) throws InterruptedException {
- BlockingQueue<String> arrayQueue = new ArrayBlockingQueue<>(4);
- //向队列中添加元素
- arrayQueue.add("张飞");
- arrayQueue.offer("刘备");
- arrayQueue.offer("关羽",2, TimeUnit.SECONDS);
- arrayQueue.put("曹操");
- System.out.println(arrayQueue);//[张飞, 刘备, 关羽, 曹操]
-
- //从队列中取元素
- arrayQueue.remove();
- arrayQueue.remove("刘备");
- arrayQueue.take();
- arrayQueue.poll();
- arrayQueue.poll(3,TimeUnit.SECONDS);
- System.out.println(arrayQueue);
- }
- }

1. boolean offer(E e)
要求存储的数据不允许为null,为null就抛出空指针;
为保证线程安全,会加锁;
如果队列中的元素已经存满了,返回false;队列没满,将元素添加到队列中。
2. boolean add(E e)
add方法本身就是调用了offer方法,如果offer方法返回false,直接抛出异常。
3. boolean offer(E e, long timeout, TimeUnit unit)
生产者在添加数据时,如果队列已经满了,阻塞一会。
阻塞到消费者消费了消息,然后唤醒当前阻塞线程
阻塞到了time时间,再次判断是否可以添加,不能,直接告辞。
- /**
- * Inserts the specified element at the tail of this queue, waiting
- * up to the specified wait time for space to become available if
- * the queue is full.
- *
- * @throws InterruptedException {@inheritDoc}
- * @throws NullPointerException {@inheritDoc}
- */
- // 如果线程在挂起的时候,如果对当前阻塞线程的中断标记位进行设置,此时会抛出异常直接结束
- public boolean offer(E e, long timeout, TimeUnit unit)
- throws InterruptedException {
-
- checkNotNull(e);
- long nanos = unit.toNanos(timeout);
- final ReentrantLock lock = this.lock;
- // 允许线程中断并抛出异常的加锁方式
- lock.lockInterruptibly();
- try {
- // 为什么是while(虚假唤醒)
- while (count == items.length) {
- if (nanos <= 0)
- return false;
- // 挂起等待,会同时释放锁资源(对标sync的wait方法)
- // awaitNanos会挂起线程,并且返回剩余的阻塞时间
- // 恢复执行时,需要重新获取锁资源
- nanos = notFull.awaitNanos(nanos);
- }
- enqueue(e);
- return true;
- } finally {
- lock.unlock();
- }
- }

4. void put(E e)
如果队列是满的, 就一直挂起,直到被唤醒,或者被中断。
- /**
- * Inserts the specified element at the tail of this queue, waiting
- * for space to become available if the queue is full.
- *
- * @throws InterruptedException {@inheritDoc}
- * @throws NullPointerException {@inheritDoc}
- */
- public void put(E e) throws InterruptedException {
- checkNotNull(e);
- final ReentrantLock lock = this.lock;
- lock.lockInterruptibly();
- try {
- while (count == items.length)
- // await方法一直阻塞,直到被唤醒或者中断标记位
- notFull.await();
- enqueue(e);
- } finally {
- lock.unlock();
- }
- }

1. remove()
- /**
- * Retrieves and removes the head of this queue. This method differs
- * from {@link #poll poll} only in that it throws an exception if this
- * queue is empty.
- *
- * <p>This implementation returns the result of <tt>poll</tt>
- * unless the queue is empty.
- *
- * @return the head of this queue
- * @throws NoSuchElementException if this queue is empty
- */
- public E remove() {
- E x = poll();
- if (x != null)
- return x;
- else
- throw new NoSuchElementException();
- }

2. poll()
3. poll(long timeout, TimeUnit unit)
- public E poll(long timeout, TimeUnit unit) throws InterruptedException {
- // 转换时间单位
- long nanos = unit.toNanos(timeout);
- // 竞争锁
- final ReentrantLock lock = this.lock;
- lock.lockInterruptibly();
- try {
- // 如果没有数据
- while (count == 0) {
- if (nanos <= 0)
- // 没数据,也无法阻塞了,返回null
- return null;
- // 没数据,挂起消费者线程
- nanos = notEmpty.awaitNanos(nanos);
- }
- // 取数据
- return dequeue();
- } finally {
- lock.unlock();
- }
- }

4. take()
- public E take() throws InterruptedException {
- final ReentrantLock lock = this.lock;
- lock.lockInterruptibly();
- try {
- // 虚假唤醒
- while (count == 0)
- notEmpty.await();
- return dequeue();
- } finally {
- lock.unlock();
- }
- }
什么是Java虚假唤醒及如何避免虚假唤醒?《多线程学习之十四》_虚假唤醒 java-CSDN博客
- import java.util.concurrent.TimeUnit;
- import java.util.concurrent.locks.Condition;
- import java.util.concurrent.locks.ReentrantLock;
-
- /**
- * 虚假唤醒解决方案:将if循环->while循环
- */
- public class testIfAndWhile {
- private static ReentrantLock lock = new ReentrantLock();
- private static Condition hasApple = lock.newCondition();
- private static volatile int appleNum;
-
- public static void main(String[] args) throws InterruptedException {
- new Thread(()->{
- lock.lock();
- try {
- while (appleNum == 0){
- System.out.println("没苹果了,有了叫我——" + Thread.currentThread().getName());
- hasApple.await(3,TimeUnit.SECONDS);
- }
- System.out.println("现有苹果个数:"+appleNum);
- appleNum--;
- System.out.println("张三吃了1个,还剩的个数:"+appleNum);
- } catch (InterruptedException e) {
- e.printStackTrace();
- } finally {
- lock.unlock();
- }
- },"张三").start();
-
- new Thread(()->{
- lock.lock();
- try {
- while (appleNum == 0){
- System.out.println("没苹果了,有了叫我——" + Thread.currentThread().getName());
- hasApple.await(3,TimeUnit.SECONDS);
- }
- System.out.println("现有苹果个数:"+appleNum);
- appleNum--;
- System.out.println("李四吃了1个,还剩的个数:"+appleNum);
- } catch (InterruptedException e) {
- e.printStackTrace();
- } finally {
- lock.unlock();
- }
- },"李四").start();
-
- Thread.sleep(1000);
- lock.lock();
- try {
- appleNum=1;//主线程休眠两秒后,送来1个苹果
- hasApple.signalAll();//唤醒所有等待的线程
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- lock.unlock();
- }
- }
- }

类上面的英文介绍:
An optionally-bounded blocking queue based on linked nodes. This queue orders elements FIFO (first-in-first-out). The head of the queue is that element that has been on the queue the longest time. The tail of the queue is that element that has been on the queue the shortest time. New elements are inserted at the tail of the queue, and the queue retrieval operations obtain elements at the head of the queue. Linked queues typically have higher throughput than array-based queues but less predictable performance in most concurrent applications.
微软翻译:基于链接节点的可选边界阻塞队列。此队列按 FIFO(先进先出)对元素进行排序。
The optional capacity bound constructor argument serves as a way to prevent excessive queue expansion. The capacity, if unspecified, is equal to Integer.MAX_VALUE. Linked nodes are dynamically created upon each insertion unless this would bring the queue above capacity.
This class and its iterator implement all of the optional methods of the Collection and Iterator interfaces.
linkedBlockingQueue和arrayBlockingQueue - 简书
认真学习阻塞队列ArrayBlockingQueue与LinkedBlockingQueue_arrayblockingqueue和linkedblockqueue-CSDN博客
集合(八) -- 阻塞队列ArrayBlockingQueue和LinkedBlockingQueue_arrayblockingqueue与linkedblockingqueue-CSDN博客
1、带优先级的无界阻塞队列,每次出队列都返回优先级最高或者最低的元素
2、内部维护最小堆,使用平衡二叉树实现,直接遍历队列元素不保证有序。
3、默认使用对象的compareTo方法比较,支持自定义comparators
PriorityBlockingQueue优先级阻塞队列 - 简书
Queue集合之PriorityBlockingQueue详解_乐乐Java路漫漫的博客-CSDN博客
DelayQueue是一个无界的BlockingQueue,用于放置实现了Delayed接口的对象,其中的对象只能在其到期时才能从队列中取走。这种队列是有序的,即队头对象的延迟到期时间最长。注意:不能将null元素放置到这种队列中。
DelayQueue能做什么?
1. 淘宝订单业务:下单之后如果三十分钟之内没有付款就自动取消订单。
2. 饿了吗订餐通知:下单成功后60s之后给用户发送短信通知。
3.关闭空闲连接。服务器中,有很多客户端的连接,空闲一段时间之后需要关闭之。
4.缓存。缓存中的对象,超过了空闲时间,需要从缓存中移出。
5.任务超时处理。在网络协议滑动窗口请求应答式交互时,处理超时未响应的请求等。
Java高并发专题之35、延迟队列 DelayQueue 详解-CSDN博客
https://www.cnblogs.com/myseries/p/10944211.html
它非常特殊,不存储数据,存的是生产者或者是消费者。
是一个没有数据缓冲的BlockingQueue,容量为0,它不会为队列中元素维护存储空间,它只是多个线程之间数据交换的媒介。
https://www.cnblogs.com/leonandyou/p/15825348.html
https://www.cnblogs.com/chafry/p/16782932.html
两种模式:
应用场景:
SynchronousQueue非常适合传递性场景做交换工作,生产者的线程和消费者的线程同步传递某些信息、事件或者任务。
SynchronousQueue的一个使用场景是在线程池里。如果我们不确定来自生产者请求数量,但是这些请求需要很快的处理掉,那么配合SynchronousQueue为每个生产者请求分配一个消费线程是处理效率最高的办法。Executors.newCachedThreadPool()就使用了SynchronousQueue,这个线程池根据需要(新任务到来时)创建新的线程,如果有空闲线程则会重复使用,线程空闲了60秒后会被回收。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。