Kafka安裝與使用

Kafka安裝與使用

下載地址:https://www.apache.org/dyn/closer.cgi?path=/kafka/0.8.1.1/kafka_2.10-0.8.1.1.tgzjava

安裝以及啓動kafka

步驟1:安裝kafka

$ tar -xzf kafka_2.10-0.8.1.1.tgz
$ cd kafka_2.10-0.8.1.1.tgz 

 

步驟2:配置server.properties

 配置zookeeper(假設您已經安裝了zookeeper,若是沒有安裝,請再網上搜索安裝方法)git

進入kafka安裝工程根目錄編輯 github

vim config/server.properties  

修改屬性 zookeeper.connect=ip:2181,ip2: 2181spring

 

步驟3:server.properties配置說明

kafka最爲重要三個配置依次爲:broker.id、log.dir、zookeeper.connectapache

kafka server端config/server.properties參數說明和解釋以下:vim

(參考配置說明地址:http://blog.csdn.net/lizhitao/article/details/25667831)api

 

#實際使用案例 這裏211上面的kafka 配置文件
broker.id=1
port=9092
host.name=192.168.1.211
num.network.threads=2
num.io.threads=8
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=2
log.retention.hours=168
log.segment.bytes=536870912
log.retention.check.interval.ms=60000
log.cleaner.enable=false
zookeeper.connect=192.168.1.213:2181,192.168.1.216:2181,192.168.1.217:2181
zookeeper.connection.timeout.ms=1000000
#kafka實際使用案例 210服務器kafka配置
broker.id=2
port=9092
host.name=192.168.1.210
num.network.threads=2
num.io.threads=8
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=2
log.retention.hours=168
log.segment.bytes=536870912
log.retention.check.interval.ms=60000
log.cleaner.enable=false
zookeeper.connect=192.168.1.213:2181,192.168.1.216:2181,192.168.1.217:2181
zookeeper.connection.timeout.ms=1000000

 

步驟4: 啓動kafka

(先啓動zookeeper $:  bin/zkServer.sh start config/zookeeper.properties &)

cd kafka-0.8.1服務器

$ bin/kafka-server-start.sh -daemon config/server.properties &

 (實驗時,須要啓動至少兩個broker   bin/kafka-server-start.sh -daemon config/server-1.properties &) session

步驟5:建立topic

$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

步驟6:驗證topic是否建立成功less

$ bin/kafka-topics.sh --list --zookeeper localhost:2181

localhostzookeeper地址 

topic描述:

bin/kafka-topics.sh --describe --zookeeper 192.168.1.8:2181 --topic test

 

//啓動報錯Unrecognized VM option '+UseCompressedOops'
查看 bin/kafka-run-class.sh
找到
if [ -z "$KAFKA_JVM_PERFORMANCE_OPTS" ]; then
  KAFKA_JVM_PERFORMANCE_OPTS="-server  -XX:+UseCompressedOops -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:+CMSClassUnloadingEnabled -XX:+CMSScavengeBeforeRemark -XX:+DisableExplicitGC -Djava.awt.headless=true"
fi
去掉-XX:+UseCompressedOops

 

啓動報錯 Could not reserve enough space for object heap
緣由及解決辦法:
查看kafka-server-start.sh配置文件,發現有heap設置信息:KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"    更改這裏的內存爲256(由於測試機內存總共才1G ,因此報錯)

 

 

 

步驟7:發送消息

發送一些消息驗證,在console模式下,啓動producer

$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

(此處localhost改成本機ip,不然報錯,I don’t  know why

 消息案例: 

{"price":"100000","userId":14615501351480021,"payType":3,"code":"AFD3B8","payTime":{"time":1457330791333,"minutes":6,"seconds":31,"hours":14,"month":2,"year":116,"timezoneOffset":-480,"day":1,"date":7},"orderId":12222096,"goodsName":"會員"} 

 

 步驟8:啓動一個consumer

$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

 


 


 


 


 

刪除topic,慎用,只會刪除zookeeper中的元數據,消息文件須手動刪除

bin/kafka-run-class.sh kafka.admin.DeleteTopicCommand --topic test --zookeeper 192.168.197.170:2181 ,192.168.197.171:2181

    


 


 


 


 

配置kafka集羣模式,須要由多個broker組成

 

和單機環境同樣,只是須要修改下broker 的配置文件而已。

一、將單機版的kafka 目錄複製到其餘幾臺電腦上。

二、修改每臺電腦上的kafka 目錄下的server.properties 文件。

broker.id=1//這個參數在kafka 的broker 集羣中必須惟一,且爲正整數。

三、啓動每臺電腦上的kafka 便可。

 

本機配置僞分佈式

 

首先爲每一個節點編寫配置文件:
> cp config/server.properties config/server-1.properties
> cp config/server.properties config/server-2.properties

 

在拷貝出的新文件中添加如下參數:
config/server-1.properties:
broker.id=1
port=9093
log.dir=/tmp/kafka-logs-1

config/server-2.properties:

broker.id=2
port=9094
log.dir=/tmp/kafka-logs-2

 

如今啓動另外兩個節點:
> bin/kafka-server-start.sh config/server-1.properties &
...
> bin/kafka-server-start.sh config/server-2.properties &
...

 

建立一個擁有3個副本的topic:
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic

 

運行「"describe topics」命令知道每一個節點的信息
> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
leader:負責處理消息的讀和寫,leader是從全部節點中隨機選擇的.
replicas:列出了全部的副本節點,無論節點是否在服務中.
isr:是正在服務中的節點.

 

  



搭建Kafka開發環境

1 在pom.xml中引入kafka依賴jar包
<!-- kafka配置 -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.9.2</artifactId>
<version>${kafka.version}</version>
<exclusions>
<!-- 實際應用中單獨引入下面的jar包,不使用kafka帶的 -->
<exclusion>
<artifactId>zookeeper</artifactId>
<groupId>org.apache.zookeeper</groupId>
</exclusion>
<exclusion>
<artifactId>zkclient</artifactId>
<groupId>com.101tec</groupId>
</exclusion>
<exclusion>
<artifactId>slf4j-api</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>

 

2.屬性文件 kafka.properties
#zookeeper.connect=192.168.1.8:2181,192.168.1.13:2181,192.168.1.16:2181
#zookeeper.connect=zkserver1.vko.cn:2181,zkserver2.vko.cn:2181,zkserver3.vko.cn:2181
zookeeper.connect=192.168.1.179:2181
metadata.broker.list=192.168.1.179:9092
#metadata.broker.list=kafka.server1.vko.cn:9092,kafka.server2.vko.cn:9092
 
#zookeeper.connect.timeout=15000
#zookeeper.session.timeout.ms=15000
#zookeeper.sync.time.ms=20000
#auto.commit.interval.ms=20000
#auto.offset.reset=smallest
#serializer.class=kafka.serializer.StringEncoder
#producer.type=async
#queue.buffering.max.ms=6000
 
group.id=llx
kafka.sellstat.topics=llx

 

在spring配置文件中引入此properties文件
<!-- 這個是加載給spring 用的.-->  
<bean id="propertyConfigurer" class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="locations">
<list>
<value>classpath:kafka.properties</value>
</list>
</property>
</bean>
<!-- 這個是用來在代碼中注入用的.-->  
<bean id="configProperties" class="org.springframework.beans.factory.config.PropertiesFactoryBean">
<property name="locations">
<list>
<value>classpath:kafka.properties</value>
</list>
</property>
</bean>

 

 
3.定義收信人
<!-- 定義收信人 receiver -->
<bean id="testReceiver" class="cn.vko.index.Receiver">
 
<constructor-arg index="0" value="${zookeeper.connect}" />
 
<constructor-arg index="1" value="${group.id}" />
 
<constructor-arg index="2" value="${kafka.sellstat.topics}"/>
 
<constructor-arg index="3" ref="testConsumer" />
</bean>

 

4. spring中定義一個消息處理器(須要實現vkoConsumer)
<!-- 定義消息處理器 -->
<bean id="testConsumer" class="cn.vko.index.TestConsumer" ></bean>

 

5.消息生產者項目引入producer

<bean id="topProducer" class="top.lilixin.TopProducer">
         <constructor-arg index="0" value="kafka.server1:9092,kafka.server2:9092" />
    </bean>

 

 
6代碼實現 見個人 https://github.com/lilixin  learn-kafka項目
相關文章
相關標籤/搜索