1、Kafka概述 java
Kafka是一種高吞吐量的分佈式發佈訂閱消息系統,它能夠處理消費者規模的網站中的全部動做流數據。 這種動做(網頁瀏覽,搜索和其餘用戶的行動)是在現代網絡上的許多社會功能的一個關鍵因素。 這些數據一般是因爲吞吐量的要求而經過處理日誌和日誌聚合來解決。 對於像Hadoop的同樣的日誌數據和離線分析系統,但又要求實時處理的限制,這是一個可行的解決方案。Kafka的目的是經過Hadoop的並行加載機制來統一線上和離線的消息處理,也是爲了經過集羣機來提供實時的消費。apache
2、Kafka相關術語vim
2、Kafka下載及安裝服務器
一、下載網絡
二、安裝less
tar zxvf kafka_2.11-0.9.0.1.tgz cd kafka_2.11-0.9.0.1
三、集羣配置分佈式
設定有兩臺服務器192.168.1.23七、192.168.1.238,兩臺服務器各安裝有兩zookeeper,端口都爲2181(zookeeper再也不說明),每一個服務器都爲Kafka配置3個broker。oop
3.一、server.properties配置測試
broker.id = 10 port = 9090 host.name=192.168.1.237 advertised.host.name=192.168.1.237 log.dirs=/tmp/kafka-logs/server0 zookeeper.connect=192.168.1.237:2181,192.168.1.238:2181
說明:host.name\advertised.host.name兩個參數仍是要配置爲IP,不然會有各類各樣的問題。網站
3.二、server1.properties配置
cp config/servier.properties config/server1.properties
vim config/server1.properties
broker.id = 11 port = 9091 host.name=192.168.1.237 advertised.host.name=192.168.1.237 log.dirs=/tmp/kafka-logs/server1 zookeeper.connect=192.168.1.237:2181,192.168.1.238:2181
3.三、server2.properties配置
cp config/servier.properties config/server2.properties vim config/server2.properties
broker.id = 12 port = 9092 host.name=192.168.1.237 advertised.host.name=192.168.1.237 log.dirs=/tmp/kafka-logs/server2 zookeeper.connect=192.168.1.237:2181,192.168.1.238:2181
說明:同一臺服務器port、log.dirs不能相同,不一樣的服務器broker.id只要在一個集羣中都不能相同。
3.四、同理 另外一臺服務器的server.properties,server1.properties,server2.properties的broker.id分別爲:20、2一、22,port分別爲:9090、909一、9092 其它:host.name=192.168.1.23八、advertised.host.name=192.168.1.238
3.五、啓動
bin/kafka-server-start.sh config/server.properties & bin/kafka-server-start.sh config/server1.properties & bin/kafka-server-start.sh config/server2.properties &
3.六、監控端口
netstat -tunpl |grep 2181 netstat -tunpl |grep 9090 netstat -tunpl |grep 9091 netstat -tunpl |grep 9092
看一下這4個端口起來沒有,並看一下iptables有沒有加入這4個IP的啓動,或要把iptables相關,不然JAVA鏈接不進來。
4、測試
4.一、建立Topic
bin/kafka-topics.sh --create --zookeeper 192.168.1.237:2181 --replication-factor 3 --partitions 1 --topic testTopic
4.二、查看建立狀況
bin/kafka-topics.sh --describe --zookeeper 192.168.1.237:2181 --topic testTopic
4.三、生產者發送消息
bin/kafka-console-producer.sh --broker-list 192.168.1.237:9090 --topic testTopic
4.四、消費都接收消息
bin/kafka-console-consumer.sh --zookeeper 192.168.1.237:2181 --from-beginning --topic testTopic
4.五、檢查consumer offset位置
bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zkconnect 192.168.1.237:2181 --group testTopic
5、遇到的問題
一、運行一段時間報錯
# # There is insufficient memory for the Java Runtime Environment to continue. # Native memory allocation (malloc) failed to allocate 986513408 bytes for committing reserved memory. # An error report file with more information is saved as: # //hs_err_pid6500.log OpenJDK 64-Bit Server VM warning: INFO: os::commit_memory(0x00000000bad30000, 986513408, 0) failed; error='Cannot allocate memory' (errno=12)
解決:
you can adjust the JVM heap size by editing kafka-server-start.sh
, zookeeper-server-start.sh
and so on:
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
The -Xms
parameter specifies the minimum heap size. To get your server to at least start up, try changing it to use less memory. Given that you only have 512M, you should change the maximum heap size (-Xmx
) too:
export KAFKA_HEAP_OPTS="-Xmx256M -Xms128M"
I'm not sure what the minimal memory requirements of kafka in default config are - maybe you need to adjust the message size in kafka to get it to run.