读写锁 ReentrantReadWriteLock ReentrantReadWriteLock维护一对关联的Locks,一个用于只读操作,一个用于写入操作。
读操作可以由多个线程进行,写操作只能由一个线程操作。
当写操作由一个线程执行时,其他线程不能进行读操作,也不能进行写操作。
当读操作被执行时,其他线程不能进行写操作。
读锁和写锁互斥。
API
方法名
说明
writeLock().lock()
锁住写锁。若写锁或 读锁本来已被锁住,则阻塞,直到写锁和 读锁被释放。阻塞结束后,锁住写锁,然后继续运行。
writeLock().unlock()
释放写锁。
readLock().lock()
锁住读锁。若写锁被锁住,则该方法阻塞,直到写锁被释放。阻塞结束后,锁住读锁,然后继续运行。
readLock().unlock()
释放读锁。
…
…
代码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 public class Demo16_ReadWriteLock { public static void main (String[] args) { MyCache myCache = new MyCache (); for (int i = 1 ; i <= 5 ; i++) { final int temp = i; new Thread (()->{ myCache.put(String.valueOf(temp),temp+"" ); },String.valueOf(i)).start(); } for (int i = 1 ; i <= 5 ; i++) { final int temp = i; new Thread (()->{ myCache.get(String.valueOf(temp)); },String.valueOf(i)).start(); } } } class MyCache { private volatile Map<String,Object> map = new HashMap <>(); private ReadWriteLock lock = new ReentrantReadWriteLock (); public void put (String key,Object value) { lock.writeLock().lock(); try { System.out.println(Thread.currentThread().getName()+" 写入开始" ); map.put(key,value); System.out.println(Thread.currentThread().getName()+" 写入结束" ); } catch (Exception e) { e.printStackTrace(); } finally { lock.writeLock().unlock(); } } public Object get (String key) { lock.readLock().lock(); Object o = null ; try { System.out.println(Thread.currentThread().getName()+" 读取开始" ); o = map.get(key); System.out.println(Thread.currentThread().getName()+" 读取结束" ); } catch (Exception e) { e.printStackTrace(); } finally { lock.readLock().unlock(); } return o; } }
解释
写入时,用lock.writeLock().lock()
锁住写锁;写入结束后,用lock.writeLock().unlock()
释放写锁。
读取时,用lock.readLock().lock()
锁住读锁;读取结束后,用lock.readLock().unlock()
释放读锁。
阻塞队列 BlockingQueue 引入 Collection结构 和 队列类型介绍
Queue队列分三种:
AbstractQueue非阻塞队列
Deque双端队列
BlockingQueue阻塞队列
队列 FIFO 先进先出
写入:若队列满了,就必须等待消费;
取:若队列是空的,就必须等待生产;
分类
阻塞队列分两种:LinkedBlockingQueue和ArrayBlockingQueue。
使用场景 多线程并发处理,线程池。
API 分为四组:抛出异常,不抛出异常有返回值,阻塞等待,超时等待。
方式
抛出异常的API
有返回值不抛出异常的API
阻塞等待的API
超时等待的API
添加
add(对象) 无返回值,失败则抛出异常
offer(对象) 有返回值,boolean
put(对象)
offer(对象,超时时间, TimeUnit); 例: offer(“a”,2,TimeUnit.SECOND) 即:添加”a”,超时2秒。
移除
remove() 无返回值,失败则抛出异常
poll() 有返回值,null
take()
poll(超时时间,TimeUnit) 例: offer(2,TimeUnit.SECOND) 即:移除并返回,若队列空,则阻塞,超时等待2秒。
返回/检测 队首元素
element()
peek()
/
/
代码 抛出异常 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 public static void test1_Exception () { ArrayBlockingQueue<String> queue = new ArrayBlockingQueue <>(2 ); System.out.println(queue.add("a" )); System.out.println(queue.add("b" )); System.out.println(queue.remove()); System.out.println(queue.remove()); System.out.println(queue.remove()); }
有返回值,不抛出异常 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public static void test2_NoException () { ArrayBlockingQueue<String> blockingQueue = new ArrayBlockingQueue <>(2 ); System.out.println(blockingQueue.offer("a" )); System.out.println(blockingQueue.offer("b" )); System.out.println(blockingQueue.offer("c" )); System.out.println(blockingQueue.poll()); System.out.println(blockingQueue.poll()); System.out.println(blockingQueue.poll()); }
等待 阻塞 1 2 3 4 5 6 7 8 9 10 11 12 13 public static void test3_blocking () throws InterruptedException { ArrayBlockingQueue<String> queue = new ArrayBlockingQueue <>(2 ); queue.put("a" ); queue.put("b" ); queue.put("c" ); System.out.println(queue.take()); System.out.println(queue.take()); System.out.println(queue.take()); }
等待阻塞 超时 1 2 3 4 5 6 7 8 9 10 11 12 13 14 public static void test4_blocking () throws InterruptedException { ArrayBlockingQueue<String> queue = new ArrayBlockingQueue <>(2 ); queue.offer("a" ); queue.offer("b" ); queue.offer("c" ,2 , TimeUnit.SECONDS); queue.poll(); queue.poll(); queue.poll(2 ,TimeUnit.SECONDS); }
同步队列 SynchronousQueue 容量为1。
API
API
说明
put()
添加元素。若队列中有元素,则阻塞,直到队列为空。
take()
取出元素。若队列中没有元素,则阻塞,直到队列不为空。
代码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 public class Demo18_SynchronousQueue { public static void main (String[] args) { BlockingQueue<String> blockingQueue = new SynchronousQueue <>(); new Thread (()->{ try { System.out.println(Thread.currentThread().getName()+" put 1" ); blockingQueue.put("1" ); System.out.println(Thread.currentThread().getName()+" put 2" ); blockingQueue.put("2" ); System.out.println(Thread.currentThread().getName()+" put 3" ); blockingQueue.put("3" ); } catch (InterruptedException e) { e.printStackTrace(); } },"T1" ).start(); new Thread (()->{ try { TimeUnit.SECONDS.sleep(3 ); System.out.println(Thread.currentThread().getName()+" " +blockingQueue.take()); TimeUnit.SECONDS.sleep(3 ); System.out.println(Thread.currentThread().getName()+" " +blockingQueue.take()); TimeUnit.SECONDS.sleep(3 ); System.out.println(Thread.currentThread().getName()+" " +blockingQueue.take()); } catch (Exception e) { e.printStackTrace(); } },"T2" ).start(); } }
线程池 池化技术。线程池、连接池、内存池、对象池……………………..
三大方法、7大参数、4种拒绝策略
好处
降低资源的消耗
提高响应的速度
方便管理
作用
线程复用
可以控制最大并发数
管理线程
三大方法 Executors工具类有三个方法创建线程池。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public class Demo19_ThreadPool { public static void main (String[] args) { ExecutorService threadPool = Executors.newCachedThreadPool(); try { for (int i = 0 ; i < 10 ; i++) { threadPool.execute(()->{ System.out.println(Thread.currentThread().getName()+" ok" ); }); } } catch (Exception e) { e.printStackTrace(); } finally { threadPool.shutdown(); } } }
Exectors的部分API
API
介绍
newSingleThreadExecutor()
创建一个有一个线程的线程池。
newFixedThreadPool(int)
创建一个指定数量的线程的线程池。
newCachedThreadPool()
创建一个可伸缩的线程池。
…
…
七大参数 引入
三个方法的源码分析
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public static ExecutorService newSingleThreadExecutor () { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor (1 , 1 , 0L , TimeUnit.MILLISECONDS, new LinkedBlockingQueue <Runnable>())); } public static ExecutorService newFixedThreadPool (int nThreads) { return new ThreadPoolExecutor (nThreads, nThreads, 0L , TimeUnit.MILLISECONDS, new LinkedBlockingQueue <Runnable>()); } public static ExecutorService newCachedThreadPool () { return new ThreadPoolExecutor (0 , Integer.MAX_VALUE, 60L , TimeUnit.SECONDS, new SynchronousQueue <Runnable>()); }
分析得知:都来源于ThreadPoolExecutor 的构造方法。
ThreadPoolExecutor的构造方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 public ThreadPoolExecutor (int corePoolSize, //核心线程池大小 int maximumPoolSize,//最大线程池大小 long keepAliveTime,//超时释放时间 TimeUnit unit,//超时释放时间单位 BlockingQueue<Runnable> workQueue,//阻塞队列 ThreadFactory threadFactory,//线程工厂 RejectedExecutionHandler handler//拒绝策略 ) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0 ) throw new IllegalArgumentException (); if (workQueue == null || threadFactory == null || handler == null ) throw new NullPointerException (); this .acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this .corePoolSize = corePoolSize; this .maximumPoolSize = maximumPoolSize; this .workQueue = workQueue; this .keepAliveTime = unit.toNanos(keepAliveTime); this .threadFactory = threadFactory; this .handler = handler; }
构造方法的7个参数,即:七大参数 。
ThreadPoolExecutor 构造方法的七个参数
参数名
介绍
int corePoolSize
核心线程数量
int maximumPoolSize
最大线程数量
long keepAliveTime
超时释放时间
TimeUnit unit
超时释放时间单位
BlockingQueue<Runnable> workQueue
阻塞队列,若线程耗尽,则将新进入的任务放进阻塞队列。线程恢复后,依次为队列里的任务分配线程。
ThreadFactory threadFactory
线程工厂
RejectedExecutionHandler handler
拒绝策略
执行策略
若任务进入,则先分配到核心线程;
若核心线程耗尽 ,则分配到阻塞队列 ;
若阻塞队列已满 ,则创建额外线程 ,将新来的任务 分配给新创建的线程 。(而不是将阻塞队列的任务分配给新创建的线程)
若线程 达最大数量 ,且阻塞队列已满 ,则对新来任务 进行拒绝策略 。
若额外线程空闲时间 达到 keepAliveTime x unit ,则释放额外线程。
代码
代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 public class Demo20_ThreadPoolExecutor { public static void main (String[] args) { ExecutorService threadPool = new ThreadPoolExecutor ( 2 , 3 , 3 , TimeUnit.SECONDS, new LinkedBlockingQueue <>(2 ), Executors.defaultThreadFactory(), new ThreadPoolExecutor .AbortPolicy() ); try { for (int i = 1 ; i <= 10 ; i++) { final int id = i; System.out.println(Thread.currentThread().getName()+" " +id+" processing..." ); threadPool.execute(()->{ try { TimeUnit.SECONDS.sleep(2 ); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()+" " +id+" ok" ); }); } } catch (Exception e) { e.printStackTrace(); } finally { threadPool.shutdown(); } } }
输出
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 main 1 processing... main 2 processing... main 3 processing... main 4 processing... main 5 processing... main 6 processing... java.util.concurrent.RejectedExecutionException: Task com.yn.Demo20_ThreadPoolExecutor$$Lambda$1/1096979270@7cca494b rejected from java.util.concurrent.ThreadPoolExecutor@7ba4f24f[Running, pool size = 3, active threads = 3, queued tasks = 2, completed tasks = 0] at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063) at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830) at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379) at com.yn.Demo20_ThreadPoolExecutor.main(Demo20_ThreadPoolExecutor.java:22) pool-1-thread-2 2 ok pool-1-thread-1 1 ok pool-1-thread-3 5 ok pool-1-thread-2 3 ok pool-1-thread-3 4 ok
说明
任务1和任务2分配给线程1和线程2;
到任务3和任务4的时候,核心线程已有任务,将任务3和任务4分配到阻塞队列;
到任务5,由于核心线程和阻塞队列都耗尽,开始创建额外线程,并将任务5分配给新创建的线程;
到任务6,由于线程数已达最大,进行拒绝策略:抛出异常(报错),中断主线程。
一些计算题
线程池的最大承载数量 = 最大线程数 + 队列长度
四个拒绝策略 ThreadPoolExecutor类的四个内部类
类名
介绍(若线程池已达最大承载,在执行execute()方法的线程会发生)
ThreadPoolExecutor.AbortPolicy
抛出异常java.util.concurrent.RejectedExecutionException
ThreadPoolExecutor.CallerRunsPolicy
任务直接在主线程执行。“你哪来的?回去!”
ThreadPoolExecutor.DiscardPolicy
不抛出异常,丢弃任务。“马什么梅?”
ThreadPoolExecutor.DiscardOldestPolicy
将等待队列里的最早来的任务丢弃,然后将新来的任务放进等待队列里。
代码
AbortPolicy
在上面
CallerRunsPolicy
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public class Demo20_ThreadPoolExecutor { public static void main (String[] args) { ExecutorService threadPool = new ThreadPoolExecutor ( 2 , 3 , 3 , TimeUnit.SECONDS, new LinkedBlockingQueue <>(2 ), Executors.defaultThreadFactory(), new ThreadPoolExecutor .CallerRunsPolicy() ); try { for (int i = 1 ; i <= 6 ; i++) { final int id = i; threadPool.execute(()->{ System.out.println(Thread.currentThread().getName()+" " +id+" ok" ); }); } } catch (Exception e) { e.printStackTrace(); } finally { threadPool.shutdown(); } } }
1 2 3 4 5 6 pool-1-thread-1 1 ok pool-1-thread-3 5 ok main 6 ok pool-1-thread-2 2 ok pool-1-thread-1 3 ok pool-1-thread-3 4 ok
任务6直接从main线程执行,未进入线程池。
DiscardPolicy
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public class Demo20_ThreadPoolExecutor { public static void main (String[] args) { ExecutorService threadPool = new ThreadPoolExecutor ( 2 , 3 , 3 , TimeUnit.SECONDS, new LinkedBlockingQueue <>(2 ), Executors.defaultThreadFactory(), new ThreadPoolExecutor .DiscardPolicy() ); try { for (int i = 1 ; i <= 10 ; i++) { final int id = i; threadPool.execute(()->{ System.out.println(Thread.currentThread().getName()+" " +id+" ok" ); }); } } catch (Exception e) { e.printStackTrace(); } finally { threadPool.shutdown(); } } }
1 2 3 4 5 pool-1-thread-1 1 ok pool-1-thread-3 5 ok pool-1-thread-1 3 ok pool-1-thread-2 2 ok pool-1-thread-3 4 ok
创建了10个任务,实际运行了前5个,另外5个被抛弃。
DiscardOldestPolicy
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 public class Demo20_ThreadPoolExecutor { public static void main (String[] args) { ExecutorService threadPool = new ThreadPoolExecutor ( 2 , 3 , 3 , TimeUnit.SECONDS, new LinkedBlockingQueue <>(2 ), Executors.defaultThreadFactory(), new ThreadPoolExecutor .DiscardOldestPolicy() ); try { for (int i = 1 ; i <= 10 ; i++) { final int id = i; threadPool.execute(()->{ try { TimeUnit.SECONDS.sleep(1 ); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()+" " +id+" ok" ); }); } } catch (Exception e) { e.printStackTrace(); } finally { threadPool.shutdown(); } } }
1 2 3 4 5 pool-1-thread-1 1 ok pool-1-thread-3 5 ok pool-1-thread-2 2 ok pool-1-thread-1 9 ok pool-1-thread-3 10 ok
任务1,2,5,9,10正常执行。
最大线程数在理论上如何定义
CPU密集型。
CPU密集型: CPU密集型也叫计算密集型 ,指的是系统的硬盘,内存性能要比CPU好很多,大部分情况是CPU满载,I/O占用率不高。
CPU密集 :任务需要大量的运算 ,没有阻塞 ,CPU一直全速运行。
这类多出现在在开发中的一些业务复杂计算 和逻辑处理过程中 。
CPU密集任务只有在真正的多核CPU上才能得到加速,而在单核CPU上,不论开几个模拟的线程,任务都不可能得到加速。
CPU密集时,线程数一般只需要设置为CPU核心数的线程个数就可。 计算密集型任务的特点是要进行大量的计算,消耗CPU资源。这种计算密集型任务虽然也可以用多任务完成,但是任务越多,花在任务切换的时间就越多,CPU执行任务的效率就越低,所以,要最高效地利用CPU,计算密集型任务同时进行的数量应当等于CPU的核心数,避免线程或进程的切换。
Java获取CPU核心数:Runtime().getRuntime().availableProcessors()
IO密集型。
IO密集型 指的是系统的CPU性能相对硬盘,内存要好很多 。此时,系统运作大部分的状况是,CPU在等I/O(硬盘/内存)的读/写操作,CPU负载并不高。
CPU使用率较低,程序中会存在大量的I/O操作占用时间,导致线程的空余时间很多,通常就需要开CPU核心数数倍的线程。
计算公式:IO密集型核心线程数 = CPU核数 / ( 1 - 阻塞系数 )
阻塞系数 = 阻塞时间 / (阻塞时间 + 计算时间) 一般取0.8或0.9
当线程进行I/O操作CPU空闲时,启动其他线程继续使用CPU,以提高CPU的利用率。
在实际业务中的线程数选择 实际业务往往与理论值有所偏差,依据实际情况选择线程数。
“粗调,微调”
粗调
IO密集型两个公式
至2022.9.2为止,目前的我了解到的线程数选择方式有两个公式,两个选一个就好,我认为后者更有道理,
线程数 = 2 x 核心数 + 1
核心线程数 = CPU核数 / ( 1 - 阻塞系数 )
阻塞系数 = 阻塞时间 / (阻塞时间 + 计算时间) 一般取0.8或0.9
CPU密集型
线程数 = 核心数 + 1
微调
公式面向的是普遍情况,但业务总是千奇百态 。
先粗调获得基本线程数,然后使用数据分析(Java可以用jstack),然后根据实际情况调整线程数。
运行程序,采集各种情况下的压力数据,统计分析得到结果。
若CPU多数线程处于等待状态,则说明线程数够用;
若CPU多数线程处于运行状态,可以适当调高线程数量。
一个新奇的想法: 动态线程数,依据情况自动调整线程数。
参考资料 https://www.bilibili.com/video/BV1B7411L7tE
https://zhuanlan.zhihu.com/p/433310450
https://blog.csdn.net/youanyyou/article/details/78990156
https://www.cnblogs.com/ming-blogs/p/10897242.html
https://blog.csdn.net/GmwEnterprise/article/details/103839295