架構設計:系統間通訊(31)——其餘消息中間件及場景應用(下1)

接上文:《架構設計:系統間通訊(30)——Kafka及場景應用(中3)java

五、場景應用——電商平臺:瀏覽記錄收集功能

事件/日誌收集系統是大中型軟件不得不面對的話題。目前第三方業務系統對 事件/日誌收集系統 的集成思路主要有兩大類:侵入式收集方案和非侵入式收集方案。侵入式收集方案,是指任何須要使用事件/日誌收集系統的第三方系統,都須要作有針對的編碼工做,這個編碼工做或者是新增代碼用於調用 事件/日誌收集系統 提供的客戶端API,又或者是修改已有的代碼,以便適應事件/日誌收集系統的調用特性。web

侵入式方案又分爲半侵入式和全侵入式。因爲第三方系統的代碼結構自己存在問題,因此一旦須要集成 事件/日誌收集系統(或者任何其餘第三方系統),就會形成業務處理過程改變。相反,因爲需求變更致使的業務代碼變更,也會牽扯到任意的第三方系統集成代碼的改變。這樣的集成方式就是全侵入式的。出現這種的狀況,是第三方系統所選擇的技術方案和業務系統自己的工程結構問題共同形成的。spring

很顯然,全侵入式的方案是一種錯誤的設計實踐,在平常的設計工做中是要儘可能避免。半侵入式方案比全侵入方案要好不少:雖然第三方系統會針對 事件/日誌收集系統 作必定的代碼改造,可是因爲第三方系統的結構清晰,因此這部分代碼和第三方系統原有的業務代碼是徹底分離的,只須要改造一次就可一直使用。也不會對第三方系統既有的業務處理過程產生任何影響,反之第三方系統因爲需求變化產生的業務代碼變化也不會影響 事件/日誌收集系統 的客戶端集成代碼變化。數據庫

事件/日誌收集系統 另一種設計方案是非侵入式的。即業務系統在集成 事件/日誌收集系統時,不須要爲這件事情專門引入新的代碼或者修改已有代碼。業務系統的開發人員甚至徹底不知道(也沒必要知道)本身的系統集成了 事件/日誌收集系統,僅經過配置一些參數文件的方式就可完成集成工做。apache

咱們將經過包括本文章在內的2-3篇文章的篇幅,利用已經學習過的技術知識向你們介紹事件/日誌收集系統的半侵入方案和非侵入式方案。固然中間還會穿插一些新技術的介紹,好比Apache Flume。編程

5-一、場景說明

這裏寫圖片描述

這是一個日均200萬PV的中型電商網站的一個系統模塊:商品詳情模塊。這個模塊用於(且只用於)向用戶展現商品詳情、展現商品價格走勢。上圖所示中,該模塊只列舉了使用的主要技術組件,畢竟這個實例場景不是爲了討論這些技術組件自己。因爲網站業務的發展須要,須要在這個模塊加入用戶操做的統計分析功能,對用戶「點擊查看訂單詳情」、「點擊查看商品價格走勢」等操做動做進行事件/日誌收集。後端

爲何要對這些操做進行統計呢?由於這些數據可以說明某一個用戶在一個特定的時間段對哪些商品感興趣,預計對哪些(或哪一類)商品會產生購買訂單。藉助後端的數據分析手段,還能知曉某一類用戶對哪一類商品感興趣的機率配比。因此這些商品詳情查看的操做日誌特別有商業價值api

日均200萬PV是一個什麼概念呢?這麼說吧,翼支付(bestpay.com.cn)的日均PV在34萬左右,汽車之家(autohome.com.cn)的日均PV在100萬左右,折800(zhe800.com)的日均PV在600萬,攜程在線(ctrip.com)日均PV1200萬,京東(jd.com)日均PV3.7億,淘寶(taobao.com)日均6.4億(以上數據均來自alexa.cn)……瀏覽器

PV是Page View的簡稱,即一次頁面的完整打開算做一次PV。PV的統計中,此次頁面訪問「是由那個訪問者發起的」並不會對統計結果構成直接影響,也就是說即便是同一個訪問者連續兩次打開同一個頁面,也會算做PV=2。這裏要注意的另一個問題是,因爲在瀏覽器頁面上會有不少訪問鏈接(例如:多個圖片鏈接、多個AJAX請求等),因此一次PV可能會包含屢次對服務端的請求spring-mvc

做爲架構師,您的工做職責就是爲這個日誌記錄系統設計一個易於業務擴展和技術擴展的軟件架構。所謂易於業務擴展是指:也許在將來的某個日子不僅是「商品詳情模塊」會集成本系統,用戶中心模塊也會集成本系統又或者訂單子系統也會集成本系統,您設計的日誌收集子系統應該能夠在將來被這些子系統輕鬆集成,而不須要修改 日誌收集系統 的任何代碼(目標子系統也只須要修改極少的代碼,甚至不修改代碼)。

所謂技術擴展主要是說「日誌收集系統」支撐的數據吞吐量能夠進行可靠的橫向擴展,而不須要中止服務或者要求業務系統進行改動,畢竟要相應考慮以上業務擴展中所描述的多種業務系統可能夠進行集成的問題。另外,因爲將來不少第三方系統都須要進行集成,做爲架構師的您不可能知曉這些第三方系統會使用的是什麼編程語言,更不可能限制第三方系統必須使用哪些編程語言。因此在進行 事件/日誌收集系統 的設計時,須要考慮一種兼容各類編程語言的設計思路。

5-二、解決方案一:半侵入式方案

咱們先來看看此問題的第一種解決方案。若是您肯定將要集成 事件/日誌收集系統 的全部第三方業務系統都有良好的代碼結構(固然實際工做這種狀況不太可能),那麼爲這些第三方系統提供相應編程語言的客戶端API,就是一個可選擇的方案:

這裏寫圖片描述

全部操做日誌在業務系統上使用過濾器/攔截器的方式對須要進行收集的訪問請求進行攔截。分離出訪問地址、訪問用戶、訪問時間等重要信息後,將其做爲Kafka消息發送給Kafka Brokers 集羣。這些信息將最終到達由若干Kafka Consumers節點組成的處理服務,並使用適當的存儲方案直接存儲到連續文件中(存儲到HBase、Cassandra這樣的數據庫中也行,具體看這些日誌數據將會被用於怎樣的分析場景)。

5-2-一、設計重點說明

上圖中,主要的展現目的是事件/日誌收集系統在業務系統端是怎樣被集成的。因此關於事件/日誌收集系統的結構就畫得比較簡單。只給出了兩個區塊「Broker Servers」和「Log Consumers」,下面咱們重點分析一下本方案中的 事件/日誌收集系統 的核心結構:

這裏寫圖片描述

在方案一種,咱們主要使用單純以Apache Kafka爲核心的消息隊列解決方案。

  • 須要多個zookeeper節點?

使用Apache Kafka時,若是您只是用一個zookeeper服務節點,整個集羣也能正常工做。可是因爲單個節點的zookeeper服務基本上沒有容錯能力,一旦單個zookeeper節點因爲各類緣由宕機,整個Apache Kafka集羣就會崩潰。因此建議在生產環境下,至少爲zookeeper服務準備三個服務節點,這樣當某個zookeeper服務節點出現故障整個Apache Kafka服務還能夠正常運行(三個節點得zookeeper服務最多容許一個節點發生故障)。

  • 多少個Broker?

在生產環境下爲了保證整個Kafka集羣的穩定,請至少使用3個Brokers物理節點。考慮到後期多個業務系統可能會使用事件/日誌收集系統,那麼能夠在首次設計時將Brokers設定爲5個Brokers物理節點。在以前的文章中咱們已經詳細介紹過Apache Kafka的工做原理,Brokers越多、Topic的分區(partition)越多,整個Apache Kafka集羣的穩定性和吞吐量就越好。

再說明一下其中的複製因子數量設置,複製因子對消息可靠性有直接影響,而且在設定爲強一致性工做模式下也會對消息吞吐量產生影響。因爲咱們使用Kafka主要是爲了接收/發送日誌數據,在運行過程當中丟失一兩條日誌是能夠容忍的錯誤。因此建議設置複製因子數量爲 「Brokers數量 / 2 + 1」,而且在生產者端使用「弱一致性」發送模式,即acks == 1。

  • 多少個消費者,分區怎麼分配?

爲了區分日誌數據來自於哪個業務系統,能夠專門爲不一樣的業務系統設置獨立的Topic。分區數量最好爲Brokers數量的整數倍,這樣才能確保在每個物理節點在硬件配置相同的狀況下,可以很好的均分吞吐量壓力。具體來講,因爲咱們在生產環境採用了5個Brokers物理節點,那麼每個Topic的分區數量最好爲5的整數倍,例如您能夠設置分區數量爲10。

既然設置分區數量爲10,那麼同一個消費者組的消費者數量最科學的值也是10。由於Kafka集羣中存在同一個分區的數據在同一時間最多被一個消費者所消費的限制,因此若是存在第11個消費者,它也只能處於備用等待狀態。待到某個消費者出現問題時,再由第11個消費者進行頂替。實際上在 事件/日誌採集系統 中這樣的Apache Kafka集羣規模,已經徹底能夠應付日均200萬PV的網站系統對日誌採集工做的吞吐量要求了。

  • 什麼是適當的存儲方案

日誌數據的分析手段通常有兩種:實時分析和離線分析。所謂實時分析是說分析服務在接收到日誌數據後,當即對產生的後果進行計算並將分析結果記錄在某個存儲方案上。Apache Storm、Apache Spark都是經常使用的實時分析系統,不過在本專題中並不會對Apache Storm或者Apache Spark進行詳細介紹,畢竟這屬於另外一個知識領域了(數據分析之後會有專門的專題進行講解。固然,讀者也能夠認爲做者壓根不知道)。實時分析在生產環境中有不少應用,例如根據用戶的上線/下線日誌對用戶的在線數量進行實時統計;根據商品的點擊狀況,對商品的查看數量進行實時統計;根據用戶的頁面跳轉狀況實時造成用戶瀏覽軌跡地圖。

日誌數據的另外一種分析手段是離線分析。即分析服務在接收到原始日誌數據後並不作任何處理,只是將原始數據按照預約的格式(又或者就是數據原本的格式)存儲到某個位置。當某個時間週期到來或者具體的事件被觸發時,再由其餘軟件對這些數據進行分析。Apache Hadoop/Cloudera Hadoop就是經常使用的離線分析工具。您能夠經過某種手段,將原始的日誌信息存儲在HDFS文件系統上,以便Hadoop進行離線分析。離線分析在實際生產環境中也有不少應用,例如按照用戶的商品瀏覽狀況分析用戶的購買趨勢、利用商品關鍵詞進一步分析適合銷售的用戶羣體、利用商品庫存和價格走勢預測最佳補貨時機。

不管是實時分析仍是離線分析Kafka的下層系統(組件)都須要作存儲操做。例如您能夠直接使用Kafka的消費者將消息寫入Cassandra集羣、能夠將Kafka接受到的數據做爲Apache Strom 的Spout,直接送入Strom的管道(進行實時分析)。若是您要將日誌寫入HDFS文件系統,則能夠直接使用Flume(這個在後續的示例方案中會講到)。不過,請別作愚蠢的事情:不要將日誌數據送入任何關係型數據庫

  • 業務層實現示圖

在接下來的方案演示中咱們假定業務系統基於JAVA,而且已經集成了Spring框架。因爲在本方案中咱們使用了過濾器(Filters)/攔截器(Interceptor)隔離操做日誌,因此業務服務中怎樣進行業務層和數據層的處理本方案能夠沒必要過多關注:

這裏寫圖片描述

這樣作的好處是,能夠將對日誌的攔截操做在執行真正的業務操做前進行隔離,業務處理代碼不須要關心在這以前都有多少層攔截,只須要按照原有的處理邏輯執行就行。

5-2-二、編碼過程:生產者和業務系統集成

  • 準備工做

演示的業務工程將使用Spring-MVC組件,因此若是您須要查看演示效果,請在工程中導入Spring-MVC組件(V3.2.X的版本都行):

<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-webmvc</artifactId>
    <version>3.2.10.RELEASE</version>
</dependency>
  • 業務系統端集成

爲了讓更多的讀者理解整個過程,咱們首先來看一下這個攔截器的使用方式:

package templateSSHProject.controller;

import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;

import templateSSHProject.controller.kafkaproducer.LogAnnotation;

/** * spring MVC組件搭建的http控制層 * @author yinwenjie */
@Controller
@RequestMapping("/")
public class UserController extends BaseController {

    /** * 查詢全部用戶信息,可是不包括關聯信息 * @param request * @param response */
    @LogAnnotation
    @RequestMapping("/queryAllUser")
    public void queryAllUserWithoutParent(HttpServletRequest request , HttpServletResponse response) {
        /* * 在這裏,以前是怎麼作業務的,仍是怎麼作業務 * 之前是怎樣調用服務層的,仍是怎麼去調用服務層 * 之前該怎樣進行輸出,如今仍是怎樣去進行輸出 * */
    }
}

以上代碼片斷是一個基於Spring-MVC組件編寫的Http Controller層的類,名叫UserController(固然這個類是被Spring Ioc容器託管了)。在瀏覽器上咱們可使用 http://ip:port/queryAllUser 這樣的URL訪問到queryAllUserWithoutParent方法。

請注意在queryAllUserWithoutParent方法上,咱們使用了一個「@LogAnnotation」自定義註解。這個註解表示:當方法被調用時,這個業務系統須要向 事件/日誌收集系統 發送日誌信息。

  • LogAnnotation註解的定義

「@LogAnnotation」註解的定義很是簡單,畢竟它只是一個標識,並非整個結構可以運行起來的核心動力。

package templateSSHProject.controller.kafkaproducer;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/** * 攔截標識註解。使用這個註解的方法說明須要向 事件/日誌服務發送消息 * @author yinwenjie */
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface LogAnnotation {
    /** * 您能夠根據本身的須要,在這個註解中加入各類屬性 * @return */
    public String message() default "";
}
  • 使用Spring-MVC的Interceptor攔截器,對HTTP請求進行攔截

好了,爲了讓以上的代碼可以運行起來。咱們須要使用基於Spring-MVC的Interceptor攔截器,對HTTP請求進行攔截。讓它在正式到達(執行)queryAllUserWithoutParent方法前,可以先被攔截器預先處理。首先咱們須要定義一個攔截器,以下所示:

package templateSSHProject.controller.kafkaproducer;

import java.lang.reflect.Method;
import java.util.Date;

import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.method.HandlerMethod;
import org.springframework.web.servlet.handler.HandlerInterceptorAdapter;

import test.interrupter.producer.ProducerService;

/** * 日誌攔截器。必定注意,攔截器是基於Spring MVC的。<br> * 若是您使用的是Struts組件,那麼就應該使用Struts提供的攔截器; * @author yinwenjie */
public class LogMethodInterceptor extends HandlerInterceptorAdapter {

    /** * 由 事件/日誌 系統提供的客戶端工具包, * 而且使用spring進行代理的消息生產者服務對象。 * 並且已經在spring配置中使用了singleton進行標記,說明全系統只有一個生產者服務對象 */
    @Autowired
    private ProducerService producerService;

    /* (non-Javadoc) * @see org.springframework.web.servlet.handler.HandlerInterceptorAdapter#preHandle(javax.servlet.http.HttpServletRequest, javax.servlet.http.HttpServletResponse, java.lang.Object) */
    @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
        //若是條件成立,說明攔截器無效:由於咱們只處理「方法級別的攔截」
        if (handler == null || !(handler instanceof HandlerMethod)) {
            return true;
        }

        HandlerMethod handlerMethod = (HandlerMethod)handler;
        Method method = handlerMethod.getMethod();
        LogAnnotation logAnnotation = method.getAnnotation(LogAnnotation.class);

        //若是條件成立,說明不須要進行事件/日誌觸發的操做動做。
        if(logAnnotation ==  null) {
            return true;
        }

        //不然就要判斷了
        Class<?> declaringClass = method.getDeclaringClass();
        String declaringClassName = declaringClass.getName();
        String methodName = method.getName();           

        // 這是要發送的日誌數據,包括類名,方法名,調用時間
        // 固然您還能夠從request對象中提取更多的業務數據
        String message = declaringClassName + ":" + methodName + "[" + new Date().getTime() + "]";
        this.producerService.senderMessage(message);

        return true;
    }
}

全部的Spring-MVC Interceptor都要繼承一個父類:org.springframework.web.servlet.handler.HandlerInterceptorAdapter,固然Interceptor也是被Spring-Ioc容器託管的。爲了使用Interceptor,您須要在配置文件中加入相應的信息:

<!-- 請配置xml的ns,加入新的ns:xmlns:mvc="http://www.springframework.org/schema/mvc" 還有新的schemaLocation: http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc.xsd -->
<mvc:interceptors>
    <bean class="templateSSHProject.controller.kafkaproducer.LogMethodInterceptor"></bean>
</mvc:interceptors>

在HandlerInterceptorAdapter父類中,咱們能夠按照自身的須要選擇性的重寫preHandle方法、postHandle方法、afterCompletion方法或者afterConcurrentHandlingStarted方法。從這些方法名稱就能夠明白這些方法所表明的含義。這裏咱們選擇重載其中的preHandle預處理方法。

請注意HandlerInterceptorAdapter類中定義的對象「private ProducerService producerService」。這個對象就是由 事件/日誌收集系統提供的JAVA 客戶端開發包中的主要服務類。第三方業務系統須要使用這個服務類和 事件/日誌收集系統 進行通信。

  • 客戶端開發包中的ProducerService定義和實現:

如下是生產者接口定義

package test.interrupter.producer;

/** * 生產者服務 * @author yinwenjie */
public interface ProducerService {
    /** * 初始化kafka生產端的配置信息 */
    public void init();

    /** * 向kafka brokers發送消息 * @param message */
    public void sendeMessage(String message);
}

如下是生產者接口實現:

package test.interrupter.producer;

import java.util.Properties;

import org.apache.commons.lang.StringUtils;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

/** * 生產者服務實現 * @author yinwenjie */
public class ProducerServiceImpl implements ProducerService {
    /** * kafka的brokers列表 */
    private String brokers;

    /** * acks的值,只能有三種-一、0還有1 */
    private Integer required_acks = 0;

    /** * 請求超時間,默認爲1000l */
    private Long request_timeout = 1000l;

    /** * kafka主服務對象 */
    private Producer<byte[], byte[]> producer;

    /** * 分區數量 */
    private Integer partitionNumber;

    /* (non-Javadoc) * @see test.interrupter.producer.ProducerService#init() */
    public void init() {
        // 驗證全部必要屬性都已設置
        if(StringUtils.isEmpty(this.brokers)) {
            throw new RuntimeException("至少須要指定一個broker的位置");
        }
        if(this.required_acks != 0 && this.required_acks != 1
            && this.required_acks != -1) {
            throw new RuntimeException("錯誤的required_acks值!");
        }
        if(this.partitionNumber <= 0) {
            throw new RuntimeException("partitionNumber至少須要有1個");
        }

        Properties props = new Properties();
        props.put("metadata.broker.list", this.brokers);
        props.put("producer.type", "sync");
        props.put("request.required.acks", this.required_acks.toString());
        props.put("request.timeout.ms", this.request_timeout.toString());
        ProducerConfig config = new ProducerConfig(props);

        this.producer = new Producer<byte[], byte[]>(config);
    }

    /* (non-Javadoc) * @see test.interrupter.producer.ProducerService#senderMessage(java.lang.String) */
    public void sendeMessage(String message) {
        // 建立和發送消息
        byte[] messageContext = message.getBytes();
        KeyedMessage<byte[], byte[]> keyedMessage = new KeyedMessage<byte[], byte[]>("MessageTopic", messageContext , null ,  messageContext);
        this.producer.send(keyedMessage);
    }

    /** * @return the brokers */
    public String getBrokers() {
        return brokers;
    }

    /** * @param brokers the brokers to set */
    public void setBrokers(String brokers) {
        this.brokers = brokers;
    }

    /** * @return the required_acks */
    public Integer getRequired_acks() {
        return required_acks;
    }

    /** * @param required_acks the required_acks to set */
    public void setRequired_acks(Integer required_acks) {
        this.required_acks = required_acks;
    }

    /** * @return the request_timeout */
    public Long getRequest_timeout() {
        return request_timeout;
    }

    /** * @param request_timeout the request_timeout to set */
    public void setRequest_timeout(Long request_timeout) {
        this.request_timeout = request_timeout;
    }

    /** * @return the partitionNumber */
    public Integer getPartitionNumber() {
        return partitionNumber;
    }

    /** * @param partitionNumber the partitionNumber to set */
    public void setPartitionNumber(Integer partitionNumber) {
        this.partitionNumber = partitionNumber;
    }
}
  • 使用Spring託管Producer服務

做爲使用java開發的業務系統,至少有兩種方式使用上一小節中定義的生產者服務接口。一種是在業務系統中的過濾器/攔截器中「new」這個類,而後手動調用初始化方法,最後再調用sendMessage方法;還好,咱們示例中的業務系統使用了Spring容器,因此咱們可使用第二種方法:將生產者服務注入容器,而後直接在過濾器/攔截器中調用sendMessage方法。

您須要在業務系統的配置欄目中加入新的bean定義:

......
<!-- 在這個示例代碼的spring bean配置中,必定要使用singleton 不然您會發現init方法不斷在執行,整個系統也會產生多個producerService對象 -->
<bean id="producerService" class="test.interrupter.producer.ProducerServiceImpl" init-method="init" scope="singleton">
    <property name="brokers">
        <value>${kafka.producer.brokers }</value>
    </property>
    <property name="partitionNumber">
        <value>${kafka.producer.partitionNumber }</value>
    </property>
    <property name="required_acks">
        <value>${kafka.producer.required_acks }</value>
    </property>
</bean>
......

和kafka brokers通信的主要參數咱們放置在一個properties文件中,方便部署時進行更改(kafka.properties):

kafka.producer.brokers=192.168.61.138:9092,192.168.61.139:9092
kafka.producer.partitionNumber=10
kafka.producer.required_acks=1

以上就是業務系統須要使用消息生產服務所進行的更改,以及生產服務自身是如何定義的。能夠看到,這種半侵入式的集成方式下,咱們確實須要爲集成 事件/日誌收集系統 作不少的配置、編碼工做。好的一方面是這些工做並不會影響原有的業務系統處理過程。

5-2-三、是否使用Spring Integration-Kafka

Spring Integration(http://projects.spring.io/spring-integration/)是依賴Spring核心框架進行工做的一套擴展組件。經過這套組件開發人員能夠方便的在應用工程上集成第三方中間件技術,例如使用Spring Integration-Redis集成對外部Redis服務的調用、使用Spring Integration-FTP 集成對外部FTP服務的調用、使用Spring Integration-Kafka 集成對外部Kafka服務的調用。

Spring Integration很是輕量、易於測試、入門文檔較全、幾乎沒有使用門檻,只要知道Spring框架的基本使用方式就行。使用Spring Integration來實現對外部中間件服務的調用,大多數狀況下比「本身編寫」的解決方式都要好。

雖然Spring Integration框架很是好用,也確實節省了至關的集成工做,減小了錯誤調用的風險。但可能要讓各位讀者要失望了:由於開發人員不能肯定,將要集成 事件/日誌收集系統 的全部業務系統都是基於Spring框架進行構建。因此在這樣的背景下,提供給基於JAVA(或者其擴展語言:Groovy、Scala)業務系統使用生產者服務,不該該和Spring造成強依賴關係,以保證在沒有使用Spring框架的JAVA業務系統上也實現生產者服務的集成。

在本小節中咱們展現生產者端的示例代碼,並無通過優化。例如雖然經過Spring框架分離了業務代碼和日誌發送代碼,可是一旦http請求到來,這些代碼仍是會在同一個線程運行。那麼若是出現因爲遠端的Kafka服務擁堵致使的生產者發送緩慢的狀況,就會影響到業務服務中對業務請求的處理速度。

要解決這個問題,能夠在業務系統中爲生產者服務開闢專門的處理線程池。利用線程池的BlockingQueue隊列存儲待發送的日誌消息,利用獨立線程進行日誌消息的發送。不過這個解決辦法並非最好的,只能算是一個辦法。由於生成者最終仍是會佔用業務系統緊張的JVM資源,仍是會在將自身的異常情況轉嫁給業務系統,在後面的方案中筆者還會提到這個問題。

5-2-四、編碼過程:消費者端

存在於 事件/日誌收集系統內部的 Kafka消息消費者端的代碼工做也是很是簡單的。Kafka消息消費者的工做只是用來接收這些這些日誌數據而且使用「適當的存儲方案」 將這些消息存儲起來(或者送入另外一處理組件,例如Strom)

下面給出一段可用的消息消費者端的代碼:

  • 一個ConsumerThread對象就表明一個消費者:
package com.test.logservice;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.message.MessageAndMetadata;

/** * 消息消費線程 * @author yinwenjie */
public class ConsumerThread implements Runnable {

    private KafkaStream<byte[], byte[]> stream;

    /** * 日誌 */
    private static Log LOGGER = LogFactory.getLog(ConsumerThread.class);

    /* (non-Javadoc) * @see java.lang.Runnable#run() */
    @Override
    public void run() {
        ConsumerIterator<byte[], byte[]> iterator =  this.stream.iterator();
        /* * 這個消費者獲取的數據在這裏 * 注意進行異常的捕獲: * 若是有異常拋出可是又沒有在方法中進行捕獲,就會致使線程執行終止 * */
        while(iterator.hasNext()) {
            MessageAndMetadata<byte[], byte[]> message = iterator.next();
            int partition = message.partition();
            String topic = message.topic();
            String messageT = new String(message.message());
            ConsumerThread.LOGGER.info("接收到: " + messageT + "來自於topic:[" + topic + "] + 第partition[" + partition + "]");

            /* * 這裏須要選擇一種 "合適的存儲方案" * */
        }
    }

    /** * @param stream the stream to set */
    public void setStream(KafkaStream<byte[], byte[]> stream) {
        this.stream = stream;
    }
}
  • KafkaConsumerLauncher基於spring框架鏈接zk而且啓動消息消費者:
package com.test.logservice;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ThreadPoolExecutor;

import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

/** * 這是Kafka的topic消費者 * @author yinwenjie */
public class KafkaConsumerLauncher implements ApplicationListener<ContextRefreshedEvent> {

    /** * zookeeper鏈接地址串 */
    private String zookeeper_connects;

    /** * zookeeper鏈接超時事件 */
    private Long zookeeper_timeout;

    /** * 分區數量 */
    private Integer consumerNumber;

    /** * 消息消費者處理線程池。 * 每個消費者都是線程池中的一個線程<br> * 且線程池中線程數量就是分區數量 */
    private ThreadPoolExecutor consumerPool;

    /* (non-Javadoc) * @see org.springframework.context.ApplicationContextAware#setApplicationContext(org.springframework.context.ApplicationContext) */
    @Override
    public void onApplicationEvent(ContextRefreshedEvent event) {
        ApplicationContext ac = event.getApplicationContext();
        //這裏的條件保證啓動 zk的鏈接和消費者線程的啓動是在spring框架完成初始化之後
        if(ac.getParent() == null) {
            this.startConsumerStream(ac);
        }
    }

    /** * 開啓消費者線程 * @param context */
    public void startConsumerStream(ApplicationContext context) {
        // ==============首先各類鏈接屬性
        Properties props = new Properties();
        props.put("zookeeper.connect", this.zookeeper_connects);
        props.put("zookeeper.connection.timeout.ms", this.zookeeper_timeout.toString());
        props.put("group.id", "consumerGroup");

        //==============
        ConsumerConfig consumerConfig = new ConsumerConfig(props);
        ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);

        HashMap<String, Integer> map = new HashMap<String, Integer>();
        String topicName = "MessageTopic";
        map.put(topicName, this.consumerNumber);
        Map<String, List<KafkaStream<byte[], byte[]>>> topicMessageStreams = consumerConnector.createMessageStreams(map);

        // 獲取並啓動消費線程,注意看關鍵就在這裏,一個消費線程能夠負責消費一個topic中的多個partition
        // 可是一個partition只能分配到一個消費者線程
        List<KafkaStream<byte[], byte[]>> streamList = topicMessageStreams.get(topicName);

        // 爲每個消費者建立一個處理線程。並放置到線程池中運行
        // 注意:並不須要監控這些消費線程的運行狀態,
        // 由於沒有消息接收的時候,線程就天然會在"iterator.hasNext()"位置等待
        for(int index = 0 ; index < streamList.size() ; index++) {
            KafkaStream<byte[], byte[]> stream = streamList.get(index);
            ConsumerThread consumerThread = (ConsumerThread)context.getBean("consumerThread");
            consumerThread.setStream(stream);
            this.consumerPool.submit(consumerThread);
        }
    }

    /** * @return the zookeeper_connects */
    public String getZookeeper_connects() {
        return zookeeper_connects;
    }

    /** * @param zookeeper_connects the zookeeper_connects to set */
    public void setZookeeper_connects(String zookeeper_connects) {
        this.zookeeper_connects = zookeeper_connects;
    }

    /** * @return the zookeeper_timeout */
    public Long getZookeeper_timeout() {
        return zookeeper_timeout;
    }

    /** * @param zookeeper_timeout the zookeeper_timeout to set */
    public void setZookeeper_timeout(Long zookeeper_timeout) {
        this.zookeeper_timeout = zookeeper_timeout;
    }

    /** * @return the consumerNumber */
    public Integer getConsumerNumber() {
        return consumerNumber;
    }

    /** * @param consumerNumber the consumerNumber to set */
    public void setConsumerNumber(Integer consumerNumber) {
        this.consumerNumber = consumerNumber;
    }

    /** * @return the consumerPool */
    public ThreadPoolExecutor getConsumerPool() {
        return consumerPool;
    }

    /** * @param consumerPool the consumerPool to set */
    public void setConsumerPool(ThreadPoolExecutor consumerPool) {
        this.consumerPool = consumerPool;
    }
}

KafkaConsumerLauncher中咱們一共爲名叫「MessageTopic」的Topic建立了10個消費者。就像講解Kafka特性時提到的那樣:消費者數量不要小於Topic的分區數量,也能夠多出一些消費者數量做爲備用。這樣才能保證每個分區都有一個對應的消費者進行消費。若是在您的集羣中,設計了5個消費節點做爲消費者,那麼也能夠爲每個消費者應用程序建立兩個消費者,這樣一共也有10個消費者了。

另外注意,在KafkaConsumerLauncher中咱們使用了一個線程池對象consumerPool,而且使用了Spring框架進行了注入;咱們建立具體的消費者線程也是依託於Spring框架完成的,因此纔會有「context.getBean」這樣的語句。它們的xml配置狀況以下:

<!-- 消費者啓動器 -->
<bean id="kafkaConsumerLauncher" class="com.test.logservice.KafkaConsumerLauncher" scope="singleton">
    <property name="consumerNumber" value="10"></property>
    <property name="consumerPool" ref="consumerPool"></property>
    <property name="zookeeper_connects" value="192.168.61.138:2181"></property>
    <property name="zookeeper_timeout" value="10000"></property>
</bean>

<!-- ========================================================= -->

<!-- 如下是服務器節點專門爲數據處理準備的處理線程 由於只會有10個生產者,因此線程池的大小是固定的,也無需使用無限隊列 -->
<bean id="consumerPool" scope="singleton" class="java.util.concurrent.ThreadPoolExecutor">
    <constructor-arg value="10" type="int"></constructor-arg>
    <constructor-arg value="10" type="int"></constructor-arg>
    <constructor-arg value="10000" type="long"></constructor-arg>
    <constructor-arg value="MILLISECONDS" type="java.util.concurrent.TimeUnit"></constructor-arg>
    <constructor-arg ref="threadCacheQueue"></constructor-arg>
</bean>
<bean id="threadCacheQueue" class="java.util.concurrent.SynchronousQueue"></bean>

<!-- 消費者啓動線程 必定注意:prototype屬性值,它表明着每次getBean就建立一個新的ConsumerThread對象 -->
<bean id="consumerThread" class="com.test.logservice.ConsumerThread" scope="prototype"></bean>

========================= (接下文)

相關文章
相關標籤/搜索