Day9 多线程进阶 5 读写锁 阻塞队列 同步队列 线程池
uwupu 啦啦啦啦啦

读写锁 ReentrantReadWriteLock

ReentrantReadWriteLock维护一对关联的Locks,一个用于只读操作,一个用于写入操作。

读操作可以由多个线程进行,写操作只能由一个线程操作。

当写操作由一个线程执行时,其他线程不能进行读操作,也不能进行写操作。

当读操作被执行时,其他线程不能进行写操作。

读锁和写锁互斥。

API

方法名 说明
writeLock().lock() 锁住写锁。若写锁读锁本来已被锁住,则阻塞,直到写锁读锁被释放。阻塞结束后,锁住写锁,然后继续运行。
writeLock().unlock() 释放写锁。
readLock().lock() 锁住读锁。若写锁被锁住,则该方法阻塞,直到写锁被释放。阻塞结束后,锁住读锁,然后继续运行。
readLock().unlock() 释放读锁。

代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
/**
* ReadWriteLock
*/
public class Demo16_ReadWriteLock {
public static void main(String[] args) {
MyCache myCache = new MyCache();
//写入
for (int i = 1; i <= 5; i++) {
final int temp = i;
new Thread(()->{
myCache.put(String.valueOf(temp),temp+"");
},String.valueOf(i)).start();
}
//读取
for (int i = 1; i <= 5; i++) {
final int temp = i;
new Thread(()->{
myCache.get(String.valueOf(temp));
},String.valueOf(i)).start();
}
}
}
//自定义缓存
class MyCache{
private volatile Map<String,Object> map = new HashMap<>();
private ReadWriteLock lock = new ReentrantReadWriteLock();
//存 写入的时候,只希望有一个线程写
public void put(String key,Object value){
lock.writeLock().lock();
try {
System.out.println(Thread.currentThread().getName()+" 写入开始");
map.put(key,value);
System.out.println(Thread.currentThread().getName()+" 写入结束");
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.writeLock().unlock();
}
}
//取 读,可以有多个线程读
public Object get(String key){
lock.readLock().lock();
Object o = null;
try {
System.out.println(Thread.currentThread().getName()+" 读取开始");
o = map.get(key);
System.out.println(Thread.currentThread().getName()+" 读取结束");
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.readLock().unlock();
}
return o;
}
}

解释

  • 写入时,用lock.writeLock().lock()锁住写锁;写入结束后,用lock.writeLock().unlock()释放写锁。
  • 读取时,用lock.readLock().lock()锁住读锁;读取结束后,用lock.readLock().unlock()释放读锁。

阻塞队列 BlockingQueue

引入

Collection结构 和 队列类型介绍

image

image

Queue队列分三种:

  • AbstractQueue非阻塞队列
  • Deque双端队列
  • BlockingQueue阻塞队列

队列

FIFO 先进先出

  • 写入:若队列满了,就必须等待消费;
  • 取:若队列是空的,就必须等待生产;

分类

image

阻塞队列分两种:LinkedBlockingQueue和ArrayBlockingQueue。

使用场景

多线程并发处理,线程池。

API

分为四组:抛出异常,不抛出异常有返回值,阻塞等待,超时等待。

方式 抛出异常的API 有返回值不抛出异常的API 阻塞等待的API 超时等待的API
添加 add(对象)
无返回值,失败则抛出异常
offer(对象)
有返回值,boolean
put(对象) offer(对象,超时时间, TimeUnit);
例:
offer(“a”,2,TimeUnit.SECOND)
即:添加”a”,超时2秒。
移除 remove()
无返回值,失败则抛出异常
poll()
有返回值,null
take() poll(超时时间,TimeUnit)
例:
offer(2,TimeUnit.SECOND)
即:移除并返回,若队列空,则阻塞,超时等待2秒。
返回/检测 队首元素 element() peek() / /

代码

抛出异常

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
    public static void test1_Exception(){
ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(2);//容量为2的阻塞队列

//添加
System.out.println(queue.add("a"));
System.out.println(queue.add("b"));
// System.out.println(queue.add("c"));//让阻塞队列超出容量
//异常 java.lang.IllegalStateException
/**
* 输出:
* Exception in thread "main" java.lang.IllegalStateException: Queue full // 队列满
* at java.util.AbstractQueue.add(AbstractQueue.java:98)
* at java.util.concurrent.ArrayBlockingQueue.add(ArrayBlockingQueue.java:312)
* at com.yn.Demo17_BlockingQueue.test1_Exception(Demo17_BlockingQueue.java:19)
* at com.yn.Demo17_BlockingQueue.main(Demo17_BlockingQueue.java:9)
* true
* true
*/

System.out.println(queue.remove());
System.out.println(queue.remove());
System.out.println(queue.remove());//若队列为空
//异常:java.util.NoSuchElementException 没有元素异常
/**
* 输出内容:
* a
* b
* Exception in thread "main" java.util.NoSuchElementException
* at java.util.AbstractQueue.remove(AbstractQueue.java:117)
* at com.yn.Demo17_BlockingQueue.test1_Exception(Demo17_BlockingQueue.java:35)
* at com.yn.Demo17_BlockingQueue.main(Demo17_BlockingQueue.java:9)
*
* Process finished with exit code 1
*/
}

有返回值,不抛出异常

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public static void test2_NoException(){
ArrayBlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(2);

//offer()
System.out.println(blockingQueue.offer("a"));
System.out.println(blockingQueue.offer("b"));
System.out.println(blockingQueue.offer("c"));//返回boolean,不抛出异常
/**
* true
* true
* false
*/
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());//若队列为空,则返回null
/**
* a
* b
* null
*/
}

等待 阻塞

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* 等待 阻塞
*/
public static void test3_blocking() throws InterruptedException {
ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(2);
queue.put("a");//进入队列
queue.put("b");
queue.put("c");//队列已满,阻塞线程

System.out.println(queue.take());//从队列取出
System.out.println(queue.take());
System.out.println(queue.take());//没有元素 阻塞
}

等待阻塞 超时

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* 等待 超时阻塞
*/
public static void test4_blocking() throws InterruptedException {
ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(2);

queue.offer("a");
queue.offer("b");
queue.offer("c",2, TimeUnit.SECONDS);//队列满,阻塞,超时等待2s。

queue.poll();
queue.poll();
queue.poll(2,TimeUnit.SECONDS);//同理
}

同步队列 SynchronousQueue

容量为1。

API

API 说明
put() 添加元素。若队列中有元素,则阻塞,直到队列为空。
take() 取出元素。若队列中没有元素,则阻塞,直到队列不为空。

代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
/**
* 同步队列
* SynchronousQueue
*/
public class Demo18_SynchronousQueue {
public static void main(String[] args) {
BlockingQueue<String> blockingQueue = new SynchronousQueue<>();// 同步队列
new Thread(()->{
try {
System.out.println(Thread.currentThread().getName()+" put 1");
blockingQueue.put("1");
System.out.println(Thread.currentThread().getName()+" put 2");
blockingQueue.put("2");
System.out.println(Thread.currentThread().getName()+" put 3");
blockingQueue.put("3");
} catch (InterruptedException e) {
e.printStackTrace();
}
},"T1").start();
new Thread(()->{
try {
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName()+" "+blockingQueue.take());
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName()+" "+blockingQueue.take());
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName()+" "+blockingQueue.take());
} catch (Exception e) {
e.printStackTrace();
}

},"T2").start();
/**
* T1 put 1
* T2 1
* T1 put 2
* T2 2
* T1 put 3
* T2 3
*/
}
}

线程池

池化技术。线程池、连接池、内存池、对象池……………………..

三大方法、7大参数、4种拒绝策略

好处

  1. 降低资源的消耗
  2. 提高响应的速度
  3. 方便管理

作用

  1. 线程复用
  2. 可以控制最大并发数
  3. 管理线程

三大方法

Executors工具类有三个方法创建线程池。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// Executors 工具类、3大方法
public class Demo19_ThreadPool {
public static void main(String[] args) {
// ExecutorService threadPool = Executors.newSingleThreadExecutor();//创建一个只有一个线程的线程池
// ExecutorService threadPool = Executors.newFixedThreadPool(5);//创建一个有5个线程的线程池
ExecutorService threadPool = Executors.newCachedThreadPool();//创建一个可伸缩的线程池

try {
for (int i = 0; i < 10; i++) {
threadPool.execute(()->{
System.out.println(Thread.currentThread().getName()+" ok");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
threadPool.shutdown();
}
}
}

Exectors的部分API

API 介绍
newSingleThreadExecutor() 创建一个有一个线程的线程池。
newFixedThreadPool(int) 创建一个指定数量的线程的线程池。
newCachedThreadPool() 创建一个可伸缩的线程池。

七大参数

引入

三个方法的源码分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1, //核心线程数和最大线程数为1
0L, TimeUnit.MILLISECONDS, //
new LinkedBlockingQueue<Runnable>()));
}

public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,//核心线程数和最大线程数为nThreads
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,//核心线程数为0,和最大线程数为Intger.MAX_VALUE
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

分析得知:都来源于ThreadPoolExecutor的构造方法。

ThreadPoolExecutor的构造方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public ThreadPoolExecutor(int corePoolSize, //核心线程池大小
int maximumPoolSize,//最大线程池大小
long keepAliveTime,//超时释放时间
TimeUnit unit,//超时释放时间单位
BlockingQueue<Runnable> workQueue,//阻塞队列
ThreadFactory threadFactory,//线程工厂
RejectedExecutionHandler handler//拒绝策略
) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}

构造方法的7个参数,即:七大参数

ThreadPoolExecutor

构造方法的七个参数

参数名 介绍
int corePoolSize 核心线程数量
int maximumPoolSize 最大线程数量
long keepAliveTime 超时释放时间
TimeUnit unit 超时释放时间单位
BlockingQueue<Runnable> workQueue 阻塞队列,若线程耗尽,则将新进入的任务放进阻塞队列。线程恢复后,依次为队列里的任务分配线程。
ThreadFactory threadFactory 线程工厂
RejectedExecutionHandler handler 拒绝策略

执行策略

  1. 若任务进入,则先分配到核心线程;
  2. 核心线程耗尽则分配到阻塞队列
  3. 阻塞队列已满,则创建额外线程,将新来的任务分配给新创建的线程。(而不是将阻塞队列的任务分配给新创建的线程)
  4. 线程最大数量,且阻塞队列已满,则对新来任务进行拒绝策略
  5. 额外线程空闲时间达到 keepAliveTime x unit,则释放额外线程。

代码

代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class Demo20_ThreadPoolExecutor {
public static void main(String[] args) {
//自定义线程池
ExecutorService threadPool = new ThreadPoolExecutor(
2,//corePoolSize核心线程数
3,//maximumPoolSize最大线程数
3,//keepAliveTime
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(2), //队列
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy()
);

try {
for (int i = 1; i <= 10; i++) {
final int id = i;
System.out.println(Thread.currentThread().getName()+" "+id+" processing...");
threadPool.execute(()->{
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+" "+id+" ok");

});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
threadPool.shutdown();
}
}
}

输出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
main 1 processing...
main 2 processing...
main 3 processing...
main 4 processing...
main 5 processing...
main 6 processing...
java.util.concurrent.RejectedExecutionException: Task com.yn.Demo20_ThreadPoolExecutor$$Lambda$1/1096979270@7cca494b rejected from java.util.concurrent.ThreadPoolExecutor@7ba4f24f[Running, pool size = 3, active threads = 3, queued tasks = 2, completed tasks = 0]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
at com.yn.Demo20_ThreadPoolExecutor.main(Demo20_ThreadPoolExecutor.java:22)
pool-1-thread-2 2 ok
pool-1-thread-1 1 ok
pool-1-thread-3 5 ok
pool-1-thread-2 3 ok
pool-1-thread-3 4 ok

说明

任务1和任务2分配给线程1和线程2;

到任务3和任务4的时候,核心线程已有任务,将任务3和任务4分配到阻塞队列;

到任务5,由于核心线程和阻塞队列都耗尽,开始创建额外线程,并将任务5分配给新创建的线程;

到任务6,由于线程数已达最大,进行拒绝策略:抛出异常(报错),中断主线程。

一些计算题

  1. 线程池的最大承载数量 = 最大线程数 + 队列长度

四个拒绝策略

ThreadPoolExecutor类的四个内部类

类名 介绍(若线程池已达最大承载,在执行execute()方法的线程会发生)
ThreadPoolExecutor.AbortPolicy 抛出异常java.util.concurrent.RejectedExecutionException
ThreadPoolExecutor.CallerRunsPolicy 任务直接在主线程执行。“你哪来的?回去!”
ThreadPoolExecutor.DiscardPolicy 不抛出异常,丢弃任务。“马什么梅?”
ThreadPoolExecutor.DiscardOldestPolicy 将等待队列里的最早来的任务丢弃,然后将新来的任务放进等待队列里。

代码

AbortPolicy

在上面

CallerRunsPolicy

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class Demo20_ThreadPoolExecutor {
public static void main(String[] args) {
//自定义线程池
ExecutorService threadPool = new ThreadPoolExecutor(
2,//corePoolSize核心线程数
3,//maximumPoolSize最大线程数
3,//keepAliveTime
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(2), //队列
Executors.defaultThreadFactory(),
// new ThreadPoolExecutor.AbortPolicy()
new ThreadPoolExecutor.CallerRunsPolicy()
);

try {
for (int i = 1; i <= 6; i++) {
final int id = i;
// System.out.println(Thread.currentThread().getName()+" "+id+" processing...");
threadPool.execute(()->{
System.out.println(Thread.currentThread().getName()+" "+id+" ok");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
threadPool.shutdown();
}
}
}
1
2
3
4
5
6
pool-1-thread-1 1 ok
pool-1-thread-3 5 ok
main 6 ok
pool-1-thread-2 2 ok
pool-1-thread-1 3 ok
pool-1-thread-3 4 ok

任务6直接从main线程执行,未进入线程池。

DiscardPolicy

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class Demo20_ThreadPoolExecutor {
public static void main(String[] args) {
//自定义线程池
ExecutorService threadPool = new ThreadPoolExecutor(
2,//corePoolSize核心线程数
3,//maximumPoolSize最大线程数
3,//keepAliveTime
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(2), //队列
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.DiscardPolicy() // 队列满了,丢掉任务,不会抛出异常
);

try {
for (int i = 1; i <= 10; i++) {
final int id = i;
// System.out.println(Thread.currentThread().getName()+" "+id+" processing...");
threadPool.execute(()->{
System.out.println(Thread.currentThread().getName()+" "+id+" ok");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
threadPool.shutdown();
}
}
}
1
2
3
4
5
pool-1-thread-1 1 ok
pool-1-thread-3 5 ok
pool-1-thread-1 3 ok
pool-1-thread-2 2 ok
pool-1-thread-3 4 ok

创建了10个任务,实际运行了前5个,另外5个被抛弃。

DiscardOldestPolicy

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class Demo20_ThreadPoolExecutor {
public static void main(String[] args) {
//自定义线程池
ExecutorService threadPool = new ThreadPoolExecutor(
2,//corePoolSize核心线程数
3,//maximumPoolSize最大线程数
3,//keepAliveTime
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(2), //队列
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.DiscardOldestPolicy() // 队列满了,丢弃等待队列里最早的任务,将新来的任务加入到队列中
);

try {
for (int i = 1; i <= 10; i++) {
final int id = i;
// System.out.println(Thread.currentThread().getName()+" "+id+" processing...");
threadPool.execute(()->{
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+" "+id+" ok");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
threadPool.shutdown();
}
}
}
1
2
3
4
5
pool-1-thread-1 1 ok
pool-1-thread-3 5 ok
pool-1-thread-2 2 ok
pool-1-thread-1 9 ok
pool-1-thread-3 10 ok

任务1,2,5,9,10正常执行。

最大线程数在理论上如何定义

CPU密集型。

CPU密集型:CPU密集型也叫计算密集型,指的是系统的硬盘,内存性能要比CPU好很多,大部分情况是CPU满载,I/O占用率不高。

CPU密集:任务需要大量的运算没有阻塞,CPU一直全速运行。

这类多出现在在开发中的一些业务复杂计算逻辑处理过程中

CPU密集任务只有在真正的多核CPU上才能得到加速,而在单核CPU上,不论开几个模拟的线程,任务都不可能得到加速。

CPU密集时,线程数一般只需要设置为CPU核心数的线程个数就可。计算密集型任务的特点是要进行大量的计算,消耗CPU资源。这种计算密集型任务虽然也可以用多任务完成,但是任务越多,花在任务切换的时间就越多,CPU执行任务的效率就越低,所以,要最高效地利用CPU,计算密集型任务同时进行的数量应当等于CPU的核心数,避免线程或进程的切换。

Java获取CPU核心数:Runtime().getRuntime().availableProcessors()

IO密集型。

IO密集型指的是系统的CPU性能相对硬盘,内存要好很多。此时,系统运作大部分的状况是,CPU在等I/O(硬盘/内存)的读/写操作,CPU负载并不高。

CPU使用率较低,程序中会存在大量的I/O操作占用时间,导致线程的空余时间很多,通常就需要开CPU核心数数倍的线程。

计算公式:IO密集型核心线程数 = CPU核数 / ( 1 - 阻塞系数 )

阻塞系数 = 阻塞时间 / (阻塞时间 + 计算时间) 一般取0.8或0.9

当线程进行I/O操作CPU空闲时,启动其他线程继续使用CPU,以提高CPU的利用率。

在实际业务中的线程数选择

实际业务往往与理论值有所偏差,依据实际情况选择线程数。

“粗调,微调”

粗调

IO密集型两个公式

至2022.9.2为止,目前的我了解到的线程数选择方式有两个公式,两个选一个就好,我认为后者更有道理,

  1. 线程数 = 2 x 核心数 + 1

  2. 核心线程数 = CPU核数 / ( 1 - 阻塞系数 )

    阻塞系数 = 阻塞时间 / (阻塞时间 + 计算时间) 一般取0.8或0.9

CPU密集型

线程数 = 核心数 + 1

微调

公式面向的是普遍情况,但业务总是千奇百态

先粗调获得基本线程数,然后使用数据分析(Java可以用jstack),然后根据实际情况调整线程数。

运行程序,采集各种情况下的压力数据,统计分析得到结果。

若CPU多数线程处于等待状态,则说明线程数够用;

若CPU多数线程处于运行状态,可以适当调高线程数量。

一个新奇的想法: 动态线程数,依据情况自动调整线程数。

参考资料

https://www.bilibili.com/video/BV1B7411L7tE

https://zhuanlan.zhihu.com/p/433310450

https://blog.csdn.net/youanyyou/article/details/78990156

https://www.cnblogs.com/ming-blogs/p/10897242.html

https://blog.csdn.net/GmwEnterprise/article/details/103839295

 评论