4 kafka集羣部署及kafka生產者java客戶端編程 + kafka消費者java客戶端編程

 

本博文的主要內容有php

   kafka的單機模式部署html

   kafka的分佈式模式部署前端

   生產者java客戶端編程java

   消費者java客戶端編程node

 

 

 

 

運行kafka ,須要依賴 zookeeper,你可使用已有的 zookeeper 集羣或者利用 kafka自帶的zookeeper。數據庫

    單機模式,用的是kafka自帶的zookeeper,express

    分佈式模式,用的是外部安裝的zookeeper,即公共的zookeeper。apache

 

 

 

 

 說在前面的話編程

 

 

 

 

我這裏是使用的是,kafka自帶的zookeeper。
以及關於kafka的日誌文件啊,都放在默認裏即/tmp下,我沒修改。保存默認的


一、 [hadoop@sparksinglenode kafka_2.10-0.8.1.1]$ jps
2625 Jps
二、 [hadoop@sparksinglenode kafka_2.10-0.8.1.1]$ bin/zookeeper-server-start.sh config/zookeeper.properties &
此刻,這時,會一直停在這,由於是前端運行。
另開一窗口,
三、 [hadoop@sparksinglenode kafka_2.10-0.8.1.1]$ bin/kafka-server-start.sh config/server.properties &
也是前端運行。bootstrap

 

 



推薦作法!!!
可是,我這裏,本身在kafka安裝目錄下,爲了本身的方便,寫了個startkafka.sh和startzookeeper.sh
nohup bin/kafka-server-start.sh config/server.properties > kafka.log 2>&1 &
nohup bin/zookeeper-server-start.sh config/zookeeper.properties > zookeeper.log 2>&1 &
注意還要,root用戶來,附上執行權限。chmod +x ./startkafka.sh chmod +x ./startzookeeper.sh
這樣,就會在kafka安裝目錄下,對應生出kafka.log和zookeeper.log。

一、[spark@sparksinglenode kafka_2.10-0.8.1.1]$ jps
5098 Jps
二、[spark@sparksinglenode kafka_2.10-0.8.1.1]$ bash startzookeeper.sh
[spark@sparksinglenode kafka_2.10-0.8.1.1]$ jps
5125 Jps
5109 QuorumPeerMain
三、[spark@sparksinglenode kafka_2.10-0.8.1.1]$ bash startkafka.sh
[spark@sparksinglenode kafka_2.10-0.8.1.1]$ jps
5155 Jps
5140 Kafka
5109 QuorumPeerMain
[spark@sparksinglenode kafka_2.10-0.8.1.1]$



 

   我了個去,啓動是多麼方便!

 

 

 

 

     運行 kafka ,須要依賴 zookeeper,你可使用已有的 zookeeper 集羣或者利用 kafka自帶的zookeeper。

這裏,我安裝的是單機模式,來玩玩而已,入門。固然,在實際生產中,是分佈式的。即用到是,外部安裝的zookeeper集羣(3節點)。

 

 

 

 

 

一、kafka的單機模式部署

一、  kafka_2.10-0.8.1.1.tgz的下載

官網:http://kafka.apache.org/

官方文檔:http://kafka.apache.org/documentation.html#quickstart

 http://archive.apache.org/dist/kafka/

http://archive.apache.org/dist/kafka/0.8.1.1/

 

 

 

 

二、  kafka_2.10-0.8.1.1.tgz的上傳

sftp> cd /home/hadoop/app/

sftp> put c:/kafka_2.10-0.8.1.1.tgz

Uploading kafka_2.10-0.8.1.1.tgz to /home/hadoop/app/kafka_2.10-0.8.1.1.tgz

  100% 13467KB  13467KB/s 00:00:01    

c:/kafka_2.10-0.8.1.1.tgz: 13790731 bytes transferred in 1 seconds (13467 KB/s)

sftp>

 

[hadoop@weekend110 app]$ ls

hadoop-2.4.1  hbase-0.96.2-hadoop2  hive-0.12.0  jdk1.7.0_65

[hadoop@weekend110 app]$ ls

hadoop-2.4.1  hbase-0.96.2-hadoop2  hive-0.12.0  jdk1.7.0_65  kafka_2.10-0.8.1.1.tgz

[hadoop@weekend110 app]$ ll

total 13484

drwxr-xr-x. 11 hadoop hadoop     4096 Jul 18 20:11 hadoop-2.4.1

drwxrwxr-x.  8 hadoop hadoop     4096 Oct 12 12:19 hbase-0.96.2-hadoop2

drwxrwxr-x. 10 hadoop hadoop     4096 Oct 10 21:30 hive-0.12.0

drwxr-xr-x.  8 hadoop hadoop     4096 Jun 17  2014 jdk1.7.0_65

-rw-r--r--.  1 root   root   13790731 May 12 03:44 kafka_2.10-0.8.1.1.tgz

[hadoop@weekend110 app]$

 

 

三、  kafka_2.10-0.8.1.1.tgz的解壓和刪除壓縮包

[hadoop@weekend110 app]$ ls

hadoop-2.4.1  hbase-0.96.2-hadoop2  hive-0.12.0  jdk1.7.0_65  kafka_2.10-0.8.1.1.tgz

[hadoop@weekend110 app]$ tar -zxvf kafka_2.10-0.8.1.1.tgz

 

 

四、  kafka_2.10-0.8.1.1.tgz的配置

[hadoop@weekend110 app]$ ll

total 13488

drwxr-xr-x. 11 hadoop hadoop     4096 Jul 18 20:11 hadoop-2.4.1

drwxrwxr-x.  8 hadoop hadoop     4096 Oct 12 12:19 hbase-0.96.2-hadoop2

drwxrwxr-x. 10 hadoop hadoop     4096 Oct 10 21:30 hive-0.12.0

drwxr-xr-x.  8 hadoop hadoop     4096 Jun 17  2014 jdk1.7.0_65

drwxr-xr-x.  5 hadoop hadoop     4096 Apr 23  2014 kafka_2.10-0.8.1.1

-rw-r--r--.  1 root   root   13790731 May 12 03:44 kafka_2.10-0.8.1.1.tgz

[hadoop@weekend110 app]$ rm kafka_2.10-0.8.1.1.tgz

rm: remove write-protected regular file `kafka_2.10-0.8.1.1.tgz'? y

[hadoop@weekend110 app]$ ll

total 20

drwxr-xr-x. 11 hadoop hadoop 4096 Jul 18 20:11 hadoop-2.4.1

drwxrwxr-x.  8 hadoop hadoop 4096 Oct 12 12:19 hbase-0.96.2-hadoop2

drwxrwxr-x. 10 hadoop hadoop 4096 Oct 10 21:30 hive-0.12.0

drwxr-xr-x.  8 hadoop hadoop 4096 Jun 17  2014 jdk1.7.0_65

drwxr-xr-x.  5 hadoop hadoop 4096 Apr 23  2014 kafka_2.10-0.8.1.1

[hadoop@weekend110 app]$

 

   注意,將Kafka安裝目錄下的bin下的腳本,都附上執行權限。

  

 

  爲了本身往後操做方便,這裏,本身寫個腳本。

 nohup bin/kafka-server-start.sh   config/server.properties > kafka.log 2>&1 &

 

 

 

 

 

 

 

 

 

[hadoop@weekend110 config]$ pwd

/home/hadoop/app/kafka_2.10-0.8.1.1/config

[hadoop@weekend110 config]$ ls

consumer.properties  producer.properties  test-log4j.properties   zookeeper.properties

log4j.properties     server.properties    tools-log4j.properties

[hadoop@weekend110 config]$ vim zookeeper.properties

 

 

 

 

 

如果,單機模式的安裝kafka,用kafka自帶的zookeeper,則不需修改。看看就好,

# Licensed to the Apache Software Foundation (ASF) under one or more

# contributor license agreements.  See the NOTICE file distributed with

# this work for additional information regarding copyright ownership.

# The ASF licenses this file to You under the Apache License, Version 2.0

# (the "License"); you may not use this file except in compliance with

# the License.  You may obtain a copy of the License at

#

#    http://www.apache.org/licenses/LICENSE-2.0

#

# Unless required by applicable law or agreed to in writing, software

# distributed under the License is distributed on an "AS IS" BASIS,

# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

# See the License for the specific language governing permissions and

# limitations under the License.

# the directory where the snapshot is stored.

dataDir=/tmp/zookeeper

# the port at which the clients will connect

clientPort=2181

# disable the per-ip limit on the number of connections since this is a non-production config

maxClientCnxns=0

 

 zookeeper的配置文件zookeeper.properties裏面的關鍵屬性:

    # the directory where the snapshot is stored.
      dataDir=/tmp/zookeeper   //這裏,固然,能夠本身新建其餘目錄,不然,每次開機就會清除這個臨時目錄。
    # the port at which the clients will connect
      clientPort=2181

     默認狀況下,zookeeper的snapshot 文件會存儲在/tmp/zookeeper下,zookeeper服務器會監聽 2181端口。

 

如果,本身新建其餘目錄,則能夠是。

dataDir=/home/hadoop/data/zookeeper

dataLogDir=/home/hadoop/data/zkdatalog

 

 

 

 

 

[hadoop@weekend110 config]$ vim server.properties

# Licensed to the Apache Software Foundation (ASF) under one or more

# contributor license agreements.  See the NOTICE file distributed with

# this work for additional information regarding copyright ownership.

# The ASF licenses this file to You under the Apache License, Version 2.0

# (the "License"); you may not use this file except in compliance with

# the License.  You may obtain a copy of the License at

#

#    http://www.apache.org/licenses/LICENSE-2.0

#

# Unless required by applicable law or agreed to in writing, software

# distributed under the License is distributed on an "AS IS" BASIS,

# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

# See the License for the specific language governing permissions and

# limitations under the License.

# see kafka.server.KafkaConfig for additional details and defaults

 

############################# Server Basics #############################

 

# The id of the broker. This must be set to a unique integer for each broker.

broker.id=0

 

############################# Socket Server Settings #############################

 

# The port the socket server listens on

port=9092

 

# Hostname the broker will bind to. If not set, the server will bind to all interfaces

# host.name=localhost

 

# Hostname the broker will advertise to producers and consumers. If not set, it uses the

# value for "host.name" if configured.  Otherwise, it will use the value returned from

# java.net.InetAddress.getCanonicalHostName().

#advertised.host.name=<hostname routable by clients>

 

# The port to publish to ZooKeeper for clients to use. If this is not set,

# it will publish the same port that the broker binds to.

#advertised.port=<port accessible by clients>

 

# The number of threads handling network requests

num.network.threads=2

 

# The number of threads doing disk I/O

num.io.threads=8

 

# The send buffer (SO_SNDBUF) used by the socket server

socket.send.buffer.bytes=1048576

 

# The receive buffer (SO_RCVBUF) used by the socket server

socket.receive.buffer.bytes=1048576

 

# The maximum size of a request that the socket server will accept (protection against OOM)

socket.request.max.bytes=104857600

 

############################# Log Basics #############################

 

# A comma seperated list of directories under which to store log files

log.dirs=/tmp/kafka-logs    // 或者能夠本身新建目錄,

/home/hadoop/data/kafka-logs

 

 

  也能夠如這樣,注意啦。就是在這裏。即kafka安裝目錄下的根目錄。

 

 

 

# The default number of log partitions per topic. More partitions allow greater

# parallelism for consumption, but this will also result in more files across

# the brokers.

num.partitions=2

 

############################# Log Flush Policy #############################

 

# Messages are immediately written to the filesystem but by default we only fsync() to sync

# the OS cache lazily. The following configurations control the flush of data to disk.

# There are a few important trade-offs here:

#    1. Durability: Unflushed data may be lost if you are not using replication.

#    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.

#    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks.

# The settings below allow one to configure the flush policy to flush data after a period of time or

# every N messages (or both). This can be done globally and overridden on a per-topic basis.

 

# The number of messages to accept before forcing a flush of data to disk

#log.flush.interval.messages=10000

 

# The maximum amount of time a message can sit in a log before we force a flush

#log.flush.interval.ms=1000

 

############################# Log Retention Policy #############################

 

# The following configurations control the disposal of log segments. The policy can

# be set to delete segments after a period of time, or after a given size has accumulated.

# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens

# from the end of the log.

 

# The minimum age of a log file to be eligible for deletion

log.retention.hours=168

 

# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining

# segments don't drop below log.retention.bytes.

#log.retention.bytes=1073741824

 

# The maximum size of a log segment file. When this size is reached a new log segment will be created.

log.segment.bytes=536870912

 

# The interval at which log segments are checked to see if they can be deleted according

# to the retention policies

log.retention.check.interval.ms=60000

 

# By default the log cleaner is disabled and the log retention policy will default to just delete segments after their retention expires.

# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction.

log.cleaner.enable=false

 

############################# Zookeeper #############################

 

# Zookeeper connection string (see zookeeper docs for details).

# This is a comma separated host:port pairs, each corresponding to a zk

# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".

# You can also append an optional chroot string to the urls to specify the

# root directory for all kafka znodes.

zookeeper.connect=localhost:2181

 

# Timeout in ms for connecting to zookeeper

zookeeper.connection.timeout.ms=1000000

 

 

 

五、kafka_2.10-0.8.1.1.tgz的配置環境變量

 

[hadoop@weekend110 kafka_2.10-0.8.1.1]$ su root

Password:

[root@weekend110 kafka_2.10-0.8.1.1]# pwd

/home/hadoop/app/kafka_2.10-0.8.1.1

[root@weekend110 kafka_2.10-0.8.1.1]# ls

bin  config  libs  LICENSE  NOTICE

[root@weekend110 kafka_2.10-0.8.1.1]# vim /etc/profile

 

export JAVA_HOME=/home/hadoop/app/jdk1.7.0_65

export HADOOP_HOME=/home/hadoop/app/hadoop-2.4.1

export ZOOKEEPER_HOME=/home/hadoop/app/zookeeper-3.4.6

export HIVE_HOME=/home/hadoop/app/hive-0.12.0

export HBASE_HOME=/home/hadoop/app/hbase-0.96.2-hadoop2

export KAFKA_HOME=/home/hadoop/app/kafka_2.10-0.8.1.1

export PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$ZOOKEEPER_HOME/bin:$HIVE_HOME/bin:$HBASE_HOME/bin:$KAFKA_HOME/bin

 

 

 

[root@weekend110 kafka_2.10-0.8.1.1]# clear

[root@weekend110 kafka_2.10-0.8.1.1]# source /etc/profile

[root@weekend110 kafka_2.10-0.8.1.1]# kafka -version

bash: kafka: command not found

由於,kafka裏,沒有這樣的查看版本命令了,這是要注意的地方。

 

 

先將zookeeper啓動,再啓動kafka。

我這裏,採用的是kafk自帶的zookeeper。

 

  之後就是能夠這麼幹啦(  適應於  在單機模式下安裝的kafka,並且仍是用的kafka自帶的zookeeper

 

 

 

啓動和中止

運行 kafka ,須要依賴 zookeeper,你可使用已有的 zookeeper 集羣或者利用 kafka 提供的腳本啓動一個 zookeeper 實例:

$ bin/zookeeper-server-start.sh config/zookeeper.properties &      (其實,本博文最頂部的,寫在前面的話,那樣更好!強烈建議)

默認的,zookeeper 會監聽在 *:2181/tcp。

中止剛纔啓動的 zookeeper 實例:

$ bin/zookeeper-server-stop.sh 

啓動Kafka server:

$ bin/kafka-server-start.sh config/server.properties &

 

出現以下的錯誤,

參考 http://www.bkjia.com/yjs/947570.html

解決方法:

找到bin/kafka-run-class.sh 文件,使用vim打開,個人這個版本是在115行

113 # JVM performance options

114 if [ -z "$KAFKA_JVM_PERFORMANCE_OPTS" ]; then

115   KAFKA_JVM_PERFORMANCE_OPTS="-server -XX:+UseCompressedOops -XX:+UseParNewGC     -XX:+UseConcMarkSweepGC -XX:+CMSClassUnloadingEnabled -XX:+CMSScavengeBeforeRe    mark -XX:+DisableExplicitGC -Djava.awt.headless=true"

116 fi

去掉-XX:+UseCompressedOops這個設置

 

111 # JVM performance options

112 if [ -z "$KAFKA_JVM_PERFORMANCE_OPTS" ]; then

113   KAFKA_JVM_PERFORMANCE_OPTS="-server -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:+CMSClassUnloadingEnabled -XX:+CMSScavengeBefo    reRemark -XX:+DisableExplicitGC -Djava.awt.headless=true"

114 fi

 

 

而後,如今,再啓動kafka自帶的 zookeeper,再啓動kafka。

[hadoop@weekend110 kafka_2.10-0.8.1.1]$ jps

2625 Jps

[hadoop@weekend110 kafka_2.10-0.8.1.1]$ bin/zookeeper-server-start.sh config/zookeeper.properties & 

[1] 2634

[hadoop@weekend110 kafka_2.10-0.8.1.1]$ [2016-10-13 22:10:21,122] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)

[2016-10-13 22:10:21,126] WARN Either no config or no quorum defined in config, running  in standalone mode (org.apache.zookeeper.server.quorum.QuorumPeerMain)

[2016-10-13 22:10:21,287] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)

[2016-10-13 22:10:21,292] INFO Starting server (org.apache.zookeeper.server.ZooKeeperServerMain)

[2016-10-13 22:10:21,366] INFO Server environment:zookeeper.version=3.3.3-1203054, built on 11/17/2011 05:47 GMT (org.apache.zookeeper.server.ZooKeeperServer)

[2016-10-13 22:10:21,367] INFO Server environment:host.name=weekend110 (org.apache.zookeeper.server.ZooKeeperServer)

[2016-10-13 22:10:21,370] INFO Server environment:java.version=1.7.0_65 (org.apache.zookeeper.server.ZooKeeperServer)

[2016-10-13 22:10:21,370] INFO Server environment:java.vendor=Oracle Corporation (org.apache.zookeeper.server.ZooKeeperServer)

[2016-10-13 22:10:21,370] INFO Server environment:java.home=/home/hadoop/app/jdk1.7.0_65/jre (org.apache.zookeeper.server.ZooKeeperServer)

[2016-10-13 22:10:21,371] INFO Server environment:java.class.path=:/home/hadoop/app/kafka_2.10-0.8.1.1/bin/../core/build/dependant-libs-2.8.0/*.jar:/home/hadoop/app/kafka_2.10-0.8.1.1/bin/../perf/build/libs//kafka-perf_2.8.0*.jar:/home/hadoop/app/kafka_2.10-0.8.1.1/bin/../clients/build/libs//kafka-clients*.jar:/home/hadoop/app/kafka_2.10-0.8.1.1/bin/../examples/build/libs//kafka-examples*.jar:/home/hadoop/app/kafka_2.10-0.8.1.1/bin/../contrib/hadoop-consumer/build/libs//kafka-hadoop-consumer*.jar:/home/hadoop/app/kafka_2.10-0.8.1.1/bin/../contrib/hadoop-producer/build/libs//kafka-hadoop-producer*.jar:/home/hadoop/app/kafka_2.10-0.8.1.1/bin/../libs/jopt-simple-3.2.jar:/home/hadoop/app/kafka_2.10-0.8.1.1/bin/../libs/kafka_2.10-0.8.1.1.jar:/home/hadoop/app/kafka_2.10-0.8.1.1/bin/../libs/kafka_2.10-0.8.1.1-javadoc.jar:/home/hadoop/app/kafka_2.10-0.8.1.1/bin/../libs/kafka_2.10-0.8.1.1-scaladoc.jar:/home/hadoop/app/kafka_2.10-0.8.1.1/bin/../libs/kafka_2.10-0.8.1.1-sources.jar:/home/hadoop/app/kafka_2.10-0.8.1.1/bin/../libs/log4j-1.2.15.jar:/home/hadoop/app/kafka_2.10-0.8.1.1/bin/../libs/metrics-core-2.2.0.jar:/home/hadoop/app/kafka_2.10-0.8.1.1/bin/../libs/scala-library-2.10.1.jar:/home/hadoop/app/kafka_2.10-0.8.1.1/bin/../libs/slf4j-api-1.7.2.jar:/home/hadoop/app/kafka_2.10-0.8.1.1/bin/../libs/snappy-java-1.0.5.jar:/home/hadoop/app/kafka_2.10-0.8.1.1/bin/../libs/zkclient-0.3.jar:/home/hadoop/app/kafka_2.10-0.8.1.1/bin/../libs/zookeeper-3.3.4.jar:/home/hadoop/app/kafka_2.10-0.8.1.1/bin/../core/build/libs/kafka_2.8.0*.jar (org.apache.zookeeper.server.ZooKeeperServer)

[2016-10-13 22:10:21,377] INFO Server environment:java.library.path=/usr/java/packages/lib/i386:/lib:/usr/lib (org.apache.zookeeper.server.ZooKeeperServer)

[2016-10-13 22:10:21,377] INFO Server environment:java.io.tmpdir=/tmp (org.apache.zookeeper.server.ZooKeeperServer)

[2016-10-13 22:10:21,378] INFO Server environment:java.compiler=<NA> (org.apache.zookeeper.server.ZooKeeperServer)

[2016-10-13 22:10:21,378] INFO Server environment:os.name=Linux (org.apache.zookeeper.server.ZooKeeperServer)

[2016-10-13 22:10:21,378] INFO Server environment:os.arch=i386 (org.apache.zookeeper.server.ZooKeeperServer)

[2016-10-13 22:10:21,379] INFO Server environment:os.version=2.6.32-431.el6.x86_64 (org.apache.zookeeper.server.ZooKeeperServer)

[2016-10-13 22:10:21,379] INFO Server environment:user.name=hadoop (org.apache.zookeeper.server.ZooKeeperServer)

[2016-10-13 22:10:21,380] INFO Server environment:user.home=/home/hadoop (org.apache.zookeeper.server.ZooKeeperServer)

[2016-10-13 22:10:21,380] INFO Server environment:user.dir=/home/hadoop/app/kafka_2.10-0.8.1.1 (org.apache.zookeeper.server.ZooKeeperServer)

[2016-10-13 22:10:21,488] INFO tickTime set to 3000 (org.apache.zookeeper.server.ZooKeeperServer)

[2016-10-13 22:10:21,488] INFO minSessionTimeout set to -1 (org.apache.zookeeper.server.ZooKeeperServer)

[2016-10-13 22:10:21,488] INFO maxSessionTimeout set to -1 (org.apache.zookeeper.server.ZooKeeperServer)

[2016-10-13 22:10:21,556] INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxn)

[2016-10-13 22:10:21,694] INFO Snapshotting: 0 (org.apache.zookeeper.server.persistence.FileTxnSnapLog)

[2016-10-13 22:11:51,976] INFO Accepted socket connection from /0:0:0:0:0:0:0:1:47619 (org.apache.zookeeper.server.NIOServerCnxn)

[2016-10-13 22:11:52,022] INFO Client attempting to establish new session at /0:0:0:0:0:0:0:1:47619 (org.apache.zookeeper.server.NIOServerCnxn)

[2016-10-13 22:11:52,030] INFO Creating new log file: log.1 (org.apache.zookeeper.server.persistence.FileTxnLog)

[2016-10-13 22:11:52,098] INFO Established session 0x157be612ba00000 with negotiated timeout 6000 for client /0:0:0:0:0:0:0:1:47619 (org.apache.zookeeper.server.NIOServerCnxn)

[2016-10-13 22:11:52,211] INFO Got user-level KeeperException when processing sessionid:0x157be612ba00000 type:create cxid:0x4 zxid:0xfffffffffffffffe txntype:unknown reqpath:n/a Error Path:/brokers Error:KeeperErrorCode = NoNode for /brokers (org.apache.zookeeper.server.PrepRequestProcessor)

[2016-10-13 22:11:52,262] INFO Got user-level KeeperException when processing sessionid:0x157be612ba00000 type:create cxid:0xa zxid:0xfffffffffffffffe txntype:unknown reqpath:n/a Error Path:/config Error:KeeperErrorCode = NoNode for /config (org.apache.zookeeper.server.PrepRequestProcessor)

[2016-10-13 22:11:52,279] INFO Got user-level KeeperException when processing sessionid:0x157be612ba00000 type:create cxid:0x10 zxid:0xfffffffffffffffe txntype:unknown reqpath:n/a Error Path:/admin Error:KeeperErrorCode = NoNode for /admin (org.apache.zookeeper.server.PrepRequestProcessor)

[2016-10-13 22:11:53,235] INFO Got user-level KeeperException when processing sessionid:0x157be612ba00000 type:setData cxid:0x19 zxid:0xfffffffffffffffe txntype:unknown reqpath:n/a Error Path:/controller_epoch Error:KeeperErrorCode = NoNode for /controller_epoch (org.apache.zookeeper.server.PrepRequestProcessor)

[2016-10-13 22:11:53,594] INFO Got user-level KeeperException when processing sessionid:0x157be612ba00000 type:delete cxid:0x27 zxid:0xfffffffffffffffe txntype:unknown reqpath:n/a Error Path:/admin/preferred_replica_election Error:KeeperErrorCode = NoNode for /admin/preferred_replica_election (org.apache.zookeeper.server.PrepRequestProcessor)

 

 

[hadoop@weekend110 kafka_2.10-0.8.1.1]$ pwd

/home/hadoop/app/kafka_2.10-0.8.1.1

[hadoop@weekend110 kafka_2.10-0.8.1.1]$ bin/kafka-server-start.sh config/server.properties &

[1] 2692

[hadoop@weekend110 kafka_2.10-0.8.1.1]$ [2016-10-13 22:11:51,302] INFO Verifying properties (kafka.utils.VerifiableProperties)

[2016-10-13 22:11:51,586] INFO Property broker.id is overridden to 0 (kafka.utils.VerifiableProperties)

[2016-10-13 22:11:51,593] INFO Property log.cleaner.enable is overridden to false (kafka.utils.VerifiableProperties)

[2016-10-13 22:11:51,597] INFO Property log.dirs is overridden to /home/hadoop/data/kafka-logs (kafka.utils.VerifiableProperties)

[2016-10-13 22:11:51,597] INFO Property log.retention.check.interval.ms is overridden to 60000 (kafka.utils.VerifiableProperties)

[2016-10-13 22:11:51,605] INFO Property log.retention.hours is overridden to 168 (kafka.utils.VerifiableProperties)

[2016-10-13 22:11:51,608] INFO Property log.segment.bytes is overridden to 536870912 (kafka.utils.VerifiableProperties)

[2016-10-13 22:11:51,610] INFO Property num.io.threads is overridden to 8 (kafka.utils.VerifiableProperties)

[2016-10-13 22:11:51,610] INFO Property num.network.threads is overridden to 2 (kafka.utils.VerifiableProperties)

[2016-10-13 22:11:51,626] INFO Property num.partitions is overridden to 2 (kafka.utils.VerifiableProperties)

[2016-10-13 22:11:51,627] INFO Property port is overridden to 9092 (kafka.utils.VerifiableProperties)

[2016-10-13 22:11:51,628] INFO Property socket.receive.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)

[2016-10-13 22:11:51,629] INFO Property socket.request.max.bytes is overridden to 104857600 (kafka.utils.VerifiableProperties)

[2016-10-13 22:11:51,635] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)

[2016-10-13 22:11:51,636] INFO Property zookeeper.connect is overridden to localhost:2181 (kafka.utils.VerifiableProperties)

[2016-10-13 22:11:51,637] INFO Property zookeeper.connection.timeout.ms is overridden to 1000000 (kafka.utils.VerifiableProperties)

[2016-10-13 22:11:51,800] INFO [Kafka Server 0], starting (kafka.server.KafkaServer)

[2016-10-13 22:11:51,805] INFO [Kafka Server 0], Connecting to zookeeper on localhost:2181 (kafka.server.KafkaServer)

[2016-10-13 22:11:51,839] INFO Starting ZkClient event thread. (org.I0Itec.zkclient.ZkEventThread)

[2016-10-13 22:11:51,869] INFO Client environment:zookeeper.version=3.3.3-1203054, built on 11/17/2011 05:47 GMT (org.apache.zookeeper.ZooKeeper)

[2016-10-13 22:11:51,870] INFO Client environment:host.name=weekend110 (org.apache.zookeeper.ZooKeeper)

[2016-10-13 22:11:51,870] INFO Client environment:java.version=1.7.0_65 (org.apache.zookeeper.ZooKeeper)

[2016-10-13 22:11:51,870] INFO Client environment:java.vendor=Oracle Corporation (org.apache.zookeeper.ZooKeeper)

[2016-10-13 22:11:51,870] INFO Client environment:java.home=/home/hadoop/app/jdk1.7.0_65/jre (org.apache.zookeeper.ZooKeeper)

[2016-10-13 22:11:51,870] INFO Client environment:java.class.path=:/home/hadoop/app/kafka_2.10-0.8.1.1/bin/../core/build/dependant-libs-2.8.0/*.jar:/home/hadoop/app/kafka_2.10-0.8.1.1/bin/../perf/build/libs//kafka-perf_2.8.0*.jar:/home/hadoop/app/kafka_2.10-0.8.1.1/bin/../clients/build/libs//kafka-clients*.jar:/home/hadoop/app/kafka_2.10-0.8.1.1/bin/../examples/build/libs//kafka-examples*.jar:/home/hadoop/app/kafka_2.10-0.8.1.1/bin/../contrib/hadoop-consumer/build/libs//kafka-hadoop-consumer*.jar:/home/hadoop/app/kafka_2.10-0.8.1.1/bin/../contrib/hadoop-producer/build/libs//kafka-hadoop-producer*.jar:/home/hadoop/app/kafka_2.10-0.8.1.1/bin/../libs/jopt-simple-3.2.jar:/home/hadoop/app/kafka_2.10-0.8.1.1/bin/../libs/kafka_2.10-0.8.1.1.jar:/home/hadoop/app/kafka_2.10-0.8.1.1/bin/../libs/kafka_2.10-0.8.1.1-javadoc.jar:/home/hadoop/app/kafka_2.10-0.8.1.1/bin/../libs/kafka_2.10-0.8.1.1-scaladoc.jar:/home/hadoop/app/kafka_2.10-0.8.1.1/bin/../libs/kafka_2.10-0.8.1.1-sources.jar:/home/hadoop/app/kafka_2.10-0.8.1.1/bin/../libs/log4j-1.2.15.jar:/home/hadoop/app/kafka_2.10-0.8.1.1/bin/../libs/metrics-core-2.2.0.jar:/home/hadoop/app/kafka_2.10-0.8.1.1/bin/../libs/scala-library-2.10.1.jar:/home/hadoop/app/kafka_2.10-0.8.1.1/bin/../libs/slf4j-api-1.7.2.jar:/home/hadoop/app/kafka_2.10-0.8.1.1/bin/../libs/snappy-java-1.0.5.jar:/home/hadoop/app/kafka_2.10-0.8.1.1/bin/../libs/zkclient-0.3.jar:/home/hadoop/app/kafka_2.10-0.8.1.1/bin/../libs/zookeeper-3.3.4.jar:/home/hadoop/app/kafka_2.10-0.8.1.1/bin/../core/build/libs/kafka_2.8.0*.jar (org.apache.zookeeper.ZooKeeper)

[2016-10-13 22:11:51,870] INFO Client environment:java.library.path=/usr/java/packages/lib/i386:/lib:/usr/lib (org.apache.zookeeper.ZooKeeper)

[2016-10-13 22:11:51,871] INFO Client environment:java.io.tmpdir=/tmp (org.apache.zookeeper.ZooKeeper)

[2016-10-13 22:11:51,871] INFO Client environment:java.compiler=<NA> (org.apache.zookeeper.ZooKeeper)

[2016-10-13 22:11:51,871] INFO Client environment:os.name=Linux (org.apache.zookeeper.ZooKeeper)

[2016-10-13 22:11:51,872] INFO Client environment:os.arch=i386 (org.apache.zookeeper.ZooKeeper)

[2016-10-13 22:11:51,872] INFO Client environment:os.version=2.6.32-431.el6.x86_64 (org.apache.zookeeper.ZooKeeper)

[2016-10-13 22:11:51,872] INFO Client environment:user.name=hadoop (org.apache.zookeeper.ZooKeeper)

[2016-10-13 22:11:51,872] INFO Client environment:user.home=/home/hadoop (org.apache.zookeeper.ZooKeeper)

[2016-10-13 22:11:51,872] INFO Client environment:user.dir=/home/hadoop/app/kafka_2.10-0.8.1.1 (org.apache.zookeeper.ZooKeeper)

[2016-10-13 22:11:51,875] INFO Initiating client connection, connectString=localhost:2181 sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@7b7258 (org.apache.zookeeper.ZooKeeper)

[2016-10-13 22:11:51,955] INFO Opening socket connection to server localhost/0:0:0:0:0:0:0:1:2181 (org.apache.zookeeper.ClientCnxn)

[2016-10-13 22:11:51,992] INFO Socket connection established to localhost/0:0:0:0:0:0:0:1:2181, initiating session (org.apache.zookeeper.ClientCnxn)

[2016-10-13 22:11:52,101] INFO Session establishment complete on server localhost/0:0:0:0:0:0:0:1:2181, sessionid = 0x157be612ba00000, negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn)

[2016-10-13 22:11:52,107] INFO zookeeper state changed (SyncConnected) (org.I0Itec.zkclient.ZkClient)

[2016-10-13 22:11:52,587] INFO Starting log cleanup with a period of 60000 ms. (kafka.log.LogManager)

[2016-10-13 22:11:52,597] INFO Starting log flusher with a default period of 9223372036854775807 ms. (kafka.log.LogManager)

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".

SLF4J: Defaulting to no-operation (NOP) logger implementation

SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.

[2016-10-13 22:11:52,707] INFO Awaiting socket connections on 0.0.0.0:9092. (kafka.network.Acceptor)

[2016-10-13 22:11:52,709] INFO [Socket Server on Broker 0], Started (kafka.network.SocketServer)

[2016-10-13 22:11:53,049] INFO Will not load MX4J, mx4j-tools.jar is not in the classpath (kafka.utils.Mx4jLoader$)

[2016-10-13 22:11:53,223] INFO 0 successfully elected as leader (kafka.server.ZookeeperLeaderElector)

[2016-10-13 22:11:53,771] INFO Registered broker 0 at path /brokers/ids/0 with address weekend110:9092. (kafka.utils.ZkUtils$)

[2016-10-13 22:11:53,825] INFO [Kafka Server 0], started (kafka.server.KafkaServer)

[2016-10-13 22:11:53,999] INFO New leader is 0 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)

 

 

 

[hadoop@weekend110 ~]$ jps

2769 Jps

2692 Kafka

2634 QuorumPeerMain

[hadoop@weekend110 ~]$

 

 

利用 kafka 提供的腳本啓動一個 zookeeper 實例:

$ bin/zookeeper-server-start.sh config/zookeeper.properties & 

中止剛纔啓動的 zookeeper 實例:

$ bin/zookeeper-server-stop.sh 

啓動Kafka server:

$ bin/kafka-server-start.sh config/server.properties & 

中止 Kafka server :

$ bin/kafka-server-stop.sh

 

 

kafka_2.10-0.8.1.1.tgz的使用介紹:

參考http://liyonghui160com.iteye.com/blog/2105824

建立topic

列出topic

Producer

Comsumer

 

 

 

 

http://kafka.apache.org/documentation

參考官網

 

 

 

建立只有一個Partition的topic

  這裏建立了一個test的topic、和列出它。

 

 

啓動一個生產者進程來發送消息

 其中,(1)參數broker-list定義了生產者要推送消息的broker地址,以<IP地址:端口>形式 ,由上面的broker的配置文件可知                                      爲localhost:9092;

              (2)參數topic指定生產者發送給哪一個topic。   

    生產者配置文件關鍵屬性:

     # list of brokers used for bootstrapping knowledge about the rest of the cluster
     # format: host1:port1,host2:port2 ...
metadata.broker.list=localhost:9092

    # specifies whether the messages are sent asynchronously (async) or synchronously (sync)
producer.type=sync

     # message encoder
serializer.class=kafka.serializer.DefaultEncoder

    接着你就能夠輸入你想要發送給消費者的消息了。(也能夠先啓動消費者進程,這樣生產者發送的消息能夠馬上顯示)。

 

 

[hadoop@weekend110 kafka_2.10-0.8.1.1]$ jps

2969 QuorumPeerMain

3024 Kafka

3109 Jps

[hadoop@weekend110 kafka_2.10-0.8.1.1]$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

Created topic "test".

[hadoop@weekend110 kafka_2.10-0.8.1.1]$ bin/kafka-topics.sh --list --zookeeper localhost:2181

test

[hadoop@weekend110 kafka_2.10-0.8.1.1]$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".

SLF4J: Defaulting to no-operation (NOP) logger implementation

SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.

iphone

xiaomi

meizu

niubi

 

 

啓動一個消費者進程來消費消息

須要另外打開一個終端:

其中,(1)參數zookeeper指定了鏈接zookeeper的地址,以<IP地址:端口>形式;

         (2)topic參數指定了從哪一個topic來pull消息。

     當你執行這個命令以後,你即可以看到控制檯上打印出的生產者生產的消息:

     消費者配置文件consumer.properties關鍵屬性:

     # Zookeeper connection string
     # comma separated host:port pairs, each corresponding to a zk
     # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
        zookeeper.connect=localhost:2181
    # timeout in ms for connecting to zookeeper
      zookeeper.connection.timeout.ms=60000
    #consumer group id
      group.id=test-consumer-group

 

[hadoop@weekend110 kafka_2.10-0.8.1.1]$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".

SLF4J: Defaulting to no-operation (NOP) logger implementation

SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.

iphone

xiaomi

meizu

niubi

 

 

我這邊,繼續寫,

 

 

 

直接,讀取,照樣,獲得。由於,kafka已經保存進去了。

 

 

 

Kafka的集羣配置通常有三種方法,即

    (1)Single node – single broker集羣;

    (2)Single node – multiple broker集羣;
    (3)Multiple node – multiple broker集羣。

 

 

參考:http://www.itnose.net/detail/6636559.html

 

 

參考:http://www.itnose.net/detail/6636559.html

 

參考:http://www.itnose.net/detail/6636559.html

 

 

 

 

 

 

 

Step 6: Setting up a multi-broker cluster

So far we have been running against a single broker, but that's no fun. For Kafka, a single broker is just a cluster of size one, so nothing much changes other than starting a few more broker instances. But just to get feel for it, let's expand our cluster to three nodes (still all on our local machine).

First we make a config file for each of the brokers:

> cp config/server.properties config/server-1.properties
> cp config/server.properties config/server-2.properties

Now edit these new files and set the following properties:

 
config/server-1.properties:
    broker.id=1
    listeners=PLAINTEXT://:9093
    log.dir=/tmp/kafka-logs-1
 
config/server-2.properties:
    broker.id=2
    listeners=PLAINTEXT://:9094
    log.dir=/tmp/kafka-logs-2

The broker.id property is the unique and permanent name of each node in the cluster. We have to override the port and log directory only because we are running these all on the same machine and we want to keep the brokers from all trying to register on the same port or overwrite each others data.

We already have Zookeeper and our single node started, so we just need to start the two new nodes:

> bin/kafka-server-start.sh config/server-1.properties &
...
> bin/kafka-server-start.sh config/server-2.properties &
...

Now create a new topic with a replication factor of three:

> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic

Okay but now that we have a cluster how can we know which broker is doing what? To see that run the "describe topics" command:

> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic      PartitionCount:1       ReplicationFactor:3    Configs:
        Topic: my-replicated-topic     Partition: 0   Leader: 1      Replicas: 1,2,0 Isr: 1,2,0

Here is an explanation of output. The first line gives a summary of all the partitions, each additional line gives information about one partition. Since we have only one partition for this topic there is only one line.

  • "leader" is the node responsible for all reads and writes for the given partition. Each node will be the leader for a randomly selected portion of the partitions.
  • "replicas" is the list of nodes that replicate the log for this partition regardless of whether they are the leader or even if they are currently alive.
  • "isr" is the set of "in-sync" replicas. This is the subset of the replicas list that is currently alive and caught-up to the leader.

Note that in my example node 1 is the leader for the only partition of the topic.

We can run the same command on the original topic we created to see where it is:

> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
Topic:test     PartitionCount:1       ReplicationFactor:1    Configs:
        Topic: test    Partition: 0   Leader: 0      Replicas: 0    Isr: 0

So there is no surprise there—the original topic has no replicas and is on server 0, the only server in our cluster when we created it.

Let's publish a few messages to our new topic:

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
...
my test message 1
my test message 2
^C

Now let's consume these messages:

> bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic
...
my test message 1
my test message 2
^C

Now let's test out fault-tolerance. Broker 1 was acting as the leader so let's kill it:

> ps | grep server-1.properties
7564 ttys002    0:15.91 /System/Library/Frameworks/JavaVM.framework/Versions/1.8/Home/bin/java...
> kill -9 7564

Leadership has switched to one of the slaves and node 1 is no longer in the in-sync replica set:

> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic      PartitionCount:1       ReplicationFactor:3    Configs:
        Topic: my-replicated-topic     Partition: 0   Leader: 2      Replicas: 1,2,0 Isr: 2,0

But the messages are still be available for consumption even though the leader that took the writes originally is down:

> bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic
...
my test message 1
my test message 2
^C

 

其實啊,通常都是kafka集羣,在業務裏,也沒什麼,改下kafka_2.10-0.8.1.1/config/下的配置文件server.properties,就好。

 

[hadoop@weekend110 config]$ pwd

/home/hadoop/app/kafka_2.10-0.8.1.1/config

[hadoop@weekend110 config]$ ll

total 32

-rw-rw-r--. 1 hadoop hadoop 1202 Apr 23  2014 consumer.properties

-rw-rw-r--. 1 hadoop hadoop 3828 Apr 23  2014 log4j.properties

-rw-rw-r--. 1 hadoop hadoop 2217 Apr 23  2014 producer.properties

-rw-rw-r--. 1 hadoop hadoop 5331 Oct 13 22:08 server.properties

-rw-rw-r--. 1 hadoop hadoop 3326 Apr 23  2014 test-log4j.properties

-rw-rw-r--. 1 hadoop hadoop  995 Apr 23  2014 tools-log4j.properties

-rw-rw-r--. 1 hadoop hadoop 1025 Oct 13 22:06 zookeeper.properties

[hadoop@weekend110 config]$ cp server.properties server-1.properties

[hadoop@weekend110 config]$ cp server.properties server-2.properties

[hadoop@weekend110 config]$ ll

total 48

-rw-rw-r--. 1 hadoop hadoop 1202 Apr 23  2014 consumer.properties

-rw-rw-r--. 1 hadoop hadoop 3828 Apr 23  2014 log4j.properties

-rw-rw-r--. 1 hadoop hadoop 2217 Apr 23  2014 producer.properties

-rw-rw-r--. 1 hadoop hadoop 5331 Oct 14 09:21 server-1.properties

-rw-rw-r--. 1 hadoop hadoop 5331 Oct 14 09:21 server-2.properties

-rw-rw-r--. 1 hadoop hadoop 5331 Oct 13 22:08 server.properties

-rw-rw-r--. 1 hadoop hadoop 3326 Apr 23  2014 test-log4j.properties

-rw-rw-r--. 1 hadoop hadoop  995 Apr 23  2014 tools-log4j.properties

-rw-rw-r--. 1 hadoop hadoop 1025 Oct 13 22:06 zookeeper.properties

[hadoop@weekend110 config]$

 

[hadoop@weekend110 config]$ vim server-1.properties

broker.id=1

log.dirs=/home/hadoop/data/kafka-logs-1

或者

log.dirs=/tmp/kafka-logs-1

port=9093

修改以後的結果:

 

 

 

# Licensed to the Apache Software Foundation (ASF) under one or more

# contributor license agreements.  See the NOTICE file distributed with

# this work for additional information regarding copyright ownership.

# The ASF licenses this file to You under the Apache License, Version 2.0

# (the "License"); you may not use this file except in compliance with

# the License.  You may obtain a copy of the License at

#

#    http://www.apache.org/licenses/LICENSE-2.0

#

# Unless required by applicable law or agreed to in writing, software

# distributed under the License is distributed on an "AS IS" BASIS,

# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

# See the License for the specific language governing permissions and

# limitations under the License.

# see kafka.server.KafkaConfig for additional details and defaults

 

############################# Server Basics #############################

 

# The id of the broker. This must be set to a unique integer for each broker.

broker.id=1

 

############################# Socket Server Settings #############################

 

# The port the socket server listens on

port=9093

 

# Hostname the broker will bind to. If not set, the server will bind to all interfaces

# host.name=localhost

 

# Hostname the broker will advertise to producers and consumers. If not set, it uses the

# value for "host.name" if configured.  Otherwise, it will use the value returned from

# java.net.InetAddress.getCanonicalHostName().

#advertised.host.name=<hostname routable by clients>

 

# The port to publish to ZooKeeper for clients to use. If this is not set,

# it will publish the same port that the broker binds to.

#advertised.port=<port accessible by clients>

 

# The number of threads handling network requests

num.network.threads=2

 

# The number of threads doing disk I/O

num.io.threads=8

 

# The send buffer (SO_SNDBUF) used by the socket server

socket.send.buffer.bytes=1048576

 

# The receive buffer (SO_RCVBUF) used by the socket server

socket.receive.buffer.bytes=1048576

 

# The maximum size of a request that the socket server will accept (protection against OOM)

socket.request.max.bytes=104857600

 

 

############################# Log Basics #############################

 

# A comma seperated list of directories under which to store log files

log.dirs=/tmp/kafka-logs-1

 

# The default number of log partitions per topic. More partitions allow greater

# parallelism for consumption, but this will also result in more files across

# the brokers.

num.partitions=2

 

############################# Log Flush Policy #############################

 

# Messages are immediately written to the filesystem but by default we only fsync() to sync

# the OS cache lazily. The following configurations control the flush of data to disk.

# There are a few important trade-offs here:

#    1. Durability: Unflushed data may be lost if you are not using replication.

 

#    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.

#    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks.

# The settings below allow one to configure the flush policy to flush data after a period of time or

# every N messages (or both). This can be done globally and overridden on a per-topic basis.

 

# The number of messages to accept before forcing a flush of data to disk

#log.flush.interval.messages=10000

 

# The maximum amount of time a message can sit in a log before we force a flush

#log.flush.interval.ms=1000

 

############################# Log Retention Policy #############################

 

# The following configurations control the disposal of log segments. The policy can

# be set to delete segments after a period of time, or after a given size has accumulated.

# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens

# from the end of the log.

 

# The minimum age of a log file to be eligible for deletion

log.retention.hours=168

 

# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining

# segments don't drop below log.retention.bytes.

#log.retention.bytes=1073741824

 

# The maximum size of a log segment file. When this size is reached a new log segment will be created.

log.segment.bytes=536870912

 

# The interval at which log segments are checked to see if they can be deleted according

# to the retention policies

log.retention.check.interval.ms=60000

 

# By default the log cleaner is disabled and the log retention policy will default to just delete segments after their retention expires.

# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction.

log.cleaner.enable=false

 

############################# Zookeeper #############################

 

# Zookeeper connection string (see zookeeper docs for details).

# This is a comma separated host:port pairs, each corresponding to a zk

# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".

# You can also append an optional chroot string to the urls to specify the

# root directory for all kafka znodes.

zookeeper.connect=localhost:2181

 

# Timeout in ms for connecting to zookeeper

zookeeper.connection.timeout.ms=1000000

 

 

[hadoop@weekend110 config]$ vim server-2.properties

broker.id=2

log.dirs=/home/hadoop/data/kafka-logs-2

或者

log.dirs=/tmp/kafka-logs-2

port=9094

 

 

 

啓動

 

[hadoop@weekend110 kafka_2.10-0.8.1.1]$ jps

2255 Jps

2181 QuorumPeerMain

[hadoop@weekend110 kafka_2.10-0.8.1.1]$ bin/kafka-server-start.sh config/server.properties

 

[hadoop@weekend110 kafka_2.10-0.8.1.1]$ jps

2338 Jps

2265 Kafka

2181 QuorumPeerMain

[hadoop@weekend110 kafka_2.10-0.8.1.1]$

 

即,server.properties,成功!

[hadoop@weekend110 kafka_2.10-0.8.1.1]$ jps

2387 Jps

2265 Kafka

2181 QuorumPeerMain

[hadoop@weekend110 kafka_2.10-0.8.1.1]$ bin/kafka-server-start.sh config/server-1.properties &

[1] 2396      //這個進程是,server-1.properties的。

 

[hadoop@weekend110 kafka_2.10-0.8.1.1]$ jps

2265 Kafka

2396 Kafka

2181 QuorumPeerMain

2470 Jps

[hadoop@weekend110 kafka_2.10-0.8.1.1]$

 

[hadoop@weekend110 kafka_2.10-0.8.1.1]$ jps

2265 Kafka

2396 Kafka

2181 QuorumPeerMain

2515 Jps

[hadoop@weekend110 kafka_2.10-0.8.1.1]$ bin/kafka-server-start.sh config/server-2.properties &

[1] 2525   //這個進程是,server-2.properties的。

 

即broker 0對應是,server.properties,進程是2265,端口是9092。

broker 1對應是,server-1.properties,進程是2396,端口是9093。

broker 2對應是,server-2.properties,進程是2525,端口是9094。

 

 

在server.properties節點上,進行建立topic,並制定分區。

 

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic

 

> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic      PartitionCount:1       ReplicationFactor:3    Configs:
        Topic: my-replicated-topic     Partition: 0   Leader: 1      Replicas: 1,2,0 Isr: 1,2,0

 

Here is an explanation of output. The first line gives a summary of all the partitions, each additional line gives information about one partition. Since we have only one partition for this topic there is only one line.

  • "leader" is the node responsible for all reads and writes for the given partition. Each node will be the leader for a randomly selected portion of the partitions.
  • "replicas" is the list of nodes that replicate the log for this partition regardless of whether they are the leader or even if they are currently alive.
  • "isr" is the set of "in-sync" replicas. This is the subset of the replicas list that is currently alive and caught-up to the leader.

Note that in my example node 1 is the leader for the only partition of the topic.

We can run the same command on the original topic we created to see where it is:

> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test

Topic:test     PartitionCount:1       ReplicationFactor:1    Configs:

        Topic: test    Partition: 0   Leader: 0      Replicas: 0    Isr: 0

So there is no surprise there—the original topic has no replicas and is on server 0, the only server in our cluster when we created it.

 

 

這意思是,建立話題my-replicated-topic,副本數是3,分區是1

 

 

[hadoop@weekend110 kafka_2.10-0.8.1.1]$ jps

2338 Jps

2265 Kafka

2181 QuorumPeerMain

[hadoop@weekend110 kafka_2.10-0.8.1.1]$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic  

Created topic "my-replicated-topic".

[hadoop@weekend110 kafka_2.10-0.8.1.1]$ bin/kafka-topics.sh --list --zookeeper localhost:2181

my-replicated-topic

test

[hadoop@weekend110 kafka_2.10-0.8.1.1]$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic

Topic:my-replicated-topic       PartitionCount:1        ReplicationFactor:3     Configs:

        Topic: my-replicated-topic      Partition: 0    Leader: 2       Replicas: 2,0,1 Isr: 2,0,1

[hadoop@weekend110 kafka_2.10-0.8.1.1]$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test

Topic:test      PartitionCount:1        ReplicationFactor:1     Configs:

        Topic: test     Partition: 0    Leader: 0       Replicas: 0     Isr: 0

[hadoop@weekend110 kafka_2.10-0.8.1.1]$

 

在這裏,學個知識點,能夠看出:

一、test是我以前,弄的Kafka單機模式。對應着,broker0。

二、my-replicated-topic是我如今,弄的kafka分佈式模式(3節點,只不過我是在一個節點上模擬仿造出server.properties、server-1.properties、server-2.properties)。

對應着,broker0、broker一、broker2。

Topic:my-replicated-topic       PartitionCount:1        ReplicationFactor:3     Configs:

Topic: my-replicated-topic      Partition: 0    Leader: 2      

Replicas: 2,0,1    Isr: 2,0,1

Topic:test      PartitionCount:1        ReplicationFactor:1     Configs:

    Topic: test     Partition: 0    Leader: 0       Replicas: 0     Isr: 0

 

Isr: 2,0,1   這意思是,指的,同步狀態,broker上的id。

Leader: 2   這意思是,指的,是在broker2那節點。

 

 

 

 

 

Producer生產者啓動一個生產者進程來發送消息

 

 

Let's publish a few messages to our new topic:

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
...
my test message 1
my test message 2
^C

 

Kafka官網,給的是輸入

my test message 1
my test message 2

 

 

我這裏,先不輸入,

 

 

 

啓動一個消費者進程來消費消息

> bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic
...
my test message 1
my test message 2
^C



[hadoop@weekend110 kafka_2.10-0.8.1.1]$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".

SLF4J: Defaulting to no-operation (NOP) logger implementation

SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.

 

好的,如今,生產者,發生消息,

 

[hadoop@weekend110 kafka_2.10-0.8.1.1]$ jps

3048 ConsoleConsumer

2525 Kafka     //broker2

2985 ConsoleProducer

3179 Jps

2265 Kafka    //broker

2396 Kafka    //broker1

2181 QuorumPeerMain

[hadoop@weekend110 kafka_2.10-0.8.1.1]$

 

 

 

Now let's test out fault-tolerance. Broker 1 was acting as the leader so let's kill it:

> ps | grep server-1.properties

7564 ttys002    0:15.91 /System/Library/Frameworks/JavaVM.framework/Versions/1.8/Home/bin/java...

> kill -9 7564

Leadership has switched to one of the slaves and node 1 is no longer in the in-sync replica set:

> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic

Topic:my-replicated-topic      PartitionCount:1       ReplicationFactor:3    Configs:

        Topic: my-replicated-topic     Partition: 0   Leader: 2      Replicas: 1,2,0 Isr: 2,0

But the messages are still be available for consumption even though the leader that took the writes originally is down:

> bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic

...

my test message 1                 

my test message 2

^C

 

      Kafaka官網,這裏leader是在broker1。broker1,經過kafka自帶的zookeeper,到broker2。

個人測試是在broker2。Broker2,經過kafka自帶的zookeeper,到broker0。

 

 

固然,用這個命令,也是能夠的。

[hadoop@weekend110 kafka_2.10-0.8.1.1]$ ps | grep server-2.properties

2524 ttys002    0:15.91 /System/Library/Frameworks/JavaVM.framework/Versions/1.8/Home/bin/java...

[hadoop@weekend110 kafka_2.10-0.8.1.1]$ jps

3048 ConsoleConsumer

2525 Kafka

2985 ConsoleProducer

3179 Jps

2265 Kafka

2396 Kafka

2181 QuorumPeerMain

[hadoop@weekend110 kafka_2.10-0.8.1.1]$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic

Topic:my-replicated-topic       PartitionCount:1        ReplicationFactor:3     Configs:

        Topic: my-replicated-topic      Partition: 0    Leader: 2       Replicas: 2,0,1 Isr: 2,0,1

[hadoop@weekend110 kafka_2.10-0.8.1.1]$ kill -9 2525

[hadoop@weekend110 kafka_2.10-0.8.1.1]$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic

Topic:my-replicated-topic       PartitionCount:1        ReplicationFactor:3     Configs:

        Topic: my-replicated-topic      Partition: 0    Leader: 0       Replicas: 2,0,1 Isr: 0,1

[hadoop@weekend110 kafka_2.10-0.8.1.1]$

      因此,可見kafka集羣,是很穩定的,照樣正常運行。

讀寫測試下,即producer和consumer下。

 

[hadoop@weekend110 kafka_2.10-0.8.1.1]$ jps

3048 ConsoleConsumer

2985 ConsoleProducer

2265 Kafka

3242 Jps

2396 Kafka

2181 QuorumPeerMain

[hadoop@weekend110 kafka_2.10-0.8.1.1]$

 

可見,ConsoleProducer和ConsoleConsumer依然還在。

 

Step 7: Use Kafka Connect to import/export data

Writing data from the console and writing it back to the console is a convenient place to start, but you'll probably want to use data from other sources or export data from Kafka to other systems. For many systems, instead of writing custom integration code you can use Kafka Connect to import or export data. Kafka Connect is a tool included with Kafka that imports and exports data to Kafka. It is an extensible tool that runs connectors, which implement the custom logic for interacting with an external system. In this quickstart we'll see how to run Kafka Connect with simple connectors that import data from a file to a Kafka topic and export data from a Kafka topic to a file. First, we'll start by creating some seed data to test with:

> echo -e "foo\nbar" > test.txt

Next, we'll start two connectors running in standalone mode, which means they run in a single, local, dedicated process. We provide three configuration files as parameters. The first is always the configuration for the Kafka Connect process, containing common configuration such as the Kafka brokers to connect to and the serialization format for data. The remaining configuration files each specify a connector to create. These files include a unique connector name, the connector class to instantiate, and any other configuration required by the connector.

> bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties

These sample configuration files, included with Kafka, use the default local cluster configuration you started earlier and create two connectors: the first is a source connector that reads lines from an input file and produces each to a Kafka topic and the second is a sink connector that reads messages from a Kafka topic and produces each as a line in an output file. During startup you'll see a number of log messages, including some indicating that the connectors are being instantiated. Once the Kafka Connect process has started, the source connector should start reading lines from

test.txt

and producing them to the topic

connect-test

, and the sink connector should start reading messages from the topic

connect-test

and write them to the file

test.sink.txt

. We can verify the data has been delivered through the entire pipeline by examining the contents of the output file:

> cat test.sink.txt
foo
bar

Note that the data is being stored in the Kafka topic

connect-test

, so we can also run a console consumer to see the data in the topic (or use custom consumer code to process it):

> bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic connect-test --from-beginning
{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}
...

The connectors continue to process data, so we can add data to the file and see it move through the pipeline:

> echo "Another line" >> test.txt

You should see the line appear in the console consumer output and in the sink file.

Step 8: Use Kafka Streams to process data

Kafka Streams is a client library of Kafka for real-time stream processing and analyzing data stored in Kafka brokers. This quickstart example will demonstrate how to run a streaming application coded in this library. Here is the gist of the WordCountDemo example code (converted to use Java 8 lambda expressions for easy reading).

KTable wordCounts = textLines
    // Split each text line, by whitespace, into words.
    .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
 
    // Ensure the words are available as record keys for the next aggregate operation.
    .map((key, value) -> new KeyValue<>(value, value))
 
    // Count the occurrences of each word (record key) and store the results into a table named "Counts".
    .countByKey("Counts")

It implements the WordCount algorithm, which computes a word occurrence histogram from the input text. However, unlike other WordCount examples you might have seen before that operate on bounded data, the WordCount demo application behaves slightly differently because it is designed to operate on an infinite, unbounded stream of data. Similar to the bounded variant, it is a stateful algorithm that tracks and updates the counts of words. However, since it must assume potentially unbounded input data, it will periodically output its current state and results while continuing to process more data because it cannot know when it has processed "all" the input data.

We will now prepare input data to a Kafka topic, which will subsequently processed by a Kafka Streams application.

> echo -e "all streams lead to kafka\nhello kafka streams\njoin kafka summit" > file-input.txt

Next, we send this input data to the input topic named streams-file-input using the console producer (in practice, stream data will likely be flowing continuously into Kafka where the application will be up and running):

> bin/kafka-topics.sh --create \
            --zookeeper localhost:2181 \
            --replication-factor 1 \
            --partitions 1 \
            --topic streams-file-input
> cat file-input.txt | bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-file-input

We can now run the WordCount demo application to process the input data:

> bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo

There won't be any STDOUT output except log entries as the results are continuously written back into another topic named streams-wordcount-output in Kafka. The demo will run for a few seconds and then, unlike typical stream processing applications, terminate automatically.

We can now inspect the output of the WordCount demo application by reading from its output topic:

> bin/kafka-console-consumer.sh --zookeeper localhost:2181 \
            --topic streams-wordcount-output \
            --from-beginning \
            --formatter kafka.tools.DefaultMessageFormatter \
            --property print.key=true \
            --property print.value=true \
            --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
            --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

with the following output data being printed to the console:

all     1
streams 1
lead    1
to      1
kafka   1
hello   1
kafka   2
streams 2
join    1
kafka   3
summit  1

Here, the first column is the Kafka message key, and the second column is the message value, both in in java.lang.String format. Note that the output is actually a continuous stream of updates, where each data record (i.e. each line in the original output above) is an updated count of a single word, aka record key such as "kafka". For multiple records with the same key, each later record is an update of the previous one.

Now you can write more input messages to the streams-file-input topic and observe additional messages added to streams-wordcount-output topic, reflecting updated word counts (e.g., using the console producer and the console consumer, as described above).

You can stop the console consumer via Ctrl-C.

 

      這裏,之後在玩。

 

 

 

 

 

二、如果3節點(不模擬,直接分佈式。),而且,使用kafka自帶的zookeeper

如如今有,weekend0一、weekend0二、weekend03

修改server.properties

broker.id=0          

zookeeper.connect=weekend01:2181,weekend02:2181,weekend03:2181

 

修改server.properties

broker.id=1          

zookeeper.connect=weekend01:2181,weekend02:2181,weekend03:2181

 

修改server.properties

broker.id=2          

zookeeper.connect=weekend01:2181,weekend02:2181,weekend03:2181

固然,這裏,不用臨時目錄,也能夠用新建的目錄。這裏我實在是很少贅述了。

 

 

 

啓動

利用 kafka 提供的腳本啓動一個 zookeeper 實例:

在kafka安裝目錄下,執行

$ bin/zookeeper-server-start.sh config/zookeeper.properties &       (其實,強烈,建議,去看看本博文的最頂部的寫在前面的話,那樣更好!)

啓動Kafka server:

在kafka安裝目錄下,執行

$ bin/kafka-server-start.sh config/server.properties & 

而後,再打開另外一個終端,使用kafka。

 

 

 

中止

中止剛纔啓動的 zookeeper 實例:

$ bin/zookeeper-server-stop.sh 

中止 Kafka server :

$ bin/kafka-server-stop.sh

 

 

 

kafka_2.10-0.8.1.1.tgz的使用介紹:

參考http://liyonghui160com.iteye.com/blog/2105824

建立topic

列出topic

Producer

Comsumer

 

 

 

 

 

三、如果3節點(不模擬,直接分佈式。),而且,使用外部的zookeeper

[hadoop@weekend110 config]$ pwd

/home/hadoop/app/kafka_2.10-0.8.1.1/config

[hadoop@weekend110 config]$ ls

consumer.properties  producer.properties  test-log4j.properties   zookeeper.properties

log4j.properties     server.properties    tools-log4j.properties

[hadoop@weekend110 config]$ vim zookeeper.properties

 

如果,本身新建其餘目錄,則能夠是。

dataDir=/home/hadoop/data/zookeeper

dataLogDir=/home/hadoop/data/zkdatalog

 

 

[hadoop@weekend110 config]$ vim server.properties

# Licensed to the Apache Software Foundation (ASF) under one or more

# contributor license agreements.  See the NOTICE file distributed with

# this work for additional information regarding copyright ownership.

# The ASF licenses this file to You under the Apache License, Version 2.0

# (the "License"); you may not use this file except in compliance with

# the License.  You may obtain a copy of the License at

#

#    http://www.apache.org/licenses/LICENSE-2.0

#

# Unless required by applicable law or agreed to in writing, software

# distributed under the License is distributed on an "AS IS" BASIS,

# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

# See the License for the specific language governing permissions and

# limitations under the License.

# see kafka.server.KafkaConfig for additional details and defaults

 

############################# Server Basics #############################

 

# The id of the broker. This must be set to a unique integer for each broker.

broker.id=0

 

############################# Socket Server Settings #############################

 

# The port the socket server listens on

port=9092

 

# Hostname the broker will bind to. If not set, the server will bind to all interfaces

# host.name=localhost

 

# Hostname the broker will advertise to producers and consumers. If not set, it uses the

# value for "host.name" if configured.  Otherwise, it will use the value returned from

# java.net.InetAddress.getCanonicalHostName().

#advertised.host.name=<hostname routable by clients>

 

# The port to publish to ZooKeeper for clients to use. If this is not set,

# it will publish the same port that the broker binds to.

#advertised.port=<port accessible by clients>

 

# The number of threads handling network requests

num.network.threads=2

 

# The number of threads doing disk I/O

num.io.threads=8

 

# The send buffer (SO_SNDBUF) used by the socket server

socket.send.buffer.bytes=1048576

 

# The receive buffer (SO_RCVBUF) used by the socket server

socket.receive.buffer.bytes=1048576

 

# The maximum size of a request that the socket server will accept (protection against OOM)

socket.request.max.bytes=104857600

 

############################# Log Basics #############################

 

# A comma seperated list of directories under which to store log files

log.dirs=/tmp/kafka-logs    // 或者能夠本身新建目錄,

/home/hadoop/data/kafka-logs

 

 

# The default number of log partitions per topic. More partitions allow greater

# parallelism for consumption, but this will also result in more files across

# the brokers.

num.partitions=2

 

############################# Log Flush Policy #############################

 

# Messages are immediately written to the filesystem but by default we only fsync() to sync

# the OS cache lazily. The following configurations control the flush of data to disk.

# There are a few important trade-offs here:

#    1. Durability: Unflushed data may be lost if you are not using replication.

#    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.

#    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks.

# The settings below allow one to configure the flush policy to flush data after a period of time or

# every N messages (or both). This can be done globally and overridden on a per-topic basis.

 

# The number of messages to accept before forcing a flush of data to disk

#log.flush.interval.messages=10000

 

# The maximum amount of time a message can sit in a log before we force a flush

#log.flush.interval.ms=1000

 

############################# Log Retention Policy #############################

 

# The following configurations control the disposal of log segments. The policy can

# be set to delete segments after a period of time, or after a given size has accumulated.

# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens

# from the end of the log.

 

# The minimum age of a log file to be eligible for deletion

log.retention.hours=168

 

# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining

# segments don't drop below log.retention.bytes.

#log.retention.bytes=1073741824

 

# The maximum size of a log segment file. When this size is reached a new log segment will be created.

log.segment.bytes=536870912

 

# The interval at which log segments are checked to see if they can be deleted according

# to the retention policies

log.retention.check.interval.ms=60000

 

# By default the log cleaner is disabled and the log retention policy will default to just delete segments after their retention expires.

# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction.

log.cleaner.enable=false

 

############################# Zookeeper #############################

 

# Zookeeper connection string (see zookeeper docs for details).

# This is a comma separated host:port pairs, each corresponding to a zk

# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".

# You can also append an optional chroot string to the urls to specify the

# root directory for all kafka znodes.

zookeeper.connect=weekend01:2181, weekend02:2181,weekend03:2181

 

 

# Timeout in ms for connecting to zookeeper

zookeeper.connection.timeout.ms=1000000

 

 

 

 

先將外部安裝的zookeeper(即公共的zookeeper)啓動,再啓動kafka。

啓動

一、啓動外部的zookeeper

在/home/hadoop/app/zookeeper-3.4.6下,執行bin/zkServer.sh start

參考個人博客

1 week110的zookeeper的安裝 + zookeeper提供少許數據的存儲

 

而後,再到kafka安裝目錄下

$ bin/kafka-server-start.sh config/server.properties &

[hadoop@weekend110 kafka_2.10-0.8.1.1]$ pwd

/home/hadoop/app/kafka_2.10-0.8.1.1

[hadoop@weekend110 kafka_2.10-0.8.1.1]$ bin/kafka-server-start.sh config/server.properties &

[1] 2692

[hadoop@weekend110 kafka_2.10-0.8.1.1]$ [2016-10-13 22:11:51,302] INFO Verifying properties (kafka.utils.VerifiableProperties)

[2016-10-13 22:11:51,586] INFO Property broker.id is overridden to 0 (kafka.utils.VerifiableProperties)

[2016-10-13 22:11:51,593] INFO Property log.cleaner.enable is overridden to false (kafka.utils.VerifiableProperties)

[2016-10-13 22:11:51,597] INFO Property log.dirs is overridden to /home/hadoop/data/kafka-logs (kafka.utils.VerifiableProperties)

[2016-10-13 22:11:51,597] INFO Property log.retention.check.interval.ms is overridden to 60000 (kafka.utils.VerifiableProperties)

[2016-10-13 22:11:51,605] INFO Property log.retention.hours is overridden to 168 (kafka.utils.VerifiableProperties)

[2016-10-13 22:11:51,608] INFO Property log.segment.bytes is overridden to 536870912 (kafka.utils.VerifiableProperties)

[2016-10-13 22:11:51,610] INFO Property num.io.threads is overridden to 8 (kafka.utils.VerifiableProperties)

[2016-10-13 22:11:51,610] INFO Property num.network.threads is overridden to 2 (kafka.utils.VerifiableProperties)

[2016-10-13 22:11:51,626] INFO Property num.partitions is overridden to 2 (kafka.utils.VerifiableProperties)

[2016-10-13 22:11:51,627] INFO Property port is overridden to 9092 (kafka.utils.VerifiableProperties)

[2016-10-13 22:11:51,628] INFO Property socket.receive.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)

[2016-10-13 22:11:51,629] INFO Property socket.request.max.bytes is overridden to 104857600 (kafka.utils.VerifiableProperties)

[2016-10-13 22:11:51,635] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)

[2016-10-13 22:11:51,636] INFO Property zookeeper.connect is overridden to localhost:2181 (kafka.utils.VerifiableProperties)

[2016-10-13 22:11:51,637] INFO Property zookeeper.connection.timeout.ms is overridden to 1000000 (kafka.utils.VerifiableProperties)

[2016-10-13 22:11:51,800] INFO [Kafka Server 0], starting (kafka.server.KafkaServer)

[2016-10-13 22:11:51,805] INFO [Kafka Server 0], Connecting to zookeeper on localhost:2181 (kafka.server.KafkaServer)

[2016-10-13 22:11:51,839] INFO Starting ZkClient event thread. (org.I0Itec.zkclient.ZkEventThread)

[2016-10-13 22:11:51,869] INFO Client environment:zookeeper.version=3.3.3-1203054, built on 11/17/2011 05:47 GMT (org.apache.zookeeper.ZooKeeper)

[2016-10-13 22:11:51,870] INFO Client environment:host.name=weekend110 (org.apache.zookeeper.ZooKeeper)

[2016-10-13 22:11:51,870] INFO Client environment:java.version=1.7.0_65 (org.apache.zookeeper.ZooKeeper)

[2016-10-13 22:11:51,870] INFO Client environment:java.vendor=Oracle Corporation (org.apache.zookeeper.ZooKeeper)

[2016-10-13 22:11:51,870] INFO Client environment:java.home=/home/hadoop/app/jdk1.7.0_65/jre (org.apache.zookeeper.ZooKeeper)

[2016-10-13 22:11:51,870] INFO Client environment:java.class.path=:/home/hadoop/app/kafka_2.10-0.8.1.1/bin/../core/build/dependant-libs-2.8.0/*.jar:/home/hadoop/app/kafka_2.10-0.8.1.1/bin/../perf/build/libs//kafka-perf_2.8.0*.jar:/home/hadoop/app/kafka_2.10-0.8.1.1/bin/../clients/build/libs//kafka-clients*.jar:/home/hadoop/app/kafka_2.10-0.8.1.1/bin/../examples/build/libs//kafka-examples*.jar:/home/hadoop/app/kafka_2.10-0.8.1.1/bin/../contrib/hadoop-consumer/build/libs//kafka-hadoop-consumer*.jar:/home/hadoop/app/kafka_2.10-0.8.1.1/bin/../contrib/hadoop-producer/build/libs//kafka-hadoop-producer*.jar:/home/hadoop/app/kafka_2.10-0.8.1.1/bin/../libs/jopt-simple-3.2.jar:/home/hadoop/app/kafka_2.10-0.8.1.1/bin/../libs/kafka_2.10-0.8.1.1.jar:/home/hadoop/app/kafka_2.10-0.8.1.1/bin/../libs/kafka_2.10-0.8.1.1-javadoc.jar:/home/hadoop/app/kafka_2.10-0.8.1.1/bin/../libs/kafka_2.10-0.8.1.1-scaladoc.jar:/home/hadoop/app/kafka_2.10-0.8.1.1/bin/../libs/kafka_2.10-0.8.1.1-sources.jar:/home/hadoop/app/kafka_2.10-0.8.1.1/bin/../libs/log4j-1.2.15.jar:/home/hadoop/app/kafka_2.10-0.8.1.1/bin/../libs/metrics-core-2.2.0.jar:/home/hadoop/app/kafka_2.10-0.8.1.1/bin/../libs/scala-library-2.10.1.jar:/home/hadoop/app/kafka_2.10-0.8.1.1/bin/../libs/slf4j-api-1.7.2.jar:/home/hadoop/app/kafka_2.10-0.8.1.1/bin/../libs/snappy-java-1.0.5.jar:/home/hadoop/app/kafka_2.10-0.8.1.1/bin/../libs/zkclient-0.3.jar:/home/hadoop/app/kafka_2.10-0.8.1.1/bin/../libs/zookeeper-3.3.4.jar:/home/hadoop/app/kafka_2.10-0.8.1.1/bin/../core/build/libs/kafka_2.8.0*.jar (org.apache.zookeeper.ZooKeeper)

[2016-10-13 22:11:51,870] INFO Client environment:java.library.path=/usr/java/packages/lib/i386:/lib:/usr/lib (org.apache.zookeeper.ZooKeeper)

[2016-10-13 22:11:51,871] INFO Client environment:java.io.tmpdir=/tmp (org.apache.zookeeper.ZooKeeper)

[2016-10-13 22:11:51,871] INFO Client environment:java.compiler=<NA> (org.apache.zookeeper.ZooKeeper)

[2016-10-13 22:11:51,871] INFO Client environment:os.name=Linux (org.apache.zookeeper.ZooKeeper)

[2016-10-13 22:11:51,872] INFO Client environment:os.arch=i386 (org.apache.zookeeper.ZooKeeper)

[2016-10-13 22:11:51,872] INFO Client environment:os.version=2.6.32-431.el6.x86_64 (org.apache.zookeeper.ZooKeeper)

[2016-10-13 22:11:51,872] INFO Client environment:user.name=hadoop (org.apache.zookeeper.ZooKeeper)

[2016-10-13 22:11:51,872] INFO Client environment:user.home=/home/hadoop (org.apache.zookeeper.ZooKeeper)

[2016-10-13 22:11:51,872] INFO Client environment:user.dir=/home/hadoop/app/kafka_2.10-0.8.1.1 (org.apache.zookeeper.ZooKeeper)

[2016-10-13 22:11:51,875] INFO Initiating client connection, connectString=localhost:2181 sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@7b7258 (org.apache.zookeeper.ZooKeeper)

[2016-10-13 22:11:51,955] INFO Opening socket connection to server localhost/0:0:0:0:0:0:0:1:2181 (org.apache.zookeeper.ClientCnxn)

[2016-10-13 22:11:51,992] INFO Socket connection established to localhost/0:0:0:0:0:0:0:1:2181, initiating session (org.apache.zookeeper.ClientCnxn)

[2016-10-13 22:11:52,101] INFO Session establishment complete on server localhost/0:0:0:0:0:0:0:1:2181, sessionid = 0x157be612ba00000, negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn)

[2016-10-13 22:11:52,107] INFO zookeeper state changed (SyncConnected) (org.I0Itec.zkclient.ZkClient)

[2016-10-13 22:11:52,587] INFO Starting log cleanup with a period of 60000 ms. (kafka.log.LogManager)

[2016-10-13 22:11:52,597] INFO Starting log flusher with a default period of 9223372036854775807 ms. (kafka.log.LogManager)

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".

SLF4J: Defaulting to no-operation (NOP) logger implementation

SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.

[2016-10-13 22:11:52,707] INFO Awaiting socket connections on 0.0.0.0:9092. (kafka.network.Acceptor)

[2016-10-13 22:11:52,709] INFO [Socket Server on Broker 0], Started (kafka.network.SocketServer)

[2016-10-13 22:11:53,049] INFO Will not load MX4J, mx4j-tools.jar is not in the classpath (kafka.utils.Mx4jLoader$)

[2016-10-13 22:11:53,223] INFO 0 successfully elected as leader (kafka.server.ZookeeperLeaderElector)

[2016-10-13 22:11:53,771] INFO Registered broker 0 at path /brokers/ids/0 with address weekend110:9092. (kafka.utils.ZkUtils$)

[2016-10-13 22:11:53,825] INFO [Kafka Server 0], started (kafka.server.KafkaServer)

[2016-10-13 22:11:53,999] INFO New leader is 0 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)

 

再,打開另外一個終端,使用kafka,便可。

 

 

 

 

[hadoop@weekend110 ~]$ jps

2769 Jps

2692 Kafka

2634 QuorumPeerMain

[hadoop@weekend110 ~]$

 

 

 

 

中止

中止 外部的zookeeper:

在/home/hadoop/app/zookeeper-3.4.6下,執行bin/zkServer.sh stop

 

中止 Kafka server :

$ bin/kafka-server-stop.sh

 

 

 

kafka_2.10-0.8.1.1.tgz的使用介紹:

參考http://liyonghui160com.iteye.com/blog/2105824

建立topic

列出topic

Producer

Comsumer

 

 

 

Kafka的Java api

Kafka客戶端編程

      寫kafka的客戶端,有2個,生產者和消費者。

 

 

weekend110-kafka   ->   Build Path  ->   Configure Build Path 

 

 

這裏,我參考了網上的。http://download.csdn.net/download/alexander_zhou/9192011

 

了手動,避免後續,還得查漏補缺添加jar包。強烈建議Maven。

 

Eclipse下新建Maven項目、自動打依賴jar包

其實啊,用Maven多麼好。

參考連接:Kafka使用Java客戶端進行訪問

 

 三、新建包cn.itcast.kafka

 

四、新建ProducerDemo.java

 

新建ConsumerDemo.java

 

這裏,我就以分佈式集羣的配置,附上代碼。工做中,就是這麼幹的!

 

ProducerDemo.java

 

package cn.itcast.kafka;

 

import java.util.Properties;

 

import kafka.javaapi.producer.Producer;

import kafka.producer.KeyedMessage;

import kafka.producer.ProducerConfig;

 

public class ProducerDemo {

    public static void main(String[] args) throws Exception {

       Properties props = new Properties();

       props.put("zk.connect", "weekend01:2181,weekend02:2181,weekend03:2181");

       props.put("metadata.broker.list","weekend01:9092,weekend02:9092,weekend03:9092");

       props.put("serializer.class", "kafka.serializer.StringEncoder");

       ProducerConfig config = new ProducerConfig(props);

       Producer<String, String> producer = new Producer<String, String>(config);

 

       // 發送業務消息

       // 讀取文件 讀取內存數據庫 讀socket端口

       for (int i = 1; i <= 100; i++) {

           Thread.sleep(500);

           producer.send(new KeyedMessage<String, String>("wordcount",

                  "i said i love you baby for" + i + "times,will you have a nice day with me tomorrow"));

       }

 

    }

}

 

 

package cn.itcast.kafka;

 

import java.util.Properties;

 

import kafka.javaapi.producer.Producer;

import kafka.producer.KeyedMessage;

import kafka.producer.ProducerConfig;

 

public class ProducerDemo {

    public static void main(String[] args) throws Exception {

       Properties props = new Properties();

       props.put("zk.connect", "weekend01:2181,weekend02:2181,weekend03:2181");

       props.put("metadata.broker.list","weekend01:9092,weekend02:9092,weekend03:9092");

       props.put("serializer.class", "kafka.serializer.StringEncoder");

       ProducerConfig config = new ProducerConfig(props);

       Producer<String, String> producer = new Producer<String, String>(config);

 

       // 發送業務消息

       // 讀取文件 讀取內存數據庫 讀socket端口

       for (int i = 1; i <= 100; i++) {

           Thread.sleep(500);

           producer.send(new KeyedMessage<String, String>("wordcount",

                  "i said i love you baby for" + i + "times,will you have a nice day with me tomorrow"));

       }

 

    }

}

 

 

 

 

 

 

 

 

 

 

 

ConsumerDemo.java

 

package cn.itcast.kafka;

 

import java.util.HashMap;

import java.util.List;

import java.util.Map;

import java.util.Properties;

 

import kafka.consumer.Consumer;

import kafka.consumer.ConsumerConfig;

import kafka.consumer.ConsumerIterator;

import kafka.consumer.KafkaStream;

import kafka.javaapi.consumer.ConsumerConnector;

import kafka.message.MessageAndMetadata;

 

public class ConsumerDemo {

    private static final String topic = "mysons";

    private static final Integer threads = 1;

 

    public static void main(String[] args) {

      

       Properties props = new Properties();

       props.put("zookeeper.connect", "weekend01:2181,weekend02:2181,weekend03:2181");

       props.put("group.id", "1111");

       props.put("auto.offset.reset", "smallest");

 

       ConsumerConfig config = new ConsumerConfig(props);

       ConsumerConnector consumer =Consumer.createJavaConsumerConnector(config);

       Map<String, Integer> topicCountMap = new HashMap<String, Integer>();

       topicCountMap.put(topic, 1);

       topicCountMap.put("mygirls", 1);

       topicCountMap.put("myboys", 1);

       Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);

       List<KafkaStream<byte[], byte[]>> streams = consumerMap.get("mygirls");

      

       for(final KafkaStream<byte[], byte[]> kafkaStream : streams){

           new Thread(new Runnable() {

              @Override

              public void run() {

                  for(MessageAndMetadata<byte[], byte[]> mm : kafkaStream){

                     String msg = new String(mm.message());

                     System.out.println(msg);

                  }

              }

          

           }).start();

      

       }

    }

}

 

 

 

啓動zookeeper,這裏不贅述了。真的很簡單

啓動kafka服務

 

 

參考博客: http://www.aboutyun.com/forum.php?mod=viewthread&tid=12847

                Kafka單機、集羣模式安裝詳解()

                Kafka單機、集羣模式安裝詳解()

                Kafka使用Java客戶端進行訪問

 

 

參考博客:http://www.jianshu.com/p/425a7d8735e2

Kafka學習筆記

相關文章
相關標籤/搜索