淺談Kafka

Kafka是當今大數據生態圈中最流行的消息隊列框架,本人最近學習了下kafka;總結了一下本人的心得。java

相關概念:node

broker :至關於server節點api

topic:話題模塊,不一樣類型的消息能夠放在不一樣的話題中以示區分;框架

partition:分區,一個話題中能夠有多個partition,消息在同一個partition中是有序的;不一樣partition中是無序的;對應topic的partition 是能夠有多個副本的多個副本之間存在一個leader,其他的爲slave分佈式

productor:生產者,消息的來源;學習

comsumer:消費者,消息的使用者;comsumer能夠分紅group,每一個group中的comsumer同一時間只能消費一個partition中的信息;comsumer的信息存儲在zookeeper下;測試

1、kafka相對於其餘消息隊列的不一樣大數據

一、吞吐量大:kafka是一個分佈式集羣服務的消息隊列框架,支持多個comsumer同時消費;ui

二、信息有放回的取出:kafka中的信息消費完是不消失的,在kafka中有offset的概念,是記錄comsumer消費到哪裏了,信息默認在kafka中保存一週;code

2、Kafka的安裝集羣(以node十一、node十二、node13爲例)

  •    解壓安裝包          tar zxvf kafka_2.10-0.9.0.1.tgz
  •    修改配置文件server.properties   
    • zookeeper.connect=node11:2181,node12:2181,node13:2181  
  •    將解壓後的kafka拷貝到node12 、node13上
  •    修改配置文件server.properties  
    • 規劃有3個節點,broker的id應該不一樣
    • node1爲broker.id=0
    • node2爲broker.id=1
    • node3爲broker.id=2
  •   啓動kafka
    • 一、在3個節點啓動ZooKeeper
    • 二、在3個節點啓動kafka
    • $ bin/kafka-server-start.sh config/server.properties
  • 測試 kafka
    • 建立話題,使用kafka-topics.sh
    • $ bin/kafka-topics.sh --zookeeper node11:2181,node12:2181,node13:2181 --topic test --replication-factor 2 --partitions 3 --create
    • $ bin/kafka-topics.sh --zookeeper node11:2181,node12:2181,node13:2181 --list
    • $ bin/kafka-topics.sh --zookeeper node11:2181,node12:2181,node13:2181 --describe --topic test
  • 建立生成者和消費者
    • 在任意節點上開啓生成者
    • $ bin/kafka-console-producer.sh --broker-list node11:9092,node12:9092,node13:9092 --topic test
    • 能夠在多個節點上開啓多個消費者
    • $ bin/kafka-console-consumer.sh --zookeeper node11:2181,node12:2181,node13:2181 --from-beginning --topic test  
      • --from-beginning 表示從最先開始獲取隊列的數據
      • 消費幾條數據後,執行下面的語句,看看是否從頭開始,以及不一樣partition返回數據無序性
      • $ bin/kafka-console-consumer.sh --zookeeper node1:2181,node2:2181,node3:2181 --topic test  

=====================================================

 Producer代碼

package com.kafka.test;

import java.util.Properties; 
   
import kafka.javaapi.producer.Producer; 
import kafka.producer.KeyedMessage; 
import kafka.producer.ProducerConfig; 
   
public class MyProducer {   
     
        public static void main(String[] args) {   
            Properties props = new Properties();   
            props.setProperty("metadata.broker.list","192.168.47.12:9092,192.168.47.13:9092,192.168.47.14:9092");
            props.setProperty("serializer.class","kafka.serializer.StringEncoder");   
            props.put("request.required.acks","1");   
            ProducerConfig config = new ProducerConfig(props);   
            //建立生產這對象
            Producer<String, String> producer = new Producer<String, String>(config);
            //生成消息
            KeyedMessage<String, String> data1 = new KeyedMessage<String, String>("test","test kafka");
            KeyedMessage<String, String> data2 = new KeyedMessage<String, String>("test2","hello world");
            try {   
                int i =1; 
                while(true){
                    //發送消息
                    producer.send(data1);   
                    producer.send(data2);
                    i++;
                 //   Thread.sleep(1000);
                } 
            } catch (Exception e) {   
                e.printStackTrace();   
            }   
            producer.close();   
        }   
}
相關文章
相關標籤/搜索