kafka基本概念和操做

一.kafka的基本概念:
Broker:安裝了Kafka服務的機器就是一個Broker,(broker的ID必須惟一)
Producer:消息的生產者,負責數據主動寫入到broker(push)
Consumer:消息的消費者,負責從Kafka中讀取數據(pull),老版本的消費者須要依賴zk,新版本不須要依賴zk
Topic:消息的分類,不一樣topic存放不一樣的數據,對比數據庫的table
Consumer Group:消費者組,一個Topic能夠有多個消費者同時消費,可是多個消費者若是在一個消費者組中,那麼它們不能重複消費數據html

二:Kafka安裝:
1.考慮Kafka和Spark版本兼容問題:
官網:Kafka: Spark Streaming 2.2.0 is compatible with Kafka broker versions 0.8.2.1 or higher. java

  spark-streaming-kafka-0-8 spark-streaming-kafka-0-10
Broker Version 0.8.2.1 or higher 0.10.0 or higher
Api Stability Stable Experimental
Language Support Scala, Java, Python Scala, Java
Receiver DStream Yes No
Direct DStream Yes Yes
SSL / TLS Support No Yes
Offset Commit Api No Yes
Dynamic Topic Subscription No Yes

必須安裝zookeeper:zk集羣啓動腳本數據庫

​​​​​​​#!/bin/sh
for i in {1,2,3}
do
 ssh xupan00$i 'source /etc/profile;/usr/local/devtools/zookeeper/zookeeper-3.4.5/bin/zkServer.sh start'
done

 

Kafka配置文件修改:注意broker.id全局惟一,別的機器要改apache

須要修改一個文件server.properties:
log.dirs : kafka保存數據的目錄
num.partitions : log partitions per topic,每一個topic的分區,一個分區對應一個文件
log.retention.hours=168 : 數據清除時間默認爲7天
zookeeper.connect : zookeeper地址


############################# Server Basics #############################

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0

############################# Socket Server Settings #############################

# The port the socket server listens on
port=9092

# Hostname the broker will bind to. If not set, the server will bind to all interfaces
host.name=xupan001

############################# Log Basics #############################

# A comma seperated list of directories under which to store log files
log.dirs=/usr/local/devtools/kafka/kafka_2.10-0.8.2.1/kafka-logs

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1


# The minimum age of a log file to be eligible for deletion
log.retention.hours=168


zookeeper.connect=xupan001:2181,xupan002:2181,xupan003:2181




Kafka基本操做:bash

打開Kafka服務
kafka-server-start.sh -daemon config/server.properties 
kafka-server-start.sh -daemon /usr/local/devtools/kafka/kafka_2.10-0.8.2.1/config/server.properties 
kafka-server-start.sh -daemon ${KAFKA_HOME}/config/server.properties

建立topic
kafka-topics.sh -zookeeper xupan001:2181,xupan002:2181,xupan003:2181 --create --topic test001 --replication-factor 3 --partitions 3

查看topic
kafka-topics.sh  -zookeeper xupan001:2181,xupan002:2181,xupan003:2181  --list

描述
kafka-topics.sh --describe --zookeeper  xupan001:2181,xupan002:2181,xupan003:2181 --topic test

刪除topic
kafka-topics.sh --delete --zookeeper xupan001:2181,xupan002:2181,xupan003:2181 --topic test


生產者推送消息topic
bin/kafka-console-producer.sh --broker-list xupan001:9092,xupan002:9092,xupan003:9092 --topic test

--消費者消費topic       --from-beginning:從頭開始消費,可選
./bin/kafka-console-consumer.sh --zookeeper xupan001:2181,xupan002:2181,xupan003:2181 --topic test --from-beginning
相關文章
相關標籤/搜索