架構設計:系統間通訊(40)——本身動手設計ESB(1)

一、概述

在我開始構思這幾篇關於「本身動手設計ESB中間件」的文章時,曾有好幾回動過放棄的念頭。緣由倒不是由於對冗長的文章產生了惰性,而是ESB中所涉及到的技術知識和須要突破的設計難點實在是比較多,再冗長的幾篇博文甚至沒法對它們所有進行概述,另外若是在思路上稍微有一點差池就會誤導讀者。一個能夠穩定使用的ESB中間件凝聚了一個團隊不少參與者的心血,一我的確定是沒法完成這些工做的。可是筆者思索再三,仍是下決心將這這即使文章完成,由於這是對本專題從第19篇文章到第39篇文章中所介紹的知識點的最好的總結。咱們本身動手設計ESB中間件,不是爲了讓它商用,也不是爲了讓它能夠比擬市面上某款ESB中間件,甚至不是爲了把ESB中的技術難點的解決所有方案化。咱們的目的是檢驗整個專題中所介紹的知識點是否能在讀者本身消化後進行綜合應用,是否能作到技術知識的活學活用、按需選型java

二、ESB的頂層設計

這裏寫圖片描述
(頂層設計圖)web

上圖是咱們要進行實現的ESB中間件的頂層設計。從上圖中能夠看到,整個ESB中間件分爲如下幾個模塊:Client客戶端、流程編排/註冊工具、主控服務模塊、服務狀態協調組(模塊)、服務運行組(模塊)。首先咱們大體描述一下這些模塊的工做內容:算法

  • Client客戶端是須要接入ESB中間件的各個業務服務系統。例如物流系統、聯帳系統、CRM系統等等。在這些客戶端系統接入ESB中間件時,將集成ESB中間件提供給他們的各類開發語言版本的ESB-Client組件。若是使用的是C#語言則ESB-Client組件可能以DLL文件的方式提供;若是使用的是JAVA語言則ESB-Client組件可能以Jar文件的方式提供;若是使用的是NODEJS則多是一個(或多個)JS文件……數據庫

  • 這些客戶端系統的開發人員將可使用ESB中間件提供的一個獨立的流程編排/註冊工具,後者在不少ESB中間件系統中通常被命名爲「…… Studio」,而且這些流程編排/註冊工具通常以各類IDE插件的形式提供出來,例如製做成Eclipse-Plugin提供給開發人員。這些工具的主要做用就是讓客戶端系統的開發人員(開發團隊)具有向ESB主控服務進行原子服務註冊的能力,另外還可讓開發人員查詢到目前服務端全部可用的其它原子服務(來自於其它業務系統的),以便在流程編排/註冊工具上完成新的服務流程編排和已有服務流程新版本的發佈。這就是上圖中標註爲「1」的步驟。json

  • 另外ESB中間件爲了保證流程編排所使用的原子服務不會由於提供這個原子服務的業務系統的變化而產生影響,通常來講在進行業務系統註冊原子服務時都會指定這個原子服務的版本和調用權限。調用權限通常又分爲黑名單權限和白名單權限。以白名單權限來講,只有白名單中所列列舉的業務系統有權限調用這個原子服務。即便這個原子服務參與了某個ESB中的流程編排,若是請求這個編排好的流程的業務系統不在這個白名單中,調用也會失敗。緩存

  • 主控服務爲流程編排/發佈工具提供新的原子服務註冊請求、新的流程發佈請求、已有流程的新版本發佈請求。最新的原子服務、流程編排等數據將會被主控服務存儲在持久化容器中(例如關係型數據庫),而且向「服務狀態協調模塊」發送最新的數據變化。注意,主控服務並不負責執行編排好的流程,只是用於記錄數據編排的變化和向「數據協調模塊」發送這些數據變化,這就是上圖中所標示的步驟2。主控服務還有另外兩個做用:負責權限管理和服務運行模塊的狀態監控。bash

  • 因爲負責最終對流程編排進行執行的「服務運行模塊」存在不少節點(下文稱爲ESB-Broker Server節點),且這些ESB-Broker Server節點的數量在服務過程當中會不斷變化(新增或減小),因此「主控服務」並不知道有哪些Boker在運行。爲了通知這些在運行狀態的Broker有新的服務編排被髮布(或者其它事件信息),這些處於運行狀態的ESB-Broker Server節點都會鏈接到「服務狀態協調模塊」,而且由後者完成數據變化的事件通知。這就是「服務狀態協調模塊」的主要功能,也是上圖中所示的步驟3。在咱們本身設計的ESB中間件中,「服務狀態協調模塊」由一組zookeeper服務構成(在我另外幾篇博文中專門介紹zookeeper,這個專題就不對zookeeper的基本操做進行講解了),若是您在實際的工做中有其它功能/技術需求,也能夠本身設計「服務狀態協調模塊」。網絡

  • 在業務系統集成過程當中,ESB中間件所扮演的角色就是在各個業務系統間進行原子服務調用、轉換數據、再進行原則服務調用、再轉換…….最後向執行服務編排的請求者返回結果。因此ESB中間件服務每每有較高的性能要求。若是執行ESB服務編排的節點只有一個,每每就達不到ESB中間件的設計要求甚至會使ESB中間件服務成爲整個軟件架構的性能瓶頸點。因此在咱們設計的ESB中間件中,真正執行ESB服務的節點會有多個這種ESB-Broker Server節點。數據結構

  • 在ESB運行服務的過程當中使用多個Broker Server有不少好處,首先來講它們能夠保證在整個系統出現請求洪峯的狀況下,可以把這些請求壓力平均分配到這些Broker Server節點上,最終使ESB服務不會成爲整個頂層設計的瓶頸。請求壓力的分配工做會由zookeeper集羣完成。另外,多個Broker Server能夠保證某一個(或者幾個)Broker Server節點在出現異常並退出服務後,整個ESB中間件的服務不會中止——這是一個現成的容錯方案。開發人員能夠經過退避算法來決定ESB-Client下一次試圖訪問出現錯誤的Broker Server節點的時間,也能夠當即爲ESB-client從新分配一個健康的Broker Server節點。最後,這個解決方案能夠在ESB服務運行的過程當中保證明現Broker Server的動態橫向擴展:當ESB主控服務模塊發現整個Broker Server服務組的性能達到(或快要達到)峯值時,運維人員能夠立刻開啓新的Broker Server節點,zookeeper集羣會負責將定製的編排、定製的Processor處理器等數據信息動態加載到新的Broker Server節點中,並讓後者當即加入整個服務組開始工做。架構

  • 在ESB-Client(某個業務系統)請求執行某個服務編排時,首先會使用這個ESB-Client(某個業務系統)已經集成的zookeeper客戶端請求ESB的zookeeper集羣服務,從中取得當前正在運行的Broker Server節點信息,並經過某種算法決定本身訪問哪個Broker Server節點(算法不少:輪詢算法、加權算法、一致性Hash算法等等),如「頂層設計圖」中步驟四、步驟5所示。爲了保證上文中提到的新的Broker Server節點可以加入服務組併爲ClientESB-Client服務,步驟4和步驟5的過程能夠週期性進行,並視狀況從新爲ESB-Client分配Broker Server節點。

  • 當ESB-Client肯定目標Broker Server節點後,將正式向這個Broker Server發起執行某個服務編排的請求。當同一個ESB-Client第二次請求執行服務編排時,就能夠在必定時間週期內(有效時間內)再也不走步驟四、5了,而能夠直接發起請求到同一個目標Broker Server節點。直到這個Broker Server再也不可以響應這些請求爲止(或者有其它依據肯定這個Broker Server節點已經不能提供服務),ESB-Client會再執行步驟四、5,以便肯定另外一個新的、正常工做的Broker Server節點。在下文筆者也會重點介紹如何進行Broker Server節點的選擇。

三、主控服務的日誌收集

上一節已經說到,在咱們設計的ESB中間件中包括兩個模塊:主控服務模塊和服務運行組(模塊)。其中主控服務模塊的其中一個做用,是對若干當前處於運行狀態的服務運行組節點(Broker Server)進行性能狀態監控。性能狀態監控的目的是確保運維人員實時瞭解這些Broker Server的運行狀態,而且能在整個服務運行組快要達到性能瓶頸時可以啓動新的Broker Server分擔壓力或者在整個服務運行組沒有什麼請求負擔時,中止一些Broker Server。

那麼主控節點如何知道整個Broker Server組中多個服務節點的性能狀態呢?要知道,Broker Server節點是能夠動態擴展的。上文也已經說到:主控節點並不知道當前有哪些Broker Server節點處於運行狀態。那麼基於Kafka消息隊列的日誌收集就是一個解決方案,設計人員還可使用Flume + Storm的解決方案進行日誌自動收集和即時分析。下面咱們對這兩種日誌收集方案進行介紹。注意,關於Kafka、Flume在這個專題以前的文章中已經作了詳細介紹,因此本小節中涉及Kafka、Flume技術的部分就再也不對設計方案的實施進行介紹了。

3-一、使用Kafka收集性能數據

Kafka Server的特色就是快,雖然在特定的狀況下Kafka Server會出現消息丟失或重複發送的問題,可是這個問題針對日誌數據收集場景來講不算大問題。使用消息隊列收集各Broker Server節點的性能日誌也是和ESB中各模塊的依賴特性相適應的:因爲在咱們設計的ESB中間件中,主控服務模塊並不知道有多少Broker Server節點處於運行狀態,也不知道這些Broker Server節點的IP位置。也就是說主控服務模塊沒法主動去這些Broker Server節點上收集性能數據。最好的辦法就是由這些活動的Broker Server節點主動發送日誌數據。

3-1-一、設計思路

下圖是使用Kafka組件收集Broker Server節點上性能數據並進行性能數據處理、結果存儲的設計示例圖:

這裏寫圖片描述

上圖中,每個Broker Server節點上除了啓動一個Camel Context實例之外(後文進行詳細說明),還須要配置一個Kafka的Producer端用於發送數據。Kafka-Producer端收集的性能數據可能包括:CPU使用狀況、內存使用狀況、本地I/O速度、操做系統線程狀況、Camel Context中的路由實例狀態、Endpoint在Cache中的命中狀況、客戶端對Broker Server中以編排路由的調用狀況等等——業務數據和非業務數據均可以經過這種方式進行監控,而且以業務數據爲主。

Kafka Servers中部署了三個Kafka Broker Server節點(建議的值),用於接收若干ESB Broker Server節點上各個Kafka-Producer發送來的性能日誌數據。爲了保證整個Kafka集羣的性能,每個Kafka Broker Server都有至少兩個分區(partition,仍是建議值)。這裏多說一句,爲了節約服務資源您能夠將Kafka Broker Server和Kafka-Consumer放在一臺服務節點上,甚至能夠將它們和主控服務節點放在一塊兒。

Kafka-Consumer負責進行性能日誌數據的處理。有的讀者可能就要問了,既然Consumer接收到的都是能夠獨立存儲性能日誌數據,那麼只須要將這些日誌找到一個合適的存儲方案(例如HBase)存放起來就能夠了,還須要Consumer作什麼處理呢?這是由於開發團隊完成的Producer採樣頻率可能和運維團隊要求的監控採用頻率不同。

爲了保證性能監控數據的精準性,開發團隊利用基於Kafka集羣提供的吞吐量優點,能夠在各個ESB Broker Server節點所集成的Kafka-Producer上設置一個較高的採樣率(固然仍是要顧忌節點自己的資源消耗),例如每秒對固定的業務指標和非業務指標完成10次採樣。可是運維團隊經過主控服務監控各個ESB Broker Server節點是,每每不須要這麼高的採樣率(這裏能夠提供一個設置選項供運維團隊隨時進行調整),大概也就是每秒更新1次的樣子就差很少了。

這裏寫圖片描述

那麼Consumer如何處理每秒鐘多出來的9次採樣數據呢?能夠明確想到的有兩種處理方式:一種處理方式是不管主控服務的監控臺上的性能指標以何種頻率進行顯示,Consumer都將收到的數據寫出存儲系統中;另外一種處理方式是Consumer將收到的多餘數據丟棄,只按照運維團隊設置的採樣頻率將數據寫入持久化存儲系統。在第二中處理方式中有一個狀況須要特別注意:若是將要被丟棄的性能數據達到了性能閥值(例如本次採集的內存使用率超多了2GB),則這條日誌數據仍是須要進行保留。第一種處理基本上沒有什麼須要介紹的,優勢和缺點也是很明確的:優勢是能夠在後期進行完整的性能歷史回溯,缺點就是會佔用較大的存儲空間——雖然目前可使用的超大存儲方案有不少並且都很成熟穩定,但它們都須要比較強大的資金預算支持。

3-1-二、Consumer的實現

這裏筆者主要討論一下Consumer的第二種處理方式:丟棄多餘的數據。咱們可使用以前文章介紹過的ConcurrentLinkedHashMap做爲Consumer中存儲性能消息日誌的Cache,Cache的固定大小設置爲200(或者其它一個較大的值)。這個Cache結構能夠幫助咱們完成不少工做:首先它可靠的性能可以保證過個Consumer不會成爲整個性能日誌收集方案的瓶頸——雖然ConcurrentLinkedHashMap的性能並非最快的;其次這個Cache結構可以幫助咱們自動完成多餘性能日誌的清除工做,由於在第201條日誌記錄被推入Cache時,在LRU隊列尾部的最初一條記錄將自動被排除隊列,最終被垃圾回收策略回收掉;最後,Consumer按照運維團隊設置的採樣週期,對Cache中的性能日誌數據進行持久化保存時,始終只須要取出當前在Cache將被剔除的那條記錄,這樣就省掉了編寫程序,在兩個週期的時間差之間判斷「要對哪條性能日誌數據」進行持久化保存的定位工做。

順便說一句,若是您須要在工程中使用Google提供的ConcurrentLinkedHashMap數據結構工具,那麼您須要首先在pom文件中添加相應的組件依賴信息:

<dependency>
    <groupId>com.googlecode.concurrentlinkedhashmap</groupId>
    <artifactId>concurrentlinkedhashmap-lru</artifactId>
    <version>1.4.2</version>
</dependency>

如下是Consumer中用於處理LRU隊列添加、LRU週期性讀取、LRU刪除事件的代碼片斷:

......
/** * 這就是性能數據的LRU隊列 */
private static final ConcurrentLinkedHashMap<Long, String> PERFORMANCE_CACHE = 
        new ConcurrentLinkedHashMap.Builder<Long, String>()
        .initialCapacity(200)
        .maximumWeightedCapacity(200)
        .listener(new EvictionListenerImpl())
        .build();
......

/** * 這個監聽器用於在數據被從LRU隊列剔除時<br> * 按照功能需求檢查這條記錄是否須要被持久化存儲起來。 * @author yinwenjie */
public static class EvictionListenerImpl implements EvictionListener<Long, String> {

    // 上一次進行數據採集的時間,初始爲-1
    private Long lastTime = -1l;

    // 這是由運維團隊設置的數據採集週期,1000表示1000毫秒
    // 正式系統中,這個值將有外部讀取
    private Long period = 1000l;

    @Override
    public void onEviction(Long key, String jsonValue) {
        /* * 如下條件任意成立時,就須要對這條數據進行採集了: * 一、lastTime爲-1的狀況(說明是程序第一次採集) * * 二、當前事件 - lastTime >= period(採集週期) * * 三、當監控數據大於設置的警告閥值,在這個示例代碼中 * 這個警告閥值爲80,正式系統中,這個閥值應從外部讀取 * 如下的threshold變量就表明這個值 * */
        Long threshold = 80L;
        Long nowtime = new Date().getTime();
        // 獲取性能數據中的CPU使用率
        // 注意,正式系統 中最好不要傳遞json結構,文本結構的數據就行了
        JSONObject jsonData = JSONObject.fromObject(jsonValue);
        Long cpuRate = jsonData.getLong("cpu");
        boolean mustCollecting = false;
        if(this.lastTime == -1 || 
            nowtime - lastTime >= this.period ||
            cpuRate >= threshold) {
            mustCollecting = true;
            this.lastTime = nowtime;
        }
        // 若是不須要作數據的持久化存儲,就終止本次監聽的操做便可
        if(!mustCollecting) {
            return;
        }

        // ********************
        // 這裏能夠作持久化數據存儲的操做了
        // ********************
        LRUConsumer.LOGGER.info(key + ":" + jsonValue + " 完成數據持久存儲操做=======");
    }
}

......

// 如下代碼就是當Kafka-Consumer收到性能日誌數據的操做
// 將這個數據存放到PERFORMANCE_CACHE便可
Long key = new Date().getTime();
// 可以使用時間的毫秒數做爲key值(正式應用場景下,考慮多個consumer節點,Key的肯定會有一個更規範的規則)
LRUConsumer.PERFORMANCE_CACHE.put(key, performanceData);
......

以上代碼中,咱們使用以前已經介紹過的LRU數據結構在Consumer端保存發送過來的數據。若是讀者對LRU還不清楚能夠查看我另外的一篇文章中的介紹(《架構設計:系統間通訊(39)——Apache Camel快速入門(下2)》)。由Google提供的ConcurrentLinkedHashMap結構就能夠向咱們提供一個現成的LRU隊列,這樣一來當LRU隊列存儲滿後,最早被接收到的性能日誌數據就會從隊列尾部被刪除。最關鍵的處理工做都將在EvictionListener接口的實現類中完成,在實際應用中開發人員還能夠在肯定一條性能日誌須要被持久化存儲以後專門啓動一個線程進行操做,例如使用一個專門的線程池(ThreadPoolExecutor)。這樣一來LRU隊列就真正不受持久化存儲操做延遲時間的影響了。

3-二、使用Flume + Storm收集性能數據

以上使用Kafka收集Broker Server節點的性能數據的方案中,須要在每一個編寫的Broker Server節點上增長額外的代碼向Kafka Broker Server發送數據。實際上這種功能需求狀況使用Apache Flume收集數據會使技術方案更容易實現和維護,下面咱們就大體介紹一下這個技術方案實現。因爲在以前的文章中筆者已經較詳細的介紹瞭如何使用Apache Flume進行基本配置了,因此這裏咱們重點討論兩個問題Apache Flume的數據來源和Storm Server在接收到Flume Server發送來的數據後如何進行處理。

3-2-一、設計思路

這裏寫圖片描述

上圖展現了整個功能需求的設計結構。安裝在ESB Broker Server節點的Flume程序負責收集這個節點上的各類功能性指標和非功能性指標,這樣避免了在ESC Broker Server服務上編寫額外的代碼採集非功能性指標,也減小了編寫代碼採集功能性指標的複雜度。而後將這些性能日誌數據按照負載均衡模式傳遞到若干中繼Flume Server節點上,後者專門用於承載/彙總多個ESB Broker Server節點傳來的性能日誌數據,而且最終將數據寫入Storm Server。在Flume Server和Storm Server之間咱們仍是須要使用Kafka Server做爲緩存,這是由於Apache Kafka經過Storm-Kafka組件和Storm Server實現無縫集成。

首先請注意安裝在ESB Broker Server節點的Flume程序,在3-1小節中採集節點功能性指標和非功能性指標都是依靠開發人員編寫程序完成,併發送給Kafka-Broker。但這樣作卻真的繞了很大一個彎路,由於Linux操做系統上已經提供了不少採集節點非功能性指標的方式(例如採集I/O信息、內存使用信息、內存分頁信息、CPU使用信息、網絡流量信息等),開發人員只須要一些腳本就能夠完成採集工做。例如,咱們採集CPU信息徹底不須要咱們在ESB-Broker Server中編寫程序(採集CPU信息也不該該是ESB-Broker Server的一項工做任務),而採用以下的腳本便可:

top -d 0.1 | grep Cpu >> cpu.rel

#寫法還有不少,還能夠從/proc/stat文件中獲取CPU狀態

以上腳本能夠按照100毫秒爲週期,獲取CPU的信息。並將這條信息做爲一條新的記錄存儲到cpu.rel文件中。這樣Apache Flume就能夠讀取cpu.rel文件中的變化,做爲性能日誌數據的來源:

#flume 配置文件中的片斷
......
agent.sources.s1.type = exec
agent.sources.s1.channels = c1
agent.sources.s1.command = tail -f -n 0 /root/cpu.rel
......

在ESB-Broker Server節點中,咱們可使用這樣的方式從不一樣文件中讀取各類不一樣的日誌信息,以下圖所示:

這裏寫圖片描述

================================= (接下文)

相關文章
相關標籤/搜索