1、并发容器及安全共享策略总结,并发容器J.U.C(即java.util.concurrent)。J.U.C同步器AQS。
1.1、相比于ArrayList,这个线程不安全的。CopyOnWriteArrayList,写操作的时候复制,当有新元素添加到CopyOnWriteArrayList的时候,先从原有数组里面拷贝一份出来,然后在新的数组做写操作,最后将原有的数组指向新的数组,是线程安全的,整个add操作都是在锁的机制下操作的,避免在多线程并发做add操作的时候复制出多个出来把数据搞乱了,缺点一是由于写操作需要拷贝数组,消耗内存,缺点二,不适合实施读的场景。CopyOnWriteArrayList适合读多写少的场景。CopyOnWriteArrayList特点是读写分离、最终一致性、使用时另外开辟空间,解决并发冲突。CopyOnWriteArrayList的读操作都是在原数组读取的,不需要加锁的,写操作是需要加锁的,避免在多线程并发做add操作的时候复制出多个出来把数据搞乱了。
1 package com.bie.concurrency.example.concurrent; 2 3 import java.util.Collections; 4 import java.util.List; 5 import java.util.concurrent.CopyOnWriteArrayList; 6 import java.util.concurrent.CountDownLatch; 7 import java.util.concurrent.ExecutorService; 8 import java.util.concurrent.Executors; 9 import java.util.concurrent.Semaphore; 10 11 import com.bie.concurrency.annoations.ThreadSafe; 12 import com.google.common.collect.Lists; 13 14 import lombok.extern.slf4j.Slf4j; 15 16 /** 17 * 18 * 19 * @Title: CollectionsExample1.java 20 * @Package com.bie.concurrency.example.syncContainer 21 * @Description: TODO 22 * @author biehl 23 * @date 2020年1月13日 24 * @version V1.0 25 * 26 */ 27 @Slf4j 28 @ThreadSafe // 由于每次结果一致,所以是线程安全的类。可以使用此程序进行并发测试。 29 public class CopyOnWriteArrayListExample1 { 30 31 public static int clientTotal = 5000;// 5000个请求,请求总数 32 33 public static int threadTotal = 200;// 允许同时并发执行的线程数目 34 35 // 相比于ArrayList,CopyOnWriteArrayList 36 private static List<Integer> list = new CopyOnWriteArrayList<>(); 37 38 private static void update(int count) { 39 list.add(count); 40 } 41 42 public static void main(String[] args) { 43 // 定义线程池 44 ExecutorService executorService = Executors.newCachedThreadPool(); 45 // 定义信号量,信号量里面需要定义允许并发的数量 46 final Semaphore semaphore = new Semaphore(threadTotal); 47 // 定义计数器闭锁,希望所有请求完以后统计计数结果,将计数结果放入 48 final CountDownLatch countDownLatch = new CountDownLatch(clientTotal); 49 // 放入请求操作 50 for (int i = 0; i < clientTotal; i++) { 51 final int count = i; 52 // 所有请求放入到线程池结果中 53 executorService.execute(() -> { 54 // 在线程池执行的时候引入了信号量,信号量每次做acquire()操作的时候就是判断当前进程是否允许被执行。 55 // 如果达到了一定并发数的时候,add方法可能会临时被阻塞掉。当acquire()可以返回值的时候,add方法可以被执行。 56 // add方法执行完毕以后,释放当前进程,此时信号量就已经引入完毕了。 57 // 在引入信号量的基础上引入闭锁机制。countDownLatch 58 try { 59 // 执行核心执行方法之前引入信号量,信号量每次允许执行之前需要调用方法acquire()。 60 semaphore.acquire(); 61 // 核心执行方法。 62 update(count); 63 // 核心执行方法执行完成以后,需要释放当前进程,释放信号量。 64 semaphore.release(); 65 } catch (InterruptedException e) { 66 e.printStackTrace(); 67 } 68 // try-catch是一次执行系统的操作,执行完毕以后调用一下闭锁。 69 // 每次执行完毕以后countDownLatch里面对应的计算值减一。 70 // 执行countDown()方法计数器减一。 71 countDownLatch.countDown(); 72 }); 73 } 74 // 这个方法可以保证之前的countDownLatch必须减为0,减为0的前提就是所有的进程必须执行完毕。 75 try { 76 // 调用await()方法当前进程进入等待状态。 77 countDownLatch.await(); 78 } catch (InterruptedException e) { 79 e.printStackTrace(); 80 } 81 // 通常,线程池执行完毕以后,线程池不再使用,记得关闭线程池 82 executorService.shutdown(); 83 // 如果我们希望在所有线程执行完毕以后打印当前计数的值。只需要log.info之前执行上一步即可countDownLatch.await();。 84 log.info("size:{}", list.size()); 85 86 } 87 }
1.2、相比于HashSet。CopyOnWriteArraySet,是线程安全的,底层实现是使用了CopyOnWriteArrayList,也适用于大小通常是很小的一个Set集合,只读操作远大于可变的操作,因为通常需要复制整个基础数组,所以对于可变的操作,包括add、set、remove等等,开销相当于大一些,迭代器不支持可变的remove操作,使用迭代器进行遍历的时候,速度很快,而且不会与其他线程发生冲突。
相比于TreeSet。ConcurrentSkipListSet,是线程安全的,是jdk6新增的类,支持自然排序的,并且可以在构造的时候可以定义比较器,和其他Set集合一样,ConcurrentSkipListSet是基于Map集合的,在多线程环境下,ConcurrentSkipListSet的add、remove、contains是线程安全的,多个线程可以并发的执行插入、移除操作,对于批量操作,比如addAll、removeAll不能保证原子操作执行,做批量操作最好手动使用锁进行同步操作哦,ConcurrentSkipListSet不支持使用空元素null。
1 package com.bie.concurrency.example.concurrent; 2 3 import java.util.Set; 4 import java.util.concurrent.CopyOnWriteArraySet; 5 import java.util.concurrent.CountDownLatch; 6 import java.util.concurrent.ExecutorService; 7 import java.util.concurrent.Executors; 8 import java.util.concurrent.Semaphore; 9 10 import com.bie.concurrency.annoations.ThreadSafe; 11 12 import lombok.extern.slf4j.Slf4j; 13 14 /** 15 * 16 * 17 * @Title: CollectionsExample1.java 18 * @Package com.bie.concurrency.example.syncContainer 19 * @Description: TODO 20 * @author biehl 21 * @date 2020年1月13日 22 * @version V1.0 23 * 24 */ 25 @Slf4j 26 @ThreadSafe // 由于每次结果一致,所以是线程安全的类。可以使用此程序进行并发测试。 27 public class CopyOnWriteArraySetExample2 { 28 29 public static int clientTotal = 5000;// 5000个请求,请求总数 30 31 public static int threadTotal = 200;// 允许同时并发执行的线程数目 32 33 // 相比于HashSet,CopyOnWriteArraySet 34 private static Set<Integer> set = new CopyOnWriteArraySet<Integer>(); 35 36 private static void update(int count) { 37 set.add(count); 38 } 39 40 public static void main(String[] args) { 41 // 定义线程池 42 ExecutorService executorService = Executors.newCachedThreadPool(); 43 // 定义信号量,信号量里面需要定义允许并发的数量 44 final Semaphore semaphore = new Semaphore(threadTotal); 45 // 定义计数器闭锁,希望所有请求完以后统计计数结果,将计数结果放入 46 final CountDownLatch countDownLatch = new CountDownLatch(clientTotal); 47 // 放入请求操作 48 for (int i = 0; i < clientTotal; i++) { 49 final int count = i; 50 // 所有请求放入到线程池结果中 51 executorService.execute(() -> { 52 // 在线程池执行的时候引入了信号量,信号量每次做acquire()操作的时候就是判断当前进程是否允许被执行。 53 // 如果达到了一定并发数的时候,add方法可能会临时被阻塞掉。当acquire()可以返回值的时候,add方法可以被执行。 54 // add方法执行完毕以后,释放当前进程,此时信号量就已经引入完毕了。 55 // 在引入信号量的基础上引入闭锁机制。countDownLatch 56 try { 57 // 执行核心执行方法之前引入信号量,信号量每次允许执行之前需要调用方法acquire()。 58 semaphore.acquire(); 59 // 核心执行方法。 60 update(count); 61 // 核心执行方法执行完成以后,需要释放当前进程,释放信号量。 62 semaphore.release(); 63 } catch (InterruptedException e) { 64 e.printStackTrace(); 65 } 66 // try-catch是一次执行系统的操作,执行完毕以后调用一下闭锁。 67 // 每次执行完毕以后countDownLatch里面对应的计算值减一。 68 // 执行countDown()方法计数器减一。 69 countDownLatch.countDown(); 70 }); 71 } 72 // 这个方法可以保证之前的countDownLatch必须减为0,减为0的前提就是所有的进程必须执行完毕。 73 try { 74 // 调用await()方法当前进程进入等待状态。 75 countDownLatch.await(); 76 } catch (InterruptedException e) { 77 e.printStackTrace(); 78 } 79 // 通常,线程池执行完毕以后,线程池不再使用,记得关闭线程池 80 executorService.shutdown(); 81 // 如果我们希望在所有线程执行完毕以后打印当前计数的值。只需要log.info之前执行上一步即可countDownLatch.await();。 82 log.info("size:{}", set.size()); 83 84 } 85 }
1 package com.bie.concurrency.example.concurrent; 2 3 import java.util.Set; 4 import java.util.concurrent.ConcurrentSkipListSet; 5 import java.util.concurrent.CountDownLatch; 6 import java.util.concurrent.ExecutorService; 7 import java.util.concurrent.Executors; 8 import java.util.concurrent.Semaphore; 9 10 import com.bie.concurrency.annoations.ThreadSafe; 11 12 import lombok.extern.slf4j.Slf4j; 13 14 /** 15 * 16 * 17 * @Title: CollectionsExample1.java 18 * @Package com.bie.concurrency.example.syncContainer 19 * @Description: TODO 20 * @author biehl 21 * @date 2020年1月13日 22 * @version V1.0 23 * 24 */ 25 @Slf4j 26 @ThreadSafe // 由于每次结果一致,所以是线程安全的类。可以使用此程序进行并发测试。 27 public class ConcurrentSkipListSetExample5 { 28 29 public static int clientTotal = 5000;// 5000个请求,请求总数 30 31 public static int threadTotal = 200;// 允许同时并发执行的线程数目 32 33 // 相比于TreeSet,ConcurrentSkipListSet 34 private static Set<Integer> set = new ConcurrentSkipListSet<>(); 35 36 private static void update(int count) { 37 set.add(count); 38 } 39 40 public static void main(String[] args) { 41 // 定义线程池 42 ExecutorService executorService = Executors.newCachedThreadPool(); 43 // 定义信号量,信号量里面需要定义允许并发的数量 44 final Semaphore semaphore = new Semaphore(threadTotal); 45 // 定义计数器闭锁,希望所有请求完以后统计计数结果,将计数结果放入 46 final CountDownLatch countDownLatch = new CountDownLatch(clientTotal); 47 // 放入请求操作 48 for (int i = 0; i < clientTotal; i++) { 49 final int count = i; 50 // 所有请求放入到线程池结果中 51 executorService.execute(() -> { 52 // 在线程池执行的时候引入了信号量,信号量每次做acquire()操作的时候就是判断当前进程是否允许被执行。 53 // 如果达到了一定并发数的时候,add方法可能会临时被阻塞掉。当acquire()可以返回值的时候,add方法可以被执行。 54 // add方法执行完毕以后,释放当前进程,此时信号量就已经引入完毕了。 55 // 在引入信号量的基础上引入闭锁机制。countDownLatch 56 try { 57 // 执行核心执行方法之前引入信号量,信号量每次允许执行之前需要调用方法acquire()。 58 semaphore.acquire(); 59 // 核心执行方法。 60 update(count); 61 // 核心执行方法执行完成以后,需要释放当前进程,释放信号量。 62 semaphore.release(); 63 } catch (InterruptedException e) { 64 e.printStackTrace(); 65 } 66 // try-catch是一次执行系统的操作,执行完毕以后调用一下闭锁。 67 // 每次执行完毕以后countDownLatch里面对应的计算值减一。 68 // 执行countDown()方法计数器减一。 69 countDownLatch.countDown(); 70 }); 71 } 72 // 这个方法可以保证之前的countDownLatch必须减为0,减为0的前提就是所有的进程必须执行完毕。 73 try { 74 // 调用await()方法当前进程进入等待状态。 75 countDownLatch.await(); 76 } catch (InterruptedException e) { 77 e.printStackTrace(); 78 } 79 // 通常,线程池执行完毕以后,线程池不再使用,记得关闭线程池 80 executorService.shutdown(); 81 // 如果我们希望在所有线程执行完毕以后打印当前计数的值。只需要log.info之前执行上一步即可countDownLatch.await();。 82 log.info("size:{}", set.size()); 83 84 } 85 }
1.3、相比于HashMap。ConcurrentHashMap,是线程安全的,不允许空值null,在实际应用中,除了少数的插入操作,删除操作外,绝大部分我们使用Map都是读取操作,而且读取操作大部分都是成功的,基于这个前置,ConcurrentHashMap基于读操作做了大量的优化,因此这个类具有很高的并发性,高并发场景下具有很好的表现。
相比于TreeMap。ConcurrentSkipListMap,是线程安全的,内部结构是使用SkipList跳表的结构实现的,优点是key有序,支持更高的并发,存取时间和线程数基本是没有关系的,在数据量一定的情况下,线程数越多,越能提现ConcurrentSkipListMap的优势,在非多线程的情况下,尽可能使用TreeMap。
1 package com.bie.concurrency.example.concurrent; 2 3 import java.util.Map; 4 import java.util.concurrent.ConcurrentHashMap; 5 import java.util.concurrent.CountDownLatch; 6 import java.util.concurrent.ExecutorService; 7 import java.util.concurrent.Executors; 8 import java.util.concurrent.Semaphore; 9 10 import com.bie.concurrency.annoations.ThreadSafe; 11 12 import lombok.extern.slf4j.Slf4j; 13 14 /** 15 * 16 * 17 * @Title: CollectionsExample1.java 18 * @Package com.bie.concurrency.example.syncContainer 19 * @Description: TODO 20 * @author biehl 21 * @date 2020年1月13日 22 * @version V1.0 23 * 24 */ 25 @Slf4j 26 @ThreadSafe // 由于每次结果一致,所以是线程安全的类。可以使用此程序进行并发测试。 27 public class ConcurrentHashMapExample3 { 28 29 public static int clientTotal = 5000;// 5000个请求,请求总数 30 31 public static int threadTotal = 200;// 允许同时并发执行的线程数目 32 33 // ConcurrentHashMap 34 private static Map<Integer, Integer> map = new ConcurrentHashMap<Integer, Integer>(); 35 36 private static void update(int count) { 37 map.put(count, count); 38 } 39 40 public static void main(String[] args) { 41 // 定义线程池 42 ExecutorService executorService = Executors.newCachedThreadPool(); 43 // 定义信号量,信号量里面需要定义允许并发的数量 44 final Semaphore semaphore = new Semaphore(threadTotal); 45 // 定义计数器闭锁,希望所有请求完以后统计计数结果,将计数结果放入 46 final CountDownLatch countDownLatch = new CountDownLatch(clientTotal); 47 // 放入请求操作 48 for (int i = 0; i < clientTotal; i++) { 49 final int count = i; 50 // 所有请求放入到线程池结果中 51 executorService.execute(() -> { 52 // 在线程池执行的时候引入了信号量,信号量每次做acquire()操作的时候就是判断当前进程是否允许被执行。 53 // 如果达到了一定并发数的时候,add方法可能会临时被阻塞掉。当acquire()可以返回值的时候,add方法可以被执行。 54 // add方法执行完毕以后,释放当前进程,此时信号量就已经引入完毕了。 55 // 在引入信号量的基础上引入闭锁机制。countDownLatch 56 try { 57 // 执行核心执行方法之前引入信号量,信号量每次允许执行之前需要调用方法acquire()。 58 semaphore.acquire(); 59 // 核心执行方法。 60 update(count); 61 // 核心执行方法执行完成以后,需要释放当前进程,释放信号量。 62 semaphore.release(); 63 } catch (InterruptedException e) { 64 e.printStackTrace(); 65 } 66 // try-catch是一次执行系统的操作,执行完毕以后调用一下闭锁。 67 // 每次执行完毕以后countDownLatch里面对应的计算值减一。 68 // 执行countDown()方法计数器减一。 69 countDownLatch.countDown(); 70 }); 71 } 72 // 这个方法可以保证之前的countDownLatch必须减为0,减为0的前提就是所有的进程必须执行完毕。 73 try { 74 // 调用await()方法当前进程进入等待状态。 75 countDownLatch.await(); 76 } catch (InterruptedException e) { 77 e.printStackTrace(); 78 } 79 // 通常,线程池执行完毕以后,线程池不再使用,记得关闭线程池 80 executorService.shutdown(); 81 // 如果我们希望在所有线程执行完毕以后打印当前计数的值。只需要log.info之前执行上一步即可countDownLatch.await();。 82 log.info("size:{}", map.size()); 83 84 } 85 }
1 package com.bie.concurrency.example.concurrent; 2 3 import java.util.Map; 4 import java.util.concurrent.ConcurrentSkipListMap; 5 import java.util.concurrent.CountDownLatch; 6 import java.util.concurrent.ExecutorService; 7 import java.util.concurrent.Executors; 8 import java.util.concurrent.Semaphore; 9 10 import com.bie.concurrency.annoations.ThreadSafe; 11 12 import lombok.extern.slf4j.Slf4j; 13 14 /** 15 * 16 * 17 * @Title: CollectionsExample1.java 18 * @Package com.bie.concurrency.example.syncContainer 19 * @Description: TODO 20 * @author biehl 21 * @date 2020年1月13日 22 * @version V1.0 23 * 24 */ 25 @Slf4j 26 @ThreadSafe // 由于每次结果一致,所以是线程安全的类。可以使用此程序进行并发测试。 27 public class ConcurrentSkipListMapExample4 { 28 29 public static int clientTotal = 5000;// 5000个请求,请求总数 30 31 public static int threadTotal = 200;// 允许同时并发执行的线程数目 32 33 // ConcurrentSkipListMap 34 private static Map<Integer, Integer> map = new ConcurrentSkipListMap<Integer, Integer>(); 35 36 private static void update(int count) { 37 map.put(count, count); 38 } 39 40 public static void main(String[] args) { 41 // 定义线程池 42 ExecutorService executorService = Executors.newCachedThreadPool(); 43 // 定义信号量,信号量里面需要定义允许并发的数量 44 final Semaphore semaphore = new Semaphore(threadTotal); 45 // 定义计数器闭锁,希望所有请求完以后统计计数结果,将计数结果放入 46 final CountDownLatch countDownLatch = new CountDownLatch(clientTotal); 47 // 放入请求操作 48 for (int i = 0; i < clientTotal; i++) { 49 final int count = i; 50 // 所有请求放入到线程池结果中 51 executorService.execute(() -> { 52 // 在线程池执行的时候引入了信号量,信号量每次做acquire()操作的时候就是判断当前进程是否允许被执行。 53 // 如果达到了一定并发数的时候,add方法可能会临时被阻塞掉。当acquire()可以返回值的时候,add方法可以被执行。 54 // add方法执行完毕以后,释放当前进程,此时信号量就已经引入完毕了。 55 // 在引入信号量的基础上引入闭锁机制。countDownLatch 56 try { 57 // 执行核心执行方法之前引入信号量,信号量每次允许执行之前需要调用方法acquire()。 58 semaphore.acquire(); 59 // 核心执行方法。 60 update(count); 61 // 核心执行方法执行完成以后,需要释放当前进程,释放信号量。 62 semaphore.release(); 63 } catch (InterruptedException e) { 64 e.printStackTrace(); 65 } 66 // try-catch是一次执行系统的操作,执行完毕以后调用一下闭锁。 67 // 每次执行完毕以后countDownLatch里面对应的计算值减一。 68 // 执行countDown()方法计数器减一。 69 countDownLatch.countDown(); 70 }); 71 } 72 // 这个方法可以保证之前的countDownLatch必须减为0,减为0的前提就是所有的进程必须执行完毕。 73 try { 74 // 调用await()方法当前进程进入等待状态。 75 countDownLatch.await(); 76 } catch (InterruptedException e) { 77 e.printStackTrace(); 78 } 79 // 通常,线程池执行完毕以后,线程池不再使用,记得关闭线程池 80 executorService.shutdown(); 81 // 如果我们希望在所有线程执行完毕以后打印当前计数的值。只需要log.info之前执行上一步即可countDownLatch.await();。 82 log.info("size:{}", map.size()); 83 84 } 85 }
2、J.U.C同步器AQS(AbstractQueuedSynchronizer)。JUC大大提高了java程序的并发性能,AQS是JUC的核心,AQS重点学习哦,底层使用了双向链表是队列的一种实现。
AQS的设计特点:
1)、使用Node实现FIFO队列,可以用于构建锁或者其他同步装置的基础框架。
2)、利用了一个int类型表示状态,private volatile int state;这个state成员变量,基于AQS有一个同步组件ReentrantLock,在这个组件中,state表示获取锁的线程数,假如state等于0表示没有线程获取锁,state等于1表示有线程获取了锁,state大于1表示重入锁 的数量。
3)、使用方法是继承,AQS的设计是基于模板设计的,使用者需要继承AbstractQueuedSynchronizer,并复写方法。
4)、子类通过继承并通过实现它的方法管理其状态(acquire和release)的方法操纵状态。
5)、AQS可以同时实现排它锁和共享锁模式(站在使用者的角度观察,AQS功能主要分为两类,独占功能、共享功能),所有子类中,要么实现并使用了独占功能api,要么使用了共享功能api,而不会同时使用两套api,即便是最有名的子类ReentrantReadWriteLock,也是通过两个内部类,读锁和写锁,分别实现了两套api来实现的。
3、J.U.C同步器AQS的具体实现的大致思路,AQS内部维护了一个CLH队列,来管理锁,线程会首先尝试获取锁,如果失败,就将当前线程以等待状态的信息包装成一个Node节点加入到同步队列SyncQueue里面,接着会循环尝试获取锁,条件是当前节点为head直接后继才会尝试,如果失败就会阻塞自己,直到自己被唤醒,而当持久锁的线程释放锁的时候会唤醒队列中的后继线程。基于这些设计AQS提高了很多子类,如CountDownLatch(闭锁,通过计数来保证线程是否需要一直需要阻塞)、Semaphore(控制同一时间并发线程的数目)、CyclicBarrier(和CountDownLatch类似,都可以阻塞进程)、ReentrantLock、Condition、FutureTask。
3.1、CountDownLatch(闭锁,通过计数来保证线程是否需要一直需要阻塞),是同步辅助类,通过它可以完成类似于阻塞当前线程的功能,换句话说就是一个线程或者多个线程一直等待,直到其他线程执行的操作完成。CountDownLatch使用了一个给定的计数器进行初始化,这个计数器的操作是原子操作,就是同时只能有一个线程操作该计数器。调用该类的await()方法的线程会一直处于阻塞状态,直到其他线程调用countDown()方法使当前计数器的值变成0,每次调用countDown()的时候,计数器的值会减1,当计数器的值减为0的时候,所有因调用await()方法而处于等待状态的线程就会继续执行向下执行的,这个操作只会出现一次,因为这个计数器是不能被重置的,如果业务需要重置计数次数的版本可以使用CyclicBarrier。使用场景,在某些业务场景中,程序执行需要等待某个条件完成后才可以执行后续操作,典型应用并行计算,当某个处理的运算量很大的时候,可以将该运算任务拆分成多个子任务,等待所有的子任务都完成后,父任务再拿到所有子任务的运算结果进行汇总。
1 package com.bie.concurrency.example.aqs; 2 3 import java.util.concurrent.CountDownLatch; 4 import java.util.concurrent.ExecutorService; 5 import java.util.concurrent.Executors; 6 7 import lombok.extern.slf4j.Slf4j; 8 9 /** 10 * 11 * 12 * @Title: CountDownLatchExample1.java 13 * @Package com.bie.concurrency.example.aqs 14 * @Description: TODO 15 * @author biehl 16 * @date 2020年1月15日 17 * @version V1.0 18 * 19 */ 20 @Slf4j 21 public class CountDownLatchExample1 { 22 23 public static int threadTotal = 200;// 允许同时并发执行的线程数目 24 25 private static void update(int threadNum) throws Exception { 26 Thread.sleep(100); 27 log.info("{}", threadNum); 28 Thread.sleep(100); 29 } 30 31 public static void main(String[] args) { 32 // 定义线程池 33 ExecutorService executorService = Executors.newCachedThreadPool(); 34 // 定义计数器闭锁,希望所有请求完以后统计计数结果,将计数结果放入 35 final CountDownLatch countDownLatch = new CountDownLatch(threadTotal); 36 // 放入请求操作 37 for (int i = 0; i < threadTotal; i++) { 38 final int threadNum = i; 39 // 所有请求放入到线程池结果中 40 executorService.execute(() -> { 41 try { 42 // 业务逻辑操作 43 update(threadNum); 44 } catch (Exception e) { 45 log.error("exception", e); 46 } finally { 47 countDownLatch.countDown(); 48 } 49 }); 50 } 51 // 这个方法可以保证之前的countDownLatch必须减为0,减为0的前提就是所有的进程必须执行完毕。 52 try { 53 // 调用await()方法当前进程进入等待状态。 54 countDownLatch.await(); 55 } catch (InterruptedException e) { 56 e.printStackTrace(); 57 } 58 // 如果我们希望在所有线程执行完毕以后打印当前计数的值。只需要log.info之前执行上一步即可countDownLatch.await();。 59 log.info("finish结束了......"); 60 // 通常,线程池执行完毕以后,线程池不再使用,记得关闭线程池。 61 executorService.shutdown(); 62 } 63 64 }
1 package com.bie.concurrency.example.aqs; 2 3 import java.util.concurrent.CountDownLatch; 4 import java.util.concurrent.ExecutorService; 5 import java.util.concurrent.Executors; 6 import java.util.concurrent.TimeUnit; 7 8 import lombok.extern.slf4j.Slf4j; 9 10 /** 11 * 12 * 13 * @Title: CountDownLatchExample1.java 14 * @Package com.bie.concurrency.example.aqs 15 * @Description: TODO 16 * @author biehl 17 * @date 2020年1月15日 18 * @version V1.0 19 * 20 */ 21 @Slf4j 22 public class CountDownLatchExample2 { 23 24 public static int threadTotal = 200;// 允许同时并发执行的线程数目 25 26 private static void update(int threadNum) throws Exception { 27 Thread.sleep(100); 28 log.info("{}", threadNum); 29 // Thread.sleep(100); 30 } 31 32 public static void main(String[] args) throws InterruptedException { 33 // 定义线程池 34 ExecutorService executorService = Executors.newCachedThreadPool(); 35 // 定义计数器闭锁,希望所有请求完以后统计计数结果,将计数结果放入 36 final CountDownLatch countDownLatch = new CountDownLatch(threadTotal); 37 // 放入请求操作 38 for (int i = 0; i < threadTotal; i++) { 39 final int threadNum = i; 40 // 所有请求放入到线程池结果中 41 executorService.execute(() -> { 42 try { 43 // 业务逻辑操作 44 update(threadNum); 45 } catch (Exception e) { 46 log.error("exception", e); 47 } finally { 48 countDownLatch.countDown(); 49 } 50 }); 51 } 52 // 这个方法可以保证之前的countDownLatch必须减为0,减为0的前提就是所有的进程必须执行完毕。 53 try { 54 // 调用await()方法当前进程进入等待状态。 55 // countDownLatch.await(); 56 57 // countDownLatch等待指定的时间,超过这个时间不管了,不管做完不做完。 58 // 参数1是timeout等待的时间,参数2是unit数值的单位。 59 countDownLatch.await(10, TimeUnit.MILLISECONDS); 60 } catch (InterruptedException e) { 61 e.printStackTrace(); 62 } 63 // 如果我们希望在所有线程执行完毕以后打印当前计数的值。只需要log.info之前执行上一步即可countDownLatch.await();。 64 log.info("finish结束了......"); 65 // 通常,线程池执行完毕以后,线程池不再使用,记得关闭线程池。 66 executorService.shutdown(); 67 } 68 69 }
3.2、AQS的同步组件Semaphore(称作信号量),可以控制并发访问线程的个数(即并发访问控制)。操作系统里面信号量是一个很重要的概念,在进程控制方面有很重要的应用,java并发库里面的Semaphore可以很轻松的完成类似操作系统信号量的控制,信号量可以控制某个资源被同时访问的个数,和CountDownLatch使用有些类型,提高了两个方法,acquire方法是获取许可,release方法是操作完成后释放一个许可。Semaphore维护了当前访问的个数,通过来提供同步机制来控制同时访问的个数。Semaphore可以实现有限大小的列表。使用场景,常用于仅能提供有限访问的资源,如项目中使用数据库的连接数。
1 package com.bie.concurrency.example.aqs; 2 3 import java.util.concurrent.ExecutorService; 4 import java.util.concurrent.Executors; 5 import java.util.concurrent.Semaphore; 6 7 import lombok.extern.slf4j.Slf4j; 8 9 /** 10 * 11 * 12 * @Title: CountDownLatchExample1.java 13 * @Package com.bie.concurrency.example.aqs 14 * @Description: TODO 15 * @author biehl 16 * @date 2020年1月15日 17 * @version V1.0 18 * 19 */ 20 @Slf4j 21 public class SemaphoreExample1 { 22 23 public static int threadTotal = 20;// 允许同时并发执行的线程数目 24 25 private static void update(int threadNum) throws Exception { 26 log.info("{}", threadNum); 27 Thread.sleep(1000); 28 } 29 30 public static void main(String[] args) { 31 // 定义线程池 32 ExecutorService executorService = Executors.newCachedThreadPool(); 33 // 定义信号量,信号量里面需要定义允许并发的数量。 34 // 线程池调度的时候加上并发控制。给定一个值代表运行的并发数目。 35 // 3代表并发的数目,控制台打印一次执行三次。 36 final Semaphore semaphore = new Semaphore(3); 37 // 放入请求操作 38 for (int i = 0; i < threadTotal; i++) { 39 final int threadNum = i; 40 // 所有请求放入到线程池结果中 41 executorService.execute(() -> { 42 try { 43 semaphore.acquire(); // 获取一个许可 44 // 对指定的代码做并发控制 45 // 对需要进行并发控制地进行获取一个许可,操作完毕以后释放一个许可 46 update(threadNum); 47 semaphore.release(); // 释放一个许可 48 } catch (Exception e) { 49 log.error("exception", e); 50 } 51 }); 52 } 53 // 如果我们希望在所有线程执行完毕以后打印当前计数的值。只需要log.info之前执行上一步即可countDownLatch.await();。 54 log.info("finish结束了......"); 55 // 通常,线程池执行完毕以后,线程池不再使用,记得关闭线程池。 56 executorService.shutdown(); 57 } 58 59 }
并发数目是3的话,一次又获取3个许可,那么同一时间只能执行一次update方法,原因是因为一次性获取了3个许可,同一秒钟没有多余的许可可以释放出来了。
1 package com.bie.concurrency.example.aqs; 2 3 import java.util.concurrent.ExecutorService; 4 import java.util.concurrent.Executors; 5 import java.util.concurrent.Semaphore; 6 7 import lombok.extern.slf4j.Slf4j; 8 9 /** 10 * 11 * 12 * @Title: CountDownLatchExample1.java 13 * @Package com.bie.concurrency.example.aqs 14 * @Description: TODO 15 * @author biehl 16 * @date 2020年1月15日 17 * @version V1.0 18 * 19 */ 20 @Slf4j 21 public class SemaphoreExample2 { 22 23 public static int threadTotal = 20;// 允许同时并发执行的线程数目 24 25 private static void update(int threadNum) throws Exception { 26 log.info("{}", threadNum); 27 Thread.sleep(1000); 28 } 29 30 public static void main(String[] args) { 31 // 定义线程池 32 ExecutorService executorService = Executors.newCachedThreadPool(); 33 // 定义信号量,信号量里面需要定义允许并发的数量。 34 // 线程池调度的时候加上并发控制。给定一个值代表运行的并发数目。 35 // 3代表并发的数目,一次获取3个许可,控制台每次打印1个。 36 // final Semaphore semaphore = new Semaphore(3); 37 // 6代表并发的数目,一次获取3个许可,控制台每次打印2个。 38 final Semaphore semaphore = new Semaphore(6); 39 // 放入请求操作 40 for (int i = 0; i < threadTotal; i++) { 41 final int threadNum = i; 42 // 所有请求放入到线程池结果中 43 executorService.execute(() -> { 44 try { 45 // 并发数目是3的话,一次又获取3个许可,那么同一时间只能执行一次update方法,原因是因为一次性获取了3个许可,同一秒钟没有多余的许可可以释放出来了。 46 semaphore.acquire(3); // 获取多个许可 47 // 对指定的代码做并发控制 48 // 对需要进行并发控制地进行获取多个许可,操作完毕以后释放多个许可 49 update(threadNum); 50 semaphore.release(3); // 释放多个许可 51 } catch (Exception e) { 52 log.error("exception", e); 53 } 54 }); 55 } 56 // 如果我们希望在所有线程执行完毕以后打印当前计数的值。只需要log.info之前执行上一步即可countDownLatch.await();。 57 log.info("finish结束了......"); 58 // 通常,线程池执行完毕以后,线程池不再使用,记得关闭线程池。 59 executorService.shutdown(); 60 } 61 62 }
如果当前可以获取到许可就做,获取不到许可不想做就直接进行丢弃即可。尝试获取许可,由于并发线程数目是3,只有三个线程获取到了许可,尝试获取许可的时候等待一定的时间。
1 package com.bie.concurrency.example.aqs; 2 3 import java.util.concurrent.ExecutorService; 4 import java.util.concurrent.Executors; 5 import java.util.concurrent.Semaphore; 6 7 import lombok.extern.slf4j.Slf4j; 8 9 /** 10 * 11 * 12 * @Title: CountDownLatchExample1.java 13 * @Package com.bie.concurrency.example.aqs 14 * @Description: TODO 15 * @author biehl 16 * @date 2020年1月15日 17 * @version V1.0 18 * 19 * 当前运行的并发数目是3,超过3个就进行丢弃。 20 */ 21 @Slf4j 22 public class SemaphoreExample3 { 23 24 public static int threadTotal = 20;// 允许同时并发执行的线程数目 25 26 private static void update(int threadNum) throws Exception { 27 log.info("{}", threadNum); 28 Thread.sleep(1000); 29 } 30 31 public static void main(String[] args) { 32 // 定义线程池 33 ExecutorService executorService = Executors.newCachedThreadPool(); 34 // 定义信号量,信号量里面需要定义允许并发的数量。 35 // 线程池调度的时候加上并发控制。给定一个值代表运行的并发数目。 36 // 3代表并发的数目,一次获取3个许可,控制台每次打印1个。 37 final Semaphore semaphore = new Semaphore(3); 38 // 放入请求操作 39 for (int i = 0; i < threadTotal; i++) { 40 final int threadNum = i; 41 // 所有请求放入到线程池结果中 42 // 向线程池放入20个线程,20个请求同一时间内 都会尝试去执行,执行的时候,信号量会尝试去获取许可,但是同一时间并发数目是3 43 // 相当于同一时间只有三个线程获取到了许可,此时,相当于有三个线程执行了,因为休眠了1秒钟。因此其余线程都m没有获取到许可,直接就结束了。 44 executorService.execute(() -> { 45 try { 46 // 如果当前可以获取到许可就做,获取不到许可不想做就直接进行丢弃即可。 47 // 尝试获取许可,由于并发线程数目是3,只有三个线程获取到了许可, 48 // 尝试获取许可的时候等待一定的时间。 49 if (semaphore.tryAcquire()) { // 尝试获取一个许可 50 update(threadNum); 51 semaphore.release(); // 释放一个许可 52 } 53 } catch (Exception e) { 54 log.error("exception", e); 55 } 56 }); 57 } 58 // 如果我们希望在所有线程执行完毕以后打印当前计数的值。只需要log.info之前执行上一步即可countDownLatch.await();。 59 log.info("finish结束了......"); 60 // 通常,线程池执行完毕以后,线程池不再使用,记得关闭线程池。 61 executorService.shutdown(); 62 } 63 64 }
如果当前可以获取到许可就做,获取不到许可不想做就直接进行丢弃即可。尝试获取许可,由于并发线程数目是3,只有三个线程获取到了许可。
1 package com.bie.concurrency.example.aqs; 2 3 import java.util.concurrent.ExecutorService; 4 import java.util.concurrent.Executors; 5 import java.util.concurrent.Semaphore; 6 import java.util.concurrent.TimeUnit; 7 8 import lombok.extern.slf4j.Slf4j; 9 10 /** 11 * 12 * 13 * @Title: CountDownLatchExample1.java 14 * @Package com.bie.concurrency.example.aqs 15 * @Description: TODO 16 * @author biehl 17 * @date 2020年1月15日 18 * @version V1.0 19 * 20 * 当前运行的并发数目是3,超过3个就进行丢弃。 21 */ 22 @Slf4j 23 public class SemaphoreExample4 { 24 25 public static int threadTotal = 20;// 允许同时并发执行的线程数目 26 27 private static void update(int threadNum) throws Exception { 28 log.info("{}", threadNum); 29 Thread.sleep(1000); 30 } 31 32 public static void main(String[] args) { 33 // 定义线程池 34 ExecutorService executorService = Executors.newCachedThreadPool(); 35 // 定义信号量,信号量里面需要定义允许并发的数量。 36 // 线程池调度的时候加上并发控制。给定一个值代表运行的并发数目。 37 // 3代表并发的数目,一次获取3个许可,控制台每次打印1个。 38 final Semaphore semaphore = new Semaphore(3); 39 // 放入请求操作 40 for (int i = 0; i < threadTotal; i++) { 41 final int threadNum = i; 42 // 所有请求放入到线程池结果中 43 // 向线程池放入20个线程,20个请求同一时间内 都会尝试去执行,执行的时候,信号量会尝试去获取许可,但是同一时间并发数目是3 44 // 相当于同一时间只有三个线程获取到了许可,此时,相当于有三个线程执行了,因为休眠了1秒钟。因此其余线程都m没有获取到许可,直接就结束了。 45 executorService.execute(() -> { 46 try { 47 // 如果当前可以获取到许可就做,获取不到许可不想做就直接进行丢弃即可。 48 // 尝试获取许可,由于并发线程数目是3,只有三个线程获取到了许可, 49 // 尝试获取许可的时候等待一定的时间。 50 if (semaphore.tryAcquire(5000, TimeUnit.MILLISECONDS)) { // 尝试获取一个许可// 尝试获取一个许可 51 update(threadNum); 52 semaphore.release(); // 释放一个许可 53 } 54 } catch (Exception e) { 55 log.error("exception", e); 56 } 57 }); 58 } 59 // 如果我们希望在所有线程执行完毕以后打印当前计数的值。只需要log.info之前执行上一步即可countDownLatch.await();。 60 log.info("finish结束了......"); 61 // 通常,线程池执行完毕以后,线程池不再使用,记得关闭线程池。 62 executorService.shutdown(); 63 } 64 65 }
3.3、AQS的同步组件,CyclicBarrier,也是一个同步辅助类,允许一组线程相互等待,直到到达某个公共屏障点(即common barrier point),通过它可以完成多个线程之间相互等待,只有当每个线程都准备就绪后,才能各自继续往下执行后面的操作,和CountDownLatch有相似的地方,都是通过计数器进行实现的,当某个线程调用await方法的时候,该线程就进入了等待状态,而且计数器执行加1操作,当计数器的值达到初始值的时候呢,因为进入await方法的线程会被唤醒,继续执行他们后续的操作。由于CyclicBarrier在释放等待线程后可以重用,所以又被称为循环屏障,可以被一直循环使用。使用场景,可以用于多线程计算数据,最后合并计算结果的应用场景。CyclicBarrier和CountDownLatch的区别,CountDownLatch的计数器只能使用一次,CyclicBarrier的计数器可以使用reset方法进行重置,可以进行循环使用。CountDownLatch主要实现一个或者N个线程需要等待其他线程完成某项操作以后才能继续往下执行,描述的是一个或者多个线程等待其他线程的关系,CyclicBarrier是多个线程之间相互等待,直到所有的线程都m满足了条件以后才能继续执行后续的操作,描述的是各个线程内部相互等待的关系。CyclicBarrier可以执行更加复杂的业务逻辑的。
1 package com.bie.concurrency.example.aqs; 2 3 import java.util.concurrent.CyclicBarrier; 4 import java.util.concurrent.ExecutorService; 5 import java.util.concurrent.Executors; 6 import java.util.concurrent.TimeUnit; 7 8 import lombok.extern.slf4j.Slf4j; 9 10 /** 11 * 12 * 13 * @Title: CyclicBarrierExample1.java 14 * @Package com.bie.concurrency.example.aqs 15 * @Description: TODO 16 * @author biehl 17 * @date 2020年1月15日 18 * @version V1.0 19 * 20 */ 21 @Slf4j 22 public class CyclicBarrierExample1 { 23 24 // 允许一组线程相互等待,这里停止的5个线程。 25 // 给定一个初始化值,告诉当前有多少个线程一起药进行同步等待 26 private static CyclicBarrier cyclicBarrier = new CyclicBarrier(5); 27 28 private static void race(int threadNum) throws Exception { 29 Thread.sleep(1000); 30 log.info("{} is ready 准备好了: ", threadNum); 31 // 调用CyclicBarrier等待,调用await告诉当前这个线程ok了,当达到上面定义的初始化数目以后,await后面的操作就可以执行了 32 // cyclicBarrier.await(); 33 34 try { 35 // 如果过了超时时间,不想影响后续的执行,将这句话进行try-catch捕获掉即可。 36 cyclicBarrier.await(2000, TimeUnit.MILLISECONDS); 37 } catch (Exception e) { 38 log.warn("BarrierException", e); 39 } 40 41 // 允许执行了,继续向下执行 42 log.info("{} continue 继续向下:", threadNum); 43 } 44 45 public static void main(String[] args) throws InterruptedException { 46 // 定义线程池 47 ExecutorService executorService = Executors.newCachedThreadPool(); 48 49 // 放入请求操作 50 for (int i = 0; i < 10; i++) { 51 final int threadNum = i; 52 Thread.sleep(1000); 53 // 所有请求放入到线程池结果中 54 executorService.execute(() -> { 55 try { 56 // 业务逻辑操作 57 race(threadNum); 58 } catch (Exception e) { 59 log.error("exception", e); 60 } 61 }); 62 } 63 64 // 如果我们希望在所有线程执行完毕以后打印当前计数的值。只需要log.info之前执行上一步即可countDownLatch.await();。 65 log.info("finish结束了......"); 66 // 通常,线程池执行完毕以后,线程池不再使用,记得关闭线程池。 67 executorService.shutdown(); 68 } 69 70 }
1 package com.bie.concurrency.example.aqs; 2 3 import java.util.concurrent.CyclicBarrier; 4 import java.util.concurrent.ExecutorService; 5 import java.util.concurrent.Executors; 6 7 import lombok.extern.slf4j.Slf4j; 8 9 /** 10 * 11 * 12 * @Title: CyclicBarrierExample1.java 13 * @Package com.bie.concurrency.example.aqs 14 * @Description: TODO 15 * @author biehl 16 * @date 2020年1月15日 17 * @version V1.0 18 * 19 */ 20 @Slf4j 21 public class CyclicBarrierExample2 { 22 23 // 允许一组线程相互等待,这里停止的5个线程。 24 // 给定一个初始化值,告诉当前有多少个线程一起药进行同步等待 25 // 线程到达屏障的时候,优先执行这里的方法 26 private static CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> { 27 log.info("callback is running"); 28 }); 29 30 private static void race(int threadNum) throws Exception { 31 Thread.sleep(1000); 32 log.info("{} is ready 准备好了: ", threadNum); 33 // 调用CyclicBarrier等待,调用await告诉当前这个线程ok了,当达到上面定义的初始化数目以后,await后面的操作就可以执行了 34 cyclicBarrier.await(); 35 36 // 允许执行了,继续向下执行 37 log.info("{} continue 继续向下:", threadNum); 38 } 39 40 public static void main(String[] args) throws InterruptedException { 41 // 定义线程池 42 ExecutorService executorService = Executors.newCachedThreadPool(); 43 44 // 放入请求操作 45 for (int i = 0; i < 10; i++) { 46 final int threadNum = i; 47 Thread.sleep(1000); 48 // 所有请求放入到线程池结果中 49 executorService.execute(() -> { 50 try { 51 // 业务逻辑操作 52 race(threadNum); 53 } catch (Exception e) { 54 log.error("exception", e); 55 } 56 }); 57 } 58 59 // 如果我们希望在所有线程执行完毕以后打印当前计数的值。只需要log.info之前执行上一步即可countDownLatch.await();。 60 log.info("finish结束了......"); 61 // 通常,线程池执行完毕以后,线程池不再使用,记得关闭线程池。 62 executorService.shutdown(); 63 } 64 65 }
3.4、同步组件,ReentrantLock,以及一起使用的Condition,java主要分两种锁,一种是synchronized关键字修饰的锁,另外一种就是JUC提供的锁,JUC里面提供的核心的锁就是ReentrantLock,核心也是lock方法和unlock方法。
1)、ReentrantLock(可重入锁)和synchronized的区别。
第一点,可重入性(即再进入锁)。其实和synchronized使用的锁也是可以重入的,两者区别不大。他们都是同一个线程进入一次锁的计数器就自增1,所以要等到锁的计数器下降到0的时候才能释放锁。
第二点,锁的实现,synchronized关键字是依赖于JVM实现的,而ReentrantLock是JDK实现的,两者的区别就是类似于操作系统控制实现和用户自己写程序实现的区别。synchronized的实现是依赖于JVM的,很难看到源代码,但是ReentrantLock的实现可以看到源代码的,可以进行阅读。
第三点,性能的区别,synchronized关键字引入了偏向锁和轻量级锁(自旋锁)以后,和ReentrantLock的性能差不多了,在两者都可以用的情况下,官方推荐使用synchronized(写法容易)。
第四点,功能区别,synchronized关键字的便利性,使用起来比较方便,并且是由编译器保证加锁和释放的,ReentrantLock需要手工声明加锁和释放锁,为了避免忘记手工释放锁造成死锁,最好在finally里面声明释放锁。锁的细粒度和灵活度ReentrantLock优于synchronized关键字。
2)、ReentrantLock拥有独有的功能。
功能一,可以指定是公平锁还是非公平锁,synchronized关键字只能是非公平锁,所谓公平锁就是先等待的线程就先获得锁。
功能二,提供了一个Condition类,可以实现分组唤醒需要唤醒的线程。
功能三,提供能够中断等待锁的线程的机制,lock.lockInterruptibly()实现这个机制,ReentrantLock实现是一种自旋锁,通过循环调用CAS操作来实现加锁,性能比较好,因为避免了使线程进入内核态的阻塞状态,想尽办法避免使线程进入内核态的阻塞状态,是我们去分析和理解锁设计的关键钥匙。
那么,什么情况下适合使用ReentrantLock锁呢,如果要实现ReentrantLock拥有独有的三个功能的适合,那么必须使用ReentrantLock的,其他情况下,可以根据性能或者业务场景选择使用synchronized关键字还是ReentrantLock,synchronized关键字可以做的事情,ReentrantLock都可以实现的,而且ReentrantLock比synchronized关键字能做的功能更多的。synchronized关键字优势是使用的时候不可能忘记释放锁,在退出synchronized块的时候,JVM会为帮你释放锁的。另外一个原因是当JVM用synchronized关键字管理锁请求和释放的时候,JVM在生成线程转储的时候能够包括锁定信息,这些对调试非常有价值的,因为他们可以标识死锁或者异常行为的来源。而Lock类只是普通的类,JVM不知道具体那个线程拥有Lock对象,而且几乎每个开发人员都熟悉synchronized关键字哦,可以在所有版本种工作,大部分都可以使用synchronized关键字进行加锁处理的哦。
1 package com.bie.concurrency.example.lock; 2 3 import java.util.concurrent.CountDownLatch; 4 import java.util.concurrent.ExecutorService; 5 import java.util.concurrent.Executors; 6 import java.util.concurrent.Semaphore; 7 8 import com.bie.concurrency.annoations.ThreadSafe; 9 10 import lombok.extern.slf4j.Slf4j; 11 12 /** 13 * 14 * 15 * @Title: LockExample1.java 16 * @Package com.bie.concurrency.example.lock 17 * @Description: TODO 18 * @author biehl 19 * @date 2020年1月15日 20 * @version V1.0 21 * 22 */ 23 @Slf4j 24 @ThreadSafe 25 public class LockExample1 { 26 27 public static int clientTotal = 5000;// 5000个请求,请求总数 28 29 public static int threadTotal = 200;// 允许同时并发执行的线程数目 30 31 public static int count = 0; 32 33 private synchronized static void add() { 34 count++; 35 } 36 37 public static void main(String[] args) { 38 // 定义线程池 39 ExecutorService executorService = Executors.newCachedThreadPool(); 40 // 定义信号量,信号量里面需要定义允许并发的数量 41 final Semaphore semaphore = new Semaphore(threadTotal); 42 // 定义计数器闭锁,希望所有请求完以后统计计数结果,将计数结果放入 43 final CountDownLatch countDownLatch = new CountDownLatch(clientTotal); 44 // 放入请求操作 45 for (int i = 0; i < clientTotal; i++) { 46 // 所有请求放入到线程池结果中 47 executorService.execute(() -> { 48 // 在线程池执行的时候引入了信号量,信号量每次做acquire()操作的时候就是判断当前进程是否允许被执行。 49 // 如果达到了一定并发数的时候,add方法可能会临时被阻塞掉。当acquire()可以返回值的时候,add方法可以被执行。 50 // add方法执行完毕以后,释放当前进程,此时信号量就已经引入完毕了。 51 // 在引入信号量的基础上引入闭锁机制。countDownLatch 52 try { 53 // 执行核心执行方法之前引入信号量,信号量每次允许执行之前需要调用方法acquire()。 54 semaphore.acquire(); 55 // 核心执行方法。 56 add(); 57 // 核心执行方法执行完成以后,需要释放当前进程,释放信号量。 58 semaphore.release(); 59 } catch (InterruptedException e) { 60 e.printStackTrace(); 61 } 62 // try-catch是一次执行系统的操作,执行完毕以后调用一下闭锁。 63 // 每次执行完毕以后countDownLatch里面对应的计算值减一。 64 // 执行countDown()方法计数器减一。 65 countDownLatch.countDown(); 66 }); 67 } 68 // 这个方法可以保证之前的countDownLatch必须减为0,减为0的前提就是所有的进程必须执行完毕。 69 try { 70 // 调用await()方法当前进程进入等待状态。 71 countDownLatch.await(); 72 } catch (InterruptedException e) { 73 e.printStackTrace(); 74 } 75 // 通常,线程池执行完毕以后,线程池不再使用,记得关闭线程池 76 executorService.shutdown(); 77 // 如果我们希望在所有线程执行完毕以后打印当前计数的值。只需要log.info之前执行上一步即可countDownLatch.await();。 78 log.info("count:{}", count); 79 80 } 81 }
1 package com.bie.concurrency.example.lock; 2 3 import java.util.concurrent.CountDownLatch; 4 import java.util.concurrent.ExecutorService; 5 import java.util.concurrent.Executors; 6 import java.util.concurrent.Semaphore; 7 import java.util.concurrent.TimeUnit; 8 import java.util.concurrent.locks.Lock; 9 import java.util.concurrent.locks.ReentrantLock; 10 11 import com.bie.concurrency.annoations.ThreadSafe; 12 13 import lombok.extern.slf4j.Slf4j; 14 15 /** 16 * 17 * 18 * @Title: LockExample1.java 19 * @Package com.bie.concurrency.example.lock 20 * @Description: TODO 21 * @author biehl 22 * @date 2020年1月15日 23 * @version V1.0 24 * 25 */ 26 @Slf4j 27 @ThreadSafe 28 public class LockExample2 { 29 30 public static int clientTotal = 5000;// 5000个请求,请求总数 31 32 public static int threadTotal = 200;// 允许同时并发执行的线程数目 33 34 public static int count = 0; 35 36 // ReentrantLock锁机制 37 // tryLock()方法,仅在调用时锁定未被另一个线程保持的情况下才获取锁定 38 // tryLock(long timeout, TimeUnitunit) 39 // 参数代表超时时间的,代表如果锁定在给定的等待时间内没有被另一个线程保持且当前线程没有被中断,则获取这个锁定。 40 private final static Lock lock = new ReentrantLock(); 41 42 private static void add() { 43 // 使用ReentrantLock进行加锁。 44 lock.lock(); 45 try { 46 count++; 47 } finally { 48 // 使用ReentrantLock进行释放锁。 49 lock.unlock(); 50 } 51 } 52 53 public static void main(String[] args) { 54 // 定义线程池 55 ExecutorService executorService = Executors.newCachedThreadPool(); 56 // 定义信号量,信号量里面需要定义允许并发的数量 57 final Semaphore semaphore = new Semaphore(threadTotal); 58 // 定义计数器闭锁,希望所有请求完以后统计计数结果,将计数结果放入 59 final CountDownLatch countDownLatch = new CountDownLatch(clientTotal); 60 // 放入请求操作 61 for (int i = 0; i < clientTotal; i++) { 62 // 所有请求放入到线程池结果中 63 executorService.execute(() -> { 64 // 在线程池执行的时候引入了信号量,信号量每次做acquire()操作的时候就是判断当前进程是否允许被执行。 65 // 如果达到了一定并发数的时候,add方法可能会临时被阻塞掉。当acquire()可以返回值的时候,add方法可以被执行。 66 // add方法执行完毕以后,释放当前进程,此时信号量就已经引入完毕了。 67 // 在引入信号量的基础上引入闭锁机制。countDownLatch 68 try { 69 // 执行核心执行方法之前引入信号量,信号量每次允许执行之前需要调用方法acquire()。 70 semaphore.acquire(); 71 // 核心执行方法。 72 add(); 73 // 核心执行方法执行完成以后,需要释放当前进程,释放信号量。 74 semaphore.release(); 75 } catch (InterruptedException e) { 76 e.printStackTrace(); 77 } 78 // try-catch是一次执行系统的操作,执行完毕以后调用一下闭锁。 79 // 每次执行完毕以后countDownLatch里面对应的计算值减一。 80 // 执行countDown()方法计数器减一。 81 countDownLatch.countDown(); 82 }); 83 } 84 // 这个方法可以保证之前的countDownLatch必须减为0,减为0的前提就是所有的进程必须执行完毕。 85 try { 86 // 调用await()方法当前进程进入等待状态。 87 countDownLatch.await(); 88 } catch (InterruptedException e) { 89 e.printStackTrace(); 90 } 91 // 通常,线程池执行完毕以后,线程池不再使用,记得关闭线程池 92 executorService.shutdown(); 93 // 如果我们希望在所有线程执行完毕以后打印当前计数的值。只需要log.info之前执行上一步即可countDownLatch.await();。 94 log.info("count:{}", count); 95 96 } 97 }
3.5、ReentrantReadWriteLock,这个是在没有任何读写锁的时候才可以取得写入锁,里面存在两个锁,一个是读锁,一个是写锁。注意点是在没有任何读写锁的时候才可以取得写入锁。可以用于实现悲观读取,即我们执行中进行读取时,j经常可能有另一个执行要写入的需求,为了保持同步,ReentrantReadWriteLock的读取锁定就可以排上用场了,然而呢,如果读取执行情况很多,写入很少的情况下,使用ReentrantReadWriteLock就会造成写入线程造成饥饿,就是写入线程迟迟无法进行到锁定状态,一直处于等待状态。
1 package com.bie.concurrency.example.lock; 2 3 import java.util.Map; 4 import java.util.Set; 5 import java.util.TreeMap; 6 import java.util.concurrent.locks.Lock; 7 import java.util.concurrent.locks.ReentrantReadWriteLock; 8 9 import lombok.extern.slf4j.Slf4j; 10 11 /** 12 * 13 * 14 * @Title: LockExample1.java 15 * @Package com.bie.concurrency.example.lock 16 * @Description: TODO 17 * @author biehl 18 * @date 2020年1月15日 19 * @version V1.0 20 * 21 */ 22 @Slf4j 23 public class LockExample3 { 24 25 // 创建一个Map集合,封装了一个内部Map集合,不希望所有方法的暴漏出来,暴漏想提供的方法 26 private final Map<String, Data> map = new TreeMap<String, Data>(); 27 28 // 创建一个读写锁ReentrantReadWriteLock 29 private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); 30 31 // 创建一个读锁 32 private final Lock readLock = lock.readLock(); 33 34 // 创建一个写锁 35 private final Lock writeLock = lock.writeLock(); 36 37 // 获取到一个值,暴漏的方法考虑到并发问题,读操作和写操作都加了锁。 38 public Data get(String key) { 39 readLock.lock(); 40 try { 41 return map.get(key); 42 } finally { 43 readLock.unlock(); 44 } 45 } 46 47 // 获取到所有的key值 48 public Set<String> getAllKeys() { 49 readLock.lock(); 50 try { 51 return map.keySet(); 52 } finally { 53 readLock.unlock(); 54 } 55 } 56 57 // 设置一个值 58 // 使用ReentrantReadWriteLock在没有任何读写锁操作的时候才可以进行写入操作。对数据同步做到更多一些 59 // 使用的是悲观读取。如果在获得写入锁的时候,坚决不允许任何读锁保持着,保证了写入的时候其实所有能做的操作都已经做完了。 60 // 存在的问题就是读取情况很多的时候,而写入很少的时候,时候该类ReentrantReadWriteLock会使线程遭遇饥饿。 61 // 什么是饥饿呢,就是写锁一直想执行,但是读锁一直在执行,导致写锁一直无法执行。 62 public Data put(String key, Data value) { 63 writeLock.lock(); 64 try { 65 return map.put(key, value); 66 } finally { 67 writeLock.unlock(); 68 } 69 } 70 71 class Data { 72 73 } 74 75 }
3.6、StampedLock,控制锁有三种模式,分别是写、读、乐观读。StampedLock是由版本和模式两个部分组成的,锁获取方法返回的是一个数字作为票据即Stamped,用相应的锁状态来表示并控制相关的访问,数字0表示没有写锁被首先访问,在读锁上分别悲观锁和乐观锁。所谓乐观读其实也就是如果读的操作很多,写的操作很少的情况下,我们可以乐观的认为写入与读取同时发生的机率很少,因此,不悲观的使用完全的读取锁定,程序可以查看查取读取操作以后,是否遭受写入执行的变更,再采取后续的措施,这一个小小的改进可以大幅度提高程序的吞吐量。
加锁总结2条:
第一条,当只有少量竞争者的时候,可以使用synchronized关键字是很好的通用锁实现。
第二条,竞争者不少,但是线程增长趋势是可以预估的,这个时候ReentrantLock是一个很好的通用锁实现。我们在使用锁的时候呢,不是看那个锁高级使用那个的。切记,synchronized关键字不会引发死锁的,JVM会自动解锁的,而其他锁,使用不当可能会引起死锁的。
1 package com.bie.concurrency.example.lock; 2 3 import java.util.concurrent.locks.StampedLock; 4 5 /** 6 * 7 * 8 * @Title: LockExample4.java 9 * @Package com.bie.concurrency.example.lock 10 * @Description: TODO 11 * @author biehl 12 * @date 2020年1月16日 13 * @version V1.0 14 * 15 * StampedLock 16 */ 17 public class LockExample4 { 18 19 class Point { 20 21 private double x, y; 22 private final StampedLock sl = new StampedLock(); 23 24 void move(double deltaX, double deltaY) { // an exclusively locked method 25 long stamp = sl.writeLock(); 26 try { 27 x += deltaX; 28 y += deltaY; 29 } finally { 30 sl.unlockWrite(stamp); 31 } 32 } 33 34 // 下面看看乐观读锁案例 35 double distanceFromOrigin() { // A read-only method 36 long stamp = sl.tryOptimisticRead(); // 获得一个乐观读锁 37 double currentX = x, currentY = y; // 将两个字段读入本地局部变量 38 if (!sl.validate(stamp)) { // 检查发出乐观读锁后同时是否有其他写锁发生? 39 stamp = sl.readLock(); // 如果没有,我们再次获得一个读悲观锁 40 try { 41 currentX = x; // 将两个字段读入本地局部变量 42 currentY = y; // 将两个字段读入本地局部变量 43 } finally { 44 sl.unlockRead(stamp); 45 } 46 } 47 return Math.sqrt(currentX * currentX + currentY * currentY); 48 } 49 50 // 下面是悲观读锁案例 51 void moveIfAtOrigin(double newX, double newY) { // upgrade 52 // Could instead start with optimistic, not read mode 53 long stamp = sl.readLock(); 54 try { 55 while (x == 0.0 && y == 0.0) { // 循环,检查当前状态是否符合 56 long ws = sl.tryConvertToWriteLock(stamp); // 将读锁转为写锁 57 if (ws != 0L) { // 这是确认转为写锁是否成功 58 stamp = ws; // 如果成功 替换票据 59 x = newX; // 进行状态改变 60 y = newY; // 进行状态改变 61 break; 62 } else { // 如果不能成功转换为写锁 63 sl.unlockRead(stamp); // 我们显式释放读锁 64 stamp = sl.writeLock(); // 显式直接进行写锁 然后再通过循环再试 65 } 66 } 67 } finally { 68 sl.unlock(stamp); // 释放读锁或写锁 69 } 70 } 71 } 72 }
1 package com.bie.concurrency.example.lock; 2 3 import java.util.concurrent.CountDownLatch; 4 import java.util.concurrent.ExecutorService; 5 import java.util.concurrent.Executors; 6 import java.util.concurrent.Semaphore; 7 import java.util.concurrent.locks.StampedLock; 8 9 import lombok.extern.slf4j.Slf4j; 10 11 /** 12 * 13 * 14 * @Title: LockExample1.java 15 * @Package com.bie.concurrency.example.lock 16 * @Description: TODO 17 * @author biehl 18 * @date 2020年1月15日 19 * @version V1.0 20 * 21 */ 22 @Slf4j 23 public class LockExample5 { 24 25 public static int clientTotal = 5000;// 5000个请求,请求总数 26 27 public static int threadTotal = 200;// 允许同时并发执行的线程数目 28 29 public static int count = 0; 30 31 private final static StampedLock lock = new StampedLock(); 32 33 private static void add() { 34 long stamp = lock.writeLock(); 35 try { 36 count++; 37 } finally { 38 lock.unlock(stamp); 39 } 40 } 41 42 public static void main(String[] args) { 43 // 定义线程池 44 ExecutorService executorService = Executors.newCachedThreadPool(); 45 // 定义信号量,信号量里面需要定义允许并发的数量 46 final Semaphore semaphore = new Semaphore(threadTotal); 47 // 定义计数器闭锁,希望所有请求完以后统计计数结果,将计数结果放入 48 final CountDownLatch countDownLatch = new CountDownLatch(clientTotal); 49 // 放入请求操作 50 for (int i = 0; i < clientTotal; i++) { 51 // 所有请求放入到线程池结果中 52 executorService.execute(() -> { 53 // 在线程池执行的时候引入了信号量,信号量每次做acquire()操作的时候就是判断当前进程是否允许被执行。 54 // 如果达到了一定并发数的时候,add方法可能会临时被阻塞掉。当acquire()可以返回值的时候,add方法可以被执行。 55 // add方法执行完毕以后,释放当前进程,此时信号量就已经引入完毕了。 56 // 在引入信号量的基础上引入闭锁机制。countDownLatch 57 try { 58 // 执行核心执行方法之前引入信号量,信号量每次允许执行之前需要调用方法acquire()。 59 semaphore.acquire(); 60 // 核心执行方法。 61 add(); 62 // 核心执行方法执行完成以后,需要释放当前进程,释放信号量。 63 semaphore.release(); 64 } catch (InterruptedException e) { 65 e.printStackTrace(); 66 } 67 // try-catch是一次执行系统的操作,执行完毕以后调用一下闭锁。 68 // 每次执行完毕以后countDownLatch里面对应的计算值减一。 69 // 执行countDown()方法计数器减一。 70 countDownLatch.countDown(); 71 }); 72 } 73 // 这个方法可以保证之前的countDownLatch必须减为0,减为0的前提就是所有的进程必须执行完毕。 74 try { 75 // 调用await()方法当前进程进入等待状态。 76 countDownLatch.await(); 77 } catch (InterruptedException e) { 78 e.printStackTrace(); 79 } 80 // 通常,线程池执行完毕以后,线程池不再使用,记得关闭线程池 81 executorService.shutdown(); 82 // 如果我们希望在所有线程执行完毕以后打印当前计数的值。只需要log.info之前执行上一步即可countDownLatch.await();。 83 log.info("count:{}", count); 84 85 } 86 87 }
整个协作过程是AQS的等待队列和Condition的等待队列来实现的。Condition作为一个条件类,很好的维护了等待信号的队列,并在适合的的时候将节点加入到AQS等待队列中,实现唤醒操作。
1 package com.bie.concurrency.example.lock; 2 3 import java.util.concurrent.locks.Condition; 4 import java.util.concurrent.locks.ReentrantLock; 5 6 import lombok.extern.slf4j.Slf4j; 7 8 /** 9 * 10 * 11 * @Title: LockExample6.java 12 * @Package com.bie.concurrency.example.lock 13 * @Description: TODO 14 * @author biehl 15 * @date 2020年1月16日 16 * @version V1.0 17 * 18 * 整个协作过程是AQS的等待队列和Condition的等待队列来实现的。 19 * 20 * Condition作为一个条件类,很好的维护了等待信号的队列,并在适合的的时候将节点加入到AQS等待队列中,实现唤醒操作。 21 * 22 */ 23 @Slf4j 24 public class LockExample6 { 25 26 public static void main(String[] args) { 27 28 // 创建一个ReentrantLock锁。 29 ReentrantLock reentrantLock = new ReentrantLock(); 30 // 使用reentrantLock锁创建Condition 31 // Condition是多线程之间协调通信的工作类,使得某个或者某个线程一起等待某个条件。 32 Condition condition = reentrantLock.newCondition(); 33 34 // 线程1 35 new Thread(() -> { 36 try { 37 // 线程1调用了reentrantLock的lock方法,线程1加入到AQS的等待队列里面去。 38 reentrantLock.lock(); 39 // 输出等待信号 40 log.info("wait signal"); // 1、等待信号 41 // 调用condition的await方法,从AQS的等待队列移除出去。对应的操作是锁的释放。 42 // 紧接着,加入到condition等待队列里面去。等待着该线程需要一个信号 43 condition.await(); 44 } catch (InterruptedException e) { 45 e.printStackTrace(); 46 } 47 // 输出得到信号 48 log.info("get signal"); // 4、得到信号 49 // 调用reentrantLock的unlock方法,释放reentrantLock的锁。释放锁执行完毕。 50 reentrantLock.unlock(); 51 }).start(); 52 53 // 线程2 54 new Thread(() -> { 55 // 线程2调用了reentrantLock的lock方法,线程2加入到AQS的等待队列里面去。 56 // 因为线程1释放锁的关系,线程2被唤醒。并判断是否获取获取到锁。 57 reentrantLock.lock(); 58 // 输出获取到锁 59 log.info("get lock"); // 2、获取锁 60 try { 61 // 等待3秒钟 62 Thread.sleep(3000); 63 } catch (InterruptedException e) { 64 e.printStackTrace(); 65 } 66 // 线程2执行了发送信号这个方法。condition等待队列里面有一个我们线程1的等待节点,于是被取出来了,加入到AQS的等待队列里面。 67 // 此时线程1未被唤醒,只是放到了队列里面了。 68 condition.signalAll(); 69 // 输出发送信号 70 log.info("send signal ~ "); // 3、发送信号 71 // 线程2发送信号执行完毕以后,执行了unlock,释放锁。 72 // 释放锁以后呢,AQS中m目前只剩下线程1了,于是AQS释放锁按照从头到尾的顺序唤醒线程,线程1就被唤醒了。 73 // 于是线程1执行开始执行,执行执行得到了第四步输出。 74 reentrantLock.unlock(); 75 }).start(); 76 77 } 78 79 }
3.7、FutureTask是JUC里面的,但是不是AQS的子类,但是这个类对线程结果的处理很好。创建线程的方法有实现Runnable接口和继承Thread类,这两种方式有一个共同的缺陷是无法获取执行结果,jdk1.5以后提供了Future和Callable接口,通过他们可以在任务执行完毕以后得到任务执行的结果。
1)、Callable与Runnable接口进行对比,Runnable接口代码非常简单,只有一个方法就是run方法,创建一个类,实现该接口,重写run方法,使用一个线程执行该Runnable实现类,就可以实现多线程了。Callable的代码也十分简单,不同的是泛型的接口,里面有一个call方法,call方法的返回类型就是我们创建Callable传进去的V类型。Callable与Runnable的功能大致相似,类比学习。Callable功能更加强大,主要是它被线程执行以后k可以有返回值,并且能够抛出异常。
2)、Future接口,对于我们具体的Callable与Runnable的任务,它可以进行取消,查询任务是否被取消,任务是否已经完成,以及获取结果等等。通常线程都是属于异步计算模型的,所以通常不可能直接从其他线程中得到方法的返回值,此时Future接口就出场了,Future接口可以监视目标线程调用Callable的情况,当你调用Future接口的get方法的时候,就可以获取他的结果,此时线程可能不会直接完成,当前线程就开始阻塞,直到Callable接口里面的call方法结束返回结果,线程才继续执行,总结一句话,Future接口可以得到别的线程任务方法的返回值。
3)、FutureTask类,父类是RunnableFuture接口,父类是RunnableFuture接口是继承了Runnable, Future<V>,FutureTask类最终也是执行Callable类型的任务,如果构造函数参数是Runnable的话,它会转换成Runnable类型的,FutureTask类实现了两个接口Runnable, Future<V>,所以FutureTask类既可以作为Runnable被线程执行,也可以作为Future得到Callable的返回值。这个组合使用的好处是,如果有一个很费时逻辑需要计算并且需要返回这个值,同时这个值又不是马上需要,那么就可以使用这个组合,用另外一个线程计算返回值,而当前线程在使用这个返回值之前可以做其他的操作,等到需要这个返回值的时候才通过Future得到。
1 package com.bie.concurrency.example.aqs; 2 3 import java.util.concurrent.Callable; 4 import java.util.concurrent.ExecutionException; 5 import java.util.concurrent.ExecutorService; 6 import java.util.concurrent.Executors; 7 import java.util.concurrent.Future; 8 9 import lombok.extern.slf4j.Slf4j; 10 11 /** 12 * 13 * 14 * @Title: FutureExample.java 15 * @Package com.bie.concurrency.example.aqs 16 * @Description: TODO 17 * @author biehl 18 * @date 2020年1月17日 19 * @version V1.0 20 * 21 */ 22 @Slf4j 23 public class FutureExample { 24 25 // Callable的代码也十分简单,不同的是泛型的接口,里面有一个call方法,call方法的返回类型就是我们创建Callable传进去的V类型 26 static class MyCallable implements Callable<String> { 27 28 @Override 29 public String call() throws Exception { 30 log.info("do something in callable......"); 31 // 休眠5秒钟 32 Thread.sleep(5000); 33 return "Just Do it!!!"; 34 } 35 36 } 37 38 public static void main(String[] args) throws InterruptedException, ExecutionException { 39 // 创建线程池 40 ExecutorService executorService = Executors.newCachedThreadPool(); 41 // 线程池提交任务,之后通过Future接收返回结果。通过Future接收另一个线程任务计算的结果。 42 Future<String> future = executorService.submit(new MyCallable()); 43 log.info("do something in main!!!"); 44 // 线程休眠1秒钟 45 Thread.sleep(1000); 46 // Future接收Callable的返回结果。查看调用任务返回结果。 47 String result = future.get(); 48 log.info("result : {}", result); 49 // 关闭线程池 50 executorService.shutdown(); 51 } 52 53 }
1 package com.bie.concurrency.example.aqs; 2 3 import java.util.concurrent.Callable; 4 import java.util.concurrent.ExecutionException; 5 import java.util.concurrent.FutureTask; 6 7 import lombok.extern.slf4j.Slf4j; 8 9 /** 10 * 11 * 12 * @Title: FutureTaskExample.java 13 * @Package com.bie.concurrency.example.aqs 14 * @Description: TODO 15 * @author biehl 16 * @date 2020年1月17日 17 * @version V1.0 18 * 19 */ 20 @Slf4j 21 public class FutureTaskExample { 22 23 public static void main(String[] args) throws InterruptedException, ExecutionException { 24 // FutureTask类最终也是执行Callable类型的任务,如果构造函数参数是Runnable的话,它会转换成Runnable类型的,FutureTask类实现了两个接口Runnable, 25 // Future<V>,所以FutureTask类既可以作为Runnable被线程执行,也可以作为Future得到Callable的返回值。 26 FutureTask<String> futureTask = new FutureTask<>(new Callable<String>() { 27 28 @Override 29 public String call() throws Exception { 30 log.info("do something in callable"); 31 Thread.sleep(5000); 32 return "Just Do it!!!"; 33 } 34 35 }); 36 37 // 这个组合使用的好处是,如果有一个很费时逻辑需要计算并且需要返回这个值,同时这个值又不是马上需要, 38 // 那么就可以使用这个组合,用另外一个线程计算返回值,而当前线程在使用这个返回值之前可以做其他的操作, 39 // 等到需要这个返回值的时候才通过Future得到。 40 new Thread(futureTask).start(); 41 log.info("do something in main!!!"); 42 Thread.sleep(1000); 43 String result = futureTask.get(); 44 log.info("result: {} ", result); 45 } 46 47 }
3.8、JUC里面的Fork/Join框架是jdk1.7提供的用于并行执行任务的框架,它是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架,类似与MapReduce的思想。Fork就是将若干个小任务切分成若干个小任务并行执行,Join合并这些子任务的执行结果,最终得到这个大任务的结果。主要采取的是工作窃取算法,工作窃取算法是从某个线程窃取任务来执行。为什么需要工作窃取算法呢,如果做一个比较大的任务,可以将这个任务分割成若干个互补依赖的子任务,为了减少线程之间的竞争,于是将这些子任务分别放到不同的队列里面,为每一个队列创建一个单独的线程来执行队列里面的任务,线程和队列一一对应,比如A线程负责处理A队列里面的任务,但是有的线程会先把自己队列里面的任务做完,而其他线程对应的队列里面还有任务等待要处理,干完活的线程去其他队列里面窃取一个任务进行执行,此时,他们会访问同一个队列,所以,为了减少窃取任务线程和被窃取任务线程之间的竞争,通常我们会使用的是双端队列,被窃取任务的线程,永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部窃取任务执行,这个窃取算法的优点充分利用线程进行并行计算,并减少了线程之间的竞争,缺点是在某些情况下,还是存在竞争。比如,双端队列里面只有一个任务的时候,同时,这样还消耗了各个系统的资源。比如,创建了多个线程和多个双端队列,对于Fork/Join框架而言,当一个任务正在等待他使用Join操作创建的子任务结束的时候,执行这个任务的工作线程查找其他未被执行的任务并开始他的执行,通过这种方式呢,线程充分利用他们的运行时间来提高应用程序的性能,为了实现这个目标,Fork/Join框架执行的任务有一些局限性,任务只能使用Fork或者Join操作,来作为同步机制,如果使用了其他同步机制,那么他们在同步操作的时候,工作线程就不能执行其他任务了。比如在Fork/Join框架中,你使任务进入了睡眠,那么这个睡眠期间内呢,正在执行这个任务的工作线程将不会执行其他任务,第二个局限性我们所拆分的任务,不应该去执行IO操作,如读或者写数据文件。第三个局限性,任务不能抛出检查异常,必须通过必要的代码来处理他们。Fork/Join框架的核心是两个类ForkJoinPool,ForkJoinTask。ForkJoinPool负责来做实现,管理工作线程,提供关于任务的状态,以及他们的执行信息,ForkJoinTask主要提供在任务中执行Fork和Join操作的机制。
1 package com.bie.concurrency.example.aqs; 2 3 import java.util.concurrent.ForkJoinPool; 4 import java.util.concurrent.Future; 5 import java.util.concurrent.RecursiveTask; 6 7 import lombok.extern.slf4j.Slf4j; 8 9 /** 10 * 11 * 12 * @Title: ForkJoinTaskExample.java 13 * @Package com.bie.concurrency.example.aqs 14 * @Description: TODO 15 * @author biehl 16 * @date 2020年1月18日 17 * @version V1.0 18 * 19 */ 20 @Slf4j 21 public class ForkJoinTaskExample extends RecursiveTask<Integer> { 22 23 /** 24 * 25 */ 26 private static final long serialVersionUID = 1L; 27 28 public static final int threshold = 2; 29 private int start; 30 private int end; 31 32 /** 33 * 构造方法 34 * 35 * @param start 36 * @param end 37 */ 38 public ForkJoinTaskExample(int start, int end) { 39 this.start = start; 40 this.end = end; 41 } 42 43 @Override 44 protected Integer compute() { 45 int sum = 0; 46 47 // 如果任务足够小就计算任务 48 boolean canCompute = (end - start) <= threshold; 49 if (canCompute) { 50 for (int i = start; i <= end; i++) { 51 sum += i; 52 } 53 } else { 54 // 如果任务大于阈值,就分裂成两个子任务计算 55 int middle = (start + end) / 2; 56 ForkJoinTaskExample leftTask = new ForkJoinTaskExample(start, middle); 57 ForkJoinTaskExample rightTask = new ForkJoinTaskExample(middle + 1, end); 58 59 // 拆分,执行子任务 60 leftTask.fork(); 61 rightTask.fork(); 62 63 // 等待任务执行结束合并其结果 64 int leftResult = leftTask.join(); 65 int rightResult = rightTask.join(); 66 67 // 合并子任务 68 sum = leftResult + rightResult; 69 } 70 return sum; 71 } 72 73 public static void main(String[] args) { 74 75 ForkJoinPool forkjoinPool = new ForkJoinPool(); 76 77 // 生成一个计算任务,计算1+2+3+4+ ...... + 100 78 ForkJoinTaskExample task = new ForkJoinTaskExample(1, 100); 79 80 // 执行一个任务,提交这个任务task,执行重写的compute方法 81 Future<Integer> result = forkjoinPool.submit(task); 82 83 try { 84 log.info("result:{}", result.get()); 85 } catch (Exception e) { 86 log.error("exception", e); 87 } 88 } 89 90 }
作者:别先生
博客园:IT虾米网
如果您想及时得到个人撰写文章以及著作的消息推送,可以扫描上方二维码,关注个人公众号哦。