大數據系列——kafka學習筆記

1. 大數據領域數據類型

1.1 有界數據

通常批處理(一個文件 或者一批文件),無論文件多大,都是能夠度量 java

mapreduce hive sparkcore sparksqlsql

1.2 無界數據

源源不斷的流水同樣 (流數據)shell

Storm SparkStreamingapache

2. 消息隊列(Message Queue)

  • 消息 Messagebootstrap

    • 網絡中的兩臺計算機或者兩個通信設備之間傳遞的數據,例如說:文本、音樂、視頻等內容
  • 隊列 Queue安全

    • 一種特殊的線性表(數據元素首尾相接),特殊之處在於只容許在首部移除元素和在尾部追加元素。入隊、出隊。
  • 消息隊列 MQ服務器

    • 消息+隊列
    • 保存消息的隊列
    • 消息的傳輸過程當中的容器
    • 主要提供生產、消費接口供外部調用作數據的存儲和獲取

3. 消息隊列的分類

3.1 點對點(P2P)

  • 一個生產者生產的消息只能被一個消費者消費

3.2 發佈訂閱(Pub/Sub)

消息隊列(Queue)、主題(Topic)、發佈者(Publisher)、訂閱者(Subscriber)網絡

  • 消息的發佈者
  • 消息的訂閱者

    每一個消息能夠有多個消費者,彼此互不影響。好比我發佈一個微博:關注個人人都可以看到。分佈式

4. Kafka的簡介

  • 在大數據領域呢,爲了知足日益增加的數據量,也有一款能夠知足百萬級別消息的生成和消費,分佈式、持久穩定的產品——Kafka
  • Kafka是分佈式的發佈—訂閱消息系統(基於PS的一個消息隊列)
  • 它最初由LinkedIn(領英)公司發佈,使用Scala語言編寫
  • Kafka是一個高吞吐量的、持久性的、分佈式發佈訂閱消息系統
  • 它主要用於處理活躍的數據(登陸、瀏覽、點擊、分享、喜歡等用戶行爲產生的數據

5. Kafka的特色

  • 高吞吐量工具

    • 能夠知足每秒百萬級別消息的生產和消費(生產消費 )
  • 持久性

    • 有一套完善的消息存儲機制,確保數據的高效安全的持久化 (數據的存儲)
  • 分佈式

    • 基於分佈式的擴展和容錯機制;Kafka的數據都會複製到幾臺服務器上。當某一臺故障失效時,生產者和消費者轉而使用其它的機器——總體健壯性

6. Kafka的組件

  • 一個消息隊列須要哪些部分?

    • 生產
    • 消費
    • 消息類別
    • 存儲等等
  • Topic(主題)

    • Kafka處理的消息的不一樣分類
  • Broker (消息代理)

    • Kafka集羣中的一個kafka服務節點稱爲一個broker,主要存儲消息數據,存在硬盤中。每一個topic都是有分區的
  • Partition (物理上的分區)

    • 一個topic在broker中被分爲1個或者多個partition,分區在建立topic的時候指定
  • Message (消息)

    • 消息,是通訊的基本單位,每一個消息都屬於一個partition

7. Kafka的服務

  • Producer : 消息和數據的生產者,向Kafka的一個topic發佈消息
  • Consumer :消息和數據的消費者,定於topic並處理其發佈的消息
  • Zookeeper :協調kafka的正常運行

8. Kafka的安裝

8.1 單機版的安裝

  • 準備kafka

    • kafka_2.10-0.10.0.1.tgz
  • 解壓kafka

    • tar -zxvf kafka_2.10-0.10.0.1.tgz -C /opt/
  • 重命名

    • mv kafka_2.10-0.10.0.1.tgz kafka
  • 配置環境變量

    export KAFKA_HOME=/opt/kafka
    export PATH=$PATH:$KAFKA_HOME/bin
  • 編輯server.properties

    broker.id=1
    log.dirs=/opt/kafka/logs
    zookeeper.connect=uplooking03:2181,uplooking04:2181,uplooking05:2181
    listeners=PLAINTEXT://:9092
  • 啓動kafka-server服務

    kafka-server-start.sh [-daemon] server.properties
  • 中止kafka服務

    kafka-server-stop.sh

8.2 集羣的安裝

只須要在每一個機器上修改對應的 ==broker.id=1== 便可

9. Kafka中Topic的操做

  • 建立topic

    kafka-topics.sh  --create --topic t1 --partitions 3 --replication-factor 1  --zookeeper uplooking03:2181,uplooking04:2181

    ==注意: 建立topic過程的問題,replication-factor個數不能超過brokerserver的個數==

  • 查看topic

    kafka-topics.sh  --list --zookeeper uplooking03
  • 查看具體topic的詳情

    kafka-topics.sh  --describe --topic t1 --zookeeper uplooking04:2181
    PartitionCount:topic對應的partition的個數
    ReplicationFactor:topic對應的副本因子,說白就是副本個數
    Partition:partition編號,從0開始遞增
    Leader:當前partition起做用的breaker.id
    Replicas: 當前副本數據存在的breaker.id,是一個列表,排在最前面的其做用
    Isr:當前kakfa集羣中可用的breaker.id列表
  • 修改topic(不能修改replication-factor,以及只能對partition個數進行增長,不能減小 )

    kafka-topics.sh --alter --topic t1 --partitions 4 --zookeeper uplooking03
  • 刪除Topic

    kafka-topics.sh --delete --topic t1 --zookeeper uplooking03

    ps:這種刪除只是標記刪除,要想完全刪除必須設置一個屬性,在server.properties中配置delete.topic.enable=true,不然只是標記刪除

    配置完成以後,須要重啓kafka服務

10. Kafka中的生產者和消費者接口

  • 本身寫代碼實現kafka提供的消息生產和消費的接口
  • kafka自身也實現了自身的生產和消費的接口,給出了兩個工具(kafka-console-producer.sh , kafka-console-consumer.sh)

11. Kafka自帶的生產和消費消息的工具

11.1 kafka-console-producer.sh(生產工具)

kafka-console-producer.sh --topic t1  --broker-list uplooking03:9092,uploo
king04:9092,uplooking05:9092

11.2 kafka-console-consumer.sh(消費工具)

kafka-console-consumer.sh  --zookeeper uplooking03 --topic t1
--from-beginning:從頭開始消費
--blacklist:黑名單過濾(kafka-console-consumer.sh  --zookeeper uplooking03   --blacklist t1,t3)
--whitelist:白名單過濾(kafka-console-consumer.sh  --zookeeper uplooking03   --whitelist t2)    

ps:--topic|--blacklist|--whitelist 只能出現其中一個

12. ==Flume與Kafka的整合==

  • 配置flume的agent配置文件

    touch flume-kafka.properties

    # 對各個組件的描述說明
    # 其中a1爲agent的名字
    # r1是a1的source的代號名字
    # c1是a1的channel的代號名字
    # k1是a1的sink的代號名字
    ############################################
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # 用於描述source的,類型是netcat網絡
    a1.sources.r1.type = netcat
    # source監聽的網絡ip地址和端口號
    a1.sources.r1.bind = uplooking01
    a1.sources.r1.port = 44444
    
    
    
    # 用於描述sink,類型是kafka
    
    a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
    a1.sinks.k1.topic = hadoop
    a1.sinks.k1.brokerList = uplooking03:9092,uplooking04:9092,uplooking05:9092
    a1.sinks.k1.requiredAcks = 1
    a1.sinks.k1.batchSize = 2
    
    
    # 用於描述channel,在內存中作數據的臨時的存儲
    a1.channels.c1.type = memory
    # 該內存中最大的存儲容量,1000個events事件
    a1.channels.c1.capacity = 1000
    # 可以同時對100個events事件監管事務
    a1.channels.c1.transactionCapacity = 100
    
    
    # 將a1中的各個組件創建關聯關係,將source和sink都指向了同一個channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
  • 啓動flume開始採集數據

    [root@uplooking01:/opt/flume/conf]
        flume-ng agent --name a1 --conf-file flume-kafka.properties
  • 開啓Kafka消息消費工具

    [root@uplooking03:/opt/flume/conf]
        kafka-console-consumer.sh  --zookeeper uplooking03 --topic hadoop
  • 給flume監聽的Source發送數據

    [root@uplooking03:/]
        nc uplooking01 44444
  • 如今就能夠到kafka的消費工具(kafka-console-consumer.sh)中區查看nc發送的數據

13. Kafka的API操做(生產者和消費者)

<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka_2.10</artifactId>
  <version>0.10.0.1</version>
</dependency>

13.1 Kafka的生產者

  • 建立生產者的配置文件 producer.properties

    bootstrap.servers=uplooking03:9092,uplooking04:9092,uplooking05:9092
    key.serializer=org.apache.kafka.common.serialization.StringSerializer
    value.serializer=org.apache.kafka.common.serialization.StringSerializer
  • 建立生產者而且發送數據到topic中

    public class MyKafkaProducer {
        public static void main(String[] args) throws IOException {
            Properties prop = new Properties();
            prop.load(MyKafkaProducer.class.getClassLoader().getResourceAsStream("producer.properties"));
            KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(prop);
            kafkaProducer.send(new ProducerRecord<String, String>("hadoop", "name", "admin123"));
            kafkaProducer.close();
        }
    }

13.2 Kafka的消費者

  • 建立消費者的配置文件consumer.properties

    zookeeper.connect=uplooking03:2181,uplooking04:2181,uplooking05:2181
    group.id=test-consumer-group
    bootstrap.servers=uplooking03:9092,uplooking04:9092,uplooking05:9092
    key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
    value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
  • 建立消息消費者消費topic中的數據

    public static void main(String[] args) throws Exception {
        Properties prop = new Properties();
        prop.load(MyKafkaConsumer.class.getClassLoader().getResourceAsStream("consumer.properties"));
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(prop);
        Collection topics = new ArrayList();
        topics.add("hadoop");
        kafkaConsumer.subscribe(topics);
        while (true) {
            ConsumerRecords<String, String> records = kafkaConsumer.poll(1000);
            for (ConsumerRecord<String, String> record : records) {
                System.out.println(record.value());
            }
        }
    }
  • 自定義分區(MyCustomPartition)

    package com.uplooking.bigdata.kafka.partition;
    public class MyCustomPartition implements Partitioner {
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster)  {
    //獲取分區數,    分區編號通常都是從0開始
    int partitionSize = cluster.partitionCountForTopic(topic);
    int keyHash = Math.abs(key.hashCode());
    int valueHash = Math.abs(value.hashCode());
    return keyHash % partitionSize;
    }
    public void close() {
    }
    public void configure(Map<String, ?> configs) {
    }
    }

    配置自定義分區(producer.properties)

    partitioner.class=com.uplooking.bigdata.kafka.partition.MyCustomPartition
相關文章
相關標籤/搜索