1、官網html
http://kafka.apache.org/downloads.htmljava
2、Kafka簡介git
Kafka是由Apache軟件基金會開發的一個開源流處理平臺,由Scala和Java編寫。Kafka是一種高吞吐量的分佈式發佈訂閱消息系統,它能夠處理消費者規模的網站中的全部動做流數據。 這種動做(網頁瀏覽,搜索和其餘用戶的行動)是在現代網絡上的許多社會功能的一個關鍵因素。 這些數據一般是因爲吞吐量的要求而經過處理日誌和日誌聚合來解決。 對於像Hadoop的同樣的日誌數據和離線分析系統,但又要求實時處理的限制,這是一個可行的解決方案。Kafka的目的是經過Hadoop的並行加載機制來統一線上和離線的消息處理,也是爲了經過集羣來提供實時的消息。github
相似的組件還有:Azure的ServiceBus、RabbitMQ等,據網上描述,Kafka比RabbitMQ性能強。apache
3、安裝centos
一、安裝Java網絡
yum install java-1.8.0-openjdk.x86_64
二、配置JAVA環境變量jvm
export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.171-8.b10.el7_5.x86_64/jre export JRE_HOME=${JAVA_HOME}/jre export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib export PATH=${JAVA_HOME}/bin:$PATH
說明:配置時注意JAVA_HOME後面要加到/jre,這個比較特殊。另外,紅色區域能夠換成您對應的安裝版本的路徑。分佈式
三、下載Kafka:http://kafka.apache.org/downloads.htmloop
cd /opt wget http://mirrors.hust.edu.cn/apache/kafka/1.1.0/kafka_2.12-1.1.0.tgz
四、解壓並進入目錄
tar -zvxf ./kafka_2.12-1.1.0.tgz cd kafka_2.12-1.1.0
五、啓動Zookeeper
使用安裝包中的腳本啓動單節點Zookeeper 實例:(參數-daemon表示後臺運行)
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
六、啓動Kafka服務
bin/kafka-server-start.sh config/server.properties
七、建立一個測試的Topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
查看Topic:
bin/kafka-topics.sh --list --zookeeper localhost:2181
八、產生消息
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test >hello xingzhu >hello sindrol
九、消費消息
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning xingzhu sindrol
好了,到此,單臺Kafka已經安裝完成。
4、集羣配置
一、單機多broker 集羣配置
利用單節點部署多個broker。 不一樣的broker 設置不一樣的 id,監聽端口及日誌目錄。 例如:
cp config/server.properties config/server-1.properties
編輯配置:
config/server-1.properties: broker.id=1 port=9093 log.dir=/tmp/kafka-logs-1
啓動Kafka服務:
bin/kafka-server-start.sh config/server-1.properties &
啓動多個服務後,能夠參考第三節內容,產生和消費消息。
二、多機多broker 集羣配置
分別在多個節點按上述方式安裝Kafka,配置啓動多個Zookeeper 實例。 例如: 在10.4.253.22,10.4.253.23,10.4.253.24三臺機器部署,Zookeeper配置以下:
initLimit=5 syncLimit=2 server.1=10.4.253.22:2888:3888 server.2=10.4.253.23:2888:3888 server.3=10.4.253.24:2888:3888
分別配置多個機器上的Kafka服務 設置不一樣的broke id,zookeeper.connect設置以下:
zookeeper.connect=10.4.253.22:2181,10.4.253.23:2181,10.4.253.24:2181
啓動Zookeeper與Kafka服務,按上文方式產生和消費消息,驗證集羣功能。
5、外網訪問
安裝完成並啓動後,若是想要外網經過外網IP訪問,須要在config/service.properites中添加以下修改:
advertised.listeners=PLAINTEXT://xxx.xxx.xxx.xxx:port
修改完成後,從新啓動Kafka服務。
6、C#調用
引入庫:kafka-net(https://github.com/Jroland/kafka-net)
一、模擬消費端
using KafkaNet; using KafkaNet.Model; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; namespace KafkaClientDemo.Consumer { class Program { static void Main(string[] args) { var options = new KafkaOptions(new Uri("http://42.159.154.132:9092")); var router = new BrokerRouter(options); var consumer = new KafkaNet.Consumer(new ConsumerOptions("test", router)); Console.WriteLine("waiting ..."); //Consume returns a blocking IEnumerable (ie: never ending stream) foreach (var message in consumer.Consume()) { Console.WriteLine("Response: P{0},O{1} : {2}", message.Meta.PartitionId, message.Meta.Offset, Encoding.UTF8.GetString(message.Value, 0, message.Value.Length)); } } } }
二、模擬消息生產端
using KafkaNet; using KafkaNet.Model; using KafkaNet.Protocol; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; namespace KafkaClientDemo { class Program { static void Main(string[] args) { var options = new KafkaOptions(new Uri("http://42.159.154.132:9092")); var router = new BrokerRouter(options); var client = new Producer(router); //var topic = client.GetTopic("test"); using (client) { while (true) { Console.Write(">"); var text = Console.ReadLine(); client.SendMessageAsync("testTopic", new[] { new Message(text, "key_" + DateTime.Now.Ticks) }).Wait(); if (text == "exit") break; } } } } }
運行效果:
我在部署和學習時,參考了https://www.mtyun.com/library/how-to-install-kafka-on-centos7文章,很是感謝原做者分享。