什麼?對你沒有聽錯,也沒有看錯 ..多線程併發執行任務,取結果歸集~~ 再也不憂愁....感謝你們的雙擊+點贊和關注 下面起鍋燒油
先來看一些APP的獲取數據,諸如此類,一個頁面獲取N多個,多達10個左右的一個用戶行爲數據,好比:點贊數,發佈文章數,點贊數,消息數,關注數,收藏數,粉絲數,卡券數,紅包數........... 真的是多~ 咱們看些圖: 前端
平時要10+接口的去獲取數據(由於當你10+個查詢寫一塊兒,那估計到半分鐘才能響應了),一個頁面上N多接口,真是累死前端的寶寶了,前端開啓多線程也累啊,咱們作後端的要體量一下前端的寶寶們,畢竟有句話叫"程序員何苦爲難程序員~"java
今天咱們也能夠一個接口將這些數據返回~ 還賊TM快,解決串行編程,阻塞編程帶來的苦惱~git
今天豬腳就是 Future、FutureTask、ExecutorService...程序員
狀態,隊列,CAS
/** * The run state of this task, initially NEW. The run state * transitions to a terminal state only in methods set, * setException, and cancel. During completion, state may take on * transient values of COMPLETING (while outcome is being set) or * INTERRUPTING (only while interrupting the runner to satisfy a * cancel(true)). Transitions from these intermediate to final * states use cheaper ordered/lazy writes because values are unique * and cannot be further modified. * * Possible state transitions: //可能發生的狀態過分過程 * NEW -> COMPLETING -> NORMAL // 建立-->完成-->正常 * NEW -> COMPLETING -> EXCEPTIONAL // 建立-->完成-->異常 * NEW -> CANCELLED // 建立-->取消 * NEW -> INTERRUPTING -> INTERRUPTED // 建立-->中斷中-->中斷結束 */
private volatile int state; // 執行器狀態
private static final int NEW = 0; // 初始值 由構造函數保證
private static final int COMPLETING = 1; // 完成進行時 正在設置任務結果
private static final int NORMAL = 2; // 正常結束 任務正常執行完畢
private static final int EXCEPTIONAL = 3; // 發生異常 任務執行過程當中發生異常
private static final int CANCELLED = 4; // 已經取消 任務已經取消
private static final int INTERRUPTING = 5; // 中斷進行時 正在中斷運行任務的線程
private static final int INTERRUPTED = 6; // 中斷結束 任務被中斷
/** The underlying callable; nulled out after running */
private Callable<V> callable;
/** The result to return or exception to throw from get() */
private Object outcome; // non-volatile, protected by state reads/writes
/** The thread running the callable; CASed during run() */
private volatile Thread runner;
/** Treiber stack of waiting threads */
private volatile WaitNode waiters;
複製代碼
還不明白就看圖:github
public interface Future<T> {
/** *取消任務 *@param mayInterruptIfRunning *是否容許取消正在執行卻沒有執行完畢的任務,若是設置true,則表示能夠取消正在執行過程當中的任務 *若是任務正在執行,則返回true *若是任務尚未執行,則不管mayInterruptIfRunning爲true仍是false,返回true *若是任務已經完成,則不管mayInterruptIfRunning爲true仍是false,返回false */
boolean cancel(boolean mayInterruptIfRunning);
/** *任務是否被取消成功,若是在任務正常完成前被取消成功,則返回 true */
boolean isCancelled();
/** *任務是否完成 */
boolean isDone();
/** *經過阻塞獲取執行結果 */
T get() throws InterruptedException, ExecutionException;
/** *經過阻塞獲取執行結果。若是在指定的時間內沒有返回,則返回null */
T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}
複製代碼
Futurespring
(1)cancle 能夠中止任務的執行 但不必定成功 看返回值true or false
(2)get 阻塞獲取callable的任務結果,即get阻塞住調用線程,直至計算完成返回結果
(3)isCancelled 是否取消成功
(4)isDone 是否完成
複製代碼
重點說明:編程
Furture.get()獲取執行結果的值,取決於執行的狀態,若是任務完成,會當即返回結果,不然一直阻塞直到任務進入完成狀態,而後返回結果或者拋出異常。後端
「運行完成」表示計算的全部可能結束的狀態,包含正常結束,因爲取消而結束和因爲異常而結束。當進入完成狀態,他會中止在這個狀態上,只要state不處於
NEW
狀態,就說明任務已經執行完畢。api
FutureTask負責將計算結果從執行任務的線程傳遞到調用這個線程的線程,並且確保了,傳遞過程當中結果的安全發佈安全
UNSAFE 無鎖編程技術,確保了線程的安全性~ 爲了保持無鎖編程CPU的消耗,因此用狀態標記,減小空轉的時候CPU的壓力
run方法
public void run() {
// UNSAFE.compareAndSwapObject, CAS保證Callable任務只被執行一次 無鎖編程
if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
return;
try {
Callable<V> c = callable; // 拿到執行任務
if (c != null && state == NEW) { // 任務不爲空,而且執行器狀態是初始值,纔會執行;若是取消就不執行了
V result;
boolean ran; // 記錄是否執行成功
try {
result = c.call(); // 執行任務
ran = true; // 成功
} catch (Throwable ex) {
result = null; // 異常,清空結果
ran = false; // 失敗
setException(ex); // 記錄異常
}
if (ran) // 問題:ran變量能夠省略嗎,把set(result);移到try塊裏面?
set(result); // 設置結果
}
} finally {
runner = null; // 直到set狀態前,runner一直都是非空的,爲了防止併發調用run()方法。
int s = state;
if (s >= INTERRUPTING) // 有別的線程要中斷當前線程,把CPU讓出去,自旋等一下
handlePossibleCancellationInterrupt(s);
}
}
複製代碼
private void handlePossibleCancellationInterrupt(int s) {
if (s == INTERRUPTING) // 當state爲INTERRUPTING時
while (state == INTERRUPTING) // 表示有線程正在中斷當前線程
Thread.yield(); // 讓出CPU,自旋等待中斷
}
複製代碼
再囉嗦下: run方法重點作了如下幾件事:
將runner屬性設置成當前正在執行run方法的線程
調用callable成員變量的call方法來執行任務
設置執行結果outcome, 若是執行成功, 則outcome保存的就是執行結果;若是執行過程當中發生了異常, 則outcome中保存的就是異常,設置結果以前,先將state狀態設爲中間態
對outcome的賦值完成後,設置state狀態爲終止態(NORMAL或者EXCEPTIONAL)
喚醒Treiber棧中全部等待的線程
善後清理(waiters, callable,runner設爲null)
檢查是否有遺漏的中斷,若是有,等待中斷狀態完成。
複製代碼
怎麼能少了get方法呢,一直阻塞獲取參見:awaitDone
public V get() throws InterruptedException, ExecutionException {
int s = state; // 執行器狀態
if (s <= COMPLETING) // 若是狀態小於等於COMPLETING,說明任務正在執行,須要等待
s = awaitDone(false, 0L); // 等待
return report(s); // 報告結果
}
複製代碼
順便偷偷看下get(long, TimeUnit),就是get的方法擴展,增長了超時時間,超時後我還沒拿到就生氣拋異常....
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null) // 參數校驗
throw new NullPointerException();
int s = state; // 執行器狀態
if (s <= COMPLETING && (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING) // 若是狀態小於等於COMPLETING,說明任務正在執行,須要等待;等待指定時間,state依然小於等於COMPLETING
throw new TimeoutException(); // 拋出超時異常
return report(s); // 報告結果
}
複製代碼
那麼再看awaitDone,要知道會寫死循環
while(true)|for (;;)
的都是高手~
private int awaitDone(boolean timed, long nanos) throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L; // 計算deadline
WaitNode q = null; // 等待結點
boolean queued = false; // 是否已經入隊
for (;;) {
if (Thread.interrupted()) { // 若是當前線程已經標記中斷,則直接移除此結點,並拋出中斷異常
removeWaiter(q);
throw new InterruptedException();
}
int s = state; // 執行器狀態
if (s > COMPLETING) { // 若是狀態大於COMPLETING,說明任務已經完成,或者已經取消,直接返回
if (q != null)
q.thread = null; // 復位線程屬性
return s; // 返回
} else if (s == COMPLETING) // 若是狀態等於COMPLETING,說明正在整理結果,自旋等待一下子
Thread.yield();
else if (q == null) // 初始,構建結點
q = new WaitNode();
else if (!queued) // 還沒入隊,則CAS入隊
queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q);
else if (timed) { // 是否容許超時
nanos = deadline - System.nanoTime(); // 計算等待時間
if (nanos <= 0L) { // 超時
removeWaiter(q); // 移除結點
return state; // 返回結果
}
LockSupport.parkNanos(this, nanos); // 線程阻塞指定時間
} else
LockSupport.park(this); // 阻塞線程
}
}
複製代碼
至此,線程安排任務和獲取我就不囉嗦了~~~~還要不少探索的,畢竟帶薪聊天比較緊張,我就很少贅述了~
接着咱們來看隊列,在FutureTask中,隊列的實現是一個單向鏈表,它表示全部等待任務執行完畢的線程的集合。咱們知道,FutureTask實現了Future接口,能夠獲取「Task」的執行結果,那麼若是獲取結果時,任務尚未執行完畢怎麼辦呢?那麼獲取結果的線程就會在一個等待隊列中掛起,直到任務執行完畢被喚醒。這一點有點相似於AQS中的sync queue,在下文的分析中,你們能夠本身對照它們的異同點。
咱們前面說過,在併發編程中使用隊列一般是將當前線程包裝成某種類型的數據結構扔到等待隊列中,咱們先來看看隊列中的每個節點是怎麼個結構:
static final class WaitNode {
volatile Thread thread;
volatile WaitNode next;
WaitNode() { thread = Thread.currentThread(); }
}
複製代碼
可見,相比於AQS的sync queue
所使用的雙向鏈表中的Node
,這個WaitNode
要簡單多了,它只包含了一個記錄線程的thread屬性和指向下一個節點的next屬性。
值得一提的是,FutureTask中的這個單向鏈表是當作棧來使用的,確切來講是當作Treiber棧來使用的,不瞭解Treiber棧是個啥的能夠簡單的把它當作是一個線程安全的棧,它使用CAS來完成入棧出棧操做(想進一步瞭解的話能夠看這篇文章)。爲啥要使用一個線程安全的棧呢,由於同一時刻可能有多個線程都在獲取任務的執行結果,若是任務還在執行過程當中,則這些線程就要被包裝成WaitNode扔到Treiber棧的棧頂,即完成入棧操做,這樣就有可能出現多個線程同時入棧的狀況,所以須要使用CAS操做保證入棧的線程安全,對於出棧的狀況也是同理。
因爲FutureTask中的隊列本質上是一個Treiber
(驅動)棧,那麼使用這個隊列就只須要一個指向棧頂節點的指針就好了,在FutureTask中,就是waiters屬性:
/** Treiber stack of waiting threads */
private volatile WaitNode waiters;
複製代碼
事實上,它就是整個單向鏈表的頭節點。
綜上,FutureTask中所使用的隊列的結構以下:
CAS操做大多數是用來改變狀態的,在FutureTask中也不例外。咱們通常在靜態代碼塊中初始化須要CAS操做的屬性的偏移量:
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long stateOffset;
private static final long runnerOffset;
private static final long waitersOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = FutureTask.class;
stateOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("state"));
runnerOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("runner"));
waitersOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("waiters"));
} catch (Exception e) {
throw new Error(e);
}
}
複製代碼
從這個靜態代碼塊中咱們也能夠看出,CAS操做主要針對3個屬性,包括state、runner和waiters,說明這3個屬性基本是會被多個線程同時訪問的。其中state屬性表明了任務的狀態,waiters屬性表明了指向棧頂節點的指針,這兩個咱們上面已經分析過了。runner屬性表明了執行FutureTask中的「Task」的線程。爲何須要一個屬性來記錄執行任務的線程呢?這是爲了中斷或者取消任務作準備的,只有知道了執行任務的線程是誰,咱們才能去中斷它。
定義完屬性的偏移量以後,接下來就是CAS操做自己了。在FutureTask,CAS操做最終調用的仍是Unsafe
類的compareAndSwapXXX
方法,關於Unsafe,因爲帶薪碼文這裏再也不贅述。
一切沒有例子的講解都是耍流氓
>>> 蔥姜切沫~~加入生命的源泉....
實戰項目以springboot爲項目腳手架,github地址: github.com/leaJone/myb…
內部定義一個線程池進行任務的調度和線程的管理以及線程的複用,你們能夠根據本身的實際項目狀況進行配置
其中線程調度示例:核心線程 8 最大線程 20 保活時間30s 存儲隊列 10 有守護線程 拒絕策略:將超負荷任務回退到調用者 說明 : 默認使用核心線程(8)數執行任務,任務數量超過核心線程數就丟到隊列,隊列(10)滿了就再開啓新的線程,新的線程數最大爲20,當任務執行完,新開啓的線程將存活30s,若沒有任務就消亡,線程池回到核心線程數量.
import com.boot.lea.mybot.dto.UserBehaviorDataDTO;
import com.boot.lea.mybot.service.UserService;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.concurrent.*;
/** * @author Lijing * @date 2019年7月29日 */
@Slf4j
@Component
public class MyFutureTask {
@Resource
UserService userService;
/** * 核心線程 8 最大線程 20 保活時間30s 存儲隊列 10 有守護線程 拒絕策略:將超負荷任務回退到調用者 */
private static ExecutorService executor = new ThreadPoolExecutor(8, 20,
30L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(10),
new ThreadFactoryBuilder().setNameFormat("User_Async_FutureTask-%d").setDaemon(true).build(),
new ThreadPoolExecutor.CallerRunsPolicy());
@SuppressWarnings("all")
public UserBehaviorDataDTO getUserAggregatedResult(final Long userId) {
System.out.println("MyFutureTask的線程:" + Thread.currentThread());
long fansCount = 0, msgCount = 0, collectCount = 0,
followCount = 0, redBagCount = 0, couponCount = 0;
// fansCount = userService.countFansCountByUserId(userId);
// msgCount = userService.countMsgCountByUserId(userId);
// collectCount = userService.countCollectCountByUserId(userId);
// followCount = userService.countFollowCountByUserId(userId);
// redBagCount = userService.countRedBagCountByUserId(userId);
// couponCount = userService.countCouponCountByUserId(userId);
try {
Future<Long> fansCountFT = executor.submit(() -> userService.countFansCountByUserId(userId));
Future<Long> msgCountFT = executor.submit(() -> userService.countMsgCountByUserId(userId));
Future<Long> collectCountFT = executor.submit(() -> userService.countCollectCountByUserId(userId));
Future<Long> followCountFT = executor.submit(() -> userService.countFollowCountByUserId(userId));
Future<Long> redBagCountFT = executor.submit(() -> userService.countRedBagCountByUserId(userId));
Future<Long> couponCountFT = executor.submit(() -> userService.countCouponCountByUserId(userId));
//get阻塞
fansCount = fansCountFT.get();
msgCount = msgCountFT.get();
collectCount = collectCountFT.get();
followCount = followCountFT.get();
redBagCount = redBagCountFT.get();
couponCount = couponCountFT.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
log.error(">>>>>>聚合查詢用戶聚合信息異常:" + e + "<<<<<<<<<");
}
UserBehaviorDataDTO userBehaviorData =
UserBehaviorDataDTO.builder().fansCount(fansCount).msgCount(msgCount)
.collectCount(collectCount).followCount(followCount)
.redBagCount(redBagCount).couponCount(couponCount).build();
return userBehaviorData;
}
}
複製代碼
常規業務查詢方法,爲了特效,以及看出實際的效果,咱們每一個方法作了延時
import com.boot.lea.mybot.mapper.UserMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.concurrent.TimeUnit;
@Service
public class UserServiceImpl implements UserService {
@Autowired
UserMapper userMapper;
@Override
public long countFansCountByUserId(Long userId) {
try {
Thread.sleep(10000);
System.out.println("獲取FansCount===睡眠:" + 10 + "s");
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("UserService獲取FansCount的線程 " + Thread.currentThread().getName());
return 520;
}
@Override
public long countMsgCountByUserId(Long userId) {
System.out.println("UserService獲取MsgCount的線程 " + Thread.currentThread().getName());
try {
Thread.sleep(10000);
System.out.println("獲取MsgCount===睡眠:" + 10 + "s");
} catch (InterruptedException e) {
e.printStackTrace();
}
return 618;
}
@Override
public long countCollectCountByUserId(Long userId) {
System.out.println("UserService獲取CollectCount的線程 " + Thread.currentThread().getName());
try {
Thread.sleep(10000);
System.out.println("獲取CollectCount==睡眠:" + 10 + "s");
} catch (InterruptedException e) {
e.printStackTrace();
}
return 6664;
}
@Override
public long countFollowCountByUserId(Long userId) {
System.out.println("UserService獲取FollowCount的線程 " + Thread.currentThread().getName());
try {
Thread.sleep(10000);
System.out.println("獲取FollowCount===睡眠:" + 10+ "s");
} catch (InterruptedException e) {
e.printStackTrace();
}
return userMapper.countFollowCountByUserId(userId);
}
@Override
public long countRedBagCountByUserId(Long userId) {
System.out.println("UserService獲取RedBagCount的線程 " + Thread.currentThread().getName());
try {
TimeUnit.SECONDS.sleep(4);
System.out.println("獲取RedBagCount===睡眠:" + 4 + "s");
} catch (InterruptedException e) {
e.printStackTrace();
}
return 99;
}
@Override
public long countCouponCountByUserId(Long userId) {
System.out.println("UserService獲取CouponCount的線程 " + Thread.currentThread().getName());
try {
TimeUnit.SECONDS.sleep(8);
System.out.println("獲取CouponCount===睡眠:" + 8+ "s");
} catch (InterruptedException e) {
e.printStackTrace();
}
return 66;
}
}
複製代碼
/** * @author LiJing * @ClassName: UserController * @Description: 用戶控制器 * @date 2019/7/29 15:16 */
@RestController
@RequestMapping("user/")
public class UserController {
@Autowired
private UserService userService;
@Autowired
private MyFutureTask myFutureTask;
@GetMapping("/index")
@ResponseBody
public String index() {
return "啓動用戶模塊成功~~~~~~~~";
}
//http://localhost:8080/api/user/get/data?userId=4
@GetMapping("/get/data")
@ResponseBody
public UserBehaviorDataDTO getUserData(Long userId) {
System.out.println("UserController的線程:" + Thread.currentThread());
long begin = System.currentTimeMillis();
UserBehaviorDataDTO userAggregatedResult = myFutureTask.getUserAggregatedResult(userId);
long end = System.currentTimeMillis();
System.out.println("===============總耗時:" + (end - begin) /1000.0000+ "秒");
return userAggregatedResult;
}
}
複製代碼
咱們啓動項目:開啓調用 http://localhost:8080/api/user/get/data?userId=4
當咱們線程池配置爲:核心線程 8 最大線程 20 保活時間30s 存儲隊列 10 的時候,咱們測試的結果以下:
結果:咱們看到每一個server method的執行線程都是從線程池中發起的線程名:
User_Async_FutureTask-%d
, 總耗時從累計的52秒縮短到10秒,即取決於最耗時的方法查詢時間.
那咱們再將註釋代碼放開,進行串行查詢進行測試:
結果:咱們使用串行的方式進行查詢,結果彙總將達到52秒,那太可怕了~~
使用FutureTask的時候,就是將任務runner以caller的方式進行回調,阻塞獲取,最後咱們將結果彙總,即完成了開啓多線程異步調用咱們的業務方法.
Future<Long> fansCountFT = executor.submit(new Callable<Long>() {
@Override
public Long call() throws Exception {
return userService.countFansCountByUserId(userId);
}
});
複製代碼
這裏使用的只是一個簡單的例子,具體項目能夠定義具體的業務方法進行歸併處理,其實在JDK1.8之後,又有了ExecutorCompletionService
,ForkJoinTask
,CompletableFuture
這些均可以實現上述的方法,咱們後續會作一些這些方法使用的案例,指望你們的關注,文章中有不足之處,歡迎指正~
因此:咱們要用到親愛的Spring的異步編程,異步編程有不少種方式:好比常見的Future的sync
,CompletableFuture.supplyAsync,
,@Async
,哈哈 其實都離不開Thread.start()...,等等我說個笑話:
老爸有倆孩子:小紅和小明。老爸想喝酒了,他讓小紅去買酒,小紅出去了。而後老爸忽然想吸菸了,因而老爸讓小明去買菸。在面對對象的思想中,通常會把買東西,而後買回來這件事做爲一個方法,若是按照順序結構或者使用多線程同步的話,小明想去買菸就必須等小紅這個買東西的操做進行完。這樣無疑增長了時間的開銷(萬一老爸尿憋呢?)。異步就是爲了解決這樣的問題。你能夠分別給小紅小明下達指令,讓他們去買東西,而後你就能夠本身作本身的事,等他們買回來的時候接收結果就能夠了。
package com.boot.lea.mybot.futrue;
/** * @ClassName: TestFuture * @Description: 演示異步編程 * @author LiJing * @date 2019/8/5 15:16 */
@SuppressWarnings("all")
public class TestFuture {
static ExecutorService executor = Executors.newFixedThreadPool(2);
public static void main(String[] args) throws InterruptedException {
//兩個線程的線程池
//小紅買酒任務,這裏的future2表明的是小紅將來發生的操做,返回小紅買東西這個操做的結果
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
System.out.println("爸:小紅你去買瓶酒!");
try {
System.out.println("小紅出去買酒了,女孩子跑的比較慢,估計5s後纔會回來...");
Thread.sleep(5000);
return "我買回來了!";
} catch (InterruptedException e) {
System.err.println("小紅路上遭遇了不測");
return "來世再見!";
}
}, executor);
//小明買菸任務,這裏的future1表明的是小明將來買東西會發生的事,返回值是小明買東西的結果
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
System.out.println("爸:小明你去買包煙!");
try {
System.out.println("小明出去買菸了,可能要3s後回來...");
Thread.sleep(3000);
throw new InterruptedException();
// return "我買回來了!";
} catch (InterruptedException e) {
System.out.println("小明路上遭遇了不測!");
return "這是我託人帶來的口信,我已經不在了。";
}
}, executor);
//獲取小紅買酒結果,從小紅的操做中獲取結果,把結果打印
future2.thenAccept((e) -> {
System.out.println("小紅說:" + e);
});
//獲取小明買菸的結果
future1.thenAccept((e) -> {
System.out.println("小明說:" + e);
});
System.out.println("爸:等啊等 西湖美景三月天嘞......");
System.out.println("爸: 我以爲無聊甚至去了趟廁所。");
Thread.currentThread().join(9 * 1000);
System.out.println("爸:終於給老子買來了......huo 酒");
//關閉線程池
executor.shutdown();
}
}
複製代碼
運行結果: