Kafka 是 LinkedIn 開源的一種高吞吐量的分佈式發佈訂閱消息系統,kafka的誕生就是爲了處理海量日誌數據,因此kafka處理消息的效率很是高,即便是很是普通的硬件也能夠支持每秒數百萬的消息。
kafka 自然支持集羣負載均衡,使用 zookeeper 進行分佈式協調管理。不支持事務,有必定機率丟失消息。
kafka 的特色,決定了使用場景:日誌中間件。sql
zookeeker: docker pull zookeeper:latest
kafka: docker pull wurstmeister/kafka:latest
mongodb
先啓動zookeeper:docker
docker run -d --name zookeeper --publish 2181:2181 \ --volume /etc/localtime:/etc/localtime \ zookeeper:latest
zookeeper啓動完成後再啓動kafka:數據庫
docker run -d --name kafka --publish 9092:9092 \ --link zookeeper \ --env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \ --env KAFKA_ADVERTISED_HOST_NAME=kafka所在宿主機的IP \ --env KAFKA_ADVERTISED_PORT=9092 \ --volume /etc/localtime:/etc/localtime \ wurstmeister/kafka:latest
運行 docker ps
,找到kafka的 CONTAINER ID,運行 docker exec -it ${CONTAINER ID} /bin/bash
,進入kafka容器。
進入kafka默認目錄 /opt/kafka_2.11-0.10.1.0
,運行 bin/kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 1 --topic test
,建立一個 topic 名稱爲 test。
運行 bin/kafka-topics.sh --list --zookeeper zookeeper:2181
查看當前的 topic 列表。
運行一個消息生產者,指定 topic 爲剛剛建立的 test , bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
,輸入一些測試消息。
運行一個消息消費者,一樣指定 topic 爲 test, bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
,能夠接收到生產者發送的消息。bootstrap
kafka 環境已經搭建完成,接下來咱們要對以前的工程進行改造,使用 log4j2 的 kafka appender 把日誌統一輸出到 kafka 日誌中間件。
日誌導入 kafka 以後,接下來的處理就比較靈活了,能夠用不一樣功能的消費者訂閱感興趣的 topic,進行日誌分析。例如:使用 kafka 做爲 storm 的數據來源 spout,進行流式處理;訂閱 kafka 中須要作離線統計處理的 topic,把數據保存到數據庫,通常是 mongodb 或 hbase 這種數據結構鬆散的 nosql 數據庫;又或者可使用 flume 或 logstash 這種管道工具,把數據導入到其餘的系統,好比 elasticsearch、 solr 等。
kafka 做爲最經常使用的日誌中間件,能夠把分散的日誌集中到一處,並作緩衝處理,再和其餘開源工具進行集成,對數據作進一步處理,是日誌統計系統的基礎組件。bash