高吞吐量的分佈式發佈訂閱消息系統Kafka--安裝及測試

1、Kafka概述 java

    Kafka是一種高吞吐量的分佈式發佈訂閱消息系統,它能夠處理消費者規模的網站中的全部動做流數據。 這種動做(網頁瀏覽,搜索和其餘用戶的行動)是在現代網絡上的許多社會功能的一個關鍵因素。 這些數據一般是因爲吞吐量的要求而經過處理日誌和日誌聚合來解決。 對於像Hadoop的同樣的日誌數據和離線分析系統,但又要求實時處理的限制,這是一個可行的解決方案。Kafka的目的是經過Hadoop的並行加載機制來統一線上和離線的消息處理,也是爲了經過集羣機來提供實時的消費。apache

2、Kafka相關術語vim

  • Broker
    Kafka集羣包含一個或多個服務器,這種服務器被稱爲broker
  • Topic
    每條發佈到Kafka集羣的消息都有一個類別,這個類別被稱爲Topic。(物理上不一樣Topic的消息分開存儲,邏輯上一個Topic的消息雖然保存於一個或多個broker上但用戶只需指定消息的Topic便可生產或消費數據而沒必要關心數據存於何處)
  • Partition
    Partition是物理上的概念,每一個Topic包含一個或多個Partition.
  • Producer
    負責發佈消息到Kafka broker
  • Consumer
    消息消費者,向Kafka broker讀取消息的客戶端。
  • Consumer Group
    每一個Consumer屬於一個特定的Consumer Group(可爲每一個Consumer指定group name,若不指定group name則屬於默認的group)。

2、Kafka下載及安裝服務器

     一、下載網絡

  二、安裝less

tar zxvf kafka_2.11-0.9.0.1.tgz
cd kafka_2.11-0.9.0.1

  三、集羣配置分佈式

         設定有兩臺服務器192.168.1.23七、192.168.1.238,兩臺服務器各安裝有兩zookeeper,端口都爲2181(zookeeper再也不說明),每一個服務器都爲Kafka配置3個broker。oop

     3.一、server.properties配置測試

broker.id = 10
port = 9090
host.name=192.168.1.237
advertised.host.name=192.168.1.237
log.dirs=/tmp/kafka-logs/server0
zookeeper.connect=192.168.1.237:2181,192.168.1.238:2181

  說明:host.name\advertised.host.name兩個參數仍是要配置爲IP,不然會有各類各樣的問題。網站

    3.二、server1.properties配置

cp config/servier.properties config/server1.properties
vim config/server1.properties
broker.id = 11
port = 9091
host.name=192.168.1.237
advertised.host.name=192.168.1.237
log.dirs=/tmp/kafka-logs/server1
zookeeper.connect=192.168.1.237:2181,192.168.1.238:2181

   3.三、server2.properties配置

cp config/servier.properties config/server2.properties
vim config/server2.properties
broker.id = 12
port = 9092
host.name=192.168.1.237
advertised.host.name=192.168.1.237
log.dirs=/tmp/kafka-logs/server2
zookeeper.connect=192.168.1.237:2181,192.168.1.238:2181

  說明:同一臺服務器port、log.dirs不能相同,不一樣的服務器broker.id只要在一個集羣中都不能相同。

      3.四、同理 另外一臺服務器的server.properties,server1.properties,server2.properties的broker.id分別爲:20、2一、22,port分別爲:9090、909一、9092  其它:host.name=192.168.1.23八、advertised.host.name=192.168.1.238

      3.五、啓動

bin/kafka-server-start.sh config/server.properties &
bin/kafka-server-start.sh config/server1.properties &
bin/kafka-server-start.sh config/server2.properties &

  3.六、監控端口

netstat -tunpl |grep 2181
netstat -tunpl |grep 9090
netstat -tunpl |grep 9091
netstat -tunpl |grep 9092

  看一下這4個端口起來沒有,並看一下iptables有沒有加入這4個IP的啓動,或要把iptables相關,不然JAVA鏈接不進來。

    4、測試

        4.一、建立Topic

bin/kafka-topics.sh --create --zookeeper 192.168.1.237:2181 --replication-factor 3 --partitions 1 --topic testTopic

    4.二、查看建立狀況

bin/kafka-topics.sh --describe --zookeeper 192.168.1.237:2181 --topic testTopic

    4.三、生產者發送消息

bin/kafka-console-producer.sh --broker-list 192.168.1.237:9090 --topic testTopic

        4.四、消費都接收消息

bin/kafka-console-consumer.sh --zookeeper 192.168.1.237:2181 --from-beginning --topic testTopic

    4.五、檢查consumer offset位置

bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zkconnect 192.168.1.237:2181 --group testTopic

  5、遇到的問題

          一、運行一段時間報錯

#
# There is insufficient memory for the Java Runtime Environment to continue.
# Native memory allocation (malloc) failed to allocate 986513408 bytes for committing reserved memory.
# An error report file with more information is saved as:
# //hs_err_pid6500.log
OpenJDK 64-Bit Server VM warning: INFO: os::commit_memory(0x00000000bad30000, 986513408, 0) failed; error='Cannot allocate memory' (errno=12)

      解決:

    you can adjust the JVM heap size by editing kafka-server-start.sh, zookeeper-server-start.shand so on:

export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"

    The -Xms parameter specifies the minimum heap size. To get your server to at least start up, try changing it to use less memory. Given that you only             have 512M, you should change the maximum heap size (-Xmx) too:

export KAFKA_HEAP_OPTS="-Xmx256M -Xms128M"

        I'm not sure what the minimal memory requirements of kafka in default config are - maybe you need to adjust the message size in kafka to get it to run.

相關文章
相關標籤/搜索