java并发性能(六)之牛逼的AQS(下)

小孩子4919 我们都是小青蛙 2018-06-11

点击蓝字,关注我们


关注

本人水平有限,如果在文中发现任何问题或疑惑,请留言,相互学习,相互进步哈~ 这篇文章是上一篇 java并发性能(五)之牛逼的AQS(上)的姊妹篇,如果没看过上一篇的话强烈建议回过头看一遍。本篇重点关注对于自定义同步工具中wait/notify机制的自定义实现。如果想真的学点东西,请勿用碎片化时间阅读,找个时间,最好在电脑上安安静静的看一遍(实在不行把手机字体设置到最小),毕竟工资水平和努力程度还是成正比的。下边是正文:


ReentrantLock的内部实现

看完了AQS中的底层同步机制,我们来简单分析一下之前介绍过的ReentrantLock的实现原理。先回顾一下这个显式锁的典型使用方式:

Lock lock = new ReentrantLock();
lock.lock();
try {
   加锁后的代码
} finally {
   lock.unlock();    
}

ReentrantLock首先是一个显式锁,它实现了Lock接口。可能你已经忘记了Lock接口长啥样了,我们再回顾一遍:

public interface Lock {
   void lock();

   void lockInterruptibly() throws InterruptedException;

   boolean tryLock();

   boolean tryLock(long time, TimeUnit unit) throws InterruptedException;

   void unlock();

   Condition newCondition();
}

其实ReentrantLock内部定义了一个AQS的子类来辅助它实现锁的功能,由于ReentrantLock是工作在独占模式下的,所以它的lock方法其实是调用AQS对象的aquire方法去获取同步状态,unlock方法其实是调用AQS对象的release方法去释放同步状态,这些大家已经很熟了,就不再赘述了,我们大致看一下ReentrantLock的代码:

public class ReentrantLock implements Lock {

   private final Sync sync;    //AQS子类对象

   abstract static class Sync extends AbstractQueuedSynchronizer {
       // ... 为节省篇幅,省略其他内容
   }

   // ... 为节省篇幅,省略其他内容
}

所以如果我们简简单单写下下边这行代码:

Lock lock = new ReentrantLock();

就意味着在内存里创建了一个ReentrantLock对象,一个AQS对象,在AQS对象里维护着同步队列head节点和tail节点,不过初始状态下由于没有线程去竞争锁,所以同步队列是空的,画成图就是这样:

Condition的提出

我们前边唠叨线程间通信的时候提到过内置锁的wait/notify机制,等待线程的典型的代码如下:

synchronized (对象) {
   处理逻辑(可选)
   while(条件不满足) {
       对象.wait();
   }
   处理逻辑(可选)
}

通知线程的典型的代码如下:

synchronized (对象) {
   完成条件
   对象.notifyAll();、
}

也就是当一个线程因为某个条件不能满足时就可以在持有锁的情况下调用该锁对象的wait方法,之后该线程会释放锁并进入到与该锁对象关联的等待队列中等待;如果某个线程完成了该等待条件,那么在持有相同锁的情况下调用该锁的notify或者notifyAll方法唤醒在与该锁对象关联的等待队列中等待的线程。

显式锁的本质其实是通过AQS对象获取和释放同步状态,而内置锁的实现是被封装在java虚拟机里的,我们并没有唠叨过,这两者的实现是不一样的。而wait/notify机制只适用于内置锁,在显式锁里需要另外定义一套类似的机制,在我们定义这个机制的时候需要整清楚:在获取锁的线程因为某个条件不满足时,应该进入哪个等待队列,在什么时候释放锁,如果某个线程完成了该等待条件,那么在持有相同锁的情况下怎么从相应的等待队列中将等待的线程从队列中移出

为了定义这个等待队列,设计java的大叔们在AQS中添加了一个名叫ConditionObject的成员内部类:

public abstract class AbstractQueuedSynchronizer {

   public class ConditionObject implements Condition, java.io.Serializable {
       private transient Node firstWaiter;
       private transient Node lastWaiter;

       // ... 为省略篇幅,省略其他方法
   }
}

很显然,这个ConditionObject维护了一个队列,firstWaiter是队列的头节点引用,lastWaiter是队列的尾节点引用。但是节点类是Node?对,你没看错,就是我们前边分析的同步队列里用到的AQS的静态内部类Node,怕你忘了,再把这个Node节点类的主要内容写一遍:

static final class Node {
   volatile int waitStatus;
   volatile Node prev;
   volatile Node next;
   volatile Thread thread;
   Node nextWaiter;
   static final int CANCELLED =  1;
   static final int SIGNAL    = -1;
   static final int CONDITION = -2;
   static final int PROPAGATE = -3;
}

也就是说:AQS中的同步队列和自定义的等待队列使用的节点类是同一个

又由于在等待队列中的线程被唤醒的时候需要重新获取锁,也就是重新获取同步状态,所以该等待队列必须知道线程是在持有哪个锁的时候开始等待的。设计java的大叔们在Lock接口中提供了这么一个通过锁来获取等待队列的方法:

Condition newCondition();

我们上边介绍的ConditionObject就实现了Condition接口,看一下ReentrantLock锁是怎么获取与它相关的等待队列的:

public class ReentrantLock implements Lock {

   private final Sync sync;

   abstract static class Sync extends AbstractQueuedSynchronizer {
       final ConditionObject newCondition() {
           return new ConditionObject();
       }
       // ... 为节省篇幅,省略其他方法
   }

   public Condition newCondition() {
       return sync.newCondition();
   }

   // ... 为节省篇幅,省略其他方法
}

可以看到,其实就是简单创建了一个ConditionObject对象而已~ 由于 ConditionObject 是AQS的成员内部类,所以在创建的 ConditionObject 对象中持有 AQS 对象的引用,所以通过 ConditionObject 对象访问到 同步队列,也就是可以重新获取同步状态,也就是重新获取锁 。用文字描述还是有些绕,我们先通过锁来创建一个Condition对象:

Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();

由于在初始状态下,没有线程去竞争锁,所以同步队列是空的,也没有线程因某个条件不成立而进入等待队列,所以等待队列也是空的,ReentrantLock对象、AQS对象以及等待队列在内存中的表示就如图:

当然,这个newCondition方法可以反复调用,从而可以通过一个锁来生成多个等待队列

Lock lock = new ReentrantLock();
Condition condition1 = lock.newCondition();
Condition condition2 = lock.newCondition();

那接下来需要考虑怎么把线程包装成Node节点放到等待队列的以及怎么从等待队列中移出了。ConditionObject成员内部类实现了一个Condition的接口,这个接口提供了下边这些方法:

public interface Condition {
   void await() throws InterruptedException;

   long awaitNanos(long nanosTimeout) throws InterruptedException;

   boolean await(long time, TimeUnit unit) throws InterruptedException;

   boolean awaitUntil(Date deadline) throws InterruptedException;

   void awaitUninterruptibly();

   void signal();

   void signalAll();
}

来看一下这些方法的具体意思:

可以看到,Condition中的await方法和内置锁对象的wait方法的作用是一样的,都会使当前线程进入等待状态,signal方法和内置锁对象的notify方法的作用是一样的,都会唤醒在等待队列中的线程。

像调用内置锁的wait/notify方法时,线程需要首先获取该锁一样,调用Condition对象的await/siganl方法的线程需要首先获得产生该Condition对象的显式锁。它的基本使用方式就是:通过显式锁的 newCondition 方法产生Condition对象,线程在持有该显式锁的情况下可以调用生成的Condition对象的 await/signal 方法,一般用法如下:

Lock lock = new ReentrantLock();

Condition condition = lock.newCondition();

//等待线程的典型模式
public void conditionAWait() throws InterruptedException {
   lock.lock();    //获取锁
   try {
       while (条件不满足) {
           condition.await();  //使线程处于等待状态
       }
       条件满足后执行的代码;
   } finally {
       lock.unlock();    //释放锁
   }
}

//通知线程的典型模式
public void conditionSignal() throws InterruptedException {
   lock.lock();    //获取锁
   try {
       完成条件;
       condition.signalAll();  //唤醒处于等待状态的线程
   } finally {
       lock.unlock();    //释放锁
   }
}

假设现在有一个锁和两个等待队列:

Lock lock = new ReentrantLock();
Condition condition1 = lock.newCondition();
Condition condition2 = lock.newCondition();

画图表示出来就是:

有3个线程maint1t2同时调用ReentrantLock对象的lock方法去竞争锁的话,只有线程main获取到了锁,所以会把线程t1t2包装成Node节点插入同步队列,所以ReentrantLock对象、AQS对象和同步队列的示意图就是这样的:

因为此时main线程是获取到锁处于运行中状态,但是因为某个条件不满足,所以它选择执行下边的代码来进入condition1等待队列:

lock.lock();
try {
   contition1.await();
} finally {
   lock.unlock();
}

具体的await代码我们就不分析了,太长了,我怕你看的发困,这里只看这个await方法做了什么事情:

  1. condition1等待队列中创建一个Node节点,这个节点的thread值就是main线程,而且waitStatus-2,也就是静态变量Node.CONDITION,表示表示节点在等待队列中,由于这个节点是代表线程main的,所以就把它叫做main节点把,新创建的节点长这样:

  2. 将该节点插入condition1等待队列中:

  3. 因为main线程还持有者锁,所以需要释放锁之后通知后边等待获取锁的线程t,所以同步队列里的0号节点被删除,线程t获取锁,节点1称为head节点,并且把thread字段设置为null:

至此,main线程的等待操作就做完了,假如现在获得锁的t1线程也执行下边的代码:

lock.lock();
try {
   contition1.await();
} finally {
   lock.unlock();
}

还是会执行上边的过程,把t1线程包装成Node节点插入到condition1等待队列中去,由于原来在等待队列中的节点1会被删除,我们把这个新插入等待队列代表线程t1的节点称为新节点1吧:

这里需要特别注意的是:同步队列是一个双向链表,prev表示前一个节点,next表示后一个节点,而等待队列是一个单向链表,使用nextWaiter表示下一个节点,这是它们不同的地方

现在获取到锁的线程是t2,大家一起出来混的,前两个都进去,只剩下t2多不好呀,不过这次不放在condition1队列后头了,换成condition2队列吧:

lock.lock();
try {
   contition2.await();
} finally {
   lock.unlock();
}

效果就是:

大家发现,虽然现在没有线程获取锁,也没有线程在锁上等待,但是同步队列里仍旧有一个节点,是的,同步队列只有初始时无任何线程因为锁而阻塞的时候才为空,只要曾经有线程因为获取不到锁而阻塞,这个队列就不为空了

至此,maint1t2这三个线程都进入到等待状态了,都进去了谁把它们弄出来呢???额~ 好吧,再弄一个别的线程去获取同一个锁,比方说线程t3去把condition2条件队列的线程去唤醒,可以调用这个signal方法:

lock.lock();
try {
   contition2.signal();
} finally {
   lock.unlock();
}

因为在condition2等待队列的线程只有t2,所以t2会被唤醒,这个过程分两步进行:

  1. 将在condition2等待队列的代表线程t2新节点2,从等待队列中移出。

  2. 将移出的节点2放在同步队列中等待获取锁,同时更改该节点的waitStauts0

这个过程的图示如下:

如果线程t3继续调用signalAllcondition1等待队列中的线程给唤醒也是差不多的意思,只不过会把condition1上的两个节点同时都移动到同步队列里:

lock.lock();
try {
   contition1.signalAll();
} finally {
   lock.unlock();
}

效果如图:

这样全部线程都从等待状态中恢复了过来,可以重新竞争锁进行下一步操作了。

以上就是Condition机制的原理和用法,它其实是内置锁的wait/notify机制在显式锁中的另一种实现,不过原来的一个内置锁对象只能对应一个等待队列,现在一个显式锁可以产生若干个等待队列,我们可以根据线程的不同等待条件来把线程放到不同的等待队列上去Condition机制的用途可以参考wait/notify机制,我们接下来把之前用内置锁和wait/notify机制编写的同步队列BlockedQueue显式锁 + Condition的方式来该写一下:

import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ConditionBlockedQueue<E> {

   private Lock lock = new ReentrantLock();

   private Condition notEmptyCondition = lock.newCondition();

   private Condition notFullCondition = lock.newCondition();

   private Queue queue = new LinkedList<>();

   private int limit;

   public ConditionBlockedQueue(int limit) {
       this.limit = limit;
   }

   public int size() {
       lock.lock();
       try {
           return queue.size();
       } finally {
           lock.unlock();
       }
   }

   public boolean add(E e) throws InterruptedException {
       lock.lock();
       try {
           while (size() >= limit) {
               notFullCondition.await();
           }

           boolean result = queue.add(e);
           notEmptyCondition.signal();
           return result;
       } finally {
           lock.unlock();
       }
   }

   public E remove() throws InterruptedException{
       lock.lock();
       try {
           while (size() == 0) {
               notEmptyCondition.await();
           }
           E e = queue.remove();
           notFullCondition.signalAll();
           return e;
       } finally {
           lock.unlock();
       }
   }
}

在这个队列里边我们用了一个ReentrantLock锁,通过这个锁生成了两个Condition对象,notFullCondition表示队列未满的条件,notEmptyCondition表示队列未空的条件。当队列已满的时候,线程会在notFullCondition上等待,每插入一个元素,会通知在notEmptyCondition条件上等待的线程;当队列已空的时候,线程会在notEmptyCondition上等待,每移除一个元素,会通知在notFullCondition条件上等待的线程。这样语义就变得很明显了。如果你有更多的等待条件,你可以通过显式锁生成更多的Condition对象。而每个内置锁对象都只能有一个相关联的等待队列,这也是显式锁对内置锁的优势之一

我们总结一下上边的用法:每个显式锁对象又可以产生若干个Condition对象,每个Condition对象都会对应一个等待队列,所以就起到了一个显式锁对应多个等待队列的效果

AQS中其他针对等待队列的重要方法

除了Condition对象的awaitsignal方法,AQS还提供了许多直接访问这个队列的方法,它们由都是public final修饰的:

public abstract class AbstractQueuedSynchronizer {
   public final boolean owns(ConditionObject condition) { ... }
    public final boolean hasWaiters(ConditionObject condition) { ... }
    public final int getWaitQueueLength(ConditionObject condition) { ... }
    public final Collection getWaitingThreads(ConditionObject condition) {}
}

具体各个方法的解释如下:

如果有需要的话,可以在我们自定义的同步工具中使用它们。

题外话

兄弟姐妹们,这篇文章和上篇文章 java并发性能(五)之牛逼的AQS(上)都设置了打赏,如果觉得有帮助赏一波,毕竟接收方是我女盆友,直接影响到她对我创作的认可度。画图挺累的,点个赞吧啦啦啦~

    觉得不错,分享给更多人看到
    我们都是小青蛙 热门文章:

    java并发编程之原子性操作    阅读/点赞 : 0/0

    我们都是小青蛙,呱呱呱呱呱    阅读/点赞 : 0/0

    活跃性(死锁、饥饿、活锁)    阅读/点赞 : 0/0

    指令重排序    阅读/点赞 : 0/0

    InnoDB的Buffer Pool简介    阅读/点赞 : 0/0

    一些比较重要的数字电路模块    阅读/点赞 : 0/0

    抽象的艺术    阅读/点赞 : 0/0

    内存可见性    阅读/点赞 : 0/0

    存储程序(二)之存储函数简介    阅读/点赞 : 0/0

    线程间通信(下)    阅读/点赞 : 0/0

    我们都是小青蛙 微信二维码

    我们都是小青蛙 微信二维码