Skip to content

Commit 9e6b408

Browse files
异步,多任务。汇总返回值
1 parent 99522ae commit 9e6b408

File tree

2 files changed

+170
-0
lines changed

2 files changed

+170
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
package com.java8.completefuture;
2+
3+
import org.apache.log4j.LogManager;
4+
import org.apache.log4j.Logger;
5+
6+
import java.util.ArrayList;
7+
import java.util.Collections;
8+
import java.util.List;
9+
import java.util.concurrent.*;
10+
11+
public class OfAllGetDemo {
12+
13+
private static Logger logger= LogManager.getLogger(OfAllGetDemo .class);
14+
15+
public static void main(String[] args) {
16+
allOfGet();
17+
}
18+
19+
20+
/**
21+
* 异步,多任务。汇总返回值
22+
*/
23+
public static void allOfGet() {
24+
//该线程池仅用于示例,实际建议使用自定义的线程池
25+
ExecutorService executorService = Executors.newCachedThreadPool();
26+
String word1 = "word1";
27+
String word2 = "word2";
28+
29+
//线程安全的list,适合写多读少的场景
30+
List<String> strList = Collections.synchronizedList(new ArrayList<>(50));
31+
CompletableFuture<String> completableFuture1 = CompletableFuture.supplyAsync(
32+
() -> getResult(word1, 1000), executorService)
33+
.whenComplete((result, throwable) -> {
34+
//任务完成时执行
35+
if (result != null) {
36+
strList.add(result);
37+
}
38+
if (throwable != null) {
39+
logger.error("completableFuture1 error:{}", throwable);
40+
}
41+
});
42+
43+
CompletableFuture<String> completableFuture2 = CompletableFuture.supplyAsync(
44+
() -> getResult(word2, 1500), executorService)
45+
.whenComplete((result, throwable) ->{
46+
if (result != null) {
47+
strList.add(result);
48+
}
49+
if (throwable != null) {
50+
logger.error("completableFuture2 error:{}", throwable);
51+
}
52+
53+
});
54+
55+
List<CompletableFuture<String>> futureList = new ArrayList<>();
56+
futureList.add(completableFuture1);
57+
futureList.add(completableFuture2);
58+
59+
try {
60+
//多个任务,耗时不超时2秒
61+
CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0]))
62+
.get(2, TimeUnit.SECONDS);
63+
} catch (InterruptedException | ExecutionException e) {
64+
logger.error("CompletableFuture.allOf InterruptedException error.", e);
65+
} catch (TimeoutException e) {
66+
logger.error("CompletableFuture.allOf TimeoutException error.", e);
67+
}
68+
List<String> resultList = new ArrayList<>(strList);
69+
70+
resultList.forEach(System.out::println);
71+
}
72+
73+
74+
private static String getResult(String result, int millis) {
75+
try {
76+
//任务耗时。可以分别设置1000和3000,看未超时和超时的不同结果。
77+
Thread.sleep(millis);
78+
} catch (InterruptedException e) {
79+
logger.error("supplyAsyncGet error.");
80+
}
81+
return result;
82+
}
83+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
package com.java8.completefuture;
2+
3+
import org.apache.log4j.LogManager;
4+
import org.apache.log4j.Logger;
5+
6+
import java.util.ArrayList;
7+
import java.util.Collections;
8+
import java.util.List;
9+
import java.util.concurrent.*;
10+
11+
public class WhenCompleteDemo {
12+
13+
14+
private static Logger logger= LogManager.getLogger(WhenCompleteDemo.class);
15+
16+
public static void main(String[] args) {
17+
whenComplete();
18+
}
19+
20+
21+
public static void whenComplete() {
22+
//该线程池仅用于示例,实际建议使用自定义的线程池
23+
ExecutorService executorService = Executors.newCachedThreadPool();
24+
String word1 = "word1";
25+
String word2 = "word2";
26+
27+
CompletableFuture<String> completableFuture1 = CompletableFuture.supplyAsync(
28+
() -> getResult(word1, 1000), executorService)
29+
.whenComplete((result, throwable) ->{
30+
if (throwable != null) {
31+
logger.error("completableFuture1 error:{}", throwable);
32+
}
33+
});
34+
35+
CompletableFuture<String> completableFuture2 = CompletableFuture.supplyAsync(
36+
() -> getResult(word2, 1500), executorService)
37+
.whenComplete((result, throwable) ->{
38+
if (throwable != null) {
39+
logger.error("completableFuture2 error:{}", throwable);
40+
}
41+
});
42+
43+
List<CompletableFuture<String>> futureList = new ArrayList<>();
44+
futureList.add(completableFuture1);
45+
futureList.add(completableFuture2);
46+
47+
List<String> resultList = new ArrayList<>();
48+
//allOf()聚合所有任务,join阻塞
49+
CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0])).join();
50+
futureList.forEach(f -> resultList.add(getFutureResult(f,2000)));
51+
52+
53+
//以下这段代码,拿不到返回值,值得思考
54+
// CompletableFuture<Void> allFutures = CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0]));
55+
// allFutures.thenRun(() ->
56+
// futureList.forEach(f -> resultList.add( getFutureResult(f))));
57+
58+
resultList.forEach(System.out::println);
59+
}
60+
61+
private static String getResult(String result, int millis) {
62+
try {
63+
//任务耗时。可以分别设置1000和3000,看未超时和超时的不同结果。
64+
Thread.sleep(millis);
65+
} catch (InterruptedException e) {
66+
logger.error("supplyAsyncGet error.");
67+
}
68+
return result;
69+
}
70+
71+
private static String getFutureResult(CompletableFuture<String> completableFuture, int timeout) {
72+
String result = null;
73+
try {
74+
//获取返回值,2秒超时
75+
result = completableFuture.get(timeout, TimeUnit.SECONDS);
76+
} catch (InterruptedException e) {
77+
logger.error("InterruptedException error.", e);
78+
} catch (ExecutionException e) {
79+
logger.error("ExecutionException error.", e);
80+
} catch (TimeoutException e) {
81+
logger.error("TimeoutException error.", e);
82+
}
83+
return result;
84+
}
85+
86+
87+
}

0 commit comments

Comments
 (0)