赞
踩
前言:
阻塞队列的典型使用场景就是生产者/消费者模式。本文会介绍Java7种阻塞队列。
阻塞队列,关键字是阻塞,先理解阻塞的含义,在阻塞队列中,线程阻塞有这样的两种情况:
1.当队列中没有数据的情况下,消费者端的所有线程都会被自动阻塞(挂起),直到有数据放入队列。
2.当队列中填满数据的情况下,生产者端的所有线程都会被自动阻塞(挂起),直到队列中有空的位置,线程被自动唤醒。
阻塞队列提供了四种处理方法:
方法\处理方式 | 抛出异常 | 返回特殊值 | 一直阻塞 | 超时退出 |
---|---|---|---|---|
插入方法 | add(e) | offer(e) | put(e) | offer(e,time,unit) |
移除方法 | remove() | poll() | take() | poll(time,unit) |
检查方法 | element() | peek() | 不可用 | 不可用 |
异常:是指当阻塞队列满时候,再往队列里插入元素,会抛出IllegalStateException("Queue full")异常。当队列为空时,从队列里获取元素时会抛出NoSuchElementException异常 。
添加数据操作:
1:publicabstract boolean add(E paramE):将指定元素插入此队列中(如果立即可行且不会违反容量限制),成功时返回true,如果当前没有可用的空间,则抛出IllegalStateException。如果该元素是NULL,则会抛出NullPointerException异常。
2:public abstract boolean offer(E paramE):将指定元素插入此队列中(如果立即可行且不会违反容量限制),成功时返回true,如果当前没有可用的空间,则返回false。
3:public abstract void put(E paramE) throws InterruptedException:将指定元素插入此队列中,将等待可用的空间(如果有必要)
4:offer(E o, long timeout, TimeUnit unit):可以设定等待的时间,如果在指定的时间内,还不能往队列中加入BlockingQueue,则返回失败。
获取数据操作:
1:poll(time):取走BlockingQueue里排在首位的对象,若不能立即取出,则可以等time参数规定的时间,取不到时返回null;
2:poll(long timeout, TimeUnit unit):从BlockingQueue取出一个队首的对象,如果在指定时间内,队列一旦有数据可取,则立即返回队列中的数据。否则直到时间超时还没有数据可取,返回失败。
3:take():取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到BlockingQueue有新的数据被加入。
4.drainTo():一次性从BlockingQueue获取所有可用的数据对象(还可以指定获取数据的个数),通过该方法,可以提升获取数据效率;不需要多次分批加锁或释放锁。
1.缓存系统的设计:可以用DelayQueue保存缓存元素的有效期,使用一个线程循环查询DelayQueue,一旦能从DelayQueue中获取元素时,表示缓存有效期到了。
2.定时任务调度:使用DelayQueue保存当天将会执行的任务和执行时间,一旦从DelayQueue中获取到任务就开始执行,从比如TimerQueue就是使用DelayQueue实现的。
1.transfer方法:如果当前有消费者正在等待接收元素(消费者使用take()方法或带时间限制的poll()方法时),transfer方法可以把生产者传入的元素立刻transfer(传输)给消费者。如果没有消费者在等待接收元素,transfer方法会将元素存放在队列的tail节点,并等到该元素被消费者消费了才返回。
2.tryTransfer方法。则是用来试探下生产者传入的元素是否能直接传给消费者。如果没有消费者等待接收元素,则返回false。和transfer方法的区别是tryTransfer方法无论消费者是否接收,方法立即返回。而transfer方法是必须等到消费者消费了才返回。对于带有时间限制的tryTransfer(E e, long timeout, TimeUnit unit)方法,则是试图把生产者传入的元素直接传给消费者,但是如果没有消费者消费该元素则等待指定的时间再返回,如果超时还没消费元素,则返回false,如果在超时时间内消费了元素,则返回true。
- //生产者
- public class Producer implements Runnable{
- private final BlockingQueue<Integer> queue;
-
- public Producer(BlockingQueue q){
- this.queue=q;
- }
-
- @Override
- public void run() {
- try {
- while (true){
- Thread.sleep(1000);//模拟耗时
- queue.put(produce());
- }
- }catch (InterruptedException e){
-
- }
- }
-
- private int produce() {
- int n=new Random().nextInt(10000);
- System.out.println("Thread:" + Thread.currentThread().getId() + " produce:" + n);
- return n;
- }
- }
- //消费者
- public class Consumer implements Runnable {
- private final BlockingQueue<Integer> queue;
-
- public Consumer(BlockingQueue q){
- this.queue=q;
- }
-
- @Override
- public void run() {
- while (true){
- try {
- Thread.sleep(2000);//模拟耗时
- consume(queue.take());
- }catch (InterruptedException e){
-
- }
-
- }
- }
-
- private void consume(Integer n) {
- System.out.println("Thread:" + Thread.currentThread().getId() + " consume:" + n);
-
- }
- }
- //测试
- public class Main {
-
- public static void main(String[] args) {
- BlockingQueue<Integer> queue=new ArrayBlockingQueue<Integer>(100);
- Producer p=new Producer(queue);
- Consumer c1=new Consumer(queue);
- Consumer c2=new Consumer(queue);
-
- new Thread(p).start();
- new Thread(c1).start();
- new Thread(c2).start();
- }
- }

由于阻塞队列本身是线程安全的,队列可以安全地从一个线程向另外一个线程传递数据,所以我们的生产者/消费者直接使用线程安全的队列就可以,而不需要自己去考虑更多的线程安全问题。这也就意味着,考虑锁等线程安全问题的重任从你
转移到了 队列
上,降低了我们开发的难度和工作量。
队列还能起到一个隔离的作用。比如说我们开发一个银行转账的程序,那么生产者线程不需要关心具体的转账逻辑,只需要把转账任务,如账户和金额等信息放到(put)队列中就可以,而不需要去关心银行这个类如何实现具体的转账业务。而作为银行这个类来讲,它会去从队列里(take)取出来将要执行的具体的任务,再去通过自己的各种方法来完成本次转账。这样就实现了具体任务与执行任务类之间的解耦,任务被放在了阻塞队列中,而负责放任务的线程是无法直接访问到我们银行具体实现转账操作的对象的,实现了隔离,提高了安全性。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。