Storm系列三: Storm消息可靠性保障

Storm系列三: Storm消息可靠性保障

在上一篇 Storm系列二: Storm拓撲設計 中咱們已經設計了一個稍微複雜一點的拓撲。html

而本篇就是在上一篇的基礎上再作出必定的調整。前端

在這裏先大概提一下上一篇的業務邏輯, 咱們會不斷收到來自前端的消息,消息包含消息的發送時間,消息內容,結束標識, 消息的發送者, SessionId等其餘信息, 咱們須要作的事情是當接收到消息以後,根據SessionId判斷是否屬於同一消息, 若是是的話將內容拼接, 若是結束標識爲 true, 表示會話已結束,則存入數據庫或其餘地方, 若是不爲true, 則等待, 在1分鐘後 仍是沒有收到消息, 則存入數據庫。java

在上一篇中, 消息內容指示的是用戶行爲, 所以對於消息的可靠性保障並無要求。node

如今咱們將需求微調,消息內容是系統的日誌信息, 並保證日誌信息沒有遺漏, 由於極可能在未來咱們須要查找到系統的日誌消息, 斷定某些錯誤發生的緣由,就要保證毫無遺漏。git

那麼什麼是消息可靠性呢?github

若是咱們的拓撲由於某種意外終止了,當拓撲再度恢復,總不可能從頭開始讀取數據,又或者數據由於時效性已經丟失,沒法被再度獲取, 因此首先,咱們要有一個可靠的數據源。 這意味着須要有保存數據的能力, 除非通知數據已經被消費,不然就不能刪除數據, 同時要記錄每次消費到某個位置。數據庫

當有了可靠的數據源以後,由於故障意外,某個bolt所處的節點掛掉,致使正在處理的數據被丟棄了, 因此須要Spout再度發送數據。 那麼第二點,須要一個有重發指定消息的能力。json

咱們已經知足了上述兩點,那麼Spout是如何得知當前數據已經被處理掉了呢? 不管是成功仍是失敗,總須要經過某種途徑獲取其監聽信息。 因此第三點就是,須要一個可以被一路跟蹤狀態信息的元組流。 也叫作錨定。由於下游並不止一個bolt,可能會在任何一個節點出問題, 因此須要持續跟蹤。緩存

第四點是, 須要一個具備容錯能力的Storm拓撲。ide

固然咱們可以影響的只有前三點。

在這裏選擇一個可靠的數據源,文本輸入固然是能夠的, 實際中使用的是 kafka, RabbitMQ, 等消息隊列, 做爲可靠數據源的輸入。

消息可靠性保障

在Storm中的消息可靠性保障意味着, 消息以元組的形式從spout中發射出來,並通過拓撲中的各個bolt完成處理, 若是一個元組在某一個節點處理失敗, Storm會馬上得知相信的信息,並通知Spout能夠進行相應的處理,不管是重發仍是拋棄(固然,若是是拋棄的話, 這裏並無必要採起可靠數據源, 也不怎麼須要可靠性保障。由於Spout表現出的行爲是對元組的成功失敗不聞不問。),直到,這個元組被完成掉。

元組狀態

元組有兩種狀態,ack 和 fail,當Spout發射出一個元組以後,下游的bolt在處理完成以後,可能會發出更多的元組, Storm爲spout發出的每個tuple都建立了一個元組樹(tupletree), 其中Spout發射的元組成爲根元組,當一棵元組樹的全部葉節點都完成了對元組的處理,此時storm纔會認爲 對當前tuple已完成處理。

那麼storm中的消息保障自己就是可選的, 你可能在任意節點決定, 當前元組已經完成, 後續的全部處理並不須要再度進行保障性操做。既然自由度是這樣高, 所以你須要作這樣兩件事:

  • 在每一個節點發出元組的時候進行 錨定,也就是意味着storm須要繼續跟蹤當前元組的狀態。

  • 確保你的每個葉節點會對tuple進行應答,告訴storm我已經處理完成了。

固然,沒有人能保證bolt永遠會作出應答,即便bolt掛掉了,storm依然會跟蹤元組狀態,在得不到迴應的狀況下,元組樹將會報錯, 表示當前tuple處於fail狀態, 這個時間的配置是TOPOLOGY_MESSAGE_TIMEOUT_SECS, 缺省配置爲30s。

Config conf = new Config();
conf.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 30);

實現

已經有了相應的概念, 下面的部分咱們開始從spout層面,來一步步說明。

代碼基於上一篇中的代碼進行修改, 開頭已經提到過了。

本篇的代碼存儲在:

git@github.com:zyzdisciple/storm_study.git

的guaranteed_message包下。

須要提到的一點是, 我繼承的都是 BaseRichSpout/BaseRichBolt;

在BaseBasic系列中, 已經默認爲你作了相關處理。

FileReaderSpout

if (isValid(info, line)) {
    completingMessage(info);
    //判斷當前的時間區間.
    long timeGroup = System.currentTimeMillis() / (BehaviorConstants.SESSION_TIME_OUT_SECS * 1000);
    collector.emit(new Values(timeGroup, info), info.hashCode());
}

在這裏,核心在:

collector.emit(new Values(timeGroup, info), info.hashCode());

collector.emit的第二個參數配置,就是 msgId, 若是咱們在從Spout中發射數據時, 沒有配置messageId,那麼storm並不會跟蹤元組狀態, 即便後續再怎麼處理, 也是無效的。

在這裏, 我簡單的採起了hashCode, 固然也重寫了info的hashCode方法, 在實際中, 咱們從kafka數據源中拉取數據的時候, 通常都會有其ID做爲惟一性標識, 並不須要去單首創建。

其餘工做暫且不提, 讓咱們繼續。

ContentStitchingBolt

更改的有這樣一個個地方:

if (info.getEnd()) {
    collector.emit(input, new Values(info));
    //發送後須要移除相關數據
    collectMap.remove(key);
} else {
    collectMap.put(key, info);
}

會發如今 emit的同時,在第一個emit中傳遞了 當前tuple做爲參數,這就是進行了錨定行爲, 將spout發出的tuple與後續的相關聯, 能夠監聽狀態, 若是不監聽而直接響應ack,那麼系統會認爲你已經完成了, 若是不監聽也不響應,時間到了,系統會認爲你超時了。

對於系統消息咱們並無進行 ack處理, 這是由於storm僅跟蹤 spout發出的tuple, 對於系統消息, 並不須要理會。

然而,在數據不知足直接發射條件的時候, 咱們對tuple並無進行ack,考慮若是ack,表示tuple在當前節點已經完成處理,若是不存在後續bolt的話, 則能夠認爲整個tuple都已經處理完畢, 那麼在spout中就會刪除對應數據, 基於可靠數據源也會忽略該數據, 然而事實上目前的數據咱們是存在內存中的,當bolt掛掉, 則內存相關數據消息, 那麼就真的是徹底沒法恢復了。

而不進行ack, 那麼就會出現這樣一個問題, 當咱們的定時器,messageTimeout超時以前, tuple的定時器已經超時了,此時會從新發出一條數據,形成了更多的困擾, 因此必須有這樣一條要求:


咱們的tuple超時時間必須大於 messageTimeout。

但這樣就不會形成問題了嗎? 並非。

若是咱們的messageTimeout設置的時間原本就很長,好比十分鐘, 那麼tupleTimeout必須大於十分鐘,也就是一條tuple發出去以後, 十分鐘我才能將其定義爲失敗狀態, 這沒什麼, 可是十分鐘內會有多少條數據累積?

所以另外一條配置也是比較有用的:

Config conf = new Config();
conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 30000);

這一條一樣是配置給topology級別的:

表示的意思是, 若是在整個拓撲中, 有超過30000條tuple處於未響應狀態, 那麼spout就中止發送數據, 將其阻塞掉.

但仔細思考會發現僅僅這樣處理是不夠的,假定存在數據屬於同一SessionId:

1,2,3,4,5

按照目前的假設來看,咱們收到1234時既不進行ack,也不錨定,惟有收到5的時候再作處理, 此時是否應該取出1234所屬的tuple一一ack?並不合適,理應須要對1234再度進行錨定,由於惟有下游有權決定數據究竟是處理失敗了仍是成功了。那咱們對1234再進行錨定發送? 也不合適,由於這意味着要將數據發送5遍,有4條數據是徹底無效的。

那麼首先能夠肯定的是,當收到中間數據須要進行ack,當真正須要發送數據的時候再進行錨定,也就是收到5的時候進行錨定,錨定的對象又是誰呢?是1所在的tuple。

那再來分析一下,身爲數據源Spout,須要知足怎樣的特性纔可以保證收到2345的時候並不刪除數據,只有收到1的時候,再將數據刪除掉。

也就是說,數據源是一個隊列,惟有當收到第一個數據的ack時,才按順序檢測,一一刪除,不然都不刪除。

這裏咱們作的是簡化處理,畢竟真正的可靠消息,有kafka這些專門的消息組件進行保證。

分析了這麼久終於能夠開始代碼了。

而在這以前,小小的總結一下:

  • 惟有當數據肯定不須要再度進行回放,一是數據已經被完全處理掉了,沒有利用價值,二是保存在了另外一種可靠的存儲結構中, 此時咱們才能進行ack,通知數據源,數據已經無效了。

  • 做爲數據源也須要爲消息的可靠性提供必定的保障, 不可以跨節點刪除, 最好是隻可以按序刪除, 進行標記刪除的處理方式。

數據源調整

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicLong;

/**
* 本身設計的數據源, 須要完成一系列功能。
* @author zyzdisciple
* @date 2019/4/11
*/
public enum DataSource {

    INSTANCE;

    private BufferedReader br;

    private BlockingQueue<Node> queue;

    private AtomicLong seq;

    private BlockingDeque<Long> ackIndexes;

    private static final Object deleteQueueLock = new Object();

    private static final Logger logger = LoggerFactory.getLogger(DataSource.class);

    DataSource() {
        try {
            br = new BufferedReader(new FileReader("E:\\IdeaProjects\\storm_demo\\src\\main\\resources\\user_behavior_data.txt"));
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        }
        queue = new LinkedBlockingDeque<>();
        ackIndexes = new LinkedBlockingDeque<>();
    }

    /**
    * 獲取一行數據,沒有返回null。
    * @return
    */
    public String nextLine() {
        Node node = null;
        try {
            String line = br.readLine();
            if (!line.trim().isEmpty()) {
                node = new Node(seq.getAndIncrement(), line);
                queue.add(node);
            }
        } catch (IOException e) {
            logger.warn("empty queue, e:" + e);
        }
        return node == null ? null : node.getValue();
    }

    /**
    * 成功響應
    * @param seq
    */
    public void ack(Object seq) {
        if (seq == null) {
            return;
        }
        deleteNode(Long.parseLong(seq.toString()));
    }

    private void deleteNode(long seq) {
        synchronized (deleteQueueLock) {
            Node headNode = queue.peek();
            if (headNode != null && headNode.getIndex() == seq) {
                queue.poll();
                //一直向下刪除, 直到不等
                deepDelete();
            } else {
                Long headIndex = ackIndexes.peek();
                if (headIndex == null) {
                    ackIndexes.add(seq);
                } else if (seq > headIndex) {
                    ackIndexes.addLast(seq);
                } else if (seq < headIndex) {
                    ackIndexes.addFirst(seq);
                }
            }
        }
    }

    /**
    * 繼續向下刪除
    */
    private void deepDelete() {
        Node headNode = queue.peek();
        long seq = ackIndexes.peek();
        boolean hasDeleted = false;
        if (headNode != null && headNode.getIndex() == seq) {
            queue.poll();
            ackIndexes.poll();
            //一直向下刪除, 直到不等
            deepDelete();
        }
    }
}

其主要有兩個功能, 一個是ack,刪除緩存數據, 另外一個是取出數據。

spout

@Override
public void nextTuple() {
    Node node = dataSource.nextLine();
    if (node == null) {
        return;
    }
    String line = node.getValue();
    MessageInfo info = gson.fromJson(line, MessageInfo.class);
    if (isValid(info, line)) {
        completingMessage(info);
        //判斷當前的時間區間.
        long timeGroup = System.currentTimeMillis() / (BehaviorConstants.SESSION_TIME_OUT_SECS * 1000);
        collector.emit(new Values(timeGroup, info), node.getIndex());
    }
}

主要是改寫了nextTuple, 在這裏用咱們的「可靠數據源」接收數據,進行響應。


在這裏會發現一點特性,貫穿數據源-spout-bolt的整個過程,其中key,也就是咱們定義的ID,起到了橋樑的做用

那麼當數據真正處理完成,收到下游的ack以後,又應該做何處理?這就須要關注Spout的接口了。 咱們會注意到, 還提供了這樣一個方法:

void ack(Object msgId);

所以咱們重寫這個方法就能夠了,告訴dataSource當前數據已經處理完了。

@Override
public void ack(Object msgId) {
    dataSource.ack(msgId);
}

那麼若是下有數據處理失敗了,天然有另外一個方法,fail:

void fail(Object msgId);

那麼當咱們接收到fail時, 該從dataSource... 等等,dataSource並無提供根據msgId取出對應數據的功能啊,是咱們疏忽忘記了嗎?並非,消息隊列,是一個隊列,並不支持根據msgId查詢返回特定的數據, 大多數狀況下, 咱們都須要本身維護相應的數據。

//加入屬性cacheMap
private Map<Long, MessageInfo> cacheMap;
//在open方法中初始化
cacheMap = new HashMap<>();
//在nextTuple發射以前
cacheMap.put(node.getIndex(), info);
//在ack中收到消息以後
cacheMap.remove(msgId);
//在fail中
@Override
public void fail(Object msgId) {
    long timeGroup = System.currentTimeMillis() / (BehaviorConstants.SESSION_TIME_OUT_SECS * 1000);
    collector.emit(new Values(timeGroup, cacheMap.get(msgId)), msgId);
}

這樣就完成了一個tuple從生到死的過程處理。

等等,在上一篇咱們提到過一個問題,spout同樣是能夠設置並行度的,也就是說可能會存在多個線程,咱們這麼操做cacheMap而且不加鎖真的好嗎?

內容處理bolt就不貼在這裏了,設計仍有必定不合理的地方, 但已經能說明主要問題。外加代碼太多, 有興趣能夠本身去github上看一下。

MessageWriterBolt

@Override
public void execute(Tuple input) {
    MessageInfo info = (MessageInfo) input.getValueByField(BehaviorConstants.FIELD_INFO);
    String jsonMessage = null;
    try {
        jsonMessage = gson.toJson(info, MessageInfo.class);
    } catch (Exception e) {
        logger.warn("格式轉換失敗, e" + e);
        collector.ack(input);
    }
    try {
        pw.println(jsonMessage);
        pw.flush();
    } catch (Exception e) {
        logger.error("寫入文件失敗, e:" + e);
        collector.fail(input);
    }
}

在這裏,對於兩種不一樣的狀況會看到咱們的 ack ,fail處理方式有所不一樣, 爲何呢?

對於錯誤咱們分爲,已知的和未知的, 對於已知的錯誤,也有兩種,可重試,和不可重試,對於不可重試錯誤,數據錯了就錯了,再試一千次也是錯的,因此直接響應ack。

對於可重試錯誤,如數據庫插入失敗等其餘狀況,就能夠告知拓撲失敗信息,促使重試。

而對於未知錯誤,那天然是沒辦法處理了,只能等到它發生變成已知錯誤,再處理。

結語

在本章主要講了數據的可靠性保障相關的東西, 瞭解了實現可靠性的基本要求是, 一個可靠的數據源, 一個錨定的元組流, 一個可以感知並處理元組狀態的spout。

還有很重要的一點沒有提到, 是 一個容錯性的拓撲。

概念比較寬泛,須要考慮到整個拓撲若是掛掉,如何恢復數據,從上次的某個地方繼續向下讀取數據, 若是某個bolt掛掉,相應的數據ack相關又該怎樣處理, 以及與外界交互的,如文件流,數據庫寫入等等地方, 出現問題又該怎樣處理?

同時,對於數據處理可靠性級別有這樣幾種:

最多一次, 至少一次, 僅一次。

最多一次就意味着能夠不處理數據, 不可靠的數據源就是這樣的

至少一次,只要咱們可以對拓撲中的 ack,fail使用的謹慎而明白,這一點也是很好保證的。

僅一次, 若是咱們在處理的是扣費項目, 由於數據從新發送,致使重複扣費,別人會投訴你的。 因此須要對數據加入惟一性標識, 而且將數據的處理狀態, 處理節點等等都交給另外一個可靠的系統進行維護。

在本身設計的時候,有這樣一個簡單的處理辦法:

對於咱們處理的每個節點,舉個例子:

有8個流程須要執行,順序未必一致。

咱們只須要始終在一個可靠的地方,維護數據狀態:

000 000 00 8位0,當某個節點被處理,即置爲1, 當節點再度收到數據,便知道是否 處理。但依然要當心,在存儲狀態及發送數據的中間,拓撲掛掉,等等。

關於Storm中Ack的詳細機制:Apache Storm 實時流處理系統ACK機制以及源碼分析

相關文章
相關標籤/搜索