因此Java 8不久前发布,具有许多功能和更改。 我们所有的Java狂热者一直在等待这个历史,从他们最初宣布Java 7的所有强大功能开始一直到最终被取消。
我最近才有时间实际开始给它一个真实的外观,我将我的家庭项目更新到了8个,我不得不说,我对所获得的一切通常感到非常满意。 java.time API的“模仿” JodaTime是一个很大的改进,java.util.stream包正在变得有用,lambda将改变我们的编码样式,这可能需要一些时间来适应这些变化……引用“强大的力量伴随着巨大的责任”这句话是正确的,我认为在我们的未来可能会有一些有趣的时刻,因为编写一些难以破解的代码非常容易。 作为调试示例,我在下面编写的代码将很有趣。
文件示例在我的Github博客回购中
此示例的操作很简单,运行几个线程,并发执行一些工作,然后等待它们全部完成。 我在玩Java 8的时候就想通了,让我全力以赴……
这是我想出的:
package net.briandupreez.blog.java8.futures;import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;import java.util.Collection;
import java.util.List;
import java.util.concurrent.*;
import java.util.stream.Collectors;/*** Generified future running and completion** @param <T> the result type* @param <S> the task input*/
public class WaitingFuturesRunner<T, S> {private transient static final Log logger = LogFactory.getLog(WaitingFuturesRunner.class);private final Collection<Task<T, S>> tasks;private final long timeOut;private final TimeUnit timeUnit;private final ExecutorService executor;/*** Constructor, used to initialise with the required tasks** @param tasks the list of tasks to execute* @param timeOut max length of time to wait* @param timeUnit time out timeUnit*/public WaitingFuturesRunner(final Collection<Task<T, S>> tasks, final long timeOut, final TimeUnit timeUnit) {this.tasks = tasks;this.timeOut = timeOut;this.timeUnit = timeUnit;this.executor = Executors.newFixedThreadPool(tasks.size());}/*** Go!** @param taskInput The input to the task* @param consolidatedResult a container of all the completed results*/public void go(final S taskInput, final ConsolidatedResult<T> consolidatedResult) {final CountDownLatch latch = new CountDownLatch(tasks.size());final List<CompletableFuture<T>> theFutures = tasks.stream().map(aSearch -> CompletableFuture.supplyAsync(() -> processTask(aSearch, taskInput, latch), executor)).collect(Collectors.<CompletableFuture<T>>toList());final CompletableFuture<List<T>> allDone = collectTasks(theFutures);try {latch.await(timeOut, timeUnit);logger.debug("complete... adding results");allDone.get().forEach(consolidatedResult::addResult);} catch (final InterruptedException | ExecutionException e) {logger.error("Thread Error", e);throw new RuntimeException("Thread Error, could not complete processing", e);}}private <E> CompletableFuture<List<E>> collectTasks(final List<CompletableFuture<E>> futures) {final CompletableFuture<Void> allDoneFuture = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));return allDoneFuture.thenApply(v -> futures.stream().map(CompletableFuture<E>::join).collect(Collectors.<E>toList()));}private T processTask(final Task<T, S> task, final S searchTerm, final CountDownLatch latch) {logger.debug("Starting: " + task);T searchResults = null;try {searchResults = task.process(searchTerm, latch);} catch (final Exception e) {e.printStackTrace();}return searchResults;}}
测试:
package net.briandupreez.blog.java8.futures;import net.briandupreez.blog.java8.futures.example.StringInputTask;
import net.briandupreez.blog.java8.futures.example.StringResults;
import org.apache.log4j.BasicConfigurator;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;/*** Test* Created by brian on 4/26/14.*/
public class CompletableFuturesRunnerTest {@BeforeClasspublic static void init() {BasicConfigurator.configure();}/*** 5tasks at 3000ms concurrently should not be more than 3100* @throws Exception error*/@Test(timeout = 3100)public void testGo() throws Exception {final List<Task<String, String>> taskList = setupTasks();final WaitingFuturesRunner<String, String> completableFuturesRunner = new WaitingFuturesRunner<>(taskList, 4, TimeUnit.SECONDS);final StringResults consolidatedResults = new StringResults();completableFuturesRunner.go("Something To Process", consolidatedResults);Assert.assertEquals(5, consolidatedResults.getResults().size());for (final String s : consolidatedResults.getResults()) {Assert.assertTrue(s.contains("complete"));Assert.assertTrue(s.contains("Something To Process"));}}private List<Task<String, String>> setupTasks() {final List<Task<String, String>> taskList = new ArrayList<>();final StringInputTask stringInputTask = new StringInputTask("Task 1");final StringInputTask stringInputTask2 = new StringInputTask("Task 2");final StringInputTask stringInputTask3 = new StringInputTask("Task 3");final StringInputTask stringInputTask4 = new StringInputTask("Task 4");final StringInputTask stringInputTask5 = new StringInputTask("Task 5");taskList.add(stringInputTask);taskList.add(stringInputTask2);taskList.add(stringInputTask3);taskList.add(stringInputTask4);taskList.add(stringInputTask5);return taskList;}
}
输出:
0 [pool-1-thread-1] Starting: StringInputTask{taskName='Task 1'}0 [pool-1-thread-5] Starting: StringInputTask{taskName='Task 5'}0 [pool-1-thread-2] Starting: StringInputTask{taskName='Task 2'}2 [pool-1-thread-4] Starting: StringInputTask{taskName='Task 4'}2 [pool-1-thread-3] Starting: StringInputTask{taskName='Task 3'}3003 [pool-1-thread-5] Done: Task 53004 [pool-1-thread-3] Done: Task 33003 [pool-1-thread-1] Done: Task 13003 [pool-1-thread-4] Done: Task 43003 [pool-1-thread-2] Done: Task 23007 [Thread-0] WaitingFuturesRunner - complete... adding results
在执行此操作时,我发现并阅读了一些有用的文章/链接:
Oracle: Lambda教程
IBM: Java 8并发
Tomasz Nurkiewicz: CompletableFuture权威指南
翻译自: https://www.javacodegeeks.com/2014/04/playing-with-java-8-lambdas-and-concurrency.html