Kafka是一個分佈式的、可分區的、可複製的、基於發佈/訂閱的消息系統,Kafka主要用於大數據領域,固然在分佈式系統中也有應用。目前市面上流行的消息隊列RocketMQ就是阿里借鑑Kafka的原理、用Java開發而得。apache
Kafka適合離線和在線的消息消費,其消息保存在磁盤上。bootstrap
Kafka以Topic爲單位進行消息的概括,Producers向Topic發送(Push)消息,Consumers會消費(Pull)預訂了Topic的消息。windows
消息隊列中的基本概念尤其重要,當對基本概念有了深刻的理解以後,消息隊列的原理以及常見的問題都將更淺顯明瞭。數組
Broker:一個單獨的Kafka server就是一個Broker,Broker的主要工做就是接收生產者發送來的消息,分配offset,而後將包裝過的數據保存到磁盤上;此外,Broker還會接收消費者和其餘Broker的請求,根據請求的類型進行相應的處理而後返回響應。多個Broker能夠作成一個Cluster(集羣)對外提供服務,每一個Cluster當中會選出一個Broker來擔任Controller,Controller是Kafka集羣的指揮中心,其餘的Broker則遵從Controller指揮實現相應的功能。Controller負責管理分區的狀態、管理每一個分區的副本狀態、監聽zookeeper中數據的變化等。Controller也是一主多從的實現,全部的Broker都會監聽Controller Leader的狀態,當Leader Controller出現了故障的時候就從新選舉新的Controller Leader。服務器
消息:消息是Kafka中最基本的消息單元。消息由一串字節組成,其中主要由key和value構成,key和value都是字節數組。key的主要做用是根據必定的策略,將這個消息路由到指定的分區中,這樣就能夠保證包含同一個key的消息所有寫入同一個分區session
Topic:Topic是用於存儲消息的邏輯概念,Topic能夠看作是一個消息的集合。每一個Topic能夠有多個生產者向其中push消息,也能夠有多個消費者向其中pull消息。異步
分區(partition):每個Topic均可以劃分紅多個分區(每個Topic都至少有一個分區),不一樣的分區會分配在不一樣的Broker上以對Kafka進行水平擴展從而增長Kafka的並行處理能力。同一個Topic下的不一樣分區包含的消息是不一樣的。每個消息在被添加到分區的時候,都會被分配一個offset,他是消息在此分區中的惟一編號,此外,Kafka經過offset保證消息在分區中的順序,offset的順序性不跨分區,也就是說在Kafka的同一個分區中的消息是有序的,不一樣分區的消息可能不是有序的。 Partitions概念圖分佈式
Log:分區在邏輯上對應着一個Log,當生產者將消息寫入分區的時候,實際上就是寫入到了一個Log中。Log是一個邏輯概念,對應的是一個磁盤上的文件夾。Log由多個Segment組成,每個Segment又對應着一個日誌文件和一個索引文件。ide
副本:Kafka對消息進行了冗餘備份,每個分區均可以有多個副本,每個副本中包含的消息是相同的(但不保證同一時刻下徹底相同)。副本的類型分爲Leader和Follower,當分區只有一個副本的時候,該副本屬於Leader,沒有 Follower。Kafka的副本具備必定的同步機制,在每一個副本集合中,都會選舉出一個副本做爲Leader副本,Kafka在不一樣的場景中會採用不一樣的選舉策略。Kafka中全部的讀寫請求都由選舉出的Leader副本處理,其餘的都做爲Follower副本,Follower副本僅僅是從Leader副本中把數據拉取到本地以後,同步更新到本身的Log中。大數據
分區副本:
生產者:生產者主要是生產消息,並將消息按照必定的規則推送到Topic的分區中
消費者:消費者主要是從Topic中拉取消息,並對消息進行消費。Consumer維護消費者消費者消費到Partition的哪個位置(offset的值)這一信息。**在Kafka中,多個Consumer能夠組成一個Consumer Group,一個Consumer只能屬於一個Consumer Group。Consumer Group保證其訂閱的Topic中每個分區只被分配給此Consumer Group中的一個消費者處理,因此若是須要實現消息的廣播消費,則將消費者放在多個不一樣的Consumer Group中便可實現。**經過向Consumer Group中動態的添加適量的Consumer,能夠出發Kafka的Rebalance操做從新分配分區與消費者的對應關係,從而實現了水平擴展的能力。
ISR集合:ISR集合表示的是目前可用(alive)且消息量與Leader相差很少的副本集合,即整個副本集合的子集。ISR集合中副本所在的節點都與ZK保持着鏈接,此外,副本的最後一條消息的offset與Leader副本的最後一條消息的offset之間的差值不能超出指定的閾值。每個分區的Leader副本都維護此分區的ISR集合。如上面所述,Leader副本進行了消息的寫請求,Follower副本會從Leader上拉取寫入的消息,第二個過程當中會存在Follower副本中的消息數量少於Leader副本的狀態,只要差值少於指定的閾值,那麼此時的副本集合就是ISR集合。
這裏介紹的是單實例的Kafka的啓動安裝:
kafka_2.11-1.0.0.tgz
,而後解壓./bin/windows/zookeeper-server-start.bat ./conf/zookeeper.properties
./bin/windows/kafka-server-start.bat ./conf/server.properties
kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic demo
kafka-topics.bat --list --zookeeper localhost:2181
kafka-console-producer.bat --broker-list localhost:9092 --topic demo
,而後在命令終端鍵入消息Hello World!
kafka-console-consumer.bat --zookeeper localhost:2181 --topic demo --from-beginning
/**
* @Name: ProducerDemo
* @Description: Kafka服務端進行消息的Push
* @Author: BeautifulSoup
* @Date: 2018年2月1日 下午11:24:39
*/
public class ProducerDemo {
public static void main(String[] args) {
//構造Kafka的配置項
Properties properties=new Properties();
//定義Kafka服務端的主機名和端口號
properties.put("bootstrap.servers", "localhost:9092");
//定義客戶端的ID
properties.put("client.id", "DemoProducer");
//定義消息的key和value的數據類型都是字節數組
properties.put("key.serializer","org.apache.kafka.common.serialization.IntegerSerializer");
properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
//建立生產者的核心類
KafkaProducer producer=new KafkaProducer<>(properties);
//指定topic的名稱
String topic = "demo";
//定義消息的key
int messageNo=1;
while(true){
//定義消息的value
String messageStr="Message_"+messageNo;
long startTime=System.currentTimeMillis();
//異步的發送消息
producer.send(new ProducerRecord<>(topic, messageNo,messageStr,new Callback() {
//消息發送成功以後收到了Kafka服務端發來的ACK確認消息以後,就回調下面的方法
//metadata保存着生產者發送過來的消息的元數據,若是消息的發送過程當中出現了異常,則改參數的值爲null
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
long elapsedTime=System.currentTimeMillis()-startTime;
if(null!=metadata){
System.out.println("消息發送給的分區是:"+metadata.partition()+",消息的發送一共用了:"+elapsedTime+"ms");
}else{
exception.printStackTrace();
}
}
}));
}
}
}
/**
* @Name: ConsumerDemo
* @Description: Kafka客戶端進行消息的Pull
* @Author: BeautifulSoup
* @Date: 2018年2月10日 下午11:24:58
*/
public class ConsumerDemo {
public static void main(String[] args) {
Properties properties=new Properties();
properties.put("bootstrap.servers","localhost:9092");
//指定Consumer Group的id
properties.put("group.id", "BeautifulSoup");
//自動提交offset
properties.put("enable.auto.commit", "true");
//自動提交offset的時間間隔
properties.put("auto.commit.interval.ms","1000");
properties.put("session.timeout.ms", "30000");
properties.put("key.deserializer","org.apache.kafka.common.serialization.IntegerDeserializer");
properties.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer consumer=new KafkaConsumer<>(properties);
//指定消費者訂閱的topic
consumer.subscribe(Arrays.asList("demo","test"));
try{
while(true){
//從服務端開始拉取消息,每次的poll都會拉取多個消息
ConsumerRecords<String, String> records=consumer.poll(100);
for (ConsumerRecord<String,String> consumerRecord : records) {
System.out.println("消息記錄的位置:"+consumerRecord.offset()+",消息的鍵:"+consumerRecord.key()+",消息的值:"+consumerRecord.value());
}
}
}finally{
//關閉consumer
consumer.close();
}
}
}
複製代碼