下載地址:https://www.apache.org/dyn/closer.cgi?path=/kafka/0.8.1.1/kafka_2.10-0.8.1.1.tgzjava
$ tar -xzf kafka_2.10-0.8.1.1.tgz $ cd kafka_2.10-0.8.1.1.tgz
配置zookeeper(假設您已經安裝了zookeeper,若是沒有安裝,請再網上搜索安裝方法)git
進入kafka安裝工程根目錄編輯 github
vim config/server.properties
修改屬性 zookeeper.connect=ip:2181,ip2: 2181spring
kafka最爲重要三個配置依次爲:broker.id、log.dir、zookeeper.connectapache
kafka server端config/server.properties參數說明和解釋以下:vim
(參考配置說明地址:http://blog.csdn.net/lizhitao/article/details/25667831)api
#實際使用案例 這裏211上面的kafka 配置文件
broker.id=1 port=9092 host.name=192.168.1.211 num.network.threads=2 num.io.threads=8 socket.send.buffer.bytes=1048576 socket.receive.buffer.bytes=1048576 socket.request.max.bytes=104857600 log.dirs=/tmp/kafka-logs num.partitions=2 log.retention.hours=168 log.segment.bytes=536870912 log.retention.check.interval.ms=60000 log.cleaner.enable=false zookeeper.connect=192.168.1.213:2181,192.168.1.216:2181,192.168.1.217:2181 zookeeper.connection.timeout.ms=1000000 #kafka實際使用案例 210服務器kafka配置 broker.id=2 port=9092 host.name=192.168.1.210 num.network.threads=2 num.io.threads=8 socket.send.buffer.bytes=1048576 socket.receive.buffer.bytes=1048576 socket.request.max.bytes=104857600 log.dirs=/tmp/kafka-logs num.partitions=2 log.retention.hours=168 log.segment.bytes=536870912 log.retention.check.interval.ms=60000 log.cleaner.enable=false zookeeper.connect=192.168.1.213:2181,192.168.1.216:2181,192.168.1.217:2181 zookeeper.connection.timeout.ms=1000000
cd kafka-0.8.1服務器
$ bin/kafka-server-start.sh -daemon config/server.properties &
(實驗時,須要啓動至少兩個broker bin/kafka-server-start.sh -daemon config/server-1.properties &) session
$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
步驟6:驗證topic是否建立成功less
$ bin/kafka-topics.sh --list --zookeeper localhost:2181
localhost爲zookeeper地址
topic描述:
bin/kafka-topics.sh --describe --zookeeper 192.168.1.8:2181 --topic test
//啓動報錯Unrecognized VM option '+UseCompressedOops' 查看 bin/kafka-run-class.sh 找到 if [ -z "$KAFKA_JVM_PERFORMANCE_OPTS" ]; then KAFKA_JVM_PERFORMANCE_OPTS="-server -XX:+UseCompressedOops -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:+CMSClassUnloadingEnabled -XX:+CMSScavengeBeforeRemark -XX:+DisableExplicitGC -Djava.awt.headless=true" fi 去掉-XX:+UseCompressedOops
啓動報錯 Could not reserve enough space for object heap 緣由及解決辦法: 查看kafka-server-start.sh配置文件,發現有heap設置信息:KAFKA_HEAP_OPTS="-Xmx1G -Xms1G" 更改這裏的內存爲256(由於測試機內存總共才1G ,因此報錯)
發送一些消息驗證,在console模式下,啓動producer
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
(此處localhost改成本機ip,不然報錯,I don’t know why)
消息案例:
{"price":"100000","userId":14615501351480021,"payType":3,"code":"AFD3B8","payTime":{"time":1457330791333,"minutes":6,"seconds":31,"hours":14,"month":2,"year":116,"timezoneOffset":-480,"day":1,"date":7},"orderId":12222096,"goodsName":"會員"}
步驟8:啓動一個consumer
$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
bin/kafka-run-class.sh kafka.admin.DeleteTopicCommand --topic test --zookeeper 192.168.197.170:2181 ,192.168.197.171:2181
和單機環境同樣,只是須要修改下broker 的配置文件而已。
一、將單機版的kafka 目錄複製到其餘幾臺電腦上。
二、修改每臺電腦上的kafka 目錄下的server.properties 文件。
broker.id=1//這個參數在kafka 的broker 集羣中必須惟一,且爲正整數。
三、啓動每臺電腦上的kafka 便可。
首先爲每一個節點編寫配置文件:
> cp config/server.properties config/server-1.properties > cp config/server.properties config/server-2.properties
在拷貝出的新文件中添加如下參數:
config/server-1.properties:
broker.id=1 port=9093 log.dir=/tmp/kafka-logs-1
config/server-2.properties:
broker.id=2 port=9094 log.dir=/tmp/kafka-logs-2
如今啓動另外兩個節點:
> bin/kafka-server-start.sh config/server-1.properties & ... > bin/kafka-server-start.sh config/server-2.properties & ...
建立一個擁有3個副本的topic:
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
運行「"describe topics」命令知道每一個節點的信息
> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs: Topic: my-replicated-topic Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0 leader:負責處理消息的讀和寫,leader是從全部節點中隨機選擇的. replicas:列出了全部的副本節點,無論節點是否在服務中. isr:是正在服務中的節點.
<!-- kafka配置 --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.9.2</artifactId> <version>${kafka.version}</version> <exclusions> <!-- 實際應用中單獨引入下面的jar包,不使用kafka帶的 --> <exclusion> <artifactId>zookeeper</artifactId> <groupId>org.apache.zookeeper</groupId> </exclusion> <exclusion> <artifactId>zkclient</artifactId> <groupId>com.101tec</groupId> </exclusion> <exclusion> <artifactId>slf4j-api</artifactId> <groupId>org.slf4j</groupId> </exclusion> </exclusions> </dependency>
#zookeeper.connect=192.168.1.8:2181,192.168.1.13:2181,192.168.1.16:2181
#zookeeper.connect=zkserver1.vko.cn:2181,zkserver2.vko.cn:2181,zkserver3.vko.cn:2181
zookeeper.connect=192.168.1.179:2181
metadata.broker.list=192.168.1.179:9092
#metadata.broker.list=kafka.server1.vko.cn:9092,kafka.server2.vko.cn:9092
#zookeeper.connect.timeout=15000
#zookeeper.session.timeout.ms=15000
#zookeeper.sync.time.ms=20000
#auto.commit.interval.ms=20000
#auto.offset.reset=smallest
#serializer.class=kafka.serializer.StringEncoder
#producer.type=async
#queue.buffering.max.ms=6000
group.id=llx
kafka.sellstat.topics=llx
<!-- 這個是加載給spring 用的.--> <bean id="propertyConfigurer" class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"> <property name="locations"> <list> <value>classpath:kafka.properties</value> </list> </property> </bean> <!-- 這個是用來在代碼中注入用的.--> <bean id="configProperties" class="org.springframework.beans.factory.config.PropertiesFactoryBean"> <property name="locations"> <list> <value>classpath:kafka.properties</value> </list> </property> </bean>
<!-- 定義收信人 receiver --> <bean id="testReceiver" class="cn.vko.index.Receiver"> <constructor-arg index="0" value="${zookeeper.connect}" /> <constructor-arg index="1" value="${group.id}" /> <constructor-arg index="2" value="${kafka.sellstat.topics}"/> <constructor-arg index="3" ref="testConsumer" /> </bean>
<!-- 定義消息處理器 --> <bean id="testConsumer" class="cn.vko.index.TestConsumer" ></bean>
5.消息生產者項目引入producer
<bean id="topProducer" class="top.lilixin.TopProducer"> <constructor-arg index="0" value="kafka.server1:9092,kafka.server2:9092" /> </bean>