Day10 多线程进阶 6 四大函数式接口 Stream流式计算 ForkJoin分支合并 CompletableFuture异步回调
四大函数式接口
函数式接口
只有一个方法的接口。
1 2 3 4 5 6 7 8 9 10 public interface Callable <V> { V call () throws Exception; }
Function函数式接口 Function接口可以将一个函数变成一个变量。
Function可以用来写工具类,泛型参数里,第一个为传入类型,第二个为返回类型。
源码
使用
基本用法(不常用的用法):
1 2 3 4 5 6 7 8 9 10 11 12 13 public class Demo21_Function { public static void main (String[] args) { Function<Integer,Integer> function_plus1 = new Function <Integer,Integer>() { @Override public Integer apply (Integer o) { return o+1 ; } }; System.out.println(function_plus1.apply(123 )); } }
常用用法:
1 2 3 4 5 6 7 8 public class Demo21_Function { public static void main (String[] args) { Function<Integer,Integer> function_plus1 = (in)->{return (int )in+1 ;}; System.out.println(function_plus1.apply(123 )); } }
创建一个Function的引用,然后将一个lambda表达式的函数传值给引用;
调用使用方法apply(参数)
,返回值为lambda表达式的返回值。
Predicate断定型接口 不同于Function,Predicate定义的函数只能返回boolean
值,输入参数不限。
代码
1 2 3 4 5 6 7 8 9 public class Demo22_Predicate { public static void main (String[] args) { Predicate<String> predicate_isEmpty = (string)->{return string.isEmpty();}; System.out.println(predicate_isEmpty.test("asdqwe" )); } }
1 2 3 false Process finished with exit code 0
Consumer消费型接口 功能上与函数式接口基本一致。
特点:只有输入,没有返回值。
代码
1 2 3 4 5 6 7 8 public class Demo23_Consumer { public static void main (String[] args) { Consumer<String> consumer = (str)->{System.out.println(str);}; consumer.accept("张三李四" ); } }
使用方法accept(参数)
使用该接口的实现对象。
Supplier供给型接口 “功能上与函数式接口基本一致。”
特点:没有输入,只有返回值。
源码
1 2 3 4 5 6 7 8 9 10 @FunctionalInterface public interface Supplier <T> { T get () ; }
代码/使用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public class Demo24_Supplies { public static void main (String[] args) { Supplier<Integer> supplier = new Supplier <Integer>() { @Override public Integer get () { System.out.println("get!" ); return 1024 ; } }; System.out.println(supplier.get()); } }
使用get()
方法执行函数。
代码 简化
1 2 3 4 5 6 7 public class Demo24_Supplies { public static void main (String[] args) { Supplier<Integer> supplier = ()->{return 1024 ;}; System.out.println(supplier.get()); } }
Stream流式计算 java.util.stream
大数据时代,无非就是:存储 + 计算
比如:MySQL、Java集合等本质是用来存储东西的。
Stream流式计算 就是用来计算 。
一个题目
用一行代码实现:
现在有5个用户,要求筛选出:
ID必须是偶数
年龄必须大于23岁
用户名转为大写字母
用户名字母倒着排序
只输出一个用户
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 public class Demo26_Stream 流式计算引入_一道题 { public static void main (String[] args) { User u1 = new User (1 ,"a" ,21 ); User u2 = new User (2 ,"b" ,22 ); User u3 = new User (3 ,"c" ,23 ); User u4 = new User (4 ,"d" ,24 ); User u5 = new User (6 ,"e" ,25 ); List<User> userList = Arrays.asList(u1,u2,u3,u4,u5); userList.stream() .filter(u->{return u.getId()%2 ==0 ;}) .filter(u->{return u.getAge()>23 ;}) .map(u->{return u.getName().toUpperCase();}) .sorted((uu1,uu2)->{return uu2.compareTo(uu1);}) .limit(1 ) .forEach(System.out::println); } } class User { int id; String name; int age; public User (int id, String name, int age) { this .id = id; this .name = name; this .age = age; } public int getId () { return id; } public void setId (int id) { this .id = id; } public String getName () { return name; } public void setName (String name) { this .name = name; } public int getAge () { return age; } public void setAge (int age) { this .age = age; } @Override public String toString () { return "User{" + "id=" + id + ", name='" + name + '\'' + ", age=" + age + '}' ; } }
好像没有说明Stream流式计算是什么?!
dig hole…
ForkJoin 分支合并 起源于JDK1.7
介绍
是一个分而治之 的任务框架。
使用场景:一个任务需要多线程执行,分割成很多块计算的时候,使用ForkJoin方法。
ForkJoin的作用:对于大数据量计算,用于提高效率。
特点
动态规范 :和分而治之不同的是,任务分割的每个小任务之间互相联系。
工作密取(工作窃取): 分割了每个任务之后,若某个线程提前完成了任务,就会去其他线程偷取任务来完成,加快执行效率。同时,第一个分配的线程从队列的头部拿取任务,当完成任务的线程后去其他队列拿任务时候是从尾部拿任务。
使用
Hole…
异步回调 类似ajax,将Java的线程创建过程包装,使创建线程更加方便。缺点是远离了底层,减少了手动优化的机会。
CompletableFuture类 API
API
说明
static runAsync(Runnable runnable)
创建一个无返回值的异步回调。
static supplyAsync(Supplier supplier)
创建一个有返回值的异步回调
whenComplete(BiConsumer<? super T, ? super Throwable> action)
为CompletableFuture对象的异步回调添加运行结束 时要执行的操作。T为运行成功返回的结果,运行失败为null。Throwable为运行失败的错误信息,成功为null(运行失败指抛出异常或其他错误)。action为BiConsumer对象,用于使用参数和添加回调方法。返回值为CompletableFuture对象,可以使用链式编程。
exceptionally(Function<Throwable, ? extends T> fn)
为CompletableFuture对象的异步回调添加出现异常 时要执行的操作。T为抛出的异常。fn为要执行的操作。
get()
使用runAsync()或supplyAsync()方法后,会返回一个CompletableFuture对象,可以使用get()方法获取异步回调的返回值。若线程未运行结束,则会阻塞当前线程等待运行结束。runAsync()的返回值为Void(无返回值),supplyAsync()返回值由输入指定。
…
…
代码
runAsync()和supplyAsyn()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public class Demo28_Future { public static void main (String[] args) throws ExecutionException, InterruptedException { CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(()->{ try { TimeUnit.SECONDS.sleep(2 ); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()+" ok" ); }); System.out.println("等待runAsync异步请求的结果...." ); completableFuture.get(); CompletableFuture<String> completableFuture2 = CompletableFuture.supplyAsync(()->{ System.out.println(Thread.currentThread().getName()+" supplyAsync异步回调" ); return "今天吃Obsidian" ; }); System.out.println("等待supplyAsync异步请求的结果....." ); System.out.println("supplyAsync异步请求的结果:" +completableFuture2.get()); } }
supplyAsync()下的whenComplete()和exceptionally()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 public class Demo28_Future2 { public static void main (String[] args) throws ExecutionException, InterruptedException { CompletableFuture<String> completableFuture2 = CompletableFuture.supplyAsync(()->{ System.out.println(Thread.currentThread().getName()+" supplyAsync异步回调" ); int t = 10 /0 ; return "今天吃Obsidian" ; }); completableFuture2.whenComplete((t,u)->{ System.out.println( "事件完成:" + t+ " " +u ); }).exceptionally((e)->{ System.out.println(e); return "今天不吃Obsidian." ; }); System.out.println("等待supplyAsync异步请求的结果....." ); System.out.println("supplyAsync异步请求的结果:" +completableFuture2.get()); } }
其他 forEach()
使用代码:
1 2 3 4 List<Integer> list = Arrays.asList(1 ,2 ,3 ,4 ,5 ,6 ); list.forEach((d)->{ System.out.print(d); });
输出:
lambda代码简化 原本:
1 2 3 4 5 6 Consumer<String> consumer = new Consumer <String>() { @Override public void accept (String s) { System.out.println(s); } };
简化1:
1 Consumer<String> consumer = (str)->{System.out.println(str);};
简化2:
1 Consumer<String> consumer = System.out::println;
当lambda代码块中只有一个方法 ,且方法的参数和lambda表达式的参数一致 ,才可以使用。
枚举enum 1 2 3 4 5 6 7 8 9 public class Demo25_ 枚举 { public static void main (String[] args) { Animal animal = Animal.cat; System.out.println(animal); } } enum Animal { pig,cat,dog }
三种方法的求和计算
代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 public class Demo27_ 求和计算三种方法 { public static void main (String[] args) throws Exception { System.out.println("sum=" +ProgramTime.get(()->{ long sum = 0L ; for (long i = 1 ; i <= 9_0000_0000 ; i++) { sum += i; } return sum; })); System.out.println("===================" ); TimeUnit.SECONDS.sleep(1 ); System.out.println("sum=" +ProgramTime.get(()->{ ForkJoinPool forkJoinPool = new ForkJoinPool (); ForkJoinTask<Long> task = new ForkJoinDemo (0l ,9_0000_0000l ); ForkJoinTask<Long> sumbit = forkJoinPool.submit(task); return sumbit.get(); })); System.out.println("===================" ); TimeUnit.SECONDS.sleep(1 ); System.out.println("sum=" +ProgramTime.get(()->{ return LongStream.rangeClosed(0l ,9_0000_0000l ) .parallel() .reduce(0 ,Long::sum); })); System.out.println("===================" ); } } class ProgramTime <T>{ public static long get (Callable<Long> fun) throws Exception { long rs; long start = System.currentTimeMillis(); rs = fun.call(); long end = System.currentTimeMillis(); System.out.println("执行了时间:" +(end-start)); return rs; } } class ForkJoinDemo extends RecursiveTask <Long> { private Long start; private Long end; private Long temp = 10000L ; public ForkJoinDemo (Long start, Long end) { this .start = start; this .end = end; } @Override protected Long compute () { if (end - start < temp) { long sum = 0L ; for (long i = start; i <= end; i++) { sum += i; } return sum; } else { long mid = (start + end) / 2 ; ForkJoinDemo task1 = new ForkJoinDemo (start, mid); task1.fork(); ForkJoinDemo task2 = new ForkJoinDemo ((mid + 1 ), end); task2.fork(); return task1.join() + task2.join(); } } }
输出
1 2 3 4 5 6 7 8 9 10 11 12 执行了时间:434 sum=405000000450000000 =================== 执行了时间:253 sum=405000000450000000 =================== 执行了时间:128 sum=405000000450000000 =================== Process finished with exit code 0