Day10 多线程进阶 6 四大函数式接口 Stream流式计算 ForkJoin分支合并 CompletableFuture异步回调
uwupu 啦啦啦啦啦

四大函数式接口

函数式接口

只有一个方法的接口。

1
2
3
4
5
6
7
8
9
10
//例 Callable接口
public interface Callable<V> {
/**
* Computes a result, or throws an exception if unable to do so.
*
* @return computed result
* @throws Exception if unable to compute a result
*/
V call() throws Exception;
}

Function函数式接口

Function接口可以将一个函数变成一个变量。

Function可以用来写工具类,泛型参数里,第一个为传入类型,第二个为返回类型。

源码

image

使用

基本用法(不常用的用法):

1
2
3
4
5
6
7
8
9
10
11
12
13
public class Demo21_Function {
public static void main(String[] args) {
//Function可以用来写工具类,泛型参数里,第一个为传入类型,第二个为返回类型。
//这里写一个工具类用于演示:返回输入整数+1。
Function<Integer,Integer> function_plus1 = new Function<Integer,Integer>() {
@Override
public Integer apply(Integer o) {
return o+1;
}
};
System.out.println(function_plus1.apply(123));
}
}

常用用法:

1
2
3
4
5
6
7
8
public class Demo21_Function {
public static void main(String[] args) {
//Function可以用来写工具类,泛型参数里,第一个为传入类型,第二个为返回类型。
//这里写一个工具类用于演示:返回输入整数+1。
Function<Integer,Integer> function_plus1 = (in)->{return (int)in+1;};
System.out.println(function_plus1.apply(123));
}
}

创建一个Function的引用,然后将一个lambda表达式的函数传值给引用;

调用使用方法apply(参数),返回值为lambda表达式的返回值。

Predicate断定型接口

不同于Function,Predicate定义的函数只能返回boolean值,输入参数不限。

代码

1
2
3
4
5
6
7
8
9
public class Demo22_Predicate {
public static void main(String[] args) {
//Predicate也是一个函数式接口,不同的是:Predicate创建的函数只能返回boolean类型。
//泛型参数为输入参数类型。
//当前代码:判断字符串是否为空。
Predicate<String> predicate_isEmpty = (string)->{return string.isEmpty();};
System.out.println(predicate_isEmpty.test("asdqwe"));
}
}
1
2
3
false

Process finished with exit code 0

Consumer消费型接口

功能上与函数式接口基本一致。

特点:只有输入,没有返回值。

代码

1
2
3
4
5
6
7
8
public class Demo23_Consumer {
public static void main(String[] args) {
//Consumer消费型接口,只有输入没有返回值。
//这里写一个:输出字符串。
Consumer<String> consumer = (str)->{System.out.println(str);};
consumer.accept("张三李四");
}
}

使用方法accept(参数)使用该接口的实现对象。

Supplier供给型接口

“功能上与函数式接口基本一致。”

特点:没有输入,只有返回值。

源码

1
2
3
4
5
6
7
8
9
10
@FunctionalInterface
public interface Supplier<T> {

/**
* Gets a result.
*
* @return a result
*/
T get();//没有参数,只有返回值
}

代码/使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* Supplier供给型接口:没有参数,只有返回值。
*/
public class Demo24_Supplies {
public static void main(String[] args) {
//这里写一个:只会返回1024的函数。
Supplier<Integer> supplier = new Supplier<Integer>() {
@Override
public Integer get() {
System.out.println("get!");
return 1024;
}
};
System.out.println(supplier.get());

}
}

使用get()方法执行函数。

代码 简化

1
2
3
4
5
6
7
public class Demo24_Supplies {
public static void main(String[] args) {
//这里写一个:只会返回1024的函数。
Supplier<Integer> supplier = ()->{return 1024;};
System.out.println(supplier.get());
}
}

Stream流式计算

java.util.stream

大数据时代,无非就是:存储 + 计算

比如:MySQL、Java集合等本质是用来存储东西的。

Stream流式计算就是用来计算

一个题目

用一行代码实现:

现在有5个用户,要求筛选出:

  1. ID必须是偶数
  2. 年龄必须大于23岁
  3. 用户名转为大写字母
  4. 用户名字母倒着排序
  5. 只输出一个用户
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
public class Demo26_Stream流式计算引入_一道题 {
/**
* 用一行代码实现:
* 现在有5个用户,要求筛选出:
* 1. ID必须是偶数
* 2. 年龄必须大于23岁
* 3. 用户名转为大写字母
* 4. 用户名字母倒着排序
* 5. 只输出一个用户
*/
public static void main(String[] args) {
User u1 = new User(1,"a",21);
User u2 = new User(2,"b",22);
User u3 = new User(3,"c",23);
User u4 = new User(4,"d",24);
User u5 = new User(6,"e",25);

//集合用于存储
List<User> userList = Arrays.asList(u1,u2,u3,u4,u5);

//流用于计算
userList.stream()
.filter(u->{return u.getId()%2==0;})//ID为偶数
.filter(u->{return u.getAge()>23;})//年龄大于23
.map(u->{return u.getName().toUpperCase();})//名字大写
.sorted((uu1,uu2)->{return uu2.compareTo(uu1);})//逆序
.limit(1)//输出1个
.forEach(System.out::println);
//Lambda表达式、链式编程、函数式接口、Stream流式计算
}

}
class User{
int id;
String name;
int age;

public User(int id, String name, int age) {
this.id = id;
this.name = name;
this.age = age;
}

public int getId() {
return id;
}

public void setId(int id) {
this.id = id;
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public int getAge() {
return age;
}

public void setAge(int age) {
this.age = age;
}

@Override
public String toString() {
return "User{" +
"id=" + id +
", name='" + name + '\'' +
", age=" + age +
'}';
}
}

好像没有说明Stream流式计算是什么?!

dig hole…

ForkJoin 分支合并

起源于JDK1.7

介绍

是一个分而治之的任务框架。

使用场景:一个任务需要多线程执行,分割成很多块计算的时候,使用ForkJoin方法。

ForkJoin的作用:对于大数据量计算,用于提高效率。

特点

动态规范:和分而治之不同的是,任务分割的每个小任务之间互相联系。

工作密取(工作窃取):分割了每个任务之后,若某个线程提前完成了任务,就会去其他线程偷取任务来完成,加快执行效率。同时,第一个分配的线程从队列的头部拿取任务,当完成任务的线程后去其他队列拿任务时候是从尾部拿任务。

使用

Hole…

异步回调

类似ajax,将Java的线程创建过程包装,使创建线程更加方便。缺点是远离了底层,减少了手动优化的机会。

CompletableFuture类

API

API 说明
static runAsync(Runnable runnable) 创建一个无返回值的异步回调。
static supplyAsync(Supplier supplier) 创建一个有返回值的异步回调
whenComplete(BiConsumer<? super T, ? super Throwable> action) 为CompletableFuture对象的异步回调添加运行结束时要执行的操作。T为运行成功返回的结果,运行失败为null。Throwable为运行失败的错误信息,成功为null(运行失败指抛出异常或其他错误)。action为BiConsumer对象,用于使用参数和添加回调方法。返回值为CompletableFuture对象,可以使用链式编程。
exceptionally(Function<Throwable, ? extends T> fn) 为CompletableFuture对象的异步回调添加出现异常时要执行的操作。T为抛出的异常。fn为要执行的操作。
get() 使用runAsync()或supplyAsync()方法后,会返回一个CompletableFuture对象,可以使用get()方法获取异步回调的返回值。若线程未运行结束,则会阻塞当前线程等待运行结束。runAsync()的返回值为Void(无返回值),supplyAsync()返回值由输入指定。

代码

runAsync()和supplyAsyn()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**
* 异步调用:CompletableFuture
*/
public class Demo28_Future {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 发起一个请求
// runAsync() 没有返回值的异步回调
CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(()->{
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+" ok");
});
System.out.println("等待runAsync异步请求的结果....");
completableFuture.get();//获取
//supplyAsync 有返回值的异步回调
CompletableFuture<String> completableFuture2 = CompletableFuture.supplyAsync(()->{
System.out.println(Thread.currentThread().getName()+" supplyAsync异步回调");
return "今天吃Obsidian";
});
System.out.println("等待supplyAsync异步请求的结果.....");
System.out.println("supplyAsync异步请求的结果:"+completableFuture2.get());
}
}

supplyAsync()下的whenComplete()和exceptionally()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
* 异步调用:CompletableFuture
*/
public class Demo28_Future2 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//supplyAsync 有返回值的异步回调,包括成功、失败的回调。
CompletableFuture<String> completableFuture2 = CompletableFuture.supplyAsync(()->{
System.out.println(Thread.currentThread().getName()+" supplyAsync异步回调");
int t = 10/0;
return "今天吃Obsidian";
});
completableFuture2.whenComplete((t,u)->{
System.out.println(
"事件完成:" +
t+ //成功时,为正确的返回值,失败时为null
" "+u //失败时,为错误信息,成功时为null
);
}).exceptionally((e)->{
System.out.println(e);//异常时,打印异常信息
return "今天不吃Obsidian.";
});
System.out.println("等待supplyAsync异步请求的结果.....");
System.out.println("supplyAsync异步请求的结果:"+completableFuture2.get());
}
}

其他

forEach()使用

代码:

1
2
3
4
List<Integer> list = Arrays.asList(1,2,3,4,5,6);
list.forEach((d)->{
System.out.print(d);
});

输出:

1
123456

lambda代码简化

原本:

1
2
3
4
5
6
Consumer<String> consumer = new Consumer<String>() {
@Override
public void accept(String s) {
System.out.println(s);
}
};

简化1:

1
Consumer<String> consumer = (str)->{System.out.println(str);};

简化2:

1
Consumer<String> consumer = System.out::println;

当lambda代码块中只有一个方法,且方法的参数和lambda表达式的参数一致,才可以使用。

枚举enum

1
2
3
4
5
6
7
8
9
public class Demo25_枚举 {
public static void main(String[] args) {
Animal animal = Animal.cat;
System.out.println(animal);
}
}
enum Animal{
pig,cat,dog
}

三种方法的求和计算

代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
/**
* 任务:求和计算
* 方法:
* 1. 1000000000.for
* 2. ForkJoin
* 3. Stream并行流
*/
public class Demo27_求和计算三种方法 {
public static void main(String[] args) throws Exception {
//基本方法
System.out.println("sum="+ProgramTime.get(()->{
long sum = 0L;
for (long i = 1; i <= 9_0000_0000; i++) {
sum += i;
}
return sum;
}));
System.out.println("===================");
TimeUnit.SECONDS.sleep(1);
//ForkJoin方法
System.out.println("sum="+ProgramTime.get(()->{
ForkJoinPool forkJoinPool = new ForkJoinPool();
ForkJoinTask<Long> task = new ForkJoinDemo(0l,9_0000_0000l);
ForkJoinTask<Long> sumbit = forkJoinPool.submit(task);
return sumbit.get();
}));
System.out.println("===================");
TimeUnit.SECONDS.sleep(1);
//Stream并行流方法
System.out.println("sum="+ProgramTime.get(()->{
return LongStream.rangeClosed(0l,9_0000_0000l)
.parallel()
.reduce(0,Long::sum);
}));
System.out.println("===================");
}
}

class ProgramTime<T>{
public static long get(Callable<Long> fun) throws Exception {
long rs;
long start = System.currentTimeMillis();
rs = fun.call();
long end = System.currentTimeMillis();
System.out.println("执行了时间:"+(end-start));
return rs;
}
}
class ForkJoinDemo extends RecursiveTask<Long> {
private Long start;
private Long end;

private Long temp = 10000L;//临界值

public ForkJoinDemo(Long start, Long end) {
this.start = start;
this.end = end;
}

@Override
protected Long compute() {
if (end - start < temp) {//若计算范围小于10000,就用普通方式计算
long sum = 0L;
for (long i = start; i <= end; i++) {
sum += i;
}
// System.out.println(sum);
return sum;
} else {
//ForkJoin 分支合并计算
long mid = (start + end) / 2;//中间值
ForkJoinDemo task1 = new ForkJoinDemo(start, mid);
task1.fork();//拆分任务,把任务放入线程队列
// System.out.println("qwe");
ForkJoinDemo task2 = new ForkJoinDemo((mid + 1), end);
task2.fork();//后半部分
// System.out.println("asd");
return task1.join() + task2.join();

}
}
}

输出

1
2
3
4
5
6
7
8
9
10
11
12
执行了时间:434
sum=405000000450000000
===================
执行了时间:253
sum=405000000450000000
===================
执行了时间:128
sum=405000000450000000
===================

Process finished with exit code 0

 评论