Apache Kafka是分佈式發佈-訂閱消息系統。它最初由LinkedIn公司開發,以後成爲Apache項目的一部分。Kafka是一種快速、可擴展的、設計內在就是分佈式的,分區的和可複製的提交日誌服務。
Apache Kafka與傳統消息系統相比,有如下不一樣:node
它被設計爲一個分佈式系統,易於向外擴展;apache
它同時爲發佈和訂閱提供高吞吐量;bootstrap
它支持多訂閱者,當失敗時能自動平衡消費者;vim
它將消息持久化到磁盤,所以可用於批量消費,例如ETL,以及實時應用程序。centos
下載地址:https://kafka.apache.org/downloadsbash
wget http://mirrors.shuosc.org/apache/kafka/1.0.0/kafka_2.11-1.0.0.tgz
解壓:服務器
tar -zxvf kafka_2.11-1.0.0.tgz cd /usr/local/kafka_2.11-1.0.0/
修改 kafka-server 的配置文件session
vim /usr/local/kafka/config/server.properties
修改其中的:分佈式
broker.id=1 log.dir=/data/kafka/logs-1
使用安裝包中的腳本啓動單節點 Zookeeper 實例:ide
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
使用 kafka-server-start.sh 啓動 kafka 服務:
bin/kafka-server-start.sh config/server.properties
啓動完成
[2018-07-20 16:18:07,243] INFO [GroupCoordinator 1]: Startup complete. (kafka.coordinator.group.GroupCoordinator) [2018-07-20 16:18:07,284] INFO [ProducerId Manager 1]: Acquired new producerId block (brokerId:1,blockStartProducerId:0,blockEndProducerId:999) by writing to Zk with path version 1 (kafka.coordinator.transaction.ProducerIdManager) [2018-07-20 16:18:07,321] INFO [TransactionCoordinator id=1] Starting up. (kafka.coordinator.transaction.TransactionCoordinator) [2018-07-20 16:18:07,330] INFO [TransactionCoordinator id=1] Startup complete. (kafka.coordinator.transaction.TransactionCoordinator) [2018-07-20 16:18:07,355] INFO [Transaction Marker Channel Manager 1]: Starting (kafka.coordinator.transaction.TransactionMarkerChannelManager) [2018-07-20 16:18:07,402] INFO Got user-level KeeperException when processing sessionid:0x164b6c0523f0000 type:delete cxid:0x42 zxid:0x1a txntype:-1 reqpath:n/a Error Path:/admin/preferred_replica_election Error:KeeperErrorCode = NoNode for /admin/preferred_replica_election (org.apache.zookeeper.server.PrepRequestProcessor) [2018-07-20 16:18:07,464] INFO Creating /brokers/ids/1 (is it secure? false) (kafka.utils.ZKCheckedEphemeral) [2018-07-20 16:18:07,466] INFO Got user-level KeeperException when processing sessionid:0x164b6c0523f0000 type:create cxid:0x4b zxid:0x1b txntype:-1 reqpath:n/a Error Path:/brokers Error:KeeperErrorCode = NodeExists for /brokers (org.apache.zookeeper.server.PrepRequestProcessor) [2018-07-20 16:18:07,467] INFO Got user-level KeeperException when processing sessionid:0x164b6c0523f0000 type:create cxid:0x4c zxid:0x1c txntype:-1 reqpath:n/a Error Path:/brokers/ids Error:KeeperErrorCode = NodeExists for /brokers/ids (org.apache.zookeeper.server.PrepRequestProcessor) [2018-07-20 16:18:07,474] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral) [2018-07-20 16:18:07,476] INFO Registered broker 1 at path /brokers/ids/1 with addresses: EndPoint(10.66.95.67,9092,ListenerName(PLAINTEXT),PLAINTEXT) (kafka.utils.ZkUtils) [2018-07-20 16:18:07,479] WARN No meta.properties file under dir /tmp/kafka-logs-1/meta.properties (kafka.server.BrokerMetadataCheckpoint) [2018-07-20 16:18:07,514] INFO Kafka version : 1.0.1 (org.apache.kafka.common.utils.AppInfoParser) [2018-07-20 16:18:07,514] INFO Kafka commitId : c0518aa65f25317e (org.apache.kafka.common.utils.AppInfoParser) [2018-07-20 16:18:07,526] INFO [KafkaServer id=1] started (kafka.server.KafkaServer)
使用 kafka-topics.sh 建立單分區單副本的 topic test:
[root@V1 kafka_2.12-1.0.1]# bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
查看 topic 列表:
[root@V1 kafka_2.12-1.0.1]# bin/kafka-topics.sh --list --zookeeper localhost:2181
使用 kafka-console-producer.sh 發送消息:
[root@V2 kafka_2.12-1.0.1]# bin/kafka-console-producer.sh --broker-list xxx.xxx.xxx.xxx:9092 --topic test >wwwwwwwwwwwwwwwwwwwwwwwwwwwwwww
使用 kafka-console-consumer.sh 接收消息並在終端打印:
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
打開個新的命令窗口執行上面命令便可查看信息:
[root@PaulV1 kafka_2.12-1.0.1]# bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper]. hhhhhhhhhhhhhhhhhh wwwwwwwwwwwwwwwwwwwwwwwwwwwwwww
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
顯示結果
[root@V1 kafka_2.12-1.0.1]# bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test Topic:test PartitionCount:1 ReplicationFactor:1 Configs: Topic: test Partition: 0 Leader: 1 Replicas: 1 Isr: 1
第一行給出了全部分區的摘要,每一個附加行給出了關於一個分區的信息。 因爲咱們只有一個分區,因此只有一行。
「Leader」 : 是負責給定分區的全部讀取和寫入的節點。 每一個節點將成爲分區隨機選擇部分的領導者。
「Replicas」 : 是複製此分區日誌的節點列表,不管它們是不是領導者,或者即便他們當前處於活動狀態。
「Isr」 : 是一組「同步」副本。這是複製品列表的子集,當前活着並被引導到領導者。
Kafka 支持兩種模式的集羣搭建:能夠在單機上運行多個 broker 實例來實現集羣,也可在多臺機器上搭建集羣,下面介紹下如何實現單機多 broker 實例集羣,其實很簡單,只須要以下配置便可。
利用單節點部署多個 broker。 不一樣的 broker 設置不一樣的 id,監聽端口及日誌目錄。 例如:
cp config/server.properties config/server-2.properties cp config/server.properties config/server-3.properties vim config/server-2.properties vim config/server-3.properties
修改 :
broker.id=2 listeners = PLAINTEXT://your.host.name:9093 log.dir=/data/kafka/logs-2
和
broker.id=3 listeners = PLAINTEXT://your.host.name:9094 log.dir=/data/kafka/logs-3
啓動Kafka服務:
bin/kafka-server-start.sh config/server-2.properties & bin/kafka-server-start.sh config/server-3.properties &
至此,單機多broker實例的集羣配置完畢。
分別在多個節點按上述方式安裝 Kafka,配置啓動多個 Zookeeper 實例。
假設三臺機器 IP 地址是 : 192.168.153.135, 192.168.153.136, 192.168.153.137
分別配置多個機器上的 Kafka 服務,設置不一樣的 broker id,zookeeper.connect 設置以下:
vim config/server.properties
裏面的 zookeeper.connect
修改成:
zookeeper.connect=192.168.153.135:2181,192.168.153.136:2181,192.168.153.137:2181
從控制檯寫入數據並將其寫回控制檯是一個方便的起點,但您可能想要使用其餘來源的數據或將數據從 Kafka 導出到其餘系統。對於許多系統,您可使用 Kafka Connect 來導入或導出數據,而沒必要編寫自定義集成代碼。
Kafka Connect 是 Kafka 包含的一個工具,能夠將數據導入和導出到 Kafka。它是一個可擴展的工具,運行 鏈接器,實現與外部系統交互的自定義邏輯。在這個快速入門中,咱們將看到如何使用簡單的鏈接器運行 Kafka Connect,這些鏈接器將數據從文件導入到 Kafka topic,並將數據從 Kafka topic 導出到文件。
首先,咱們將經過建立一些種子數據開始測試:
echo -e "zhisheng\ntian" > test.txt
接下來,咱們將啓動兩個以獨立模式運行的鏈接器,這意味着它們將在單個本地專用進程中運行。咱們提供三個配置文件做爲參數。首先是 Kafka Connect 過程的配置,包含常見的配置,例如要鏈接的 Kafka 代理以及數據的序列化格式。其他的配置文件都指定一個要建立的鏈接器。這些文件包括惟一的鏈接器名稱,要實例化的鏈接器類以及鏈接器所需的任何其餘配置。
bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
Kafka 附帶的這些示例配置文件使用您以前啓動的默認本地羣集配置,並建立兩個鏈接器:第一個是源鏈接器,用於讀取輸入文件中的行,並將每一個鏈接生成爲 Kafka topic,第二個爲鏈接器它從 Kafka topic 讀取消息,並在輸出文件中產生每行消息。
在啓動過程當中,您會看到一些日誌消息,其中一些指示鏈接器正在實例化。Kafka Connect 進程啓動後,源鏈接器應該開始讀取 test.txt topic connect-test,並將其生成 topic ,而且接收器鏈接器應該開始讀取 topic 中的消息 connect-test 並將其寫入文件 test.sink.txt。咱們能夠經過檢查輸出文件的內容來驗證經過整個管道傳輸的數據:
數據存儲在 Kafka topic 中 connect-test,所以咱們也能夠運行控制檯使用者來查看 topic 中的數據
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
鏈接器繼續處理數據,因此咱們能夠將數據添加到文件中,並看到它在管道中移動:
echo zhishengtian>> test.txt echo zhishengtian2>> test.txt echo zhishengtian3>> test.txt echo zhishengtian4>> test.txt
Kafka Streams 是用於構建關鍵任務實時應用程序和微服務的客戶端庫,輸入和/或輸出數據存儲在 Kafka 集羣中。Kafka Streams 結合了在客戶端編寫和部署標準 Java 和 Scala 應用程序的簡單性以及 Kafka 服務器端集羣技術的優點,使這些應用程序具備高度可伸縮性,彈性,容錯性,分佈式等特性。
可參考官網入門案例:http://kafka.apache.org/10/documentation/streams/quickstart
二、http://kafka.apache.org/10/documentation/streams/quickstart