自定義ForkJoinPool 執行線程異常場景 源碼跟蹤

package thread;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;

import org.apache.log4j.Logger;

import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;

public class ForkJoinPool<T> {

	private final static Logger logger = org.apache.log4j.Logger

	public static final int AVAILABLE_PROCESSORS_SIZE = Runtime.getRuntime()

	private ListeningExecutorService executorService = null;

	private ThreadLocal<List<ListenableFuture<T>>> futuresThreadLocal = new ThreadLocal<List<ListenableFuture<T>>>(){
		protected java.util.List<com.google.common.util.concurrent.ListenableFuture<T>> initialValue() {
			 return Lists.newArrayList();

	public ForkJoinPool() {

	public ForkJoinPool(int poolSize) {
		executorService = MoreExecutors

	public void createTask() {
	 * @description
	 * @return ListenableFuture<T>
	 * @Exception
	public ForkJoinPool<T> addTaskList(final List<Callable<T>> callables) {
			for(Callable<T> c:callables){
		return this;

	 * @description
	 * @return ListenableFuture<T>
	 * @Exception
	public ForkJoinPool<T> addTask(final Callable<T> callable) {
		ListenableFuture<T> listenableFuture = executorService.submit(callable);
		return this;

	 * 多線程執行商品生成信息
	 * @description
	 * @return
	 * @Exception
	public List<T> executeTask(List<ListenableFuture<T>> futures) {
		long gstartTime = System.currentTimeMillis();
		ListenableFuture<List<T>> successfulQueries = Futures
		try {
			// 獲取全部線程的執行結果
			List<T> lists = successfulQueries.get();
			return lists;
		} catch (Exception e) {
			logger.error(e.getMessage(), e);
		logger.info(" executeTask ! cost time:"
				+ (System.currentTimeMillis() - gstartTime));

		return null;
	 * 多線程執行商品生成信息
	 * @description
	 * @return
	 * @Exception
	public List<T> executeTask() {
		List<ListenableFuture<T>> futures = futuresThreadLocal.get();
		try {
			return executeTask(futures);
		} catch (Exception e) {
			logger.error(e.getMessage(), e);
		} finally {
		return null;
	 * 拆分任務
	 * @param tasks
	 * @param 拆分數量
	 * @return
	public static <T> List<T> mergeTask(List<List<T>> tasks) {
			return null;
		List<T> list = Lists.newArrayList();
		for(List<T> l:tasks){
		return list;

	 * 拆分任務
	 * @param tasks
	 * @param 拆分數量
	 * @return
	public static <T> List<List<T>> splitTask(List<T> tasks, Integer taskSize) {
		List<List<T>> list = Lists.newArrayList();
        if(tasks==null || taskSize <= 0){
			return list;
        if(tasks.size() < taskSize){
        	return list;
		int baseNum = tasks.size() / taskSize; // 每一個list的最小size
		int remNum = tasks.size() % taskSize; // 獲得餘數

		int index = 0;
		for (int i = 0; i < taskSize; i++) {
			int arrNum = baseNum; // 每一個list對應的size
			if (i < remNum) {
				arrNum += 1;
			List<T> ls = Lists.newArrayList();
			for (int j = index; j < arrNum + index; j++) {
			index += arrNum;
		return list;

	public void shutdown() {

	public static void main(String[] args) {
		ForkJoinPool<Object> forkJoinPool = new ForkJoinPool<>(2);
		forkJoinPool.addTask(new Callable<Object>() {
			public Object call() throws Exception {
				throw new RuntimeException("test");
		forkJoinPool.addTask(new Callable<Object>() {
			public Object call() throws Exception {
				return "123";
		List<Object> list = forkJoinPool.executeTask();
		//[null, 123]


import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;


ListenableFuture<List<T>> successfulQueries = Futures


com.google.common.util.concurrent.CollectionFuture.ListFuture<V, C>

  /** Used for {@link Futures#allAsList} and {@link Futures#successfulAsList}. */
  static final class ListFuture<V> extends CollectionFuture<V, List<V>> {
        ImmutableCollection<? extends ListenableFuture<? extends V>> futures,
        boolean allMustSucceed) {
      init(new ListFutureRunningState(futures, allMustSucceed));

    private final class ListFutureRunningState extends CollectionFutureRunningState {
          ImmutableCollection<? extends ListenableFuture<? extends V>> futures,
          boolean allMustSucceed) {
        super(futures, allMustSucceed);

      public List<V> combine(List<Optional<V>> values) {
        List<V> result = newArrayListWithCapacity(values.size());
        for (Optional<V> element : values) {
          result.add(element != null ? element.orNull() : null);
        return unmodifiableList(result);
com.google.common.util.concurrent.AggregateFuture.RunningState.handleOneInputDone(int, Future<? extends InputT>)
     * Handles the input at the given index completing.
    private void handleOneInputDone(int index, Future<? extends InputT> future) {
      // The only cases in which this Future should already be done are (a) if it was cancelled or
      // (b) if an input failed and we propagated that immediately because of allMustSucceed.
          allMustSucceed || !isDone() || isCancelled(),
          "Future was done before all dependencies completed");

      try {
        checkState(future.isDone(), "Tried to set value from future which is not done");
        if (allMustSucceed) {
          if (future.isCancelled()) {
            // clear running state prior to cancelling children, this sets our own state but lets
            // the input futures keep running as some of them may be used elsewhere.
            runningState = null;
          } else {
            // We always get the result so that we can have fail-fast, even if we don't collect
            InputT result = getDone(future);
            if (collectsValues) {
              collectOneValue(allMustSucceed, index, result);
        } else if (collectsValues && !future.isCancelled()) {
          collectOneValue(allMustSucceed, index, getDone(future));
      } catch (ExecutionException e) {
      } catch (Throwable t) {
  public static <V> V getUninterruptibly(Future<V> future) throws ExecutionException {
    boolean interrupted = false;
    try {
      while (true) {
        try {
          return future.get();
        } catch (InterruptedException e) {
          interrupted = true;
    } finally {
      if (interrupted) {

  public V get() throws InterruptedException, ExecutionException {
    if (Thread.interrupted()) {
      throw new InterruptedException();
    Object localValue = value;
    if (localValue != null & !(localValue instanceof SetFuture)) {
      return getDoneValue(localValue);
    Waiter oldHead = waiters;
    if (oldHead != Waiter.TOMBSTONE) {
      Waiter node = new Waiter();
      do {
        if (ATOMIC_HELPER.casWaiters(this, oldHead, node)) {
          // we are on the stack, now wait for completion.
          while (true) {
            // Check interruption first, if we woke up due to interruption we need to honor that.
            if (Thread.interrupted()) {
              throw new InterruptedException();
            // Otherwise re-read and check doneness. If we loop then it must have been a spurious
            // wakeup
            localValue = value;
            if (localValue != null & !(localValue instanceof SetFuture)) {
              return getDoneValue(localValue);
        oldHead = waiters; // re-read and loop.
      } while (oldHead != Waiter.TOMBSTONE);
    // re-read value, if we get here then we must have observed a TOMBSTONE while trying to add a
    // waiter.
    return getDoneValue(value);

   * Unboxes {@code obj}. Assumes that obj is not {@code null} or a {@link SetFuture}.
  private V getDoneValue(Object obj) throws ExecutionException {
    // While this seems like it might be too branch-y, simple benchmarking proves it to be
    // unmeasurable (comparing done AbstractFutures with immediateFuture)
    if (obj instanceof Cancellation) {
      throw cancellationExceptionWithCause("Task was cancelled.", ((Cancellation) obj).cause);
    } else if (obj instanceof Failure) {
      throw new ExecutionException(((Failure) obj).exception);
    } else if (obj == NULL) {
      return null;
    } else {
      @SuppressWarnings("unchecked") // this is the only other option
      V asV = (V) obj;
      return asV;

com.google.common.util.concurrent.CollectionFuture.CollectionFutureRunningState.collectOneValue(boolean, int, V)
    final void collectOneValue(boolean allMustSucceed, int index, @Nullable V returnValue) {
      List<Optional<V>> localValues = values;

      if (localValues != null) {
        localValues.set(index, Optional.fromNullable(returnValue));
      } else {
        // Some other future failed or has been cancelled, causing this one to also be cancelled or
        // have an exception set. This should only happen if allMustSucceed is true or if the output
        // itself has been cancelled.
            allMustSucceed || isCancelled(), "Future was done before all dependencies completed");

abstract class CollectionFutureRunningState extends RunningState {
    private List<Optional<V>> values;

        ImmutableCollection<? extends ListenableFuture<? extends V>> futures,
        boolean allMustSucceed) {
      super(futures, allMustSucceed, true);

      this.values =
              ? ImmutableList.<Optional<V>>of()
              : Lists.<Optional<V>>newArrayListWithCapacity(futures.size());

      // Populate the results list with null initially.
      for (int i = 0; i < futures.size(); ++i) {

guava 線程池,線程執行異常狀況下返回null,不會拋出異常