分佈式和事務

集羣中存活的節點與同步

分佈式系統中,如何判斷一個節點(node)是否存活?
kafka這樣認爲:node

  1. 此節點和zookeeper能喊話.(Keep sessions with zookeeper through heartbeats.)
  2. 此節點若是是個從節點,必須可以儘量忠實地反映主節點的數據變化。
    也就是說,必須可以在主節點寫了新數據後,及時複製這些變化的數據,所謂及時,不能拉下太多哦.

那麼,符合上面兩個條件的節點就能夠認爲是存活的,也能夠認爲是同步的(in-sync).數據庫

關於第1點,你們對心跳都很熟悉,那麼咱們能夠這樣認爲某個節點不能和zookeeper喊話了:緩存

1網絡

2session

3架構

4併發

5app

6框架

7異步

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

zookeeper-node:

var timer =

new timer()

.setInterval(10sec)

.onTime(slave-nodes,function(slave-nodes){

    slave-nodes.forEach( node -> {

        boolean isAlive = node.heartbeatACK(15sec);

        if(!isAlive) {

            node.numNotAlive += 1;

            if(node.numNotAlive >= 3) {

                node.declareDeadOrFailed();

                slave-nodes.remove(node);

 

                //回調也可 leader-node-app.notifyNodeDeadOrFailed(node)

 

            }

        }else

        node.numNotAlive = 0;

    });

});

 

timer.run();

 

//你能夠回調也能夠像下面這樣簡單的計時判斷

leader-node-app:

var timer =

new timer()

.setInterval(10sec)

.onTime(slave-nodes,function(slave-nodes){

    slave-nodes.forEach(node -> {

        if(node.isDeadOrFailed) {

 

        //node不能和zookeeper喊話了

 

        }

    });

});

 

timer.run();

關於第二點,要稍微複雜點了,怎麼搞呢?
來這麼分析:

  • 數據 messages.
  • 操做 op-log.
  • 偏移 position/offset.

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

// 1. 先考慮messages

// 2. 再考慮log的postion或者offset

// 3. 考慮msg和off都記錄在同源數據庫或者存儲設備上.(database or storage-device.)

var timer =

new timer()

.setInterval(10sec)

.onTime(slave-nodes,function(nodes){

    var core-of-cpu = 8;

    //嫌慢就併發唄 mod hash go!

    nodes.groupParallel(core-of-cpu)

    .forEach(node -> {

        boolean nodeSucked = false;

 

        if(node.ackTimeDiff > 30sec) {

            //30秒內沒有回覆,node卡住了

            nodeSucked = true;

        }

        if(node.logOffsetDiff > 100) {

            //node複製跟不上了,差距超過100條數據

            nodeSucked = true;

        }

 

        if(nodeSucked) {

            //總之node「死」掉了,其實到底死沒死,誰知道呢?network-error在分佈式系統中或者節點失敗這個事情是正常現象.

            node.declareDeadOrFailed();

            //不和你玩啦,集羣不要你了

            nodes.remove(node);

            //該怎麼處理呢,拋個事件吧.

            fire-event-NodeDeadOrFailed(node);

        }

    });

});

 

timer.run();

上面的節點的狀態管理通常由zookeeper來作,leader或者master節點也會維護那麼點狀態。

那麼應用中的leader或者master節點,只須要從zookeeper拉狀態就能夠,同時,上面的實現是否是必定最佳呢?不是的,並且多數操做能夠合起來,但爲了描述節點是否存活這個事兒,我們這麼寫沒啥問題。

節點死掉、失敗、不一樣步了,咋處理呢?

好嘛,終於說到failover和recover了,那failover比較簡單,由於還有其它的slave節點在,不影響數據讀取。

  1. 同時多個slave節點失敗了?
    沒有100%的可用性.數據中心和機房癱瘓、網絡電纜切斷、hacker入侵刪了你的根,總之你rp爆表了.
  2. 若是主節點失敗了,那master-master不行嘛?
    keep-alived或者LVS或者你本身寫failover吧.
    高可用架構(HA)又是個大件兒了,此文不展開了。

咱們來關注下recover方面的東西,這裏把視野打開點,不只關注slave節點重啓後追log來同步數據,咱們看下在實際應用中,數據請求(包括讀、寫、更新)失敗怎麼辦?

你們可能都會說,重試(retry)唄、重放(replay)唄或者乾脆無論了唄!
行,都行,這些都是策略,但具體怎麼個搞法,你真的清楚了?

一個bigdata問題

咱們先擺個探討的背景:

問題:消息流,好比微博的微博(真繞),源源不斷地流進咱們的應用中,要處理這些消息,有個需求是這樣的:

Reach is the number of unique people exposed to a URL on Twitter.

那麼,統計一下3小時內的本條微博(url)的reach總數。

怎麼解決呢?

把某時間段內轉發過某條微博(url)的人拉出來,把這些人的粉絲拉出來,去掉重複的人,而後求總數,就是要求的reach.

爲了簡單,咱們忽略掉日期,先看看這個方法行不行:

1

2

3

4

5

6

7

8

9

10

11

12

/** ---------------------------------

* 1. 求出轉發微博(url)的大V.

* __________________________________*/

 

方法 :getUrlToTweetersMap(String url_id)

 

SQL : /* 數據庫A,表url_user存儲了轉發某url的user */

SELECT url_user.user_id as tweeter_id

FROM url_user

WHERE url_user.url_id = ${url_id}

 

返回 :[user_1,...,user_m]

1

2

3

4

5

6

7

8

9

10

11

12

/** ---------------------------------

* 2. 求出大V的粉絲

* __________________________________*/

 

方法 : getFollowers(String tweeter_id);

 

SQL :   /* 數據庫B */

SELECT users.id as user_id

FROM users

WHERE users.followee_id = ${tweeter_id}

 

返回:tweeter的粉絲

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

/** ---------------------------------

* 3. 求出Reach

* __________________________________*/

 

var url = queryArgs.getUrl();

var tweeters = getUrlToTweetersMap();

var result = new HashMap<String,Integer>();

tweeters.forEach(t -> {

    // 你能夠批量in + 併發讀來優化下面方法的性能

    var followers = getFollowers(t.tweeter_id);

 

    followers.forEach(f -> {

        //hash去重

        result.put(f.user_id,1);

    });

});

 

//Reach

return result.size();

其實這又引出了一個很重要的問題,也是不少大談框架、設計、模式卻每每忽視的問題:性能和數據庫建模的關係。

  1. 數據量有多大?
    不知道讀者有木有對這個問題的數據庫I/O有點想法,或者虎軀一震呢?
    Computing reach is too intense for a single machine – it can require thousands of database calls and tens of millions of tuples.
    在上面的數據庫設計中避免了JOIN,爲了提升求大V粉絲的性能,能夠將一批大V做爲batch/bulk,而後多個batch併發讀,誓死搞死數據庫。
    這裏將微博到轉發者表所在的庫,與粉絲庫分離,若是數據更大怎麼辦?
    庫再分表…
    OK,假設你已經很是熟悉傳統關係型數據庫的分庫分表及數據路由(讀路徑的聚合、寫路徑的分發)、或者你對於sharding技術也很熟悉、或者你良好的結合了HBase的橫向擴展能力並有一致性策略來解決其二級索引問題.
    總之,存儲和讀取的問題假設你已經解決了,那麼分佈式計算呢?
  2. 微博這種應用,人與人之間的關係成圖狀(網),你怎麼建模存儲?而不只僅對應這個問題,好比:
    某人的好友的好友可能和某人有幾分相熟?

看看用storm怎麼來解決分佈式計算,並提供流式計算的能力:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

// url到大V -> 數據庫1

TridentState urlToTweeters =

    topology.newStaticState(getUrlToTweetersState());

// 大V到粉絲 -> 數據庫2

TridentState tweetersToFollowers =

    topology.newStaticState(getTweeterToFollowersState());

 

topology.newDRPCStream("reach")

    .stateQuery(urlToTweeters, new Fields("args"), new MapGet(), new Fields("tweeters"))

    .each(new Fields("tweeters"), new ExpandList(), new Fields("tweeter"))

    .shuffle() /* 大V的粉絲不少,因此須要分佈式處理*/

    .stateQuery(tweetersToFollowers, new Fields("tweeter"), new MapGet(), new Fields("followers"))

    .parallelismHint(200) /* 粉絲不少,因此須要高併發 */

    .each(new Fields("followers"), new ExpandList(), new Fields("follower"))

    .groupBy(new Fields("follower"))

    .aggregate(new One(), new Fields("one")) /* 去重 */

    .parallelismHint(20)

    .aggregate(new Count(), new Fields("reach")); /* 計算reach數 */

最多處理一次(At most once)

回到主題,引出上面的例子,一是爲了引出一個有關分佈式(存儲+計算)的問題,二是透漏這麼點意思:
碼農,就應該關注設計和實現的東西,好比Jay Kreps是如何發明Kafka這個輪子的 : ]

若是你仍是碼農級別,咱來務點實吧,前面咱們說到recover,節點恢復的問題,那麼咱們恢復幾個東西?

基本的:

  • 節點狀態
  • 節點數據

本篇從數據上來討論下這個問題,爲使問題再簡單點,咱們考慮寫數據的場景,若是咱們用write-ahead-log的方式來保證數據複製和一致性,那麼咱們會怎麼處理一致性問題呢?

  1. 主節點有新數據寫入.
  2. 從節點追log,準備複製這批新數據。從節點作兩件事:
    (1). 把數據的id偏移寫入log;
    (2). 正要處理數據自己,從節點掛了。

那麼根據上文的節點存活條件,這個從節點掛了這件事被探測到了,從節點由維護人員手動或者其本身恢復了,那麼在加入集羣和小夥伴們繼續玩耍以前,它要同步本身的狀態和數據。
問題來了:

若是根據log內的數據偏移來同步數據,那麼,由於這個節點在處理數據以前就把偏移寫好了,但是那批數據lost-datas沒有獲得處理,若是追log以後的數據來同步,那麼那批數據lost-datas就丟了。

在這種狀況下,就叫做數據最多處理一次,也就是說數據會丟失。

最少處理一次(At least once)

好吧,丟失數據不能容忍,那麼咱們換種方式來處理:

  1. 主節點有新數據寫入.
  2. 從節點追log,準備複製這批新數據。從節點作兩件事:
    (1). 先處理數據;
    (2). 正要把數據的id偏移寫入log,從節點掛了。

問題又來了:

若是從節點追log來同步數據,那麼由於那批數據duplicated-datas被處理過了,而數據偏移沒有反映到log中,若是這樣追,會致使這批數據重複。

這種場景,從語義上來說,就是數據最少處理一次,意味着數據處理會重複。

僅處理一次(Exactly once)

Transaction

好吧,數據重複也不能容忍?要求挺高啊。
你們都追求的強一致性保證(這裏是最終一致性),怎麼來搞呢?
換句話說,在更新數據的時候,事務能力如何保障呢?
假設一批數據以下:

1

2

3

4

5

6

// 新到數據

{

    transactionId:4

    urlId:99

    reach:5

}

如今要更新這批數據到庫裏或者log裏,那麼原來的狀況是:

1

2

3

4

5

6

// 老數據

{

    transactionId:3

    urlId:99

    reach:3

}

若是說能夠保證以下三點:

  1. 事務ID的生成是強有序的.(隔離性,串行)
  2. 同一個事務ID對應的一批數據相同.(冪等性,屢次操做一個結果)
  3. 單條數據會且僅會出如今某批數據中.(一致性,無遺漏無重複)

那麼,放心大膽的更新好了:

1

2

3

4

5

6

7

// 更新後數據

{

    transactionId:4

    urlId:99

    //3 + 5 = 8

    reach:8

}

注意到這個更新是ID偏移和數據一塊兒更新的,那麼這個操做靠什麼來保證:原子性
你的數據庫不提供原子性?後文略有說起。

這裏是更新成功了。若是更新的時候,節點掛了,那麼庫裏或者log裏的id偏移不寫,數據也不處理,等節點恢復,就能夠放心去同步,而後加入集羣玩耍了。

因此說,要保證數據僅處理一次,仍是挺困難的吧?

上面的保障「僅處理一次」這個語義的實現有什麼問題呢?

性能問題。

這裏已經使用了batch策略來減小到庫或磁盤的Round-Trip Time,那麼這裏的性能問題是什麼呢?

考慮一下,採用master-master架構來保證主節點的可用性,可是一個主節點失敗了,到另外一個主節點主持工做,是須要時間的。
假設從節點正在同步,啪!主節點掛了!由於要保證僅處理一次的語義,因此原子性發揮做用,失敗,回滾,而後從主節點拉失敗的數據(你不能就近更新,由於這批數據可能已經變化了,或者你根本沒緩存本批數據),結果是什麼呢?

老主節點掛了, 新的主節點還沒啓動,因此此次事務就卡在這裏,直到數據同步的源——主節點能夠響應請求。

若是不考慮性能,就此做罷,這也不是什麼大事。

你彷佛意猶未盡?來吧,看看「銀彈」是什麼?

Opaque-Transaction

如今,咱們來追求這樣一種效果:

某條數據在一批數據中(這批數據對應着一個事務),極可能會失敗,可是它會在另外一批數據中成功。
換句話說,一批數據的事務ID必定相同。

來看看例子吧,老數據不變,只是多了個字段:prevReach

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

// 老數據

{

    transactionId:3

    urlId:99

    //注意這裏多了個字段,表示以前的reach的值

    prevReach:2

    reach:3

}

 

// 新到數據

{

    transactionId:4

    urlId:99

    reach:5

}

這種狀況,新事務的ID更大、更靠後,代表新事務能夠執行,還等什麼,直接更新,更新後數據以下:

1

2

3

4

5

6

7

8

9

// 新到數據

{

    transactionId:4

    urlId:99

    //注意這裏更新爲以前的值

    prevReach:3

    //3 + 5 = 8

    reach:8

}

如今來看下另外的狀況:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

// 老數據

{

    transactionId:3

    urlId:99

    prevReach:2

    reach:3

}

 

// 新到數據

{

    //注意事務ID爲3,和老數據中的事務ID相同

    transactionId:3

    urlId:99

    reach:5

}

這種狀況怎麼處理?是跳過嗎?由於新數據的事務ID和庫裏或者log裏的事務ID相同,按事務要求此次數據應該已經處理過了,跳過?
不,這種事不能靠猜的,想一想咱們有的幾個性質,其中關鍵一點就是:

給定一批數據,它們所屬的事務ID相同。

仔細體會下,上面那句話和下面這句話的差異:
給定一個事務ID,任什麼時候候,其所關聯的那批數據相同。

咱們應該這麼作,考慮到新到數據的事務ID和存儲中的事務ID一致,因此這批數據可能被分別或者異步處理了,可是,這批數據對應的事務ID永遠是同一個,那麼,即便這批數據中的A部分先處理了,因爲你們都是一個事務ID,那麼A部分的前值是可靠的。

因此,咱們將依靠prevReach而不是Reach的值來更新:

1

2

3

4

5

6

7

8

9

// 更新後數據

{

    transactionId:3

    urlId:99

    //這個值不變

    prevReach:2

    //2 + 5 = 7

    reach:7

}

你發現了什麼呢?
不一樣的事務ID,致使了不一樣的值:

  1. 當事務ID爲4,大於存儲中的事務ID3,Reach更新爲3+5 = 8.
  2. 當事務ID爲3,等於存儲中的事務ID3,Reach更新爲2+5 = 7.

這就是Opaque Transaction.

這種事務能力是最強的了,能夠保證事務異步提交。因此不用擔憂被卡住了,若是說集羣中:

Transaction:

  • 數據是分批處理的,每一個事務ID對應一批肯定、相同的數據.
  • 保證事務ID的產生是強有序的.
  • 保證分批的數據不重複、不遺漏.
  • 若是事務失敗,數據源丟失,那麼後續事務就卡住直到數據源恢復.

Opaque-Transaction:

  • 數據是分批處理的,每批數據有肯定而惟一的事務ID.
  • 保證事務ID的產生是強有序的.
  • 保證分批的數據不重複、不遺漏.
  • 若是事務失敗,數據源丟失,不影響後續事務,除非後續事務的數據源也丟了.

其實這個全局ID的設計也是門藝術:

  • 冗餘關聯表的ID,以減小join,作到O(1)取ID.
  • 冗餘日期(long型)字段,以免order by.
  • 冗餘過濾字段,以免無二級索引(HBase)的尷尬.
  • 存儲mod-hash的值,以方便分庫、分表後,應用層的數據路由書寫.

這個內容也太多,話題也太大,就不在此展開了。

你如今知道twitter的snowflake生成全局惟一且有序的ID的重要性了。

兩階段提交

如今用zookeeper來作兩階段提交已是入門級技術,因此也不展開了。

若是你的數據庫不支持原子操做,那麼考慮兩階段提交吧

相關文章
相關標籤/搜索