Kafka分佈式消息系統

1.簡介

 

Kafka是一個分佈式消息系統,由LinkedIn使用Scala語言編寫的,具備高水平擴展和高吞吐量 (每秒處理任務數)spring

 

目前流行的消息隊列主要有三種:ActiveMQ、RabbitMQ、Kafkaapache

 

 

*其中ActiveMQ、RabbitMQ均支持AMQP協議,Kafka有其本身的協議 (仿AMQP,並不通用),但目前愈來愈多的開源分佈式處理系統如Flume(日誌收集系統)、Storm(實時數據處理系統)、Spark(大數據通用處理平臺)、Elasticsearch(全文檢索系統)都支持與Kafka的集成。bootstrap

*動態擴容:在不需中止服務的前提下動態的增長或減小消息隊列服務器,Kafka的動態擴容是經過zookeeper實現的,zookeeper上保存着kafka的相關狀態信息 (topic、partition等)緩存

 

Kafka使用場景:

1.網站活動追蹤:用戶的活動追蹤、搜索、瀏覽、點擊率等,將操做信息發佈到不一樣的主題中,可對數據實時監控,統計分析用戶行爲。安全

2.日誌聚合:做爲一種日誌聚合的解決方案( 日誌聚合的做用在於能夠把來自不一樣服務器上不一樣應用程序產生的日誌聚合起來,存放在單一的服務器上,方便進行搜索和分析)服務器

3.數據集中管理:分佈式應用產生的數據統一存放在kafka集羣中,集中式管理,供其餘程序使用或後續對數據統計分析。session

 
*對於不影響主流程的業務,而且系統對延時性有高要求的,均可以直接使用Kafka,將不一樣的任務類型放入Kafka不一樣的Topic中,使用多個Consumer去進行消費。

 

 
 

 

2.AMQP

 

AMQP(Advanced Message Queuing Protocol),高級消息隊列協議是一個統一消息服務的應用層協議,爲面向消息的中間件所設計,基於此協議的客戶端與消息中間件可相互傳遞消息,並不受客戶端和中間件的產品以及開發語言不一樣所限制。app

 

AMQP協議模型

 

 

 

生產者(Producer):往消息隊列中發送消息的應用程序負載均衡

消費者(Consumer): 從消息隊列中獲取消息的應用程序異步

AMQP服務器(Broker):用來接收生產者發送的消息並將這些消息路由給服務器中的隊列

 

*消息隊列以broker爲最小的運行單元,一個broker的運行就表明着一個Kafka應用程序實例。

*Kafka客戶端支持的語言:C、C++、Erlang、Java、.net、perl、PHP、Python、Ruby、Go、Javascript,可使用任何一種語言和Kafka服務器進行通訊,編寫本身的生產者與消費者客戶端程序。

 

 

3.Kafka的組件

 

1.核心組件Broker

 

 

*broker中能夠包含多個主題,每一個主題中又能夠包含多個分區。

 

主題(topic)

一個主題相似新聞中的體育、娛樂、教育等分類概念,在實際工程中一般一個業務一個主題。

 

分區(partition)

一個topic中由一到多個分區組成,分區是Kafka結構的最小單元,一個分區就是一個FIFO(First In First Out)的隊列,用於存放topic中的消息。

*Kafka的分區是提升Kafka性能的關鍵手段,當Kafka集羣的性能不高時,能夠試着往topic中添加分區。

 

Kafka的分區模型

 

 

*可見每一個分區都是一個先進先出的隊列,producer往broker中的指定topic發送消息,消息將經過負載均衡策略進入到相應的partition中。

 

 

 

 

*每一個消息都有一個連續的序列號叫作offset(偏移量),是消息在分區中的惟一標識。

*每一個consumer都須要維護一個當前已消費消息的偏移量,相似於指針,隨着consumer不斷的讀取消息,消費者的offset值也會不斷的增長,consumer也能夠以任意的順序讀取消息,只須要設置偏移量便可。

*每一個consumer讀取的偏移量都會同步給Kafka,在Kafka集羣中同時會維護各個consumer消費的偏移量(消息在分區中的偏移量是固定的,Consumer的偏移量是動態可變的,其至關於讀取的指針)

*在一個可配置的時間段內,Kafka集羣將保留全部發布的消息,無論這些消息是否被被消費。

 

Kafka的分區備份

*每一個分區在Kafka集羣的若干服務中都有副本( 數量可配置 ),使Kafka具有了容錯能力。

*在邏輯上相關的一組分區中,都由一個服務器做爲leader,其他服務器做爲follower,leader和follwer的選舉是隨機的,當follower接收到請求首先會發送給leader,由leader負責消息的讀和寫並將消息同步給各個follower,若是Leader所在節點宕機,followers中的一臺則會自動成爲leader。

 

 

2.Producer生產者

 

Producer將消息發佈到Broker的指定Topic中,消息將根據負載均衡策略進入到相應的Partition中。

 

 

3.Consumer消費者

 

Kafka中提供了Consumer組的概念,一個Consumer組中包含若干個Consumer,總體對外可當作是一個消費者。

 

 

 
 

傳統的消息隊列模式

傳統的消息隊列能順序的保存同一個生產者發送的消息,但儘管服務器保證了消息的順序,但消息最終是經過異步的方式發送給各個消費者,當多個消費者並行消費時,並不能保證隊列中消息能按順序到達各個消費者中。
 
*Kafka採用的策略:一個topic中的各個partition只能被consumer組下的惟一一個consumer消費,用於保證消息到達的順序,所以同一個組下的consumer的數量不能超過topic中的分區數,不然其餘consumer將會處於空閒狀態。
*Topic中的Partition數量決定了Consumer組下Consumer的數量。  
 

隊列模式

若全部的消費者都在同一個consumer組中則成爲隊列模式,topic中各個分區的消息僅能被組中分區個的惟一consumer消費,組下的consumer共同競爭topic中的分區。

 

發佈-訂閱模式

若全部的消費者都不在同一個consumer組中則成爲發佈-訂閱模式,topic中各個分區的消息都會廣播給全部的consumer組。

 

 

4.Kafka的使用

 

1.安裝

 

因爲Kafka使用scala語言編寫,scale語言運行在JVM中,所以須要先安裝JDK而且配置好環境變量。

 

 

因爲Kafka中的狀態信息(topic、partition)都保存在zk上,雖然Kafka中自帶zk,但通常是使用外置的zk集羣,所以須要先安裝zk服務而且配置好zk集羣關係。

 

從Kafka官網中下載安裝包並進行解壓。

 

2.配置文件

 

 config是Kafka配置文件的存放目錄

 

server.proeperties(broker的配置文件)

 

 

*因爲多個Kafka服務(broker)都使用同一個zk集羣,所以在同一個zk集羣中的Kafka也就自動成爲集羣的關係,所以borker.id在同一個集羣中不能重複。

*Kafka中的消息是緩存到本地磁盤的(log.dirs目錄下),每一個topic的分區在broker的日誌路徑下都對應一個目錄,目錄下的.log文件用於存放分區中的消息,當有新消息進入分區時直接追加到文件中。

*若建立的topic其備份數大於1 (狀態保存在zk) ,則Kafka集羣中備份數個broker也會建立此topic,所以在其日誌路徑下也會存在此topic各個分區的目錄。

 

consumer.properties(消費者的配置文件)

 

*在使用Kafka提供的消費者腳本文件時能夠指定其使用的配置文件。

*在程序中使用時須要手動設置配置項。

 

producer.properties(生產者的配置文件)

 

*在使用Kafka提供的生產者腳本文件時能夠指定其使用的配置文件。

*在程序中使用時須要手動設置配置項。

 

 

3.啓動

 

1.啓動zk集羣

 

逐一啓動zk服務

 

 

2.啓動Kafka集羣

 

逐一啓動kafka服務並指定使用的配置文件( service.properties文件中配置使用外置的ZK集羣)

 

 

4.建立主題

 

建立名爲chat的topic,其備份數爲3而且每一個topic下存在3個分區 (因爲啓動了3個broker,所以topic的備份數最多隻能是3)

建立topic時需指定zk服務地址,zk中保存了topic的相關屬性(備份數和分區數),Kafka集羣再從zk服務中獲取topic屬性信息並在Kafka集羣中備份數個節點建立該topic。

 

 

查看各個broker中的日誌目錄,可見目錄下都生成了chat-0、chat-一、chat2分別表示chat主題中的第一個、第二個、第三個分區,每一個分區中都有.log文件存放分區中的消息。

 

 

 

查看Kafka集羣中chat主題下各個分區的狀態

 

 

Topic:主題名稱

PartitionCount:主題包含的分區數

ReplicationFactor:topic的備份數 

Partition:分區號

Leader:充當leader的broker節點

Replicas:存在備份的broker節點(無論存活)

Isr:存在備份而且存活的broker節點

 

 

5.生產者發送消息

 

往Kafka集羣中的chat主題發送消息 

*消息將會根據負載均衡機制隨機進入分區

 

 

6.消費者訂閱消息

 

 

*因爲使用腳本文件啓動消費者時沒有指定使用的配置文件,因此三個消費者都不是同一個消費者組中,所以三個消費者都可以消費到chat主題中各個分區的消息。

 

 

 

 

*啓動了三個消費者並指定使用的配置文件,默認的group.id是test-consumer-group,所以三個消費者都屬於同一個消費者組中,topic中各個分區僅能被組下的惟一一個consumer消費。

*因爲啓動第一個消費者時,消費者組下只有一個消費者,所以消息都會被此消費者消費,當往消費者組中添加新的消費者而且生產者往主題添加消息時,此時消費者會從新競爭消息。

 

 

5.Java中操做Kafka

 

1.導入依賴

<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka_2.12</artifactId>
  <version>0.11.0.1</version>
</dependency>

 

2.建立主題

ZkUtils zkUtils = ZkUtils.apply("192.168.1.80:2181,192.168.1.81:2181,192.168.1.82:2181", 30000, 30000, JaasUtils.isZkSecurityEnabled()); // 建立一個名爲chat的主題其包含2個分區,備份數是3
AdminUtils.createTopic(zkUtils, "chat", 2, 3, new Properties(), RackAwareMode.Enforced$.MODULE$); zkUtils.close();

 

3.Producer發送消息

//建立Properties對象用於封裝配置項
Properties props = new Properties(); props.put("bootstrap.servers", "192.168.1.80:9092,192.168.1.81:9092,192.168.1.82:9092"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //建立KafkaProducer生產者實例
Producer<String, String> producer = new KafkaProducer<>(props); //建立生產端消息實體ProducerRecord並指定消息上傳的topic名稱、消息的Key、消息的Value(消息由Key Value兩部分組成)
ProducerRecord<String,String> record = new ProducerRecord<String,String>("topic","key","value"); //發送消息
producer.send(record); //關閉鏈接
producer.close();

 

*KafkaProducer是線程安全的,在線程之間能夠共享單個生產者實例。
*send()方法是異步的,一旦消息被保存在待發送緩衝區中此方法就當即返回,其返回Future<RecordMetadata>實例,當調用該實例的get()方法時將會阻塞直到服務器對請求進行應答(阻塞時長跟acks配置項有關),當服務器處理異常時將拋出異常。
*消息由Key、Value兩部分組成,Key值能夠重複,使用Kafka API往topic發送消息時,默認狀況下將會根據消息的Key值進行散列來決定消息到達的分區。
*生產者的緩衝區保留還沒有發送到服務器的消息,後臺I/O線程負責將這些消息轉換成請求發送到集羣,若使用後不關閉生產者則會泄露這些資源。

 

4.consumer消費消息

//建立Properties對象用於封裝配置項
Properties props = new Properties(); props.put("bootstrap.servers", "192.168.1.80:9092,192.168.1.81:9092,192.168.1.82:9092"); props.put("group.id", "consumerA"); //自動提交Consumer的偏移量給Kafka服務
props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000");
props.put("auto.offset.reset", "earliest"); props.put(
"session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //建立KafkaConsumer消費者實例 Consumer<String, String> consumer = new KafkaConsumer<>(props);
//訂閱主題,一個消費者實例能夠訂閱多個主題 consumer.subscribe(Arrays.asList("chat", "hello")); //接收數據,消息存放在ConsumerRecords消息集合中 ConsumerRecords<String, String> records = consumer.poll(1000*5);
//遍歷消費端消息集合獲取ConsumerRecord消費端消息實體,一個消費端消息實體包含偏移量、消息Key值、消息Value值 for (ConsumerRecord<String, String> record : records){  System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); }
 
*poll(long blockTime)方法用於接收topic中的消息,當沒有消息時將會等待blockTime的時間 (單位:毫秒),執行結果需結合auto.offset.reset配置項。
*使用commitSync()方法能夠手動同步消費者的偏移量給Kafka (若設置自動提交偏移量給Kafka,當消費消息後,後續須要進行入庫或其餘操做失敗了,那麼數據將會丟失,須要從新設置偏移量去消費)
*使用seek(TopicPartition , long)方法手動設置消費者的偏移量。
 
 
 

6.Spring Kafka

 
 

1.導入依賴

<dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>0.10.2.0</version>
</dependency>
<dependency>
      <groupId>org.springframework.kafka</groupId>
      <artifactId>spring-kafka</artifactId>
      <version>1.2.0.RELEASE</version>
</dependency>
 

2.建立Kafka Consumer配置類

@Configuration
@EnableKafka
public class KafkaConsumerConfiguration {

    //Consumer配置
    private Map<String, Object> consumerProps() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "bootKafka");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }

    //Consumer工廠Bean
    @Bean
    public ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerProps());
    }
    
    //Kafka監聽器工廠Bean
    @Bean
    public ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }


}
 

3.建立Kafka Producer配置類

@Configuration
@EnableKafka
public class KafkaProducerConfiguration {

    //Producer配置
    private Map<String, Object> senderProps (){
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.RETRIES_CONFIG, 1);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 1024000);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }

    //Producer工廠Bean
    @Bean
    public ProducerFactory<Integer, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(senderProps());
    }

    //kafkaTemplate,用於向Topic發送消息
    @Bean
    public KafkaTemplate<Integer, String> kafkaTemplate() {
        KafkaTemplate template = new KafkaTemplate<Integer, String>(producerFactory());
        return template;
    }


}

 

4.Producer生產消息

@Component
public class Producer {

    @Autowired
    private KafkaTemplate kafkaTemplate;

    public void send() throws Exception {
        kafkaTemplate.send("fruit", "apple");
    }
}

 

5.Consumer消費消息

@Component
public class Consumer {

    @KafkaListener(topics = "fruit")
    public void listen(String msgData) {

    }
    
}
 
 
 

7.使用Kafka面臨的問題

 

1.Consumer端消息丟失

當consumer讀取消息後,自動提交了offset,若是後續程序處理出錯,那麼消息將會丟失。
*此時能夠經過手動提交offset的方式解決。
 
 

2.重複消費

當consumer讀取消息後,當程序處理完成後,手動提交offset,若提交offset以前程序出錯,則會致使重複消費。
*此時只須要保證冪等性便可(屢次執行的最終結果保持一致)
 
 

3.Broker端消息丟失

當partition中的Leader從新選舉時,有可能致使消息將來得及同步到其餘Follower,最終致使消息丟失。
*此時只須要設置acks=all,producer必須等待Leader將消息同步給全部Follower後再進行返回。
相關文章
相關標籤/搜索