各种基于AQS的同步工具

大孩子 我们都是小青蛙 2018-06-14

点击蓝字,关注我们


关注


本人水平有限,如果在文中发现任何问题或疑惑,请留言,相互学习,相互进步哈~ 


阅读本篇之前确保你已经读过下边两篇文章:


因为多线程并发编程的坑比较多,而且很容易就出现了,所以设计java的大叔们贴切的为使用java的人提供了好多线程的工具,这些工具都是借助AQS开发的,也就是获取和释放同步状态那一套东西,如果你有兴趣的话可以打开它们的源代码查看一下它们是怎么实现的,我们接下来主要唠叨一下如何使用这些工具。它们能明显加快我们编写正确的并发程序的速度。下边介绍4个常用的并发工具类。

CountDownLatch

前边唠叨过Threadjoin方法,这个方法表示一个线程将等待另一个线程执行完才能继续执行。现实中我们常常将一个大任务拆成好多小任务,让每个线程都去执行一个小任务,待到所有小任务都执行完成之后,汇总个个小任务的执行结果。所以汇总线程就需要等待所有执行小任务的线程完成之后才能继续执行,假设main线程是汇总线程,代码看起来就像是这样:

public class CountDownLatchDemo {

   public static void main(String[] args) {
       Thread[] threads = new Thread[5];
       for (int i = 0; i < threads.length; i++) {

           int num = i;
           Thread t = new Thread(new Runnable() {
               @Override
               public void run() {
                   try {
                       Thread.sleep(1000L);    //模拟耗时操作
                   } catch (InterruptedException e) {
                       throw new RuntimeException(e);
                   }

                   System.out.println("第" + num + "个小任务执行完成");
               }
           });
           threads[i] = t;
           t.start();
       }

       for (int i = 0; i < threads.length; i++) {  //等待所有线程执行完才可以执行main线程
           try {
               threads[i].join();
           } catch (InterruptedException e) {
               throw new RuntimeException(e);
           }
       }

       System.out.println("等待所有线程执行完成之后才执行");
   }
}

由于我们在threads数组中创建的5个线程是并发执行的,所以不能保证它们的执行顺序,但是main线程必须等待这5个线程执行完成后,才能继续它的执行。在我的机器上的一个输出结果是:

第4个小任务执行完成
第2个小任务执行完成
第1个小任务执行完成
第3个小任务执行完成
第0个小任务执行完成
等待所有线程执行完成之后才执行

我们需要对每一个线程调用join方法。现在设计java的大叔们提供了CountDownLatch类,它内部维护了一个类似计数器的东东,看它的构造方法:

看一下它的各个重要的方法:

其中带参数的await方法中的TimeUnit一个枚举类型,指填入的timeout是以什么为单位的,TimeUnit一共有下边这么几种枚举对象:

我们接下来改写一下CountDownLatchDemo类,看一下它怎么使用:

import java.util.concurrent.CountDownLatch;

public class CountDownLatchDemo {

   public static void main(String[] args) {
       Thread[] threads = new Thread[5];
       CountDownLatch countDownLatch = new CountDownLatch(threads.length); //创建CountDownLatch对象

       for (int i = 0; i < threads.length; i++) {

           int num = i;
           Thread t = new Thread(new Runnable() {
               @Override
               public void run() {
                   try {
                       Thread.sleep(1000L);    //模拟耗时操作
                   } catch (InterruptedException e) {
                       throw new RuntimeException(e);
                   }

                   System.out.println("第" + num + "个小任务执行完成");
                   countDownLatch.countDown(); //每个线程在执行完任务后,都调用这个方法
               }
           });
           threads[i] = t;
           t.start();
       }

       try {
           countDownLatch.await(); //在threads中线程都执行完成之前,此方法将阻塞
       } catch (InterruptedException e) {
           throw new RuntimeException(e);
       }

       System.out.println("等待所有线程执行完成之后才执行");
   }
}

我们看到,我们在创建CountDownLatch对象的时候传入了Thread数组的大小作为参数,也就是计数大小为5,然后每次执行完一个任务的时候都调用一下这个CountDownLatch对象的countDown方法,每调用一下countDown方法计数就会减1,最后在main线程中调用这个CountDownLatch对象的await方法,在CountDownLatch对象的计数减为0之前,这个方法会一只等待,直到那5个线程都调用了countDown方法,计数减为0,await方法返回,main线程继续执行。

CountDownLatch与join方法对比

需要注意的是,CountDownLatch代表的是一个计数器,不论是否在同一线程中,不论线程是否执行完成,都可以随时随地调用CountDownLatchcountDown方法,而Thread的成员方法join只能在一个线程中对另一个线程对象调用,而且方法返回的前提是线程已经执行完成。

所以使用CountDownLatch会比join方法更灵活,比如下边这种场景就必须使用CountDownLatch来协调各个任务的执行顺序:

import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.CountDownLatch;

public class CountDownLatchDemo {

   public static void main(String[] args) {
       Queue runnableQueue = new LinkedList<>();
       CountDownLatch countDownLatch = new CountDownLatch(5); //创建CountDownLatch对象

       for (int i = 0; i < 5; i++) {

           int num = i;
           Runnable runnable = new Runnable() {
               @Override
               public void run() {
                   try {
                       Thread.sleep(1000L);    //模拟耗时操作
                   } catch (InterruptedException e) {
                       throw new RuntimeException(e);
                   }

                   System.out.println("第" + num + "个小任务执行完成");
                   countDownLatch.countDown(); //每个任务执行完成之后,都调用这个方法
               }
           };

           runnableQueue.add(runnable);
       }


       for (int i = 0; i < 2; i++) {   //创建两个线程来执行上边的5个任务
           new Thread(new Runnable() {
               @Override
               public void run() {
                   while (true) {
                       Runnable runnable = null;
                       synchronized (CountDownLatchDemo.class) {   //runnableQueue的相关操作需要进行同步处理
                           if (runnableQueue.size() < 1) {
                               break;
                           }

                           runnable = runnableQueue.remove();
                       }
                       runnable.run(); //执行该任务
                   }
               }
           }).start();
       }

       try {
           countDownLatch.await(); //在runnableQueue中的所有任务都执行完成之前,此方法将阻塞
       } catch (InterruptedException e) {
           throw new RuntimeException(e);
       }

       System.out.println("等待所有任务执行完成之后才执行");
   }
}

我们创建了5个Runnable任务,然后又创建了两个线程去执行这5个任务,当5个任务都执行完成之后main线程才能继续执行,也就是说我们统计的是任务有没有被完成,而不是线程有没有执行完,所以这里只能使用CountDownLatch,而不是join方法来实现这个功能。

小贴士:
由于队列`queue`的方法`size``remove`方法需要在多线程中调用,所以需要一个锁来保护这些操作,我们这里用了`CountDownLatchDemo.class`来作为锁~ 大家以后编码过程中必须时时注意有没有共享可变变量被多个线程访问,如果有的化一定要采取措施保护起来。

注意的点

  • CountDownLatch对象不能被重复利用,也就是不能修改计数器的值。

  • CountDownLatch代表的计数器的大小可以为0,意味着在一个线程调用await方法时会立即返回。

  • 如果某些线程中有阻塞操作的话,最好使用带有超时时间的await方法,以免该线程调用await方法之后永远得不到执行。

CyclicBarrier

有个家伙不识好歹抢了狗哥媳妇儿,狗哥叫了加上他一共5个兄弟去打架,相约放学后在学校门口集合,必须5个人全部到齐后才能出发。我们把每一个兄弟都当作一个线程的话,当第一个兄弟到达校门口的时候会等待其他兄弟而不继续往下执行,第二个兄弟也是这样,第三、第四个兄弟也是这样,直到第5个兄弟到达校门口,这5个线程才能同时继续执行,奔向远方开启打架模式。设计java的大叔设计了CyclicBarrier类来解决这种多个线程在某个地方相互等待,直到有规定数量的线程都执行到这个地方才能同时继续往下执行的问题。

从字面上理解,CyclicBarrier的意思就是循环利用的栅栏CyclicBarrier对象内部也维护了一个类似计数器的东东,我们可以通过它的构造方法把计数器的值给传进去。每个线程在调用CyclicBarrier对象的await方法的时候,就相当于到达了多个线程共享的一个栅栏,该线程会在这个栅栏前等待,直到调用await方法的线程数量和计数器的值一样,该栅栏将被移除,因为await方法而等待的线程都恢复执行。

我们写代码描述一下狗哥打架这个场景:

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;


class Fighter extends Thread{

   private CyclicBarrier cyclicBarrier;

   public Fighter(CyclicBarrier cyclicBarrier, String name) {
       super(name);
       this.cyclicBarrier = cyclicBarrier;
   }

   @Override
   public void run() {
       try {
           Thread.sleep(1000L);    //模拟上学中过程
           System.out.println(getName() + "放学了,向学校门跑去");

           cyclicBarrier.await();  //到达校门后等待,直到5个线程都执行到了这里

           System.out.println("人聚齐了,一起打架去喽~");
       } catch (InterruptedException e) {
           throw new RuntimeException(e);
       } catch (BrokenBarrierException e) {
           throw new RuntimeException(e);
       }
   }
}

public class CyclicBarrierDemo {

   public static void main(String[] args) {

       CyclicBarrier cyclicBarrier = new CyclicBarrier(5);

       new Fighter(cyclicBarrier, "狗哥").start();
       new Fighter(cyclicBarrier, "猫爷").start();
       new Fighter(cyclicBarrier, "王尼妹").start();
       new Fighter(cyclicBarrier, "狗剩").start();
       new Fighter(cyclicBarrier, "张大嘴巴").start();
   }
}

我们在main线程中先创建了一个计数器初始值为5的CyclicBarrier对象,然后又创建了5个Fighter对象,它们共享同一个CyclicBarrier对象。每个Fighter类代表一个线程,它维护了一个CyclicBarrier类型的字段。前4个Fighter线程执行到cyclicBarrier.await()方法时都会等待其他线程也执行到该方法,直到第5个Fighter线程执行cyclicBarrier.await()方法时,其他的4个等待的线程也都会恢复执行。执行一下上边的代码,结果是:

狗哥放学了,向学校门跑去
猫爷放学了,向学校门跑去
王尼妹放学了,向学校门跑去
狗剩放学了,向学校门跑去
张大嘴巴放学了,向学校门跑去
人聚齐了,一起打架去喽~
人聚齐了,一起打架去喽~
人聚齐了,一起打架去喽~
人聚齐了,一起打架去喽~
人聚齐了,一起打架去喽~

大家可以试一下把上边CyclicBarrier对象的初始值改为6,也就是说需要6个线程同时到达栅栏处所有线程才能恢复执行,所以这样的话执行结果中就不会输出"人聚齐了,一起打架去喽~"这句话了,而是所有线程都处于等待状态。

然后仔细分析一下CyclicBarrier,先看一下CyclicBarrier的构造方法:

为了让大家理解下第二个构造方法中的barrierAction会优先被执行的含义,我们举个例子:

public class CyclicBarrierDemo {

   public static void main(String[] args) {

       CyclicBarrier cyclicBarrier = new CyclicBarrier(2, new Runnable() {
           @Override
           public void run() {
               System.out.println("这句话会被优先执行");
           }
       });

       new Thread(new Runnable() {
           @Override
           public void run() {
               try {
                   cyclicBarrier.await();
               } catch (InterruptedException e) {
                   throw new RuntimeException(e);
               } catch (BrokenBarrierException e) {
                   throw new RuntimeException(e);
               }
               System.out.println("在线程t中输出一句话");
           }
       }, "t").start();

       try {
           cyclicBarrier.await();
       } catch (InterruptedException e) {
           throw new RuntimeException(e);
       } catch (BrokenBarrierException e) {
           throw new RuntimeException(e);
       }
       System.out.println("在线程main中输出一句话");
   }
}

我们定义了一个计数器值为2的CyclicBarrier对象,并且在它的构造方法里传入了一个Runnable对象。等到线程t和线程main都调用了await方法,也就是说都到达这个栅栏的时候,从CyclicBarrier构造方法传入的的Runnable对象将会在两个线程继续执行之前而被优先执行,但是不能保证执行完该Runnable对象后的线程t和线程main的执行顺序,所以一种可能的执行结果就是:

这句话会被优先执行
在线程t中输出一句话
在线程main中输出一句话

再看一下CyclicBarrier中几个重要的成员方法:

CyclicBarrier和CountDownLatch的区别

初看起来CyclicBarrierCountDownLatch内部都有一个计数器,它们的await操作都会造成线程等待,但是CountDownLatch需要在线程执行过程中使用 countDown 来使计数器递减直到0后所有调用await的线程才会恢复执行,CyclicBarrier await 操作本身就算是一个计数操作,当有足够多的线程调用await方法时,所有在该方法处等待的线程都恢复执行。所以它们一个重要区别就是: CyclicBarrier 用于等待其他线程,而 CountDownLatch 用于等待 countDown 事件的发生

另外一个重要的不同点就是 CountDownLatch 只能使用一次,而 CyclicBarrier 可以多次循环利用,循环利用是指可以通过调用reset方法来移除之前的栅栏,如果有线程在之前的栅栏处等待,则抛出一个BrokenBarrierException异常,我们看看怎么用:

public class CyclicBarrierDemo {

   public static void main(String[] args) {

       CyclicBarrier cyclicBarrier = new CyclicBarrier(2);

       Thread t = new Thread(new Runnable() {
           @Override
           public void run() {
               try {
                   cyclicBarrier.await();
               } catch (InterruptedException e) {
                   throw new RuntimeException(e);
               } catch (BrokenBarrierException e) {
                   System.out.println("原来的栅栏遭到了破坏,抛出了BrokenBarrierException异常");
                   return;
               }
               System.out.println("在线程t中输出一句话");
           }
       }, "t");
       t.start();


       try {
           Thread.sleep(1000L);    //确保线程t已经运行了await方法,实际操作中不鼓励使用sleep方法来控制执行顺序
       } catch (InterruptedException e) {
           throw new RuntimeException(e);
       }

       cyclicBarrier.reset();  //重置cyclicBarrier,弃用原来的栅栏

       new Thread(new Runnable() {
           @Override
           public void run() {
               try {
                   cyclicBarrier.await();  //线程t2调用重置后的cyclicBarrier的await方法
               } catch (Exception e) {
                   throw new RuntimeException(e);
               }
               System.out.println("在线程t2中输出一句话");
           }
       }, "t2").start();

       try {
           cyclicBarrier.await();  //线程main调用重置后的cyclicBarrier的await方法
       } catch (Exception e) {
           throw new RuntimeException(e);
       }
       System.out.println("在线程main中输出一句话");
   }
}

在线程t执行cyclicBarrier.await()方法后,在main线程中调用了cyclicBarrier.reset(),也就是说原先的栅栏无效,在原先栅栏处等待的线程会抛出BrokenBarrierException,然后在线程t2和线程main中可以继续使用重置过的cyclicBarrier。这个程序的执行结果是:

原来的栅栏遭到了破坏,抛出了BrokenBarrierException异常
在线程t2中输出一句话
在线程main中输出一句话

Semaphore

地铁或者火车站都有安检的地方,假设安检的屋子里只能容纳下10个人,当屋子里满了10个人的时候,安检的小姑娘会拿一个牌子把后边排队的人挡住,直到有人安检完从屋子里出来,后边排队的才可以继续跟进,但是无论如何,屋子里的人不能超过10个。假设每个人都是一个线程的话,安检小屋子就起到了一个限制并发执行线程数量的作用,也就是说,虽然有许多线程,但是只有在屋子里的线程才能并发执行,其余的线程只能等待屋子里的线程执行完成后才能进入屋子执行。

在一个线程执行完从屋子里退出的时候,有两种选择下一个线程进入安检屋子的方式,第一种是公平的,也就是谁先排的队,谁先进;第二种是随机的,就像挤公交车那样,谁运气好挤进去了算谁的,这种方式叫非公平的

设计java的大叔们提出了Semaphore,它就像拿牌子的安检小姑娘一样,手里握着可以进入安检屋子的名额,每个进入的线程都需要获取一张许可证,每个退出安检屋子的线程都需要把许可证归还给安检的小姑娘。通过这个许可证的数量,可以限制并发执行线程数量。我们先看一下它的构造方法:

然后再看几个重要的方法:

写个代码看看这个Semaphore怎么用:

public class SemaphoreDemo {

   public static void main(String[] args) {
       Semaphore semaphore = new Semaphore(5);
       for (int i = 0; i < 20; i++) {
           int num = i;
           new Thread(new Runnable() {
               @Override
               public void run() {
                   try {
                       semaphore.acquire();
                       System.out.println("第" + num + "个线程执行任务");
                       Thread.sleep(5000L);    //休眠5秒钟
                       semaphore.release();
                   } catch (InterruptedException e) {
                       throw new RuntimeException(e);
                   }

               }
           }).start();
       }
   }
}

我们创建的Semaphore对象手里只有5张许可证,但是有20个线程需要获取许可证,所以在同一时刻只有5个线程能同时执行指定的代码,执行一下上边的代码会发现每隔5秒都会在黑框框里输出5行字儿~

使用Semaphore的典型场景

有的时候有很多线程会竞争某些可重复利用的资源,但是这些资源是有限的,比如我们上边举的安检屋子只能容下10个人的例子,在多线程获取有限可重复利用的资源的情景下可以考虑
使用一下Semaphore

Exchanger

在一个婚礼上,男方女方通常要交换誓言,必须两个人都和对方说了自己的誓言婚礼才能继续进行下去。如果把男女双方都作为一个线程的话,当男方对女方说了自己的誓言之后,男方线程就会处于等待状态,直到女方也把誓言说了出来,男方女方线程才能继续往下执行。

设计java的大叔们提出了Exchanger来描述这个场景,这是一个带有类型参数的类,我们看一下它的定义:

public class Exchanger<V> {
   //... 省略具体内容
}    

其中的类型参数V就代表交换数据的类型。

Exchanger的构造方法贼简单:

方法也少:

看一个例子哈:

public class ExchangerDemo {

   public static void main(String[] args) {
       Exchanger exchanger = new Exchanger<>();
       new Thread(new Runnable() {

           @Override
           public void run() {
               String manWords = "我爱你,si八婆";
               try {
                   String womanWords = exchanger.exchange(manWords);   //男方的誓言
                   System.out.println("在男方线程中获取到的女方誓言:" + womanWords);
               } catch (InterruptedException e) {
                   throw new RuntimeException(e);
               }
           }
       }, "男方").start();

       new Thread(new Runnable() {

           @Override
           public void run() {
               try {
                   Thread.sleep(5000L);    //女生先墨迹5秒中
                   String womanWords = "去吃屎吧";
                   String manWords = exchanger.exchange(womanWords);   //女方的誓言
                   System.out.println("在女方线程中获取到的男方誓言:" + manWords);
               } catch (InterruptedException e) {
                   throw new RuntimeException(e);
               }
           }
       }, "女方").start();

   }
}

创建了一个男方线程,一个女方线程,男方线程调用exchange方法后会一直阻塞直到女方线程回应,女方比较墨迹,墨迹了5秒中后才交换数据,所以执行程序后发现5秒后才有输出:

在女方线程中获取到的男方誓言:我爱你,si八婆
在男方线程中获取到的女方誓言:去吃屎吧


题外话

写文章挺累的,有时候你觉得阅读挺流畅的,那其实是背后无数次修改的结果。如果你觉得不错请帮忙转发一下,万分感谢~



    本站仅按申请收录文章,版权归原作者所有
    如若侵权,请联系本站删除
    觉得不错,分享给更多人看到
    我们都是小青蛙 热门文章:

    java并发编程之原子性操作    阅读/点赞 : 0/0

    我们都是小青蛙,呱呱呱呱呱    阅读/点赞 : 0/0

    活跃性(死锁、饥饿、活锁)    阅读/点赞 : 0/0

    InnoDB空间文件布局的基础知识    阅读/点赞 : 0/0

    指令重排序    阅读/点赞 : 0/0

    InnoDB的Buffer Pool简介    阅读/点赞 : 0/0

    一些比较重要的数字电路模块    阅读/点赞 : 0/0

    《UNIX环境高级编程》书籍推荐    阅读/点赞 : 0/0

    InnoDB中的B+树索引结构    阅读/点赞 : 0/0

    InnoDB索引页面的物理结构    阅读/点赞 : 0/0

    我们都是小青蛙 微信二维码

    我们都是小青蛙 微信二维码