分佈式消息中間件

分佈式應用和集羣:java

  從部署形態來看,它們都是多臺機器或者多個進程部署,並且都是爲了實現一個業務功能。git

  若是是一個業務被拆分紅多個子業務部署在不一樣的服務器上,那就是分佈式應用github

  若是是同一個業務部署在多臺服務器上,那就是集羣算法

分佈式應用的多個子系統之間並非徹底獨立的,它們須要相互通訊來共同完成某個功能。系統間通訊的方式有兩種,一種是遠程過程調用即RPC接口,spring

另外一種是基於消息隊列的方式。數據庫

  基於消息隊列的方式是指由應用中的某個系統負責發送消息,由訂閱這條消息的相應系統負責接收消息。不一樣的系統在收到消息後進行各自系統內的業務處理。消息能夠很是簡單,好比只包文本字符串;也能夠很複雜,好比包含字節流、字節數組,還可能嵌入對象,甚至是經序列化後的Java對象。消息生產者在發送消息後能夠當即返回,由消息隊列來負責消息的傳遞,消息發佈者只管將消息發佈到消息隊列而不用管誰來取,消息消費者只管從消息隊列中取消息而不用管是誰發佈的,這樣生產者和消費者都不用知道對方的存在。apache

消息隊列使用的典型場景是異步處理,同時還可用於解耦、流量削峯、日誌收集、事務最終一致性等問題。bootstrap

消息隊列的特色:數組

Broker:至少須要包含消息的發送、接收和暫存功能,另外,在不一樣的業務場景中,須要消息隊列能解決諸如消息堆積、消息持久化、可靠投遞、消息重複、嚴格有序、集羣等各類問題。瀏覽器

  一、消息堆積:消息消費者處理速度跟不上生產者發送消息的速度,形成消息堆積,因此消息隊列要可以處理這種狀況,好比設置閥值,將超過閥值的消息再也不放入處理中

    心、設置消息過時時間,以防止系統資源被耗盡致使整個消息隊列不可用。

  二、消息持久化:消息處理中心若是在接收到消息以後不作任何處理就直接轉給消費者,那就沒法知足流量削峯等需求。索引消息處理中心要能先把消息暫存下來,而後選擇

    合適的時機將消息投遞給消費者。

  三、可靠投遞:可靠投遞是不容許存在消息丟失的狀況的

  四、消息重複:當消息發送失敗或者不知道是否發送成功時(好比超時),消息的狀態時待發送,定時任務會不停的輪詢全部的待發送消息,最終保證消息不會丟失,可是帶

    來了消息可能會重複的狀況

  五、嚴格有序:實際的業務場景中,會有須要按生產消息時的順序來消費的情形,這就須要消息隊列可以提供有序消息的保證。

  六、集羣:消息隊列產品要提供對集羣模式的支持

  七、消息中間件:非底層操做系統軟件,非業務軟件,不是最終給用戶使用的額,不能直接給客戶端帶來價值的軟件統稱爲中間件。介於用戶應用和操做系統之間的軟件。

    消息中間件關注於數據的發送和接收,利用高效、可靠的異步消息傳遞機制集成分佈式系統。

 

  消息協議

  消息協議是指用於實現消息隊列功能時所涉及的協議。按照是否向行業開放消息規範文檔,能夠將消息協議分爲開放協議和私有協議。常見的開放協議有AMQP、MQTT、STOMP、XMPP等,有些特殊框架Redis、Kafka、ZeroMQ根據自身須要位嚴格遵循MQ規範,而是基於TCP/IP自行封裝了一套協議,經過網絡Socket接口進行傳輸,實現了MQ的功能。

這裏的協議能夠簡單地理解成雙方通訊的一個約定,好比傳過來一段字符流數據,其中第一個字節表示什麼,第二個字節表示什麼。

AMQP:Advanced Message Queuing Protocol,通常來講將AMQP協議的內容分爲三個部分,基本概念、功能命令和傳輸層協議

  基本概念:AMQP內部定義的各組件及組件的功能說明

  功能命令:指該協議定義的一系列命令,應用程序能夠基於這些命令來實現相應的功能

  傳輸層協議:定義了數據的傳輸格式,消息隊列的客戶端能夠基於這個協議與消息代理和AMQP的相關模型進行交互通訊,該協議內容包括數據幀處理、信道複用、內容編碼、心跳檢測

      、數據表示和錯誤處理等。

一、主要概念

  Message:消息,消息服務器所處理數據的原子單元

  Publisher:消息生產者,也是一個向交換器發佈消息的客戶端應用程序

  Exchange:交換器,用來接收消息生產者所發送的消息並將這些消息路由給服務器中的隊列

  Binding:綁定,用於消息隊列和交換器之間的關聯,一個綁定就是基於路由鍵將交換器和消息隊列鏈接起來的路由規則,因此能夠將交換器理解成一個由綁定構成的路由表。

  Virtual Host:虛擬主機,它是消息隊列以及相關對象的集合,是共享同一個身份驗證和加密環境的獨立服務器域,每一個虛擬主機本質上都是一個mini版的消息服務器,擁有本身的隊列、交換器、綁定和權限機制

  Broker:消息代理,表示消息隊列服務器實體,接受客戶端鏈接,實現AMQP消息隊列和路由功能的過程

  Routing Key:路由規則,虛擬機可用它來肯定如何路由一個特定消息

  Queue:消息隊列,用來保存消息直到發送給消費者。它是消息的容器,也是消息的終點。一個消息可被投入一個或多個隊列中。消息一直在隊列裏面,等待消費者鏈接到這個隊列將其取走

  Connection:鏈接,能夠理解成客戶端和消息隊列服務器之間的TCP鏈接

  Channel:信道,僅僅當建立了鏈接後,若客戶端仍是不能發送消息,則須要爲鏈接建立一個信道。信道是一條獨立的雙向數據流通道,它是創建在真實的TCP鏈接內的虛擬鏈接,AMQP命令都是經過信道發送出去的,無論是發佈消息、訂閱隊列仍是接收消息,它們都是信道完成。一個鏈接能夠包含多個信道,之因此須要信道,是由於TCP鏈接的創建和釋放都是十分昂貴的,若是客戶端的每個線程都須要與消息服務器交互,若是每個線程都創建了一個TCP鏈接,不只浪費資源,並且操做系統也沒法支持每秒創建如此多的鏈接

  Consumer:消息消費者,表示一個從消息隊列中取得消息的客戶端應用程序

二、核心組件生命週期

(1)消息的聲明週期:一條消息消息的流轉過程是這樣的:Publisher產生一條數據,發送到Broker,Broker中的Exchange能夠被理解爲一個規則表(Routing Key和Queue的映射關係-Binding),Broker收到消息後根據Routing key查詢投遞的目標Queue。Consumer向Broker發送訂閱消息時會指定本身監聽哪一個Queue,當有數據到達Queue時Broker會推送數據到Consumer。

  生產者Publisher在發佈消息時能夠給消息指定各類消息屬性,其中有些屬性有可能會被消息代理Broker所使用,而其餘屬性則是徹底不透明的,它們只能被接收消息的應用所使用。

  當消息到達服務器時,服務器中的交換器一般會將消息路由到服務器上的消息隊列中,若是消息不能路由,則交換器會將消息丟棄或者將其返回給消息生產者,這樣生產者能夠選擇如何來處理未路由的消息。

  單條消息可存在於多個消息隊列中,消息代理能夠採用複製消息等多種方式進行處理。可是當一條消息被路由到多個消息隊列中時,它在每一個消息隊列中都是同樣的。

  當消息到達消息隊列時,消息隊列會當即嘗試將消息傳遞給消息消費者。若是傳遞不成功,則消息隊列會存儲消息(按生產者要求存儲在內存或者磁盤中),並等待消費者準備好。

  若是沒有消費者,則消息隊列經過AMQP將消息返回給生產者(若是須要的話)。當消息隊列把消息傳遞給消費者後,它會從內部緩衝區中刪除消息,刪除動做多是當即發生的,也可能在消費者應答已成功處理以後再刪除。

  消息消費者可選擇如何及什麼時候來應答消息,一樣,消費者也能夠拒絕消息(一個否認應答)。

(2)交換器的生命週期:每臺AMQP服務器都預先建立了許多交換器實例,它們在服務器啓動時就存在而且不能被銷燬。若是你的應用程序有特殊要求,則能夠選擇本身建立交換器,並在完成工做後進行銷燬

(3)隊列的生命週期:隊列分爲持久化消息隊列和臨時消息隊列。

  持久化消息隊列可被多個消費者共享,無論是否有消費者接收,它們均可以獨立存在

  臨時消息隊列對某個消費者是私有的,只能綁定到此消費者,當消費者斷開鏈接時,該消息隊列將被刪除

 

Cosumer工做原理:

  應用經過監聽隊列中的消息,獲取queue中的message便可消費(Broker推送)

注意事項:

  1):沒有消費者的Queue的message是沒法被消費的,這個queue中的message就會一直存在

  2):一個Queue能夠擁有多個消費者,也能夠註冊一個獨享消費者,註冊獨享消費者的Queue中的消息只有指定的消費者能夠消費message

  3):消費者消費完消息會發個發個反饋給Queue,這個Queue就會將這條message從Queue中移除,若是沒有接收到反饋那麼Queue就會一直

    存在這條message,同時這個message若是不能被消費那麼就會形成Queue中的消息堵塞

 

Message的主要屬性

  Content type:內容類型

  Content encoding:內容編碼

  Routing key:路由鍵  

  Delivery mode(persistent or not):投遞模式(持久化或非持久化)

  Message priority:消息優先權

  Message publishing timestamp:消息發佈的時間戳

  Expiration period:消息有效期

  Publisher application id:發佈應用的ID

  注意事項:

    (1):消息是以byte字節的形式存在

    (2):Content type能夠存放一些header和argument屬性(和Http Request相似)

    (3):有些內容例如中文,須要指定編碼

    (4):Delivery mode設置成持久化模式能夠將消息保存到硬盤,在服務器重啓後會讀取硬盤中未被消費的message,此舉會保證

      消息的健壯性可是會形成性能犧牲。

 

 

 三、功能命令

AMQP協議文本是分層描述的,在不一樣主版本中劃分的層次是有必定區別的。

0-9版本共分兩層:Function Layer(功能層)和Transport Layer(傳輸層)。

  功能層定義了一系列命令,這些命令按功能邏輯組合成不一樣的類(Class),客戶端應用能夠利用它們來實現本身的業務功能。

  傳輸層將功能層所接收的消息傳遞給服務器通過相應處理後再返回,處理的事情包括信道複用、幀同步、內容編碼、心跳檢測、數據表示和錯誤處理等

0-10版本則分爲三層:Model Layer(模型層)、Session Layer(會話層)和Transport Layer(傳輸層)。

  模型層定義了一套命令,客戶端應用利用這些命令來實現業務功能

  會話層將負責將命令從客戶端應用傳遞給服務器,再將服務器的響應返回給客戶端應用,會話層爲這個傳遞過程提供了可靠性、同步機制和錯誤處理。

  傳輸層負責提供幀處理、信道複用、錯誤檢測和數據表示

 

四、消息的數據格式

  全部的消息數據都被組織成各類類型的幀(Frame)。幀能夠攜帶協議方法和其餘信息,全部幀都有一樣的格式,都有一個幀頭(header,7個字節)、任意大小的負載(payload)和

一個檢測錯誤的結束幀(Frame-end)字節組成。

  其中幀頭包括一個type字段、一個channel字段和一個size字段;幀負載的格式依賴幀類型(type)

  要讀取一個幀須要三步、

  一、讀取幀頭,檢查幀類型和通道(channel)

  二、根據幀類型讀取幀負載並進行處理

  三、讀取結束幀字節

AMQP定義了以下幀類型。

  type=1,"METHOD":方法幀;

  type=2,"HEADER":內容頭幀;

  type=3,"BODY":內容體幀;

  type=4,"HEARTBEAT":心跳幀。

通道編號爲0的表明全局連接中的全部幀,1~65535表明特定通道的幀。size字段是指幀負載的大小,它的數值不包括結束幀字節。

AMQP使用結束幀來檢測錯誤客戶端和服務端實現引發的錯誤。

 

  JMS

  Java Message Service,即Java消息服務應用程序接口,是Java平臺中面向消息中間件的一套規範的Java API接口。用於在兩個應用程序之間或分佈式系統中發送消息,進行異步通訊。

JMS不是消息隊列協議中的一種,更不是消息隊列產品,它是與具體平臺無關的API,目前市面上的絕大多數消息中間件廠商都支持JMS接口規範。換句話說,你可使用JMS API來連

接支持AMQP、STOMP等協議的消息中間件產品。在這一點上和JDBC很像。

 

  RabbitMQ

  RabbitMQ是一個由Erlang語言開發的基於AMQP標準的開源實現。RabbitMQ最初起源於金融系統,用於在分佈式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗。

其具體特色包括:

  一、保證可靠性(Reliability)。RabbitMQ使用一些機制來保證可靠性,如持久化、傳輸確認、發佈確認等

  二、具備靈活的路由(Flexible Routing)功能。在消息進入隊列以前,是經過Exchange(交換器)來路由消息的。對於典型的路由功能,RabbitMQ已經提供了一些內置的Exchange來

    實現。針對更復雜的路由功能,能夠將多個Exchange綁定在一塊兒,也能夠經過插件機制來實現本身的Exchange。

  三、支持消息集羣(Clustering)。多臺RabbitMQ服務器能夠組成一個集羣,造成一個邏輯Broker。

  四、具備高可用性(Highly Available)。隊列能夠在集羣中的機器上進行鏡像,使得在部分節點出現問題的狀況下仍然可用。

  五、支持多種協議(Multi-protocol)除了支持AMQP以外,還經過插件的方式支持其餘消息隊列協議,如STOMP、MQTT等

  RabbitMQ的總體架構圖:

  

交換器:

不一樣類型的交換器分發消息的策略也不一樣,Direct、Fanout、Topic等

Driect交換器:

  Direct交換器基於消息中的路由鍵將消息投遞到對應的消息隊列中,Direct交換器是徹底匹配,單播的模式。

  一、每個消息隊列根據routing key 爲K綁定到交換器上

  二、當一個到達Direct 交換器中的消息的routing key 爲R時,交換器根據路由key R找到Binding中的路由key K和R相等的隊列,將消息投遞到這個

    隊列中

  

 

Fanout交換器:

  Fanout交換器路由消息到全部的與該交換器綁定的消息隊列中,每一個隊列都會獲得這個消息的一個拷貝。忽略掉Routing key。

  

Topic交換器

  Topic交換器根據消息中的Routing key和隊列與綁定到交換器中的Routing key的模式進行匹配。消息中的Routing key與模式匹配的話就能夠將消息分發到隊列中,所以能夠分發到一個或多個隊列中。

Topic交換器一般用於各類發佈/訂閱模式的變體和消息的多播路由。Topic交換器將路由鍵和綁定鍵的字符串切分紅單詞,這些單詞之間用點」.「隔開,且Topic交換器會識別兩個通配符,#和*,#匹配0或多個單詞,*匹配恰好一個單詞。

 <!--RabbitMQ-->
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>4.1.0</version>
        </dependency>

 生產者:

package com.yang.spbo.rabbitmq;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * rabbitmq生產者
 * 〈功能詳細描述〉
 *
 * @author 17090889
 * @see [相關類/方法](可選)
 * @since [產品/模塊版本] (可選)
 */
public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 建立鏈接工廠
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        // 設置rabbitmq地址
        connectionFactory.setHost("localhost");
        // 虛擬主機
        connectionFactory.setVirtualHost("/");
        // 創建到代理服務器的鏈接
        Connection connection = connectionFactory.newConnection();
        // 建立信道
        Channel channel = connection.createChannel();
        // 聲明direct交換器
        String exchangeName = "hello-exchange";
        channel.exchangeDeclare(exchangeName, "direct", true);
        // 路由鍵
        String routingKey = "testRoutingKey";
        // 發佈消息
        byte[] messageBodyBytes = "quit".getBytes();
        channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);
        // 關閉信道和鏈接
        channel.close();
        connection.close();
    }
}

消費者:

消息消費者經過不斷循環等待服務器推送消息,一旦有消息過來,就在控制檯輸出消息的相關內容。

package com.yang.spbo.rabbitmq;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * rabbitmq消費者
 * 〈功能詳細描述〉
 *
 * @author 17090889
 * @see [相關類/方法](可選)
 * @since [產品/模塊版本] (可選)
 */
public class Consumer {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 建立鏈接工廠
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        // 設置rabbitmq地址
        connectionFactory.setHost("localhost");
        // 虛擬主機
        connectionFactory.setVirtualHost("/");
        // 創建到代理服務器的鏈接
        Connection connection = connectionFactory.newConnection();
        // 建立信道,不能修改
        final Channel channel = connection.createChannel();
        // 聲明direct交換器
        String exchangeName = "hello-exchange";
        channel.exchangeDeclare(exchangeName, "direct", true);
        // 聲明隊列
        String queueName = channel.queueDeclare().getQueue();
        // 路由鍵
        String routingKey = "testRoutingKey";
        // 將交換器和隊列根據路由key綁定起來
        channel.queueBind(queueName, exchangeName, routingKey);
        while (true) {
            // 消費消息
            boolean autoAck = false;
            String consumerTag = "";
            channel.basicConsume(queueName, autoAck, consumerTag, new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String routingKey = envelope.getRoutingKey();
                    String contentType = properties.getContentType();
                    System.out.println("消費的路由鍵:" + routingKey);
                    System.out.println("消費的內容類型:" + contentType);
                    long deliveryTag = envelope.getDeliveryTag();
                    // 確認消息
                    channel.basicAck(deliveryTag, false);
                    String bodyStr = new String(body, "UTF-8");
                    System.out.println("消費的消息體內容:" + bodyStr);
                }
            });
        }
    }
}

使用消息隊列可使以前同步調用的代碼改爲了異步處理的方式。

Spring整合RabbitMQ

   <!--spring rabbitmq-->
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit</artifactId>
            <version>2.0.2.RELEASE</version>
        </dependency>

同時在Spring配置文件中配置鏈接信息、監聽器、隊列名稱、交換器,rabbitTemplate以及rabbitAdmin等

同時自定義類實現MessageListener

@Service
public class MyMessageListener implements MessageListener {
    @Override
    public void onMessage(Message message) {
        String messageBody = new String(message.getBody());
    }
}

 

基於RabbitMQ的消息推送:

  以瀏覽器推送接收消息爲例,之前,瀏覽器中的推送功能都是經過輪詢來實現的。所謂輪詢是指以特定時間間隔(如每隔1s)由瀏覽器向服務器發送請求,而後服務器返回最新的數據給瀏覽器。但這種模式的缺點是瀏覽器須要不斷地向服務器發出請求。每次請求中的絕大部分數據都是相同的,裏面包含的有效數據只是很小的一部分,這會致使佔用不少的帶寬,並且不斷地鏈接將大量消耗服務器資源。

  因此,爲了改善這種狀況,H5定義了WebSocket,可以實現瀏覽器與服務器之間全雙工通訊。其優勢有兩個:

  一是服務器與客戶端之間交換的標頭信息很小;

  二是服務器能夠主動傳送數據給客戶端;

  目前的主流瀏覽器都已支持WebSocket,而服務端消息隊列選用RabbitMQ,則是由於RabbitMQ有豐富的第三方插件,用戶能夠在AMQP協議的基礎上本身擴展應用。針對WebSocket通訊RabbitMQ提供了Web STOMP插件,它是一個實現了STOMP協議的插件,它是一個實現了STOMP協議的插件,能夠將該插件理解爲WebSocket與STOMP協議間的橋接,目的是讓瀏覽器可以使用RabbitMQ,當RabbitMQ啓用了Web STOMP插件後,瀏覽器就可使用WebSocket與之通訊了。

  當有新消息須要發佈時,系統後臺將消息數據發送到RabbitMQ中,再經過WebSocket將數據推送給瀏覽器。

  再js中消費消息。能夠在github上下載stomp.js。

 

消息保存:

  對隊列中的消息的保存方式有disk和RAM兩種。

  disk即寫入磁盤,也就是持久化,在發生宕機時,消息數據能夠在系統重啓以後恢復。

  採用disk方式,消息數據會被保存在以.rdq後綴命名的文件中,當文件達到16M時會從新生成一個新的文件,當文件中已經被刪除的消息比例大於閥值時會觸發文件合併操做,以提升磁盤利用率。

  採用RAM方式,只是在RAM保存內部數據庫表數據,而不會保存消息,消息存儲索引。隊列索引和其餘節點等數據,因此必須在啓動時從集羣中其餘節點同步原來的消息數據,這也意味着集羣中必須包含至少一個disk方式的節點。

  消息持久化包括Queue、Message、Exchange持久化三部分。durable:持久的

  Queue持久化:經過設置queueDeclare方法中的durable參數設置爲true;

  Message持久化:經過設置basePublish方法中的BasicProperties中的deliveryMode爲2;

  Exchange持久化:在聲明Exchange時使用支持durable入參的方法,設置爲true;

 

如何保證消息不會丟失?

消息確認模式(生產者確認消息投遞到消息隊列中):

  在默認狀況下,生產者把消息發送出去之後,Broker不會返回任何消息給生產者。也就是說,生產者不知道消息有沒有到達Broker。若是在消息到達Broker前發生了宕機或者Broker接收到消息在將消息寫入磁盤時發生了宕機,那麼消息就會丟失。而生產者並不知道消息的狀況?

RabbitMQ提供了兩種解決方式:一、經過AMQP協議中的事務機制  二、把信道設置成確認模式

  AMQP中的事務機制將把信道設置成事務模式後,生產者和Broker之間會有一種發送/響應機制,生產者須要同步等待Broker的執行結果,在性能上會下降消息服務的吞吐量,因此通常採用性能更好的發送方確認方式來保障消息投遞,將信道設置爲確認模式以後,在該信道上發佈的全部消息都會被分配一個惟一ID,一旦消息被投遞到全部匹配的隊列中,該信道就會向生產者發送確認消息,在確認消息中包含了以前的惟一ID,從而讓生產者知道消息已到達目的隊列。確認模式最大的優點是異步,生產者可繼續發送消息。

 

消費者回執(消費者成功消費消息):

  在實際應用中可能出現消費者接收到消息,可是尚未處理完就發生宕機的狀況,這也會致使消息丟失爲避免這種狀況,能夠要求消費者在消費完消息以後發送一個回執給RabbitMQ服務器,RabbitMQ服務器在收到回執以後再將消息從其隊列中刪除,若是沒有收到回執而且檢測到消費者與RabbitMQ服務器的鏈接斷開了,則由RabbitMQ服務器負責把消息發送給其餘消費者。若是沒有斷開,RabbitMQ是不會把消息發送給其餘消費者的。

  一、兩種消息回執模式

    1)、自動回執:當Broker成功發送消息給消費者後就會當即把此消息從隊列中刪除,而不等待消費者回送確認消息

    2)、手動回執:當Broker發送消息給消費者後不會當即把此消息刪除,而是等待消費者回執的確認消息後纔會刪除。消費者收到消息並處理完成後須要向Broker顯式發送ACK指令,若是消費者由於意外崩潰而沒有發送ACK指令,那麼Broker就會把該消息轉發給其餘消費者,若是此時沒有其餘消費者,那麼Broker會緩存消息。

  二、拒絕消息

    當消費者處理消息失敗或者當前不能處理消息時,能夠給Broker發送一個拒絕消息的指令,而且可要求Broker將該消息丟棄或者從新放入隊列中。

  三、消息預取

    爲了消費者負載均衡,能夠設置預取數量限制每一個消費者在收到下一個確認回執前一次能夠接收多少條消息。

 

如何處理消息不被重複消費呢?

  首先要知道消息爲何會被重複消費,大可能是因爲網絡不通致使,消費者的確認消息沒有傳送到消息隊列,致使消息隊列不知道消息已經被消費了,再次

將該消息分發給其餘消費者。因此解決的思路有下面幾種:

  1):若是消息是作數據庫的插入操做,給這個消息作一個惟一的主鍵,那麼就算出現重複消費的狀況,就會致使主鍵衝突,避免數據庫出現髒數據。

  2):判重表,將消費過處理成功的消息存入判重表中,每次消費處理前先去判重表查詢是否已消費過

  2):若是你拿到這個消息作Redis的set操做,不用解決,由於不管你set幾回結果都是同樣的,set操做原本就算冪等操做

  3):若是上面兩種狀況都不行,準備一個第三方服務方來作消費記錄。以Redis爲例,給消息分配一個全局id,只要消費過該消息,將<id,Message>以KV

    形式寫入Redis。那消費者開始消費前,先去Redis中查詢 有沒有消費記錄便可。

  總之,解決思路就是,若是消息重複消費不會帶來問題,那大可不用理會,若是有問題,要對消費過的消息作記錄(數據庫或者緩存),再次消費前查詢是否已經被消費。

或者兩次操做作互斥操做,使只有一次操做能成功執行。有些狀況須要考慮使用分佈式鎖

 

如何保證消息順序消費? 

  經過算法,將須要保持前後順序的消息放在同一個消息隊列中,而後只用一個消費者去消費該隊列。

  一、RabbitMQ:若是存在多個消費者,那麼就讓每一個消費者對應一個queue,而後把要發送的數據所有放到一個queue,這樣就能保證全部

  的數據只到達一個消費者從而保證每一個數據到達數據庫都是順序的。

  (拆分多個queue,每一個queue一個consumer。或者就是一個queue可是對應一個consumer,而後這個consumer內部用內存隊列作排隊,而後分發給底層不一樣的worker來處理)。

  二、Kafka寫入partition時指定一個key,例如訂單id,那麼消費者從partition中取出數據的時候確定是有序的,當開啓多個線程的時候可能

數據不一致,這時候就須要內存隊列,將相同的hash過的數據放在一個內存隊列裏,這樣就能保證一條線程對應一個內存隊列的數據寫入數據

庫的時候順序性的,從而卡伊開啓多條線程對應多個內存隊列。

  (Kafka:一個topic,一個partition,一個consumer,內部單線程消費,寫N個內存queue,而後N個線程分別消費一個內存queue便可)。

流控機制:

  RabbitMQ能夠對內存和磁盤的使用量設置閥值,當達到閥值後生產者將被阻塞,直到對應資源的使用恢復正常。除了設置這兩個閥值以外,RabbitMQ還用流控(Flow Control)機制來確保穩定性。

 

  Kafka

  特色:

  一、同時爲發佈和訂閱提供搞吞吐量。Kafka的設計目標是以時間複雜度爲O(1)的方式提供消息持久化能力的,即便對TB級別以上數據也能保證常數時間的訪問性能,即便在很是廉價的商用機器上也能作到單機支持每秒100K條消息的傳輸(通常消息處理是百萬級,使用Partition實現機器間的並行處理

  二、消息持久化。將消息持久化到磁盤,所以可用於批量消費,例如ETL以及實時應用程序。經過將數據持久化到磁盤以及複製能夠防止數據丟失

  三、分佈式。支持服務器間的消息分區及分佈式消費,同時保證每一個Partition內的消息順序傳輸。同時保證每一個Partition內的消息順序傳輸。其內部的Producer、Broker和Consumer都是分佈式架構,這更易於向外擴展。

  四、 消費消息採用Pull模式。消息被處理的狀態是在Consumer端維護的,而不是由服務端維護,Broker無狀態,Consumer本身保存offset

  五、支持Online和Offline場景,同時支持離線數據處理和實時數據處理。

基本概念:

  Broker:Kafka集羣中的一臺或多臺服務器

  Topic:主題,發佈到Kafka的每條消息都有一個類別,這個類別就被稱爲Topic(物理上,不一樣Topic的消息分開存儲;邏輯上,雖然一個Topic的消息被保存在一個或多個Broker上,但用戶只須要指定消息的Topic便可生產或消費數據,而沒必要關心數據存於何處)

  Partition:物理上的Topic分區,一個Topic能夠分爲多個Partition,每一個partition都是一個有序的隊列。Partition中的每條消息都會被分配一個有序的ID(offset),它惟一地標識分區中的每一個記錄。每一個Partition只能被一個消費組中的一個消費者消費。

  Producer:消息和數據的生產者,能夠理解爲向Kafka發送消息的客戶端

  Consumer:消息和數據的消費者,能夠理解爲從Kafka取消息的客戶端,經過與Kafka集羣創建長鏈接的方式,不斷的從集羣中拉去消息

  Consumer Group(消費組):每一個消費者都屬於一個特定的消費組(可爲每一個消費者指定組名,若不指定組名,則屬於默認的組)。這是Kafka用來實現一個Topic的廣播(發送給全部的消費者)和單播(發送給任意一個消費者)的手段。一個Topic能夠由多個消費組。但對每一個消費組,只會把消息發送給該組中的一個消費者。若是要實現廣播,只要每一個消費者都有一個獨立的消費組就能夠了;若是要實現單播,只要全部的消費者都在同一個消費者組中就行。

        

  一個典型的Kafka集羣中包含若干生產者、若干Broker(Kafka支持水平擴展,通常Broker數量越多集羣吞吐量越大)、若干消費者組以及一個Zookeeper集羣。Kafka經過Zookeeper管理集羣配置、選舉leader,以及當消費者組發生變化時進行Rebalance(再均衡)。生產者使用推模式將消息發佈到Broker,消費者使用拉模式從Broker訂閱並消費消息。

  

  建立一個Topic時,能夠指定分區數目,分區數越多,其吞吐量越大,可是須要的資源也越多,也會帶來更高的不可用性。

  生產者在向kafka集羣發送消息的分區策略

  一、能夠指定分區,則消息投遞到指定的分區

  二、若是沒有指定分區,可是消息的key不爲空,則基於key的哈希值來選擇一個分區

  三、若是既沒有指定分區,且消息的key也爲空,則用輪詢的方式選擇一個分區

也就是一條消息只會發送到一個分區中。

  對於一個group而言,消費者的數量不該該多餘分區的數量,由於在一個group中,每一個分區最多隻能綁定到一個消費者上,只能被一個消費組中的一個消費者消費。而一個消費者能夠消費多個分區。所以,若一個消費組中的消費者數量大於分區數量的話,多餘的消費者將不會收到消息(沒有分區能夠消費)。

  實例:

        <!--kafka-->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.11.0.1</version>
        </dependency>

消息生產者:

package com.yang.spbo.kafka;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.HashMap;
import java.util.Map;

/**
 * Kafka消息生產者
 * 〈功能詳細描述〉
 *
 * @author 17090889
 * @see [相關類/方法](可選)
 * @since [產品/模塊版本] (可選)
 */
public class ProducerSample {
    public static void main(String[] args) {
        Map<String, Object> props = new HashMap<>();
        // Kafka集羣,多臺服務器地址之間用逗號隔開
        props.put("bootstrap.servers", "localhost:9092");
        // 消息的序列化類型
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        // 消息的反序列化類型
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        // zookeeper集羣地址,提供了基於Zookeeper的集羣服務器自動感知功能,能夠動態從Zookeeper中讀取Kafka集羣配置信息
        props.put("zk.connect", "127.0.0.1:2181");
        String topic = "test-topic";
        Producer<String, String> producer = new KafkaProducer<String, String>(props);
        // 發送消息
        producer.send(new ProducerRecord<String, String>(topic, "idea-key2", "javaMesage1"));
        producer.close();
    }
}

ProducerRecord構造:

public ProducerRecord(String topic, Integer partition, K key, V value) {
        this(topic, partition, (Long)null, key, value, (Iterable)null);
    }

    public ProducerRecord(String topic, K key, V value) {
        this(topic, (Integer)null, (Long)null, key, value, (Iterable)null);
    }

    public ProducerRecord(String topic, V value) {
        this(topic, (Integer)null, (Long)null, (Object)null, value, (Iterable)null);
    }

topic和value是必填的,若是指定了partition,那麼消息會被髮送至指定的partition;若是沒有指定partition但指定了key,那麼消息會按照hash(key)發送至指定的partition;若是既沒有指定partition,也沒有指定key,那麼消息會按照round-robin模式發送至每個partition。

消息消費者:

package com.yang.spbo.kafka;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;

/**
 * Kafka消息消費者
 * 〈功能詳細描述〉
 *
 * @author 17090889
 * @see [相關類/方法](可選)
 * @since [產品/模塊版本] (可選)
 */
public class ConsumerSample {
    public static void main(String[] args) {
        String topic = "test-topic";
        Properties props = new Properties();
        // Kafka集羣,多臺服務器地址之間用逗號隔開
        props.put("bootstrap.servers", "localhost:9092");
        // 消費組ID
        props.put("group.id", "test_group1");
        // Consumer的offset是否自動提交
        props.put("enable.auto.commit", "true");
        // 自動提交offset到zk的時間間隔,時間單位是毫秒
        props.put("auto.commit.interval.ms", "1000");
        // 消息的反序列化類型
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        Consumer<String, String> consumer = new KafkaConsumer<String, String>(props);
        // 訂閱的話題
        consumer.subscribe(Arrays.asList(topic));
        // Consumer調用poll方法來輪詢Kafka集羣的消息,一直等到Kafka集羣中沒有消息或者達到超時時間100ms爲止
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord record : records) {
                System.out.println(record.partition() + record.offset());
                System.out.println(record.key());
                System.out.println(record.value());
            }
        }
    }
}

 

Kafka應用:

  一、用戶行爲數據採集

  二、基於Kafka的日誌收集:

    集羣方式部署的應用,日誌文件有多個存放地址,須要快速定位日誌問題就比較繁瑣,那麼就須要一個統一的日誌平臺來管理項目中產生的日誌文件。

    各個應用系統在輸出日誌時利用擁有高吞吐量的Kafka做爲數據緩衝平臺,將日誌統一輸出到Kafka,再經過Kafka以統一接口服務的方式開放給消費者。

    如今不少公司作的統一日誌平臺就是收集重要的系統日誌幾種到Kafka中,而後再導入Elasticsearch、HDFS、Storm等具體日誌數據的消費者中,用於進行實時搜索分析、離線統計、數據備份、大數據分析等。引入log4j和Kafka的集成包kafka-log4j-appender,在日誌配置文件中配置kafka信息便可

  三、基於Kafka的流量削峯

    好比秒殺帶來的流量高峯的場景,爲了保證系統高可用,加入消息隊列做爲信息流的緩衝,從而緩解短期內產生的高流量帶來的壓垮整個應用的問題,這就叫流量削峯。

    好比:秒殺場景,將商品的基本信息和庫存使用緩存預熱保存在Redis中,而後從Redis中讀取,系統後臺收到秒殺下單請求先從Redis中預減庫存,若是庫存

  不足,返回秒殺失敗;若是庫存充足,則將請求的業務數據放入消息隊列Kafka中排隊,以後請求當即返回頁面。消息隊列的消費者在收到消息後取得業務數據,

  執行後續的生成訂單、扣減數據庫和寫消息操做。

 

Kafka分區:

  在使用Kafka做爲消息隊列時,無論是發佈仍是訂閱都須要指定主題topic,在這裏的主題是一個邏輯上的概念,實際上Kafka的基本存儲單元是分區Partition,在一個Topic中會有一個或多個Partition,不一樣的Partition可位於不一樣的服務器節點上,物理上一個Partition對應一個文件夾。(分區是Topic私有的,全部的Topic之間不共享分區)

  站在生產者和Broker的角度,對不一樣Partition的寫操做時徹底並行的,但對消費者而言,其併發數則取決於Partition的數量。

  因此在實際的項目中須要配置合適的Partition數量,而這個數值須要根據所設計的系統吞吐量來推算。假設p是生產者寫入單個Partition的最大吞吐量,c表示消費者從單個Partitin消費的最大吞吐量,系統須要的目標吞吐量爲t,那麼Partition的數量應取t/p和t/c之間的大者。並且Partition的值要大於或等於消費組中消費者的數量。

 

Kafka集羣複製(1.0保證消息不丟失的策略)

  Kafka使用了zookeeper實現了去中心化的集羣功能,簡單地講,其運行機制是利用zookeeper維護集羣成員的信息,每一個Broker實例都會被設置一個惟一的標識符,Broker在啓動時會經過建立臨時節點的方式把本身的惟一標識符註冊到zookeeper中,Kafka中的其餘組件會監視Zookeeper裏的/broker/ids路徑,因此當集羣中有Broker加入或退出時其餘組件就會收到通知。

  雖然Kafka有集羣功能,可是在0.8版本以前一直存在一個嚴重的問題,就是一旦某個Broker宕機,該Broker上的全部Partition數據就不能被消費了,生產者也不能把數據存放在這些Partition中了,顯然不知足高可用設計。

  爲了讓Kafka集羣中某些節點不能繼續提供服務的狀況下,集羣對外總體依然可用,即生產者可繼續發送消息,消費者可繼續消費消息,因此須要提供一種集羣間數據的複製機制。在Kafka中是經過使用Zookeeper提供的leader選舉方式來實現數據複製方案的,其基本原理是:首先在Kafka集羣中的全部節點中選舉出一個leader,其餘副本做爲follower,全部的寫操做都先發給leader,而後再由leader把消息發給follower

  複製方案使Kafka集羣能夠在部分節點不可用的狀況下還能保證Kafka的總體可用性。Kafka中的複製操做也是針對分區的。一個分區有多個副本,副本被保存在Broker上,每一個Broker均可以保存上千個屬於不一樣主題和分區的副本。副本有兩種類型:leader副本(每一個分區都會有)和follower副本(除了leader副本以外的其餘副本)。爲了保證一致性,全部的生產者和消費者的請求都會通過leader。而follower不處理客戶端的請求,它的職責是從leader處複製消息數據,使本身和leader的狀態保持一致,若是leader節點宕機,那麼某個follower就會被選爲leader繼續對外提供服務

 

  Kafka保證消息不丟失的方案

1、消息發送

  一、消息發送確認:消息數據是存儲在分區中的,而分區又可能有多個副本,因此一條消息被髮送到Broker以後什麼時候算投遞成功呢?Kafka提供了三種模式:

    1):不等Broker確認,消息被髮送出去就認爲是成功的。這種方式延遲最小,可是不能保證消息已經被成功投遞到Broker

    2):由leader確認,當leader確認接收到消息就認爲投遞是成功的,而後由其餘副本經過異步方式拉取

    3):由全部的leader和follower都確認接收到消息才認爲是成功的。採用這種方式投遞的可靠性最高,但相對會損傷性能

    // 生產者消息發送確認模式,0表示第一種,1表示第二種,all表示第三種
        props.put("acks", "1");

  二、消息重發:Kafka爲了高可用性,生產者提供了自動重試機制。當從Broker接收到的是臨時可恢復的異常時,生產者會向Broker重發消息,但不能無限

  次重發,若是重發次數達到閥值,生產者將再也不重試並返回錯誤。

     // 消息發送重試次數
        props.put("retries", "10");
        // 重試間隔時間,默認100ms,設置時須要知道節點恢復所用的時間,要設置的比節點恢復所用時間長
        props.put("retry.backoff.ms", "1000");

 

2、消息消費

  從設計上來講,因爲Kafka服務端並不保存消息的狀態,因此在消費消息時就須要消費者本身去作不少事情,消費者每次調用poll方法時,該方法老是返回

由生產者寫入Kafka中但尚未被消費者消費的消息。Kafka在設計上有一個不一樣於其餘JMS隊列的地方是生產者的消息並不須要消費者確認,而消息在分區中

又是順序排列的,那麼必然就能夠經過一個偏移量offset來肯定每一條消息的位置,偏移量在消息消費的過程當中起着很重要的做用。

  更新分區當前位置的操做叫作提交偏移量,Kafka中有個叫作_consumer_offset的特殊主題用來保存消息在每一個分區的偏移量,消費者每次消費時都會往

這個主題中發送消息,消息包含每一個分區的偏移量。若是消費者崩潰或者有新的消費者加入消費組從而觸發再均衡操做,再均衡以後該分區的消費者若不是以前

那個,那麼新的消費者如何得知該分區的消息已經被以前的消費者消費到哪一個位置了呢?這種狀況下,就提現了偏移量的用處。爲了能繼續以前的工做,新的消

費者須要讀取每一個分區最後一次提交的偏移量,而後再從偏移量開始繼續往下消費消息。

偏移量提交方式:

  一、自動提交

  Kafka默認會按期自動提交偏移量,提交的默認時間間隔是5000ms,但可能存在提交不及時致使再均衡以後重複消費的狀況

        // Consumer的offset是否自動提交
        props.put("enable.auto.commit", "true");
        // 自動提交offset到zk的時間間隔,時間單位是毫秒
        props.put("auto.commit.interval.ms", "1000");

  二、手動提交

  先關閉消費者的自動提交配置,而後使用commitSync方法提交偏移量。

    // 關閉自動提交
        props.put("enable.auto.commit", "false");
    // Consumer調用poll方法來輪詢Kafka集羣的消息,一直等到Kafka集羣中沒有消息或者達到超時時間100ms爲止
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord record : records) {
                System.out.println(record.partition() + record.offset());
                System.out.println(record.key());
                System.out.println(record.value());
            }
            // 手動提交最新的偏移量
            consumer.commitSync();
        }

commitSync方法會提交由poll返回的最新偏移量,因此在處理完記錄後要確保調用了commitSync方法,不然仍是會發生重複處理的問題。

 

  三、異步提交

  使用commitSync方法提交偏移量有一個不足之處,就是該方法在Broker對提交請求作出迴應前是阻塞的,要等待迴應。所以,採用這種方式每提交一次偏移量就

等待一次限制了消費端的吞吐量,所以Kafka提供了異步提交的方式【consumer.commitAsync();】,消費者只管發送提交請求,而不須要等待Broker的當即迴應。

但commitSync方法在成功提交以前如碰到沒法恢復的錯誤以前會一直重試,而commitAsync並不會,由於爲了不異步提交的偏移量被覆蓋。

 

  Kafka高吞吐量的緣由?

  1)、順序讀寫

    Kafka的消息是不斷追加到文件中的,這個特性使Kafka能夠充分利用磁盤的順序讀寫性能。順序讀寫不須要磁盤磁頭的尋道時間,只需不多的扇區

    旋轉時間,因此速度遠快於隨機讀寫

  2)、零拷貝

    在Linux Kernel2.2以後出現了一種叫作「零拷貝(zero-copy)」系統調用機制,就是跳過「用戶緩衝區」的拷貝,創建一個磁盤空間和內存的直接映射,

    數據再也不復制到「用戶緩衝區」

  3)、分區

    kafka中的topic中的內容能夠分在多個分區(partition)存儲,每一個partition又分爲多個段segment,因此每次操做都是針對一小部分作操做,很輕便,

    而且增長並行操做的能力

  4)、批量發送

    Kafka容許進行批量發送消息,Productor發送消息的時候,能夠將消息緩存在本地,等到了固定條件發送到kafka

    (1):等消息條數到固定條數

    (2):一段時間發送一次

  5)、數據壓縮

    Kafka還支持對消息集合進行壓縮,Producer能夠經過GZIP或Snappy格式對消息集合進行壓縮,壓縮的好處就是減小傳輸的數據量,減輕

    對網絡傳輸的壓力。

    批量發送和數據壓縮一塊兒使用,單條作數據壓縮的話,效果不太明顯。

相關文章
相關標籤/搜索