這是why技術的第 74 篇原創文章前端
深夜懟文的我mysql
分佈式事務你應該是知道的。可是這個多線程事務......git
沒事,我慢慢給你說。github
如圖所示,有個小夥伴想要實現多線程事務。spring
這個需求其實我在不一樣的地方看到過不少次,因此我才說:這個問題又出現了。sql
那麼有解決方案嗎?數據庫
在此以前,個人回答都是很是的確定:毋庸置疑,確定是沒有的。編程
爲何呢?安全
咱們先從理論上去推理一下。多線程
來,首先我問你,事務的特性是什麼?
這個不難吧?八股文必背內容之一,ACID 必須張口就來:
那麼問題又來了,你以爲若是有多線程事務,那麼咱們破壞了哪一個特性?
多線程事務你也別想的多深奧,你就想,兩個不一樣的用戶各自發起了一個下單請求,這個請求對應的後臺實現邏輯中是有事務存在的。
這不就是多線程事務嗎?
這種場景下你沒有想過怎麼分別去控制兩個用戶的事務操做吧?
由於這兩個操做之間就是徹底隔離的,各自拿着各自的連接玩兒。
因此多個事務之間的最基本的原則是什麼?
隔離性。兩個事務操做之間不該該相互干擾。
而多線程事務想要實現的是 A 線程異常了。A,B 線程的事務一塊兒回滾。
事務的特性裏面就卡的死死的。因此,多線程事務從理論上就是行不通的。
經過理論指導實踐,那麼多線程事務的代碼也就是寫不出來的。
前面說到隔離性。那麼請問,Spring 的源碼裏面,對於事務的隔離性是如何保證的呢?
答案就是 ThreadLocal。
在事務開啓的時候,把當前的連接保存在了 ThreadLocal 裏面,從而保證了多線程之間的隔離性:
能夠看到,這個 resource 對象是一個 ThreadLocal 對象。
在下面這個方法中進行了賦值操做:
org.springframework.jdbc.datasource.DataSourceTransactionManager#doBegin
其中的 bindResource 方法中,就是把當前連接綁定到當前線程中,其中的 resource 就是咱們剛剛說的 ThreadLocal:
就是每一個線程裏面都各自玩本身的,咱們不可能打破 ThreadLocal 的使用規則,讓各個線程共享同一個 ThreadLocal 吧?
鐵子,你要是這樣去作的話,那豈不是走遠了?
因此,不管從理論上,仍是代碼實現上,我都認爲這個需求是不能實現的。
至少我以前是這樣想的。
可是事情,稍稍的發生了一點點的變化。
任何脫離場景討論技術實現的行爲都是耍流氓。
因此,咱們先看一下場景是什麼。
假設咱們有一個大數據系統,天天指定時間,咱們就須要從大數據系統中拉取 50w 條數據,對數據進行一個清洗操做,而後把數據保存到咱們業務系統的數據庫中。
對於業務系統而言,這 50w 條數據,必須所有落庫,差一條都不行。要麼就是一條都不插入。
在這個過程當中,不會去調用其餘的外部接口,也不會有其餘的流程去操做這個表的數據。
既然說到一條不差了,那麼對於你們直觀而言,想到的確定是兩個解決方案:
對於這種需求,開啓事務,而後在 for 循環中一條條的插入能夠說是很是 low 的解決方案了。
效率很是的低下,給你們演示一下。
好比,咱們有一個 Student 表,表結構很是簡單,以下:
`CREATE TABLE
student` (
id
bigint(63) NOT NULL AUTO_INCREMENT,
name
varchar(32) DEFAULT NULL,
home
varchar(64) DEFAULT NULL,
PRIMARY KEY (id
)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
``
在咱們的項目中,咱們經過 for 循環插入數據,同時該方法上有 @Transactional 註解:
num 參數是咱們經過前端請求傳遞過來的數據,表明要插入 num 條數據:
這種狀況下,咱們能夠經過下面的連接,模擬插入指定數量的數據:
http://127.0.0.1:8081/insertOneByOne?num=xxx
我嘗試了把 num 設置爲 50w,讓它慢慢的跑着,可是我仍是太年輕了,等了很是長的時間都沒有等到結果。
因而我把 num 改成了 5000,運行結果以下:
insertOneByOne執行耗時:133449ms,num=5000
一條條的插入 5000 條數據,耗時 133.5 s 的樣子。
按照這個速度,插入 50w 條數據得 13350s,大概也是這麼多小時:
這誰頂得住啊。
因此,這方案擁有巨大的優化空間。
好比咱們優化爲這樣批量插入:
其對應的 sql 語句是這樣的:
insert into table ([列名],[列名]) VALUES ([列值],[列值]), ([列值],[列值]);
咱們仍是經過前端接口調用:
當咱們的 num 設置爲 5000 的時候,我頁面刷新了 10 次,你看耗時基本上在 200ms 毫秒之內:
從 133.5s 到 200ms,朋友們,這是什麼東西?
這是質的飛躍啊。性能提高了近 667 倍的樣子。
爲何批量插入能有這麼大的飛躍呢?
你想啊,以前 for 循環插入,雖然 SpringBoot 2.0 默認使用了 HikariPool,鏈接池裏面默認給你搞 10 個鏈接。
可是你只須要一個鏈接,開啓一次事務。這個不耗時。
耗時的地方是你 5000 次 IO 呀。
因此,耗時長是必然的。
而批量插入只是一條 sql 語句,因此只須要一個鏈接,還不須要開啓事務。
爲啥不用開啓事務?
你一條 sql 開啓事務有錘子用啊?
那麼,若是咱們一口氣插入 50w 條數據,會是怎麼樣的呢?
來,搞一波,試一下:
http://127.0.0.1:8081/insertBatch?num=500000
能夠看到拋出了一個異常。並且錯誤信息很是的清晰:
`Packet for query is too large (42777840 > 1048576). You can change this value on the server by setting the max_allowed_packet' variable.; nested exception is com.mysql.jdbc.PacketTooBigException: Packet for query is too large (42777840 > 1048576).You can change this value on the server by setting the max_allowed_packet' variable.
`
說你這個包太大了。能夠經過設置 max_allowed_packet 來改變包大小。
咱們能夠經過下面的語句查詢當前的配置大小:
select @@max_allowed_packet;
能夠看到是 1048576,即 1024*1024,1M 大小。
而咱們須要傳輸的包大小是 42777840 字節,大概是 41M 的樣子。
因此咱們須要修改配置大小。
這個地方也給你們提了個醒:若是你的 sql 語句很是大,裏面有大字段,記得調整一下 mysql 的這個參數。
能夠經過修改配置文件或者直接執行 sql 語句的方式進行修改。
我這裏就使用 sql 語句修改成 64M:
set global max_allowed_packet = 1024*1024*64;
而後再次執行,能夠看到插入成功了:
50w 的數據,74s 的樣子。
數據要麼所有提交,要麼一條也沒有,需求也實現了。
時間上呢,是有點長,可是好像也想不到什麼好的提高方案。
那麼咱們怎麼還能再縮短點時間呢?
我能想到的,只能是祭出多線程了。
50w 數據。咱們開五個線程,一個線程處理 10w 數據,沒有異常就保存入庫,出現問題就回滾。
這個需求很好實現。分分鐘就能寫出來。
可是再加上一個需求:這 5 個線程的數據,若是有一個線程出現問題了,須要所有回滾。
順着思路慢慢擼,咱們發現這個時候就是所謂的多線程事務了。
我以前說徹底不可能實現是由於提到事務我就想到了 @Transactional 註解去實現了。
咱們只須要正確使用它,而後關係業務邏輯便可,不須要也根本插手不了事務的開啓和提交或者回滾。
這種代碼的寫法咱們叫作聲明式事務。
和聲明式事務對應的就是編程式事務了。
經過編程式事務,咱們就能徹底掌控事務的開啓和提交或者回滾操做。
能想到編程式事務,這事基本上就成了一半了。
你想,首先咱們有一個全局變量爲 Boolean 類型,默認爲能夠提交。
在子線程裏面,咱們能夠先經過編程式事務開啓事務,而後插入 10w 條數據後,可是不提交。同時告訴主線程,我這邊準備好了,進入等待。
若是子線程裏面出現了異常,那麼我就告訴主線程,我這邊出問題了,而後本身進行回滾。
最後主線程收集到了 5 個子線程的狀態。
若是有一個線程出現了問題,那麼設置全局變量爲不可提交。
而後喚醒全部等待的子線程,進行回滾。
根據上面的流程,寫出模擬代碼就是這樣的,你們能夠直接複製出來運行:
`public class MainTest {
//是否能夠提交
public static volatile boolean IS_OK = true;
public static void main(String[] args) {
//子線程等待主線程通知
CountDownLatch mainMonitor = new CountDownLatch(1);
int threadCount = 5;
CountDownLatch childMonitor = new CountDownLatch(threadCount);
//子線程運行結果
List<Boolean> childResponse = new ArrayList<Boolean>();
ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < threadCount; i++) {
int finalI = i;
executor.execute(() -> {
try {
System.out.println(Thread.currentThread().getName() + ":開始執行");
// if (finalI == 4) {
// throw new Exception("出現異常");
// }
TimeUnit.MILLISECONDS.sleep(ThreadLocalRandom.current().nextInt(1000));
childResponse.add(Boolean.TRUE);
childMonitor.countDown();
System.out.println(Thread.currentThread().getName() + ":準備就緒,等待其餘線程結果,判斷是否事務提交");
mainMonitor.await();
if (IS_OK) {
System.out.println(Thread.currentThread().getName() + ":事務提交");
} else {
System.out.println(Thread.currentThread().getName() + ":事務回滾");
}
} catch (Exception e) {
childResponse.add(Boolean.FALSE);
childMonitor.countDown();
System.out.println(Thread.currentThread().getName() + ":出現異常,開始事務回滾");
}
});
}
//主線程等待全部子線程執行response
try {
childMonitor.await();
for (Boolean resp : childResponse) {
if (!resp) {
//若是有一個子線程執行失敗了,則改變mainResult,讓全部子線程回滾
System.out.println(Thread.currentThread().getName()+":有線程執行失敗,標誌位設置爲false");
IS_OK = false;
break;
}
}
//主線程獲取結果成功,讓子線程開始根據主線程的結果執行(提交或回滾)
mainMonitor.countDown();
//爲了讓主線程阻塞,讓子線程執行。
Thread.currentThread().join();
} catch (Exception e) {
e.printStackTrace();
}
}
}
`
在全部子線程都正常的狀況下,輸出結果是這樣的:
從結果看,是符合咱們的預期的。
假設有子線程出現了異常,那麼運行結果是這樣的:
一個線程出現異常,所有線程都進行回滾,這樣看來也是符合預期的。
若是你根據前面的需求寫出了這樣的代碼,那麼恭喜你,一不留神實現了一個相似於兩階段提交(2PC)的一致性協議。
我前面說的能想到編程式事務,這事基本上就成了一半了。
而另一半,就是兩階段提交(2PC)。
有了前面的瓢,你照着畫個葫蘆不是很簡單的事情嗎?
就不大段上代碼了,示例代碼能夠點擊這裏獲取到,因此我這裏截個圖吧:
上面的代碼應該是很是好理解的,開啓五個線程,每一個線程插入 10w 條數據。
這個不用說,用腳趾頭想也能知道,確定是比一次性批量插入 50w 條數據快的。
至於快多少,不廢話了,直接看執行效果吧。
因爲咱們的 controller 是這樣的:
因此調用連接:
http://127.0.0.1:8081/batchHandle
輸出結果以下:
還記得咱們批量插入的耗時嗎?
73791ms。
從 73791ms 到 15719ms。快了 58s 的樣子。
已經很是不錯了。
那麼若是是某個線程拋出了異常呢?好比這樣:
咱們看看日誌輸出:
經過日誌分析,看起來也是符合要求的。
而從讀者反饋的實際測試效果來看,也是很是顯著的:
符合要求,只是看起來而已。
經驗老道的讀者朋友們確定早就看到問題所在了。已經把手舉得高高的:老師,這題我知道。
我以前說了,這個實現方式實際上就是編程式事務配合二階段提交(2PC)使用。
破綻就出在 2PC 上。
就像我和讀者討論這樣的:
不能再日後扯了,再日後就是 3PC,TCC,Seata 這一套分佈式事務的東西了。
這套東西寫下來,就得上萬字了。因此我從海神那邊轉了一篇文章,放在第二條推送裏面了。若是你們有興趣的能夠去看一下。乾貨滿滿。
其實當咱們把一個個子線程理解爲微服務中的一個個子系統的時候,這就是一個分佈式事務的場景了。
而咱們拿出來的解決方案,並非一個完美的解決方案。
雖然,從某種角度上,咱們繞開了事務的隔離性,可是有必定機率出現數據一致性問題,雖然機率比較小。
因此我稱之爲這種方案叫作:基於運氣編程,用運氣換時間。
關於上面的代碼,其實還有幾個須要注意的地方。
給你們提個醒。
第一個:啓用多少線程進行分配數據插入,這個參數是能夠進行調整的。
好比我修改成 10 個線程,每一個線程插入 5w 條數據。那麼執行時間又快了 2s:
可是必定記得不是越大越好,同時記得調整數據庫鏈接池的最大鏈接數。否則白搭。
第二個:正是由於啓動多少線程是能夠進行調整的,甚至是能夠每次進行計算的。
那麼必需要注意的一個問題是不能讓任何一個任務進入隊列裏面。一旦進入隊列,程序立馬就涼。
你想,若是咱們須要開啓 5 個子線程,可是核心線程數只有 4 個,有一個任務進入隊列了。
那麼這 4 個核心線程會一直阻塞住,等待主線程喚醒。
而主線程這個時候在幹什麼?
在等 5 個線程的運行結果,可是它只能收集到 4 個結果。
因此它會一直等下去。
第三個:這裏是多個線程開啓了事務在往表裏插入數據,謹防數據庫死鎖。
第四個:注意程序裏面的代碼,countDown 安裝標準寫法上是要放到 finally 代碼塊裏面的,我這裏爲了截圖的美觀度,省去了這個步驟:
你若是真的要用,得注意一下。並且這個finally你得想清楚了寫,不是隨便寫的。
第五個:我這裏只是提供一個思路,並且它也根本不是什麼多線程事務。
也再次證實了,多線程事務就是一個僞命題。
因此我給出一個基於運氣的僞一致性的回答也不過度吧。
第六個:多線程事務換個角度想,能夠理解爲分佈式事務。,能夠藉助這個案例去了解分佈式事務。可是解決分佈式事務的最好的方法就是:不要有分佈式事務!
而解決分佈式事務的絕大部分落地方案都是:最終一致性。
性價比高,大多數業務上也能接受。
第七個:這個解決方案你要拿到生產用的話,記得先和業務同事溝通好,能不能接受這種狀況。速度和安全之間的兩難抉擇。
同時本身留好人工修數的接口:
才疏學淺,不免會有紕漏,若是你發現了錯誤的地方,能夠在留言區提出來,我對其加以修改。 感謝您的閱讀,我堅持原創,十分歡迎並感謝您的關注。
我是 why,一個被代碼耽誤的文學創做者,不是大佬,可是喜歡分享,是一個又暖又有料的四川好男人。
還有,歡迎關注我呀。