当前位置:   article > 正文

并发编程之美(第6章—1)_park(object blocker)

park(object blocker)

第6章 Java并发包中锁原理剖析

6.1 LockSupport工具类

        JDK中的rt.jar包里面的LockSupport是个工具类,它的主要作用是挂起和唤醒线程该工具类是创建锁和其他同步类的基础
        LockSupport类与每个使用它的线程都会关联一个许可证,在默认情况下调用LockSupport类的方法的线程是不持有许可证的。LockSupport是使用Unsafe类实现的,下面介绍LockSupport中的几个主要函数。
1.void park()方法
        如果调用park方法的线程已经拿到了与LockSupport关联的许可证,则调用LockSupport.park()时会马上返回,否则调用线程会被禁止参与线程的调度,也就是会被阻塞挂起。
        如下代码直接在main函数里面调用park方法,最终只会输出begin park!,然后当前线程被挂起,这是因为在默认情况下调用线程是不持有许可证的。

  1. public static void main( String[] args ){
  2.     System.out.println( "begin park! " );
  3.     LockSupport.park();
  4.     System.out.println( "end park! " );
  5. }

        在其他线程调用unpark(Thread thread)方法并且将当前线程作为参数时,调用park方法而被阻塞的线程会返回。另外,如果其他线程调用了阻塞线程的interrupt()方法,设置了中断标志或者线程被虚假唤醒,则阻塞线程也会返回。所以在调用park方法时最好也使用循环条件判断方式
        需要注意的是,因调用park()方法而被阻塞的线程被其他线程中断而返回时并不会抛出InterruptedException异常
2.void unpark(Thread thread)方法
        当一个线程调用unpark时,如果参数thread线程没有持有thread与LockSupport类关联的许可证,则让thread线程持有。如果thread之前因调用park()而被挂起,则调用unpark后,该线程会被唤醒。如果thread之前没有调用park,则调用unpark方法后,再调用park方法,其会立刻返回。修改代码如下。

  1. public static void main( String[] args ){
  2.       System.out.println( "begin park! " );
  3.       //使当前线程获取到许可证
  4.       LockSupport.unpark(Thread.currentThread());
  5.       //再次调用park方法
  6.       LockSupport.park();
  7.       System.out.println( "end park! " );
  8. }
  9. 该代码会输出
  10. begin park!
  11. end park!

下面再来看一个例子以加深对park和unpark的理解。

  1. public static void main(String[] args) throws InterruptedException {
  2.         Thread thread = new Thread(new Runnable() {
  3.             @Override
  4.             public void run() {
  5.                 System.out.println("child thread begin park! ");
  6.                 // 调用park方法,挂起自己
  7.                 LockSupport.park();
  8.                 System.out.println("child thread unpark! ");
  9.             }
  10.         });
  11.         //启动子线程
  12.         thread.start();
  13.         //主线程休眠1s
  14.         Thread.sleep(1000);
  15.         System.out.println("main thread begin unpark! ");
  16.         //调用unpark方法让thread线程持有许可证,然后park方法返回
  17.         LockSupport.unpark(thread);
  18.     }
  19. 输出结果为
  20. child thread begin park!
  21. main thread begin unpark!
  22. child thread unpark!

        上面代码首先创建了一个子线程thread,子线程启动后调用park方法,由于在默认情况下子线程没有持有许可证,因而它会把自己挂起。
        主线程休眠1s是为了让主线程调用unpark方法前让子线程输出child thread begin park!并阻塞。
        主线程然后执行unpark方法,参数为子线程,这样做的目的是让子线程持有许可证,然后子线程调用的park方法就返回了
        park方法返回时不会告诉你因何种原因返回,所以调用者需要根据之前调用park方法的原因,再次检查条件是否满足,如果不满足则还需要再次调用park方法。
       例如,根据调用前后中断状态的对比就可以判断是不是因为被中断才返回的。
       为了说明调用park方法后的线程被中断后会返回,我们修改上面的例子代码,删除LockSupport.unpark(thread);,然后添加thread.interrupt();,具体代码如下。

  1.     public static void main(String[] args) throws InterruptedException {
  2.         Thread thread = new Thread(new Runnable() {
  3.             @Override
  4.             public void run() {
  5.                 System.out.println("child thread begin park! ");
  6.                 // 调用park方法,挂起自己,只有被中断才会退出循环
  7.                 while (! Thread.currentThread().isInterrupted()) {
  8.                     LockSupport.park();
  9.                 }
  10.                 System.out.println("child thread unpark! ");
  11.             }
  12.         });
  13.         // 启动子线程
  14.         thread.start();
  15.         // 主线程休眠1s
  16.         Thread.sleep(1000);
  17.         System.out.println("main thread begin unpark! ");
  18.         // 中断子线程
  19.         thread.interrupt();
  20.     }
  21. 输出结果为
  22. child thread begin park!
  23. main thread begin unpark!
  24. child thread unpark!

        在如上代码中,只有中断子线程,子线程才会运行结束,如果子线程不被中断,即使你调用unpark(thread)方法子线程也不会结束。
3.void parkNanos(long nanos)方法
         和park方法类似,如果调用park方法的线程已经拿到了与LockSupport关联的许可证,则调用LockSupport.parkNanos(long nanos)方法后会马上返回。该方法的不同在于,如果没有拿到许可证,则调用线程会被挂起nanos时间后修改为自动返回
         另外park方法还支持带有blocker参数的方法void park(Object blocker)方法,当线程在没有持有许可证的情况下调用park方法而被阻塞挂起时,这个blocker对象会被记录到该线程内部
         使用诊断工具可以观察线程被阻塞的原因,诊断工具是通过调用getBlocker(Thread)方法来获取blocker对象的,所以JDK推荐我们使用带有blocker参数的park方法,并且blocker被设置为this,这样当在打印线程堆栈排查问题时就能知道是哪个类被阻塞了。

例如下面的代码。

  1. public class TestPark {
  2.     public  void testPark(){
  3.       LockSupport.park(); //(1)
  4.     }
  5.     public static void main(String[] args) {
  6.         TestPark testPark = new TestPark();
  7.         testPark.testPark();
  8.     }
  9. }

运行代码后,使用jstack pid命令查看线程堆栈时可以看到如下输出结果。

修改代码(1)为LockSupport.park(this)后运行代码,则jstack pid的输出结果为

使用带blocker参数的park方法,线程堆栈可以提供更多有关阻塞对象的信息。
4.park(Object blocker)方法

  1. public static void park(Object blocker) {
  2.   //获取调用线程
  3.     Thread t = Thread.currentThread();
  4.   //设置该线程的blocker变量
  5.     setBlocker(t, blocker);
  6.     //挂起线程
  7.     UNSAFE.park(false, 0L);
  8.   //线程被激活后清除blocker变量,因为一般都是在线程阻塞时才分析原因
  9.     setBlocker(t, null);
  10. }

        Thread类里面有个变量volatile Object parkBlocker,用来存放park方法传递的blocker对象,也就是把blocker变量存放到了调用park方法的线程的成员变量里面。
5.void parkNanos(Object blocker, long nanos)方法
      相比park(Object blocker)方法多了个超时时间。
6.void parkUntil(Object blocker, long deadline)方法
      它的代码如下

  1. public static void parkUntil(Object blocker, long deadline) {
  2.         Thread t = Thread.currentThread();
  3.         setBlocker(t, blocker);
  4.       //isAbsolute=true, time=deadline;表示到deadline时间后返回
  5.         UNSAFE.park(true, deadline);
  6.         setBlocker(t, null);
  7.     }

        其中参数deadline的时间单位为ms,该时间是从1970年到现在某一个时间点的毫秒值。这个方法和parkNanos(Object blocker, long nanos)方法的区别是,后者是从当前算等待nanos秒时间,而前者是指定一个时间点,比如需要等到2017.12.11日12:00:00,则把这个时间点转换为从1970年到这个时间点的总毫秒数。
最后再看一个例子。

  1. class FIFOMutex {
  2.     private final AtomicBoolean locked = new AtomicBoolean(false);
  3.     private final Queue<Thread> waiters = new ConcurrentLinkedQueue<Thread>();
  4.     public void lock() {
  5.         boolean wasInterrupted = false;
  6.         Thread current = Thread.currentThread();
  7.         waiters.add(current);
  8.         // 只有队首的线程可以获取锁(1)
  9.         while (waiters.peek() ! = current || ! locked.compareAndSet(false, true)) {
  10.             LockSupport.park(this);
  11.             if (Thread.interrupted()) //(2)
  12.                 wasInterrupted = true;
  13.         }
  14.         waiters.remove();
  15.         if (wasInterrupted) //(3)
  16.             current.interrupt();
  17.     }
  18.     public void unlock() {
  19.         locked.set(false);
  20.         LockSupport.unpark(waiters.peek());
  21.     }
  22. }

        这是一个先进先出的锁,也就是只有队列的首元素可以获取锁。在代码(1)处,如果当前线程不是队首或者当前锁已经被其他线程获取,则调用park方法挂起自己。
        然后在代码(2)处判断,如果park方法是因为被中断而返回,则忽略中断,并且重置中断标志,做个标记,然后再次判断当前线程是不是队首元素或者当前锁是否已经被其他线程获取,如果是则继续调用park方法挂起自己
        然后在代码(3)中,判断标记,如果标记为true则中断该线程,这个怎么理解呢?其实就是其他线程中断了该线程,虽然我对中断信号不感兴趣,忽略它,但是不代表其他线程对该标志不感兴趣,所以要恢复下

6.2 抽象同步队列AQS概述

6.2.1 AQS——锁的底层支持

        AbstractQueuedSynchronizer抽象同步队列简称AQS,它是实现同步器的基础组件,并发包中锁的底层就是使用AQS实现的。另外,大多数开发者可能永远不会直接使用AQS,但是知道其原理对于架构设计还是很有帮助的。下面看下AQS的类图结构,如图6-1所示。

                                                                                                              图6-1

        由该图可以看到,AQS是一个FIFO的双向队列,其内部通过节点head和tail记录队首和队尾元素,队列元素的类型为Node其中Node中的thread变量用来存放进入AQS队列里面的线程;Node节点内部的SHARED用来标记该线程是获取共享资源时被阻塞挂起后放入AQS队列的EXCLUSIVE用来标记线程是获取独占资源时被挂起后放入AQS队列的waitStatus记录当前线程等待状态,可以为CANCELLED(线程被取消了)、SIGNAL(线程需要被唤醒)、CONDITION(线程在条件队列里面等待)、PROPAGATE(释放共享资源时需要通知其他节点); prev记录当前节点的前驱节点,next记录当前节点的后继节点
        在AQS中维持了一个单一的状态信息state,可以通过getState、setState、compareAndSetState函数修改其值。对于ReentrantLock的实现来说,state可以用来表示当前线程获取锁的可重入次数;对于读写锁ReentrantReadWriteLock来说,state的高16位表示读状态,也就是获取该读锁的次数,低16位表示获取到写锁的线程的可重入次数;对于semaphore来说,state用来表示当前可用信号的个数;对于CountDownlatch来说,state用来表示计数器当前的值。
        AQS有个内部类ConditionObject,用来结合锁实现线程同步。ConditionObject可以直接访问AQS对象内部的变量,比如state状态值和AQS队列ConditionObject是条件变量,每个条件变量对应一个条件队列(单向链表队列),其用来存放调用条件变量的await方法后被阻塞的线程,如类图所示,这个条件队列的头、尾元素分别为firstWaiter和lastWaiter
        对于AQS来说,线程同步的关键是对状态值state进行操作。根据state是否属于一个线程,操作state的方式分为独占方式和共享方式。在独占方式下获取和释放资源使用的方法为: void acquire(int arg)void acquireInterruptibly(int arg)boolean release(int arg)。
        在共享方式下获取和释放资源的方法为: void acquireShared(int arg)void acquireSharedInterruptibly(int arg)boolean releaseShared(int arg)。
        使用独占方式获取的资源是与具体线程绑定的,就是说如果一个线程获取到了资源,就会标记是这个线程获取到了,其他线程再尝试操作state获取资源时会发现当前该资源不是自己持有的,就会在获取失败后被阻塞。比如独占锁ReentrantLock的实现,当一个线程获取了ReentrantLock的锁后,在AQS内部会首先使用CAS操作把state状态值从0变为1,然后设置当前锁的持有者为当前线程,当该线程再次获取锁时发现它就是锁的持有者,则会把状态值从1变为2,也就是设置可重入次数,而当另外一个线程获取锁时发现自己并不是该锁的持有者就会被放入AQS阻塞队列后挂起。
        对应共享方式的资源与具体线程是不相关的,当多个线程去请求资源时通过CAS方式竞争获取资源,当一个线程获取到了资源后,另外一个线程再次去获取时如果当前资源还能满足它的需要,则当前线程只需要使用CAS方式进行获取即可。比如Semaphore信号量,当一个线程通过acquire()方法获取信号量时,会首先看当前信号量个数是否满足需要,不满足则把当前线程放入阻塞队列,如果满足则通过自旋CAS获取信号量。

在独占方式下,获取与释放资源的流程如下
(1)当一个线程调用acquire(int arg)方法获取独占资源时,会首先使用tryAcquire方法尝试获取资源,具体是设置状态变量state的值,成功则直接返回,失败则将当前线程封装为类型为Node.EXCLUSIVE的Node节点后插入到AQS阻塞队列的尾部,并调用LockSupport.park(this)方法挂起自己。

  1.   public final void acquire(int arg) {
  2.       if (! tryAcquire(arg) &&
  3.           acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  4.           selfInterrupt();
  5.    }

(2)当一个线程调用release(int arg)方法时会尝试使用tryRelease操作释放资源,这里是设置状态变量state的值,然后调用LockSupport.unpark(thread)方法激活AQS队列里面被阻塞的一个线程(thread)。被激活的线程则使用tryAcquire尝试,看当前状态变量state的值是否能满足自己的需要,满足则该线程被激活,然后继续向下运行,否则还是会被放入AQS队列并被挂起

  1.     public final boolean release(int arg) {
  2.         if (tryRelease(arg)) {
  3.             Node h = head;
  4.             if (h ! = null && h.waitStatus ! = 0)
  5.                 unparkSuccessor(h);
  6.             return true;
  7.         }
  8.         return false;
  9.     }

        需要注意的是,AQS类并没有提供可用的tryAcquire和tryRelease方法,正如AQS是锁阻塞和同步器的基础框架一样,tryAcquire和tryRelease需要由具体的子类来实现。子类在实现tryAcquire和tryRelease时要根据具体场景使用CAS算法尝试修改state状态值,成功则返回true,否则返回false。子类还需要定义,在调用acquire和release方法时state状态值的增减代表什么含义。
        比如继承自AQS实现的独占锁ReentrantLock,定义当status为0时表示锁空闲,为1时表示锁已经被占用。在重写tryAcquire时,在内部需要使用CAS算法查看当前state是否为0,如果为0则使用CAS设置为1,并设置当前锁的持有者为当前线程,而后返回true,如果CAS失败则返回false。
        比如继承自AQS实现的独占锁在实现tryRelease时,在内部需要使用CAS算法把当前state的值从1修改为0,并设置当前锁的持有者为null,然后返回true,如果CAS失败则返回false。
在共享方式下,获取与释放资源的流程如下
(1)当线程调用acquireShared(int arg)获取共享资源时,会首先使用tryAcquireShared尝试获取资源,具体是设置状态变量state的值,成功则直接返回,失败则将当前线程封装为类型为Node.SHARED的Node节点后插入到AQS阻塞队列的尾部,并使用LockSupport.park(this)方法挂起自己。

  1.     public final void acquireShared(int arg) {
  2.         if (tryAcquireShared(arg) < 0)
  3.             doAcquireShared(arg);
  4.     }

(2)当一个线程调用releaseShared(int arg)时会尝试使用tryReleaseShared操作释放资源,这里是设置状态变量state的值,然后使用LockSupport.unpark(thread)激活AQS队列里面被阻塞的一个线程(thread)。被激活的线程则使用tryReleaseShared查看当前状态变量state的值是否能满足自己的需要,满足则该线程被激活,然后继续向下运行,否则还是会被放入AQS队列并被挂起。

  1.     public final boolean releaseShared(int arg) {
  2.           if (tryReleaseShared(arg)) {
  3.               doReleaseShared();
  4.               return true;
  5.           }
  6.           return false;
  7.     }

        同样需要注意的是,AQS类并没有提供可用的tryAcquireShared和tryReleaseShared方法,正如AQS是锁阻塞和同步器的基础框架一样,tryAcquireShared和tryReleaseShared需要由具体的子类来实现子类在实现tryAcquireShared和tryReleaseShared时要根据具体场景使用CAS算法尝试修改state状态值,成功则返回true,否则返回false
        比如继承自AQS实现的读写锁ReentrantReadWriteLock里面的读锁在重写tryAcquireShared时,首先查看写锁是否被其他线程持有,如果是则直接返回false,否则使用CAS递增state的高16位(在ReentrantReadWriteLock中,state的高16位为获取读锁的次数)。
        比如继承自AQS实现的读写锁ReentrantReadWriteLock里面的读锁在重写tryReleaseShared时,在内部需要使用CAS算法把当前state值的高16位减1,然后返回true,如果CAS失败则返回false。
        基于AQS实现的锁除了需要重写上面介绍的方法外,还需要重写isHeldExclusively方法,来判断锁是被当前线程独占还是被共享
        另外,也许你会好奇,独占方式下的void acquire(int arg)和void acquireInterruptibly(int arg),与共享方式下的void acquireShared(int arg)和void acquireSharedInterruptibly(int arg),这两套函数中都有一个带有Interruptibly关键字的函数,那么带这个关键字和不带有什么区别呢?我们来讲讲。
        其实不带Interruptibly关键字的方法的意思是不对中断进行响应,也就是线程在调用不带Interruptibly关键字的方法获取资源时或者获取资源失败被挂起时,其他线程中断了该线程,那么该线程不会因为被中断而抛出异常,它还是继续获取资源或者被挂起,也就是说不对中断进行响应,忽略中断
        而带Interruptibly关键字的方法要对中断进行响应,也就是线程在调用带Interruptibly关键字的方法获取资源时或者获取资源失败被挂起时,其他线程中断了该线程,那么该线程会抛出InterruptedException异常而返回。
        最后,我们来看看如何维护AQS提供的队列,主要看入队操作。
● 入队操作:当一个线程获取锁失败后该线程会被转换为Node节点,然后就会使用enq(final Node node)方法将该节点插入到AQS的阻塞队列。

  1. private Node enq(final Node node) {
  2.         for (; ; ) {
  3.             Node t = tail; //(1)
  4.             if (t == null) { // Must initialize
  5.                 if (compareAndSetHead(new Node()))//(2)
  6.                     tail = head;
  7.             } else {
  8.                 node.prev = t; //(3)
  9.                 if (compareAndSetTail(t, node)) {//(4)
  10.                     t.next = node;
  11.                     return t;
  12.                 }
  13.             }
  14.         }
  15.     }

        下面结合代码和节点图(见图6-2)来讲解入队的过程。如上代码在第一次循环中,当要在AQS队列尾部插入元素时,AQS队列状态如图6-2中(default)所示。也就是队列头、尾节点都指向null;当执行代码(1)后节点t指向了尾部节点,这时候队列状态如图6-2中(I)所示。
        这时候t为null,故执行代码(2),使用CAS算法设置一个哨兵节点为头节点,如果CAS设置成功,则让尾部节点也指向哨兵节点,这时候队列状态如图6-2中(II)所示。
        到现在为止只插入了一个哨兵节点,还需要插入node节点,所以在第二次循环后执行到代码(1),这时候队列状态如图6-2(III)所示;然后执行代码(3)设置node的前驱节点为尾部节点,这时候队列状态如图6-2中(IV)所示;然后通过CAS算法设置node节点为尾部节点,CAS成功后队列状态如图6-2中(V)所示;CAS成功后再设置原来的尾部节点的后驱节点为node,这时候就完成了双向链表的插入,此时队列状态如图6-2中(VI)所示。

                                                                                                             图6-2

6.2.2 AQS——条件变量的支持

        正如在基础篇中讲解的,notify和wait,是配合synchronized内置锁实现线程间同步的基础设施一样,条件变量的signal和await方法也是用来配合锁(使用AQS实现的锁)实现线程间同步的基础设施。
        它们的不同在于,synchronized同时只能与一个共享变量的notify或wait方法实现同步,而AQS的一个锁可以对应多个条件变量
        在基础篇中讲解了,在调用共享变量的notify和wait方法前必须先获取该共享变量的内置锁,同理,在调用条件变量的signal和await方法前也必须先获取条件变量对应的锁。
        那么,到底什么是条件变量呢?如何使用呢?不急,下面看一个例子。

  1. ReentrantLock lock = new ReentrantLock(); //(1)
  2. Condition condition = lock.newCondition(); //(2)
  3. lock.lock(); //(3)
  4. try {
  5.     System.out.println("begin wait");
  6.     condition.await(); //(4)
  7.     System.out.println("end wait");
  8. } catch (Exception e) {
  9.     e.printStackTrace();
  10. } finally {
  11.     lock.unlock(); //(5)
  12. }
  13. lock.lock(); //(6)
  14. try {
  15.     System.out.println("begin signal");
  16.     condition.signal(); //(7)
  17.     System.out.println("end signal");
  18. } catch (Exception e) {
  19.     e.printStackTrace();
  20. } finally {
  21.     lock.unlock(); //(8)
  22. }

        代码(1)创建了一个独占锁ReentrantLock对象,ReentrantLock是基于AQS实现的锁。
        代码(2)使用创建的Lock对象的newCondition()方法创建了一个ConditionObject变量,这个变量就是Lock锁对应的一个条件变量。需要注意的是,一个Lock对象可以创建多个条件变量
        代码(3)首先获取了独占锁,代码(4)则调用了条件变量的await()方法阻塞挂起了当前线程。当其他线程调用条件变量的signal方法时,被阻塞的线程才会从await处返回。需要注意的是,和调用Object的wait方法一样,如果在没有获取到锁前调用了条件变量的await方法则会抛出java.lang.IllegalMonitorStateException异常。
        代码(5)则释放了获取的锁。
        其实这里的Lock对象等价于synchronized加上共享变量,调用lock.lock()方法就相当于进入了synchronized块(获取了共享变量的内置锁),调用lock.unLock()方法就相当于退出synchronized块。调用条件变量的await()方法就相当于调用共享变量的wait()方法,调用条件变量的signal方法就相当于调用共享变量的notify()方法。调用条件变量的signalAll()方法就相当于调用共享变量的notifyAll()方法。
        经过上面解释,相信大家已经知道条件变量是什么,它是用来做什么的了。
        在上面代码中,lock.newCondition()的作用其实是new了一个在AQS内部声明的ConditionObject对象,ConditionObject是AQS的内部类,可以访问AQS内部的变量(例如状态变量state)和方法。在每个条件变量内部都维护了一个条件队列,用来存放调用条件变量的await()方法时被阻塞的线程。注意这个条件队列和AQS队列不是一回事
        在如下代码中,当线程调用条件变量的await()方法时(必须先调用锁的lock()方法获取锁),在内部会构造一个类型为Node.CONDITION的node节点,然后将该节点插入条件队列末尾,之后当前线程会释放获取的锁(也就是会操作锁对应的state变量的值),并被阻塞挂起。这时候如果有其他线程调用lock.lock()尝试获取锁,就会有一个线程获取到锁,如果获取到锁的线程调用了条件变量的await()方法,则该线程也会被放入条件变量的阻塞队列,然后释放获取到的锁,在await()方法处阻塞。

  1. public final void await() throws InterruptedException {
  2.           if (Thread.interrupted())
  3.               throw new InterruptedException();
  4.             //创建新的node节点,并插入到条件队列末尾(9)
  5.           Node node = addConditionWaiter();
  6.           //释放当前线程获取的锁(10)
  7.           int savedState = fullyRelease(node);
  8.           int interruptMode = 0;
  9.           //调用park方法阻塞挂起当前线程(11)
  10.           while (! isOnSyncQueue(node)) {
  11.               LockSupport.park(this);
  12.               if ((interruptMode = checkInterruptWhileWaiting(node)) ! = 0)
  13.                   break;
  14.           }
  15.           ...
  16.     }

        在如下代码中,当另外一个线程调用条件变量的signal方法时(必须先调用锁的lock()方法获取锁),在内部会把条件队列里面队头的一个线程节点从条件队列里面移除并放入AQS的阻塞队列里面,然后激活这个线程。

  1. public final void signal() {
  2.       if (! isHeldExclusively())
  3.           throw new IllegalMonitorStateException();
  4.       Node first = firstWaiter;
  5.       if (first ! = null)
  6.         //将条件队列头元素移动到AQS队列
  7.           doSignal(first);
  8.   }

        需要注意的是,AQS只提供了ConditionObject的实现,并没有提供newCondition函数,该函数用来new一个ConditionObject对象。需要由AQS的子类来提供newCondition函数
        下面来看当一个线程调用条件变量的await()方法而被阻塞后,如何将其放入条件队列。

  1.     private Node addConditionWaiter() {
  2.       Node t = lastWaiter;
  3.       ...
  4.       //(1)
  5.       Node node = new Node(Thread.currentThread(), Node.CONDITION);
  6.       //(2)
  7.       if (t == null)
  8.           firstWaiter = node;
  9.       else
  10.           t.nextWaiter = node; //(3)
  11.           lastWaiter = node; //(4)
  12.       return node;
  13. }

       代码(1)首先根据当前线程创建一个类型为Node.CONDITION的节点,然后通过代码(2)(3)(4)在单向条件队列尾部插入一个元素。
       注意:当多个线程同时调用lock.lock()方法获取锁时,只有一个线程获取到了锁,其他线程会被转换为Node节点插入到lock锁对应的AQS阻塞队列里面,并做自旋CAS尝试获取锁
       如果获取到锁的线程又调用了对应的条件变量的await()方法,则该线程会释放获取到的锁,并被转换为Node节点插入到条件变量对应的条件队列里面。
       这时候因为调用lock.lock()方法被阻塞到AQS队列里面的一个线程会获取到被释放的锁,如果该线程也调用了条件变量的await()方法则该线程也会被放入条件变量的条件队列里面。
       当另外一个线程调用条件变量的signal()或者signalAll()方法时,会把条件队列里面的一个或者全部Node节点移动到AQS的阻塞队列里面,等待时机获取锁。
       最后使用一个图(见图6-3)总结如下:一个锁对应一个AQS阻塞队列,对应多个条件变量,每个条件变量有自己的一个条件队列

                                                                                                           图6-3

6.2.3 基于AQS实现自定义同步器

       本节我们基于AQS实现一个不可重入的独占锁,正如前文所讲的,自定义AQS需要重写一系列函数,还需要定义原子变量state的含义。这里我们定义,state为0表示目前锁没有被线程持有,state为1表示锁已经被某一个线程持有,由于是不可重入锁,所以不需要记录持有锁的线程获取锁的次数。另外,我们自定义的锁支持条件变量。
1.代码实现
如下代码是基于AQS实现的不可重入的独占锁。

  1. class NonReentrantLock implements Lock, java.io.Serializable {
  2.         // 内部帮助类
  3.         private static class Sync extends AbstractQueuedSynchronizer {
  4.             // 是否锁已经被持有
  5.             protected boolean isHeldExclusively() {
  6.                 return getState() == 1;
  7.             }
  8.             //如果state为0 则尝试获取锁
  9.             public boolean tryAcquire(int acquires) {
  10.                 assert acquires == 1//
  11.                 if (compareAndSetState(01)) {
  12.                     setExclusiveOwnerThread(Thread.currentThread());
  13.                     return true;
  14.                 }
  15.                 return false;
  16.             }
  17.             // 尝试释放锁,设置state为0
  18.             protected boolean tryRelease(int releases) {
  19.                 assert releases == 1//
  20.                 if (getState() == 0)
  21.                     throw new IllegalMonitorStateException();
  22.                 setExclusiveOwnerThread(null);
  23.                 setState(0);
  24.                 return true;
  25.             }
  26.             // 提供条件变量接口
  27.             Condition newCondition() {
  28.                 return new ConditionObject();
  29.             }
  30.         }
  31.         //创建一个Sync来做具体的工作
  32.         private final Sync sync = new Sync();
  33.         public void lock() {
  34.             sync.acquire(1);
  35.         }
  36.         public boolean tryLock() {
  37.             return sync.tryAcquire(1);
  38.         }
  39.         public void unlock() {
  40.             sync.release(1);
  41.         }
  42.         public Condition newCondition() {
  43.             return sync.newCondition();
  44.         }
  45.         public boolean isLocked() {
  46.             return sync.isHeldExclusively();
  47.         }
  48.         public void lockInterruptibly() throws InterruptedException {
  49.             sync.acquireInterruptibly(1);
  50.         }
  51.         public boolean tryLock(long timeout, TimeUnit unit) throws
  52.           InterruptedException {
  53.             return sync.tryAcquireNanos(1, unit.toNanos(timeout));
  54.         }
  55.     }

        在如上代码中,NonReentrantLock定义了一个内部类Sync用来实现具体的锁的操作,Sync则继承了AQS。由于我们实现的是独占模式的锁,所以Sync重写了tryAcquire、tryRelease和isHeldExclusively 3个方法。另外,Sync提供了newCondition这个方法用来支持条件变量。

2.使用自定义锁实现生产—消费模型
下面我们使用上节自定义的锁实现一个简单的生产—消费模型,代码如下。

  1.     final static NonReentrantLock lock = new NonReentrantLock();
  2.     final static Condition notFull = lock.newCondition();
  3.     final static Condition notEmpty = lock.newCondition();
  4.     final static Queue<String> queue = new LinkedBlockingQueue<String>();
  5.     final static  int queueSize = 10;
  6.     public static void main(String[] args) {
  7.         Thread producer = new Thread(new  Runnable() {
  8.             public void run() {
  9.                 //获取独占锁
  10.                 lock.lock();
  11.                 try{
  12.                     //(1)如果队列满了,则等待
  13.                     while(queue.size() == queueSize){
  14.                         notEmpty.await();
  15.                     }
  16.                     //(2)添加元素到队列
  17.                     queue.add("ele");
  18.                     //(3)唤醒消费线程
  19.                     notFull.signalAll();
  20.                 }catch(Exception e){
  21.                     e.printStackTrace();
  22.                 }finally {
  23.                     //释放锁
  24.                     lock.unlock();
  25.                 }
  26.             }
  27.         });
  28.         Thread consumer = new Thread(new  Runnable() {
  29.             public void run() {
  30.                 //获取独占锁
  31.                 lock.lock();
  32.                 try{
  33.                     //队列空,则等待
  34.                     while(0 == queue.size() ){
  35.                         notFull.await(); ;
  36.                     }
  37.                     //消费一个元素
  38.                     String ele = queue.poll();
  39.                     //唤醒生产线程
  40.                     notEmpty.signalAll();
  41.                 }catch(Exception e){
  42.                     e.printStackTrace();
  43.                 }finally {
  44.                     //释放锁
  45.                     lock.unlock();
  46.                 }
  47.             }
  48.         });
  49.         //启动线程
  50.         producer.start();
  51.         consumer.start();
  52. }

        如上代码首先创建了NonReentrantLock的一个对象lock,然后调用lock.newCondition创建了两个条件变量,用来进行生产者和消费者线程之间的同步。
        在main函数里面,首先创建了producer生产线程,在线程内部首先调用lock.lock()获取独占锁,然后判断当前队列是否已经满了,如果满了则调用notEmpty.await()阻塞挂起当前线程。需要注意的是,这里使用while而不是if是为了避免虚假唤醒。如果队列不满则直接向队列里面添加元素,然后调用notFull.signalAll()唤醒所有因为消费元素而被阻塞的消费线程,最后释放获取的锁。
        然后在main函数里面创建了consumer生产线程,在线程内部首先调用lock.lock()获取独占锁,然后判断当前队列里面是不是有元素,如果队列为空则调用notFull.await()阻塞挂起当前线程。需要注意的是,这里使用while而不是if是为了避免虚假唤醒。如果队列不为空则直接从队列里面获取并移除元素,然后唤醒因为队列满而被阻塞的生产线程,最后释放获取的锁。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/寸_铁/article/detail/882080
推荐阅读
相关标签
  

闽ICP备14008679号