要我說,多線程事務它必須就是個僞命題!

這是why技術的第 74 篇原創文章前端

深夜懟文的我

深夜懟文的我mysql

別問,問就是不行

分佈式事務你應該是知道的。可是這個多線程事務......git

沒事,我慢慢給你說。github

如圖所示,有個小夥伴想要實現多線程事務。spring

這個需求其實我在不一樣的地方看到過不少次,因此我才說:這個問題又出現了。sql

那麼有解決方案嗎?數據庫

在此以前,個人回答都是很是的確定:毋庸置疑,確定是沒有的。編程

爲何呢?安全

咱們先從理論上去推理一下。多線程

來,首先我問你,事務的特性是什麼?

這個不難吧?八股文必背內容之一,ACID 必須張口就來:

  • 原子性(Atomicity)
  • 一致性(Consistency)
  • 隔離性(Isolation)
  • 持久性(Durability)

那麼問題又來了,你以爲若是有多線程事務,那麼咱們破壞了哪一個特性?

多線程事務你也別想的多深奧,你就想,兩個不一樣的用戶各自發起了一個下單請求,這個請求對應的後臺實現邏輯中是有事務存在的。

這不就是多線程事務嗎?

這種場景下你沒有想過怎麼分別去控制兩個用戶的事務操做吧?

由於這兩個操做之間就是徹底隔離的,各自拿着各自的連接玩兒。

因此多個事務之間的最基本的原則是什麼?

隔離性。兩個事務操做之間不該該相互干擾。

而多線程事務想要實現的是 A 線程異常了。A,B 線程的事務一塊兒回滾。

事務的特性裏面就卡的死死的。因此,多線程事務從理論上就是行不通的。

經過理論指導實踐,那麼多線程事務的代碼也就是寫不出來的。

前面說到隔離性。那麼請問,Spring 的源碼裏面,對於事務的隔離性是如何保證的呢?

答案就是 ThreadLocal。

在事務開啓的時候,把當前的連接保存在了 ThreadLocal 裏面,從而保證了多線程之間的隔離性:

能夠看到,這個 resource 對象是一個 ThreadLocal 對象。

在下面這個方法中進行了賦值操做:

org.springframework.jdbc.datasource.DataSourceTransactionManager#doBegin

其中的 bindResource 方法中,就是把當前連接綁定到當前線程中,其中的 resource 就是咱們剛剛說的 ThreadLocal:

就是每一個線程裏面都各自玩本身的,咱們不可能打破 ThreadLocal 的使用規則,讓各個線程共享同一個 ThreadLocal 吧?

鐵子,你要是這樣去作的話,那豈不是走遠了?

因此,不管從理論上,仍是代碼實現上,我都認爲這個需求是不能實現的。

至少我以前是這樣想的。

可是事情,稍稍的發生了一點點的變化。

說個場景,常規實現

任何脫離場景討論技術實現的行爲都是耍流氓。

因此,咱們先看一下場景是什麼。

假設咱們有一個大數據系統,天天指定時間,咱們就須要從大數據系統中拉取 50w 條數據,對數據進行一個清洗操做,而後把數據保存到咱們業務系統的數據庫中。

對於業務系統而言,這 50w 條數據,必須所有落庫,差一條都不行。要麼就是一條都不插入。

在這個過程當中,不會去調用其餘的外部接口,也不會有其餘的流程去操做這個表的數據。

既然說到一條不差了,那麼對於你們直觀而言,想到的確定是兩個解決方案:

  1. for 循環中一條條的事務插入。
  2. 直接一條語句批量插入。

對於這種需求,開啓事務,而後在 for 循環中一條條的插入能夠說是很是 low 的解決方案了。

效率很是的低下,給你們演示一下。

好比,咱們有一個 Student 表,表結構很是簡單,以下:

`CREATE TABLE student` (
  id bigint(63) NOT NULL AUTO_INCREMENT,
  name varchar(32) DEFAULT NULL,
  home varchar(64) DEFAULT NULL,
  PRIMARY KEY (id)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
``

在咱們的項目中,咱們經過 for 循環插入數據,同時該方法上有 @Transactional 註解:

num 參數是咱們經過前端請求傳遞過來的數據,表明要插入 num 條數據:

這種狀況下,咱們能夠經過下面的連接,模擬插入指定數量的數據:

http://127.0.0.1:8081/insertOneByOne?num=xxx

我嘗試了把 num 設置爲 50w,讓它慢慢的跑着,可是我仍是太年輕了,等了很是長的時間都沒有等到結果。

因而我把 num 改成了 5000,運行結果以下:

insertOneByOne執行耗時:133449ms,num=5000

一條條的插入 5000 條數據,耗時 133.5 s 的樣子。

按照這個速度,插入 50w 條數據得 13350s,大概也是這麼多小時:

這誰頂得住啊。

因此,這方案擁有巨大的優化空間。

好比咱們優化爲這樣批量插入:

其對應的 sql 語句是這樣的:

insert into table ([列名],[列名]) VALUES ([列值],[列值]), ([列值],[列值]);

咱們仍是經過前端接口調用:

當咱們的 num 設置爲 5000 的時候,我頁面刷新了 10 次,你看耗時基本上在 200ms 毫秒之內:

從 133.5s 到 200ms,朋友們,這是什麼東西?

這是質的飛躍啊。性能提高了近 667 倍的樣子。

爲何批量插入能有這麼大的飛躍呢?

你想啊,以前 for 循環插入,雖然 SpringBoot 2.0 默認使用了 HikariPool,鏈接池裏面默認給你搞 10 個鏈接。

可是你只須要一個鏈接,開啓一次事務。這個不耗時。

耗時的地方是你 5000 次 IO 呀。

因此,耗時長是必然的。

而批量插入只是一條 sql 語句,因此只須要一個鏈接,還不須要開啓事務。

爲啥不用開啓事務?

你一條 sql 開啓事務有錘子用啊?

那麼,若是咱們一口氣插入 50w 條數據,會是怎麼樣的呢?

來,搞一波,試一下:

http://127.0.0.1:8081/insertBatch?num=500000

能夠看到拋出了一個異常。並且錯誤信息很是的清晰:

`Packet for query is too large (42777840 > 1048576). You can change this value on the server by setting the max_allowed_packet' variable.; nested exception is com.mysql.jdbc.PacketTooBigException: Packet for query is too large (42777840 > 1048576).You can change this value on the server by setting the max_allowed_packet' variable.
`

說你這個包太大了。能夠經過設置 max_allowed_packet 來改變包大小。

咱們能夠經過下面的語句查詢當前的配置大小:

select @@max_allowed_packet;

能夠看到是 1048576,即 1024*1024,1M 大小。

而咱們須要傳輸的包大小是 42777840 字節,大概是 41M 的樣子。

因此咱們須要修改配置大小。

這個地方也給你們提了個醒:若是你的 sql 語句很是大,裏面有大字段,記得調整一下 mysql 的這個參數。

能夠經過修改配置文件或者直接執行 sql 語句的方式進行修改。

我這裏就使用 sql 語句修改成 64M:

set global max_allowed_packet = 1024*1024*64;

而後再次執行,能夠看到插入成功了:

50w 的數據,74s 的樣子。

數據要麼所有提交,要麼一條也沒有,需求也實現了。

時間上呢,是有點長,可是好像也想不到什麼好的提高方案。

那麼咱們怎麼還能再縮短點時間呢?

騷想法出現了

我能想到的,只能是祭出多線程了。

50w 數據。咱們開五個線程,一個線程處理 10w 數據,沒有異常就保存入庫,出現問題就回滾。

這個需求很好實現。分分鐘就能寫出來。

可是再加上一個需求:這 5 個線程的數據,若是有一個線程出現問題了,須要所有回滾。

順着思路慢慢擼,咱們發現這個時候就是所謂的多線程事務了。

我以前說徹底不可能實現是由於提到事務我就想到了 @Transactional 註解去實現了。

咱們只須要正確使用它,而後關係業務邏輯便可,不須要也根本插手不了事務的開啓和提交或者回滾。

這種代碼的寫法咱們叫作聲明式事務。

和聲明式事務對應的就是編程式事務了。

經過編程式事務,咱們就能徹底掌控事務的開啓和提交或者回滾操做。

能想到編程式事務,這事基本上就成了一半了。

你想,首先咱們有一個全局變量爲 Boolean 類型,默認爲能夠提交。

在子線程裏面,咱們能夠先經過編程式事務開啓事務,而後插入 10w 條數據後,可是不提交。同時告訴主線程,我這邊準備好了,進入等待。

若是子線程裏面出現了異常,那麼我就告訴主線程,我這邊出問題了,而後本身進行回滾。

最後主線程收集到了 5 個子線程的狀態。

若是有一個線程出現了問題,那麼設置全局變量爲不可提交。

而後喚醒全部等待的子線程,進行回滾。

根據上面的流程,寫出模擬代碼就是這樣的,你們能夠直接複製出來運行:

`public class MainTest {
    //是否能夠提交
    public static volatile boolean IS_OK = true;
    public static void main(String[] args) {
        //子線程等待主線程通知
        CountDownLatch mainMonitor = new CountDownLatch(1);
        int threadCount = 5;
        CountDownLatch childMonitor = new CountDownLatch(threadCount);
        //子線程運行結果
        List<Boolean> childResponse = new ArrayList<Boolean>();
        ExecutorService executor = Executors.newCachedThreadPool();
        for (int i = 0; i < threadCount; i++) {
            int finalI = i;
            executor.execute(() -> {
                try {
                    System.out.println(Thread.currentThread().getName() + ":開始執行");
// if (finalI == 4) {
// throw new Exception("出現異常");
// }
                    TimeUnit.MILLISECONDS.sleep(ThreadLocalRandom.current().nextInt(1000));
                    childResponse.add(Boolean.TRUE);
                    childMonitor.countDown();
                    System.out.println(Thread.currentThread().getName() + ":準備就緒,等待其餘線程結果,判斷是否事務提交");
                    mainMonitor.await();
                    if (IS_OK) {
                        System.out.println(Thread.currentThread().getName() + ":事務提交");
                    } else {
                        System.out.println(Thread.currentThread().getName() + ":事務回滾");
                    }
                } catch (Exception e) {
                    childResponse.add(Boolean.FALSE);
                    childMonitor.countDown();
                    System.out.println(Thread.currentThread().getName() + ":出現異常,開始事務回滾");
                }
            });
        }
        //主線程等待全部子線程執行response
        try {
            childMonitor.await();
            for (Boolean resp : childResponse) {
                if (!resp) {
                    //若是有一個子線程執行失敗了,則改變mainResult,讓全部子線程回滾
                    System.out.println(Thread.currentThread().getName()+":有線程執行失敗,標誌位設置爲false");
                    IS_OK = false;
                    break;
                }
            }
            //主線程獲取結果成功,讓子線程開始根據主線程的結果執行(提交或回滾)
            mainMonitor.countDown();
            //爲了讓主線程阻塞,讓子線程執行。
            Thread.currentThread().join();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
`

在全部子線程都正常的狀況下,輸出結果是這樣的:

從結果看,是符合咱們的預期的。

假設有子線程出現了異常,那麼運行結果是這樣的:

一個線程出現異常,所有線程都進行回滾,這樣看來也是符合預期的。

若是你根據前面的需求寫出了這樣的代碼,那麼恭喜你,一不留神實現了一個相似於兩階段提交(2PC)的一致性協議。

我前面說的能想到編程式事務,這事基本上就成了一半了。

而另一半,就是兩階段提交(2PC)。

依瓢畫葫蘆

有了前面的瓢,你照着畫個葫蘆不是很簡單的事情嗎?

就不大段上代碼了,示例代碼能夠點擊這裏獲取到,因此我這裏截個圖吧:

上面的代碼應該是很是好理解的,開啓五個線程,每一個線程插入 10w 條數據。

這個不用說,用腳趾頭想也能知道,確定是比一次性批量插入 50w 條數據快的。

至於快多少,不廢話了,直接看執行效果吧。

因爲咱們的 controller 是這樣的:

因此調用連接:

http://127.0.0.1:8081/batchHandle

輸出結果以下:

還記得咱們批量插入的耗時嗎?

73791ms。

從 73791ms 到 15719ms。快了 58s 的樣子。

已經很是不錯了。

那麼若是是某個線程拋出了異常呢?好比這樣:

咱們看看日誌輸出:

經過日誌分析,看起來也是符合要求的。

而從讀者反饋的實際測試效果來看,也是很是顯著的:

真的符合要求嗎?

符合要求,只是看起來而已。

經驗老道的讀者朋友們確定早就看到問題所在了。已經把手舉得高高的:老師,這題我知道。

我以前說了,這個實現方式實際上就是編程式事務配合二階段提交(2PC)使用。

破綻就出在 2PC 上。

就像我和讀者討論這樣的:

不能再日後扯了,再日後就是 3PC,TCC,Seata 這一套分佈式事務的東西了。

這套東西寫下來,就得上萬字了。因此我從海神那邊轉了一篇文章,放在第二條推送裏面了。若是你們有興趣的能夠去看一下。乾貨滿滿。

其實當咱們把一個個子線程理解爲微服務中的一個個子系統的時候,這就是一個分佈式事務的場景了。

而咱們拿出來的解決方案,並非一個完美的解決方案。

雖然,從某種角度上,咱們繞開了事務的隔離性,可是有必定機率出現數據一致性問題,雖然機率比較小。

因此我稱之爲這種方案叫作:基於運氣編程,用運氣換時間。

注意事項

關於上面的代碼,其實還有幾個須要注意的地方。

給你們提個醒。

第一個:啓用多少線程進行分配數據插入,這個參數是能夠進行調整的。

好比我修改成 10 個線程,每一個線程插入 5w 條數據。那麼執行時間又快了 2s:

可是必定記得不是越大越好,同時記得調整數據庫鏈接池的最大鏈接數。否則白搭。

第二個:正是由於啓動多少線程是能夠進行調整的,甚至是能夠每次進行計算的。

那麼必需要注意的一個問題是不能讓任何一個任務進入隊列裏面。一旦進入隊列,程序立馬就涼。

你想,若是咱們須要開啓 5 個子線程,可是核心線程數只有 4 個,有一個任務進入隊列了。

那麼這 4 個核心線程會一直阻塞住,等待主線程喚醒。

而主線程這個時候在幹什麼?

在等 5 個線程的運行結果,可是它只能收集到 4 個結果。

因此它會一直等下去。

第三個:這裏是多個線程開啓了事務在往表裏插入數據,謹防數據庫死鎖。

第四個:注意程序裏面的代碼,countDown 安裝標準寫法上是要放到 finally 代碼塊裏面的,我這裏爲了截圖的美觀度,省去了這個步驟:

你若是真的要用,得注意一下。並且這個finally你得想清楚了寫,不是隨便寫的。

第五個:我這裏只是提供一個思路,並且它也根本不是什麼多線程事務。

也再次證實了,多線程事務就是一個僞命題。

因此我給出一個基於運氣的僞一致性的回答也不過度吧。

第六個:多線程事務換個角度想,能夠理解爲分佈式事務。,能夠藉助這個案例去了解分佈式事務。可是解決分佈式事務的最好的方法就是:不要有分佈式事務!

而解決分佈式事務的絕大部分落地方案都是:最終一致性。

性價比高,大多數業務上也能接受。

第七個:這個解決方案你要拿到生產用的話,記得先和業務同事溝通好,能不能接受這種狀況。速度和安全之間的兩難抉擇。

同時本身留好人工修數的接口:

最後說一句

才疏學淺,不免會有紕漏,若是你發現了錯誤的地方,能夠在留言區提出來,我對其加以修改。 感謝您的閱讀,我堅持原創,十分歡迎並感謝您的關注。

我是 why,一個被代碼耽誤的文學創做者,不是大佬,可是喜歡分享,是一個又暖又有料的四川好男人。

還有,歡迎關注我呀。

相關文章
相關標籤/搜索