生產者基本操做

一、啓動生產者bootstrap

  kafka 自帶了 kafka-console-producer.sh 腳本,經過該腳本能夠在終端調用 kafka 生產者向 kafka 發送消息;服務器

  該腳本運行時須要指定 broker-list(kafka代理地址列表) 和 topic(消息被髮送的目標主題) 兩個比傳參數;工具

  可選參數:性能

    1):經過參數 sync 指定以同步模式發送消息;測試

    2):property 參數後跟配置項鍵值對;線程

    3):producer.config 參數加載一個生產者級別的配置文件;代理

  執行如下命令,啓動一個生產者向名爲kafka-action的主題發送消息,每條消息包含一個key:日誌

    kafka-console-producer.sh --broker-list localhost:9092 --topic kafka-action --property parse.key=trueserver

    因爲該指令沒有指定消息key與消息淨荷payload之間的分隔符,默認是以製表符分割,能夠經過配置項 key.separator 指定:如kafka

      kafka-console-producer.sh --broker-list localhost:9092 --topic kafka-action --property parse.key=true --property key.separator=' '    消息key與消息實際數據之間的分隔符是空格;

  查看某個主題各分區對應消息偏移量:

    kafka-run-class.bat kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic kafka-action --time -1   time參數表示查看在指定時間以前的數據,支持-1(latest)、-2(earliest)兩個時間選項,默認值爲-1;

    命令結果共三列:主題名、分區編號、消息偏移量

      kafka-action:2:1
      kafka-action:1:2
      kafka-action:0:1

 

二、建立主題

  若kafka服務器啓動的時候開啓了server.properties 文件中的建立主題的配置項 auto.create.topics.enable=true,默認是false,當生產者向一個不存在的主題發送消息的時候,kafka會自動建立主題,建立的主題配置參數使用默認的;

 

三、查看主題

  kafka 生產的消息以二進制的形式存在文件中,kafka提供了一個查看日誌文件的工具類 kafka.tools.DumpLogSegments。

  產看某個日誌文件,files是必參數,多個文件之間用 逗號 隔開:

    kafka-run-class.sh kafka.tools.DumpLogSegments --files dir

 

四、生產者性能測試工具

  kafka 提供了生產者性能測試腳本 kafka-producer-perf-test.sh,經過該腳本,可能對生產者性能進行調優。

  腳本參數說明:

    topic  指定生產者發送消息的目標主題

    num-records  測試時發送消息的總條數

    records-size  每條消息的字節數

    throughput  限流控制,當throughput值小於0時,不進行限流;當throughput值大於0時,當已發送的消息總字節數與當前已執行的時間取整大於該throughput值時,生產者線程被阻塞一段時間(生產者線程被阻塞的時,控制檯能夠看到輸出一行吞吐量統計信息);當throughput的值等於0時,生產者發送一次消息以後檢測知足阻塞條件時將一直被阻塞。

    producer-props  以鍵值對的形式指定配置,能夠同時指定多組配置,多組配置之間以空格隔開

    producer.config  加載生產者級別的配置文件

  性能測試腳本使用,向一個名爲「cest-throughput」的主題發送100萬條消息,每條消息大小爲1000字節,同時acks設置爲all(對應的值爲-1),命令以下:

    kafka-producer-perf-test.bat --topic cest-throughput --num-records 1000000 --record-size 1000 --throughput 1000000 --producer-props bootstrap.servers=localhost:9092 acks=all

    測試結果:

      1000000 records sent, 21524.818115 records/sec (20.53 MB/sec), 1464.26 ms avg latency, 4896.00 ms max latency, 1121 ms 50th, 3725 ms 95th, 4727 ms 99th, 4881 ms 99.9th.

      records sent  測試發送的消息總條數

      records/sec  每秒發送的消息數

      MB/sec  每秒發送消息的大小(單位MB)

      avg latency  消息處理的平均耗時,單位ms

      max latency  消息處理的最大耗時,單位ms

      50th /95th /99.9th  表示50%,95th,99.9th的消息處理耗時

相關文章
相關標籤/搜索