Day9 多线程进阶 4 Callable 和 常用的并发用三个辅助类
uwupu 啦啦啦啦啦

Callable

Callable创建线程的优势

  1. 可以有返回值
  2. 可以抛出异常

FutureTask简述

image

Runnable、FutureTask、Thread和Callable的关系

1
2
new Thread(new FutureTask<V>(new Callable<V>() {...}))
new Thread(new Runnable(){...})

使用Callable

  1. 实现Callable接口创建类MyThread,并创建类的对象thread;
  2. 使用对象thread创建FutureTask适配类对象futureTask
  3. 使用futureTask对象创建Thread对象并start(),即可使用Callable创建线程;
  4. 创建线程后,使用futureTask.get()并cast可以得到返回值。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class Demo12_Callable {
public static void main(String[] args) throws ExecutionException, InterruptedException {
MyThread thread = new MyThread();
FutureTask futureTask = new FutureTask(thread);//适配类
new Thread(futureTask,"A").start();//运行线程
String rs = (String) futureTask.get();//获取Callback的返回结果
System.out.println(rs);//输出返回值
}
}
class MyThread implements Callable<String>{
@Override
public String call() throws Exception {
System.out.println("call()");
return "这是一个返回值";
}
}

其他需要注意的地方

  1. futureTask.get()方法会阻塞线程,因为要等待Callback的返回结果

    解决方法:异步通信

  2. new Thread(futureTask,"A").start();的运行结果会被缓存,提高效率。

    若同时执行两个这个语句,第二个不会输出。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    public class Demo12_Callable {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
    new Thread().start();
    MyThread thread = new MyThread();
    FutureTask futureTask = new FutureTask(thread);//适配类
    new Thread(futureTask,"A").start();//运行线程
    new Thread(futureTask,"A").start();//
    String rs = (String) futureTask.get();//获取Callback的返回结果
    System.out.println(rs);//输出返回值
    }
    }
    class MyThread implements Callable<String>{
    @Override
    public String call() throws Exception {
    System.out.println("call()");
    return "这是一个返回值";
    }
    }

常用的辅助类 CountDownLatch CyclicBarrier Semaphore

CountDownLatch

一个减法计数器

常用方法

方法名 介绍
CountDownLatch(int) 构造方法,参数为计数器的起点。
countDown() 计数器值-1。
await() 阻塞线程,直到计数器归零。

代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class Demo13_CountDownLatch {
public static void main(String[] args) throws InterruptedException {
// 这里设总数为6
CountDownLatch countDownLatch = new CountDownLatch(6);

for (int i = 0; i < 6; i++) {
new Thread(()->{
System.out.println(Thread.currentThread().getName()+" come on.");
countDownLatch.countDown();
},String.valueOf(i)).start();
}
countDownLatch.await();
System.out.println("程序结束");
}
}

countDownLatch初始值为6。创建6个线程,每个线程都会执行一个countDownLatch.countDown();。然后countDownLatch.await();会阻塞线程,直到countDownLatch归零为止。

CyclicBarrier

CountDownLatch相反,CyclicBarrier是一个加法计数器。

用处

用在协调多个线程同步的场合,所有线程等待完成,然后一起继续下一步。

使用介绍

使用new CyclicBarrier(数量,方法)来创建一个CyclicBarrier对象。

构造方法有两个参数,

  1. 第一个参数为一个整数,表示计时器的总数。
  2. 第二个参数为Callback接口,使用lambda表达式填充。表示计时器到达指定数量后要进行的操作。

await()方法用在需要协调的线程中,执行该方法:

  1. 计数器+1,然后阻塞当前线程。当计数器达到指定值,计数器会唤醒线程,线程继续运行。
  2. 为需要协调的多个线程都添加该方法,可以让多个线程协调工作。

代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class Demo14_CyclicBarrier {
public static void main(String[] args) {
/**
* Java的寻宝者:在Java中收集到10个华丽宝箱。
*/
CyclicBarrier cyclicBarrier = new CyclicBarrier(10,()->{
System.out.println("获得成就:Java的寻宝者");
});

for (int i = 0; i < 10; i++) {
final int temp = i;
new Thread(()->{
System.out.println(Thread.currentThread().getName()+"打开了第"+temp+"个华丽宝箱。");
try {
cyclicBarrier.await();
} catch (Exception e) {
e.printStackTrace();
}
},String.valueOf(i)).start();
}
}
}

Java的寻宝者:在Java中打开10个华丽宝箱

创建10个打开宝箱的线程。线程打开宝箱后,进入await状态,计数器+1,直到计数器计数结束,即所有线程都打开宝箱,然后将进入await的线程全部唤醒,继续运行。

获得成就:Java的寻宝者

image

Semaphore

信号量

一般用来表示资源的可用量。

比如,数据库的最大并发连接数为3。

方法

方法名 介绍
Semaphore(permits) 构造方法。参数为信号量的计数器容量。
acquire() 得到。若Semaphore没有剩余的信号量,则阻塞,等待信号量释放。若有,则信号量减一,继续运行。
release() 释放。释放一个Semaphore的信号量,唤醒等待的线程。

代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class Demo15_Semaphore {
public static void main(String[] args) {
//例如:一个数据库的最高并发连接数为3。
Semaphore semaphore = new Semaphore(3);//最高并发连接数
for (int i = 1; i <= 10; i++) {
new Thread(()->{
try {
semaphore.acquire();//获得
System.out.println(Thread.currentThread().getName()+" 得到一个连接权,连接数据库");
TimeUnit.SECONDS.sleep(1);//模拟数据库操作
System.out.println(Thread.currentThread().getName()+" 操作结束,断开数据库。");
} catch (Exception e) {
e.printStackTrace();
} finally {
semaphore.release();//释放
}
},String.valueOf(i)).start();
}
}
}

使用场景

限流,数据库限制并发数。

多个共享资源互斥的使用!并发限流,控制最大的线程数。

 评论