package thread; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.Executors; import org.apache.log4j.Logger; import com.google.common.collect.Lists; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; public class ForkJoinPool<T> { private final static Logger logger = org.apache.log4j.Logger .getLogger(ForkJoinPool.class); public static final int AVAILABLE_PROCESSORS_SIZE = Runtime.getRuntime() .availableProcessors(); private ListeningExecutorService executorService = null; private ThreadLocal<List<ListenableFuture<T>>> futuresThreadLocal = new ThreadLocal<List<ListenableFuture<T>>>(){ protected java.util.List<com.google.common.util.concurrent.ListenableFuture<T>> initialValue() { return Lists.newArrayList(); }; }; public ForkJoinPool() { this(AVAILABLE_PROCESSORS_SIZE*2); } public ForkJoinPool(int poolSize) { executorService = MoreExecutors .listeningDecorator(Executors .newFixedThreadPool(poolSize)); } public void createTask() { } /** * * @description * @return ListenableFuture<T> * @Exception */ public ForkJoinPool<T> addTaskList(final List<Callable<T>> callables) { if(callables!=null){ for(Callable<T> c:callables){ addTask(c); } } return this; } /** * * @description * @return ListenableFuture<T> * @Exception */ public ForkJoinPool<T> addTask(final Callable<T> callable) { ListenableFuture<T> listenableFuture = executorService.submit(callable); futuresThreadLocal.get().add(listenableFuture); return this; } /** * 多線程執行商品生成信息 * * @description * @return * @Exception */ public List<T> executeTask(List<ListenableFuture<T>> futures) { long gstartTime = System.currentTimeMillis(); ListenableFuture<List<T>> successfulQueries = Futures .successfulAsList(futures); try { // 獲取全部線程的執行結果 List<T> lists = successfulQueries.get(); return lists; } catch (Exception e) { logger.error(e.getMessage(), e); } logger.info(" executeTask ! cost time:" + (System.currentTimeMillis() - gstartTime)); return null; } /** * 多線程執行商品生成信息 * * @description * @return * @Exception */ public List<T> executeTask() { List<ListenableFuture<T>> futures = futuresThreadLocal.get(); try { return executeTask(futures); } catch (Exception e) { logger.error(e.getMessage(), e); } finally { futuresThreadLocal.remove(); } return null; } /** * 拆分任務 * * @param tasks * @param 拆分數量 * @return */ public static <T> List<T> mergeTask(List<List<T>> tasks) { if(tasks==null){ return null; } List<T> list = Lists.newArrayList(); for(List<T> l:tasks){ if(l!=null){ list.addAll(l); } } return list; } /** * 拆分任務 * * @param tasks * @param 拆分數量 * @return */ public static <T> List<List<T>> splitTask(List<T> tasks, Integer taskSize) { List<List<T>> list = Lists.newArrayList(); if(tasks==null || taskSize <= 0){ return list; } if(tasks.size() < taskSize){ list.add(tasks); return list; } int baseNum = tasks.size() / taskSize; // 每一個list的最小size int remNum = tasks.size() % taskSize; // 獲得餘數 int index = 0; for (int i = 0; i < taskSize; i++) { int arrNum = baseNum; // 每一個list對應的size if (i < remNum) { arrNum += 1; } List<T> ls = Lists.newArrayList(); for (int j = index; j < arrNum + index; j++) { ls.add(tasks.get(j)); } list.add(ls); index += arrNum; } return list; } public void shutdown() { this.executorService.shutdown(); } public static void main(String[] args) { //測試線程池內異常狀況 ForkJoinPool<Object> forkJoinPool = new ForkJoinPool<>(2); forkJoinPool.addTask(new Callable<Object>() { @Override public Object call() throws Exception { throw new RuntimeException("test"); } }); forkJoinPool.addTask(new Callable<Object>() { @Override public Object call() throws Exception { return "123"; } }); List<Object> list = forkJoinPool.executeTask(); //[null, 123] System.out.println(list); } }
引用包:java
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
node
關鍵代碼: ListenableFuture<List<T>> successfulQueries = Futures .successfulAsList(futures); 源碼跟蹤: com.google.common.util.concurrent.CollectionFuture.ListFuture<V, C> /** Used for {@link Futures#allAsList} and {@link Futures#successfulAsList}. */ static final class ListFuture<V> extends CollectionFuture<V, List<V>> { ListFuture( ImmutableCollection<? extends ListenableFuture<? extends V>> futures, boolean allMustSucceed) { init(new ListFutureRunningState(futures, allMustSucceed)); } private final class ListFutureRunningState extends CollectionFutureRunningState { ListFutureRunningState( ImmutableCollection<? extends ListenableFuture<? extends V>> futures, boolean allMustSucceed) { super(futures, allMustSucceed); } @Override public List<V> combine(List<Optional<V>> values) { List<V> result = newArrayListWithCapacity(values.size()); for (Optional<V> element : values) { result.add(element != null ? element.orNull() : null); } return unmodifiableList(result); } } } com.google.common.util.concurrent.AggregateFuture.RunningState.handleOneInputDone(int, Future<? extends InputT>) /** * Handles the input at the given index completing. */ private void handleOneInputDone(int index, Future<? extends InputT> future) { // The only cases in which this Future should already be done are (a) if it was cancelled or // (b) if an input failed and we propagated that immediately because of allMustSucceed. checkState( allMustSucceed || !isDone() || isCancelled(), "Future was done before all dependencies completed"); try { checkState(future.isDone(), "Tried to set value from future which is not done"); if (allMustSucceed) { if (future.isCancelled()) { // clear running state prior to cancelling children, this sets our own state but lets // the input futures keep running as some of them may be used elsewhere. runningState = null; cancel(false); } else { // We always get the result so that we can have fail-fast, even if we don't collect InputT result = getDone(future); if (collectsValues) { collectOneValue(allMustSucceed, index, result); } } } else if (collectsValues && !future.isCancelled()) { //聚合future結果 collectOneValue(allMustSucceed, index, getDone(future)); } } catch (ExecutionException e) { handleException(e.getCause()); } catch (Throwable t) { handleException(t); } } com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly(Future<V>) @CanIgnoreReturnValue public static <V> V getUninterruptibly(Future<V> future) throws ExecutionException { boolean interrupted = false; try { while (true) { try { return future.get(); } catch (InterruptedException e) { interrupted = true; } } } finally { if (interrupted) { Thread.currentThread().interrupt(); } } } com.google.common.util.concurrent.AbstractFuture.get() @CanIgnoreReturnValue @Override public V get() throws InterruptedException, ExecutionException { if (Thread.interrupted()) { throw new InterruptedException(); } Object localValue = value; if (localValue != null & !(localValue instanceof SetFuture)) { return getDoneValue(localValue); } Waiter oldHead = waiters; if (oldHead != Waiter.TOMBSTONE) { Waiter node = new Waiter(); do { node.setNext(oldHead); if (ATOMIC_HELPER.casWaiters(this, oldHead, node)) { // we are on the stack, now wait for completion. while (true) { LockSupport.park(this); // Check interruption first, if we woke up due to interruption we need to honor that. if (Thread.interrupted()) { removeWaiter(node); throw new InterruptedException(); } // Otherwise re-read and check doneness. If we loop then it must have been a spurious // wakeup localValue = value; if (localValue != null & !(localValue instanceof SetFuture)) { return getDoneValue(localValue); } } } oldHead = waiters; // re-read and loop. } while (oldHead != Waiter.TOMBSTONE); } // re-read value, if we get here then we must have observed a TOMBSTONE while trying to add a // waiter. return getDoneValue(value); } com.google.common.util.concurrent.AbstractFuture.getDoneValue(Object) /** * Unboxes {@code obj}. Assumes that obj is not {@code null} or a {@link SetFuture}. */ private V getDoneValue(Object obj) throws ExecutionException { // While this seems like it might be too branch-y, simple benchmarking proves it to be // unmeasurable (comparing done AbstractFutures with immediateFuture) if (obj instanceof Cancellation) { throw cancellationExceptionWithCause("Task was cancelled.", ((Cancellation) obj).cause); } else if (obj instanceof Failure) { throw new ExecutionException(((Failure) obj).exception); } else if (obj == NULL) { return null; } else { @SuppressWarnings("unchecked") // this is the only other option V asV = (V) obj; return asV; } } com.google.common.util.concurrent.CollectionFuture.CollectionFutureRunningState.collectOneValue(boolean, int, V) @Override final void collectOneValue(boolean allMustSucceed, int index, @Nullable V returnValue) { List<Optional<V>> localValues = values; if (localValues != null) { localValues.set(index, Optional.fromNullable(returnValue)); } else { // Some other future failed or has been cancelled, causing this one to also be cancelled or // have an exception set. This should only happen if allMustSucceed is true or if the output // itself has been cancelled. checkState( allMustSucceed || isCancelled(), "Future was done before all dependencies completed"); } } abstract class CollectionFutureRunningState extends RunningState { private List<Optional<V>> values; CollectionFutureRunningState( ImmutableCollection<? extends ListenableFuture<? extends V>> futures, boolean allMustSucceed) { super(futures, allMustSucceed, true); this.values = futures.isEmpty() ? ImmutableList.<Optional<V>>of() : Lists.<Optional<V>>newArrayListWithCapacity(futures.size()); // Populate the results list with null initially. for (int i = 0; i < futures.size(); ++i) { values.add(null); } } 總結: guava 線程池,線程執行異常狀況下返回null,不會拋出異常