1:環境準備git
jdk 推薦oracle,不建議open sdkgithub
在/etc/profile加入下列環境變量shell
在PATH中將jdk和jre的bin加入path裏面apache
$JAVA_HOME/bin:$JRE_HOME/bin
2:安裝zookeeperbootstrap
下載zookeeper tarvim
url: https://archive.apache.org/dist/zookeeper/zookeeper-3.4.5/zookeeper-3.4.5.tar.gzoracle
將壓縮包移動到/usr/local下面測試
tar -zxvf ***url
更改配置文件spa
(1)將conf/zoo_sample.cfg更改成zoo.cfg
(2)更改配置以下
注意:
*其中默認port爲2181。
*datadir需手動建立 mkdir -p datadir
*註釋掉的參數在單機中無用
(3)加入環境變量 /etc/profile
測試:
能夠cd 到bin目錄下經過執行 zkServer.sh shell腳本啓動、中止或者查看狀態
eg:
./zkServer.sh start/stop/status
3:安裝kafka
先下載tar包、解壓、mv到/usr/local下面
wget https://www.apache.org/dyn/closer.cgi?path=/kafka/0.11.0.0/kafka_2.11-0.11.0.0.tgz sudo mv *** /usr/local cd /usr/local sudo tar -zxvf ****
sudo rm **.tar.gz
修改config目錄下配置文件
vim server.properties
修改以下參數
#broker.id需改爲正整數,單機爲1就好 broker.id=1 #指定端口號 port=9092 #localhost這一項還有其餘要修改,詳細見下面說明 host.name=localhost #指定kafka的日誌目錄 log.dirs=/usr/local/kafka_2.11-0.11.0.0/kafka-logs #鏈接zookeeper配置項,這裏指定的是單機,因此只須要配置localhost,如果實際生產環境,須要在這裏添加其餘ip地址和端口號 zookeeper.connect=localhost:2181
vim zookeeper.properties
修改以下參數
#數據目錄 dataDir=/usr/local/kafka_2.11-0.11.0.0/zookeeper/data #客戶端端口 clientPort=2181 host.name=localhost
producer.properties and consumer.properties
zookeeper.connect=localhost:2181
4:啓動kafka
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties
終端都是info log
注意:若是是非root用戶,可能會使用到sudo去啓動zookeeper和kafka,可是那樣會失敗必須將kafka整個目錄下的文件的owner和group都改成本身的用戶。
能夠經過jps命令查看二者是否啓動成功
2576 QuorumPeerMain表示zookeeper
若是發現沒啓動成功,能夠在zookeeper/config/zookeeper.out裏debug。
成功啓動後,能夠簡單經過demo進行測試
github有各類語言的kafka支持
url:https://github.com/edenhill/librdkafka/tree/v0.9.5
能夠簡單經過go的demo進行測試
consumer
import ( "fmt" "github.com/confluentinc/confluent-kafka-go/kafka" ) func main() { c, err := kafka.NewConsumer(&kafka.ConfigMap{ "bootstrap.servers": "localhost", "group.id": "myGroup", "auto.offset.reset": "earliest", }) if err != nil { panic(err) } c.SubscribeTopics([]string{"myTopic", "^aRegex.*[Tt]opic"}, nil) for { msg, err := c.ReadMessage(-1) if err == nil { fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value)) } else { fmt.Printf("Consumer error: %v (%v)\n", err, msg) break } } c.Close() }
producer
import ( "fmt" "github.com/confluentinc/confluent-kafka-go/kafka" ) func main() { p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "localhost"}) if err != nil { panic(err) } go func() { for e := range p.Events() { switch ev := e.(type) { case *kafka.Message: if ev.TopicPartition.Error != nil { fmt.Printf("Delivery failed: %v\n", ev.TopicPartition) } else { fmt.Printf("Delivered message to %v\n", ev.TopicPartition) } } } }() topic := "myTopic" for _, word := range []string{"Welcome", "to", "the", "Confluent", "Kafka", "Golang", "client"} { p.Produce(&kafka.Message{ TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny}, Value: []byte(word), }, nil) } p.Flush(15 * 1000) }