Kafka簡介及安裝配置

Kafka簡介及安裝配置

1、簡介

    Kafka是分佈式發佈-訂閱消息系統。它最初由LinkedIn公司開發,使用Scala語言編寫,以後成爲Apache項目的一部分,目前是Apache的一個頂級項目。java

    Kafka是一個分佈式的、可分區的、可複製的消息系統。它提供了普通消息系統的功能,但具備本身獨特的設計。bash

    它提供了相似於JMS(Java消息隊列規範)的特性,可是在設計實現上徹底不一樣,此外它並非JMS規範的實現。服務器

    kafka對消息保存時根據Topic進行歸類,發送消息者稱爲Producer,消息接受者稱爲Consumer,此外kafka集羣由多個kafka實例組成,每一個實例(server)稱爲broker。網絡

    不管是kafka集羣,仍是producer和consumer都依賴於zookeeper來保證系統可用性集羣保存一些meta信息。session

    總結:併發

    Kafka是分佈式消息隊列,按topic分類存放數據,擁有Producer、Consumer、Broker三個角色,使用zookeeper作爲集羣的協調工具。負載均衡

一、kafka的特色

1.高吞吐量

    理論上Kafka每秒能夠生產約25萬消息(50MB),每秒處理55萬消息(110MB),生產環境中在這個速度上下浮動,這個速度就至關於硬盤IO的速度。異步

2.持久化數據存儲

    可進行持久化操做。將消息持久化到磁盤,所以可用於批量消費,例如ETL,以及實時應用程序。經過將數據持久化到硬盤以及replication防止數據丟失。分佈式

3.分佈式系統易於擴展

    全部的producer、broker和consumer均可以有多個,均爲分佈式的。無需停機便可擴展機器。函數

4.客戶端維護狀態

    消息被處理的狀態是在consumer端維護,而不是由server端維護。當失敗時能自動平衡。

2、基本概念

    Kafka將消息以topic爲單位進行概括。topic之間的數據是相互隔離的。

    將向Kafka topic發佈消息的程序稱爲producers。

    將預訂topics並消費消息的程序稱爲consumer。

    Kafka以集羣的方式運行,能夠由一個或多個服務組成,每一個服務叫作一個broker。

    producers經過網絡將消息發送到Kafka集羣,集羣向消費者提供消息。

    客戶端和服務端經過TCP協議通訊。Kafka提供了Java客戶端,而且對多種語言都提供了支持。

一、Topic

一個topic是對一組消息的概括。

1.分區

Kafka針對對每一個topic的日誌進行了分區。

kafka中的分區是負載均衡和失敗恢復的基本單位。

 

2.offset

    每一個分區都由一系列有序的、不可變的消息組成,這些消息被連續的追加到分區中。分區中的每一個消息都有一個連續的序列號叫作offset,用來在分區中惟一標識一個消息。在一個可配置的時間段內,Kafka集羣保留全部發布的消息,無論這些消息有沒有被消費。

    好比,若是消息的保存策略被設置爲2天,那麼在一個消息被髮布的兩天時間內,它都是能夠被消費的。過時以後它將被丟棄以釋放空間。Kafka的性能是和數據量無關的常量級的,因此保留太多的數據並非問題。

3.分區副本

    每一個分區在Kafka集羣的若干服務中都有副本,這樣這些持有副本的服務能夠共同處理數據和請求,副本數量是能夠配置的。副本使Kafka具有了容錯能力。

    每一個分區都由一個服務器做爲「leader」,零或若干服務器做爲「followers」,leader負責處理消息的讀和寫,followers則去同步leader的數據,並對外提供讀操做。若是leader down了,followers中的一臺則會自動成爲leader。

    集羣中的每一個服務器都會同時扮演兩個角色:做爲它所持有的一部分分區的leader,同時做爲其餘分區的followers,這樣集羣就據有較好的負載均衡。

    將日誌分區能夠達到如下目的:

    首先這使得每一個日誌的數量不會太大,能夠在單個服務上保存。另外每一個分區能夠單獨發佈和消費,爲併發操做topic提供了一種可能。

    分區是負載均衡失敗恢復分佈式數據存儲的基本單元。

二、Producers

    Producer將消息發佈到它指定的topic中,並負責決定發佈到哪一個分區。一般簡單的由負載均衡機制隨機選擇分區,但也能夠經過特定的分區函數選擇分區。使用的更多的是第二種。

三、Consumers

    實際上每一個consumer惟一須要維護的數據是消息在日誌中的位置,也就是offset。這個offset由consumer來維護,通常狀況下隨着consumer不斷的讀取消息,這offset的值會不斷增長,但其實consumer能夠以任意的順序讀取消息,好比它能夠將offset設置成爲一箇舊的值來重讀以前的消息。

    以上特色的結合,使Kafka consumers很是的輕量級,它們能夠在不對集羣和其餘consumer形成影響的狀況下讀取消息。你可使用命令行來"tail"消息而不會對其餘正在消費消息的consumer形成影響。

    消費消息一般有兩種模式:隊列模式(queuing)和發佈-訂閱模式(publish-subscribe)。

1.隊列模式

    隊列模式中,多個consumers能夠同時從服務端讀取消息,每一個消息只被其中一個consumer讀到。

    通俗的來說,consumers之間是競爭關係,都在從borker中搶數據,而這個數據只有一份,誰搶到就是誰的。

2.發佈訂閱模式

    發佈-訂閱模式中消息被廣播到全部的consumer中。這種模式之下,每一個consumers都能獲得相同的消息數據。

3.consumer group

    Consumers能夠加入一個consumer group,此組就是用來實現以上兩種模式的。

1>組內

    若是全部的consumer都在一個組中,這就成爲了傳統的隊列模式,在各consumer中實現負載均衡。

    組內的Consumer是處在隊列模式下,共同競爭一個topic內的消息,topic中的消息將被分發到組中的一個成員中,同一條消息只發往其中的一個消費者。同一組中的consumer能夠在不一樣的程序中,也能夠在不一樣的機器上。

2>組間

    若是全部的consumer都不在不一樣的組中,這就成爲了發佈-訂閱模式,全部的消息都被分發到全部的consumer中。

    而若是有多個Consumer group來消費相同的Topic中的消息,則組和組之間使用的就是發佈訂閱模式,是一個共享數據的狀態。每個組均可以獲取到這個主題中的全部消息。

3>應用

    常見的應用方式是,每一個topic都有若干數量的consumer組來消費,每一個組都是一個邏輯上的「訂閱者」,爲了容錯和更好的穩定性,每一個組都由若干consumer組成,在組內競爭實現負載均衡。實現了組內競爭負載均衡,組間共享互不影響,這其實就是一個發佈-訂閱模式,只不過訂閱者是個組而不是單個consumer。 

四、與傳統消息隊列對比

    相比傳統的消息系統,Kafka能夠很好的保證有序性。

    傳統的隊列在服務器上保存有序的消息,若是多個consumers同時從這個服務器消費消息,服務器就會以消息存儲的順序向consumer分發消息。雖然服務器按順序發佈消息,可是消息是被異步的分發到各consumer上,因此當消息到達時可能已經失去了原來的順序,這意味着併發消費將致使順序錯亂。爲了不故障,這樣的消息系統一般使用「專用consumer」的概念,其實就是隻容許一個消費者消費消息,固然這就意味着失去了併發性。

    在這方面Kafka作的更好,經過分區的概念,Kafka能夠在多個consumer組併發的狀況下提供較好的有序性和負載均衡。將每一個分區分只分發給一個consumer組,這樣一個分區就只被這個組的一個consumer消費,就能夠順序的消費這個分區的消息。由於有多個分區,依然能夠在多個consumer組之間進行負載均衡。注意consumer組的數量不能多於分區的數量,也就是有多少分區就容許多少併發消費。

    Kafka只能保證一個分區以內消息的有序性,在不一樣的分區之間是不能夠的,這已經能夠知足大部分應用的需求。若是須要topic中全部消息的有序性,那就只能讓這個topic只有一個分區,固然也就只有一個consumer組消費它。

五、選擇Kafka的理由

    爲何大數據環境下的消息隊列常選擇kafka?

    分佈式存儲數據,提供了更好的性能、可靠性、可擴展能力。

    利用磁盤存儲數據,且按照主題、分區來分佈式存放數據,持久化存儲,提供海量數據存儲能力。

    採用磁盤存儲數據,連續進行讀寫保證性能,性能和磁盤的性能相關和數據量的大小無關。

3、安裝配置

一、下載安裝

    下載Kafka安裝包,上傳到Linux服務器。

    解壓:

tar -zxvf kafka_2.9.2-0.8.1.1.tgz

    解壓完成就至關於安裝完畢,不過還要進行響應的配置。

二、配置

1.僞分佈式

1>server.properties

    修改server.properties文件。

log.dirs=/tmp/kafka-logs

    此選項配置的是Kafka的數據存儲位置,須要更改。

2>zookeeper.properties

    修改zookeeper.properties配置文件。此文件是Kafka內置的Zookeeper的配置文件。Kafka爲了保證軟件的獨立性,本身內置了一個Zookeeper,因此使用爲分佈式的狀況下,不用專門安裝Zookeeper。

    以下項配置的是zookeeper的數據存儲位置,默認在/tmp中,須要修改。

dataDir=/tmp/zookeeper

3>啓動kafka

    啓動zookeeper:

bin/zookeeper-server-start.sh config/zookeeper.properties &

啓動kafka:

bin/kafka-server-start.sh config/server.properties

2.徹底分佈式

1>server.properties

    在config目錄下,修改server.properties,在文件中修改以下參數:

broker.id=0 #當前server編號
port=9092 #使用的端口
log.dirs=/tmp/kafka-logs-1 #日誌存儲目錄
zookeeper.connect=yun01:2181

    集羣中broker.id要具備惟一性。

    日誌存放目錄須要更改成規劃目錄,不能使用默認的/tmp目錄。

    Zookeeper須要配置Zookeeper集羣中全部服務器的ip或主機名:端口。

2>啓動

啓動zookeeper

    在各個機器上執行以下啓動命令:

zkServer.sh start

啓動kafka

bin/kafka-server-start.sh ../config/server.properties &

    Kafka服務默認啓動會佔用控制檯,因此能夠後臺運行。

三、測試

1.建立topic

    建立一個擁有3個副本的topic:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic topicname

2.查看

1>查看主題

bin/kafka-topics.sh --list --zookeeper localhost:2181

2>查看節點

    查看每一個節點的信息:

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic topicname

3.生產消息

    向topic發送消息:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topicname

4.消費消息

    消費消息:

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic topicname

5.實驗:容錯性

    製造宕機,查看Kafka的容錯性。

kill -9 7564
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic topicname
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic topicname

    啓動kafka:

    啓動zookeeper:

zkServer.sh start

啓動kafka:

bin/kafka-server-start.sh config/server.properties

4、使用kafka

一、sell操做

1.建立topic

bin/kafka-topics.sh --create --zookeeper localhost:9092 --replication-factor 1 --partitions 1 --topic test

2.查看topic

bin/kafka-topics.sh --list --zookeeper localhost:2181

3.生產消息

使用命令行producer從文件中或者從標準輸入中讀取消息併發送到服務端:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

4.消費消息

啓動命令行consumer讀取消息並輸出到標準輸出:

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

二、JavaAPI操做

1.搭建開發環境

    建立java工程,導入kafka相關包,jar包存在於Kafka安裝包的libs目錄中,拷貝時注意,裏面不僅有jar包,還含有其餘類型的文件,只拷貝jar包便可。

2.代碼實現

1>消費者

/**
接收數據
*/
@Test
public void ConsumerReceive() throws Exception{
  Properties properties = new Properties();  
  properties.put("zookeeper.connect", "yun01:2181,yun02:2181,yun03:2181");//聲明zk  
  // 必需要使用別的組名稱, 若是生產者和消費者都在同一組,則不能訪問同一組內的topic數據  
  properties.put("group.id", "group2xx");
  properties.put("auto.offset.reset", "smallest");
  // properties.put("zookeeper.session.timeout.ms", "400");
  // properties.put("zookeeper.sync.time.ms", "200");
  // properties.put("auto.commit.interval.ms", "1000");
  // properties.put("serializer.class", "kafka.serializer.StringEncoder");
  ConsumerConnector consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));  
  Map<String, Integer> topicCountMap = new HashMap<String, Integer>();  
  topicCountMap.put("my-replicated-topic", 1); // 一次從主題中獲取一個數據  
  Map<String, List<KafkaStream<byte[], byte[]>>>  messageStreams = consumer.createMessageStreams(topicCountMap);  
  // 獲取每次接收到的這個數據 
  KafkaStream<byte[], byte[]> stream = messageStreams.get("my-replicated-topic").get(0); 
  ConsumerIterator<byte[], byte[]> iterator =  stream.iterator();  
  while(iterator.hasNext()){
    System.out.println("receive:" + new String(iterator.next().message()));
  }
}

2>生產者

/**
發送數據
*/
@Test
public void ProducerSend(){
  Properties props = new Properties();
  props.put("serializer.class", "kafka.serializer.StringEncoder");
  props.put("metadata.broker.list", "192.168.242.101:9092");
  Producer<Integer, String> producer = new Producer<Integer, String>(new ProducerConfig(props ));
  producer.send(new KeyedMessage<Integer, String>("my-replicated-topic","message~xxx123asdf"));
  producer.close();
}

下一篇:Kafka的存儲機制以及可靠性

相關文章
相關標籤/搜索