下載node
wget http://mirror-hk.koddos.net/apache/kafka/2.3.0/kafka_2.12-2.3.0.tgz
安裝apache
tar zxvf kafka_2.12-2.3.0.tgz cd kafka_2.12-2.3.0/ vim config/server.properties
配置bootstrap
# 通用配置 # kafka數據目錄 log.dirs=/data/kafka # zookeeeper zookeeper.connect=kafka-node1:2181,kafka-node2:2181,kafka-node3:2181 # 節點配置 # 節點1 broker.id=0 #listeners=PLAINTEXT://:9092 listeners=PLAINTEXT://10.13.6.1:9092 # 節點2 broker.id=1 #listeners=PLAINTEXT://:9092 listeners=PLAINTEXT://10.13.6.2:9092 # 節點3 broker.id=2 #listeners=PLAINTEXT://:9092 listeners=PLAINTEXT://10.13.6.3:9092
啓動vim
#進入kafka根目錄 cd /app/kafka_2.12-2.3.0/ #啓動 bin/kafka-server-start.sh -daemon config/server.properties #啓動成功輸出示例(最後幾行) [2019-09-11 11:14:13,403] INFO [ProducerId Manager 0]: Acquired new producerId block (brokerId:0,blockStartProducerId:0,blockEndProducerId:999) by writing to Zk with path version 1 (kafka.coordinator.transaction.ProducerIdManager) [2019-09-11 11:14:13,423] INFO [TransactionCoordinator id=0] Starting up. (kafka.coordinator.transaction.TransactionCoordinator) [2019-09-11 11:14:13,424] INFO [Transaction Marker Channel Manager 0]: Starting (kafka.coordinator.transaction.TransactionMarkerChannelManager) [2019-09-11 11:14:13,424] INFO [TransactionCoordinator id=0] Startup complete. (kafka.coordinator.transaction.TransactionCoordinator) [2019-09-11 11:14:13,459] INFO [/config/changes-event-process-thread]: Starting (kafka.common.ZkNodeChangeNotificationListener$ChangeEventProcessThread) [2019-09-11 11:14:13,479] INFO [SocketServer brokerId=0] Started data-plane processors for 1 acceptors (kafka.network.SocketServer) [2019-09-11 11:14:13,485] INFO Kafka version: 2.3.0 (org.apache.kafka.common.utils.AppInfoParser) [2019-09-11 11:14:13,485] INFO Kafka commitId: fc1aaa116b661c8a (org.apache.kafka.common.utils.AppInfoParser) [2019-09-11 11:14:13,485] INFO Kafka startTimeMs: 1568171653480 (org.apache.kafka.common.utils.AppInfoParser) [2019-09-11 11:14:13,487] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
使用服務器
一、建立Topic 在kafka-node1(Broker)上建立測試Tpoic:test-ken-io,這裏咱們指定了3個副本、1個分區 bin/kafka-topics.sh --create --bootstrap-server kafka-node1:9092 --replication-factor 3 --partitions 1 --topic test-ken-io Topic在kafka-node1上建立後也會同步到集羣中另外兩個Broker:kafka-node二、kafka-node3 二、查看Topic 咱們能夠經過命令列出指定Broker的 bin/kafka-topics.sh --list --bootstrap-server kafka-node1:9092 三、發送消息 這裏咱們向Broker(id=0)的Topic=test-ken-io發送消息 bin/kafka-console-producer.sh --broker-list kafka-node1:9092 --topic test-ken-io #消息內容 > test by ken.io 四、消費消息 在kafka-node2上消費Broker03的消息 bin/kafka-console-consumer.sh --bootstrap-server kafka-node3:9092 --topic test-ken-io --from-beginning 在Kafka03上消費Broker02的消息 bin/kafka-console-consumer.sh --bootstrap-server kafka-node2:9092 --topic test-ken-io --from-beginning 而後均能收到消息 test by ken.io 這是由於這兩個消費消息的命令是創建了兩個不一樣的Consumer 若是咱們啓動Consumer指定Consumer Group Id就能夠做爲一個消費組協同工,1個消息同時只會被一個Consumer消費到 bin/kafka-console-consumer.sh --bootstrap-server kafka-node3:9092 --topic test-ken-io --from-beginning --group testgroup_ken bin/kafka-console-consumer.sh --bootstrap-server kafka-node2:9092 --topic test-ken-io --from-beginning --group testgroup_ken
參數app
Kafka經常使用Broker配置說明:測試
配置項 | 默認值/示例值 | 說明 |
---|---|---|
broker.id | 0 | Broker惟一標識 |
listeners | PLAINTEXT://192.168.88.53:9092 | 監聽信息,PLAINTEXT表示明文傳輸 |
log.dirs | kafka/logs | kafka數據存放地址,能夠填寫多個。用」,」間隔 |
message.max.bytes | message.max.bytes | 單個消息長度限制,單位是字節 |
num.partitions | 1 | 默認分區數 |
log.flush.interval.messages | Long.MaxValue | 在數據被寫入到硬盤和消費者可用前最大累積的消息的數量 |
log.flush.interval.ms | Long.MaxValue | 在數據被寫入到硬盤前的最大時間 |
log.flush.scheduler.interval.ms | Long.MaxValue | 檢查數據是否要寫入到硬盤的時間間隔。 |
log.retention.hours | 24 | 控制一個log保留時間,單位:小時 |
zookeeper.connect | 192.168.88.21:2181 | ZooKeeper服務器地址,多臺用」,」間隔 |