1:Kafka名詞解釋和工做方式 1.1:Producer :消息生產者,就是向kafka broker發消息的客戶端。 1.2:Consumer :消息消費者,向kafka broker取消息的客戶端 1.3:Topic :能夠理解爲一個隊列。 1.4:Consumer Group (CG):這是kafka用來實現一個topic消息的廣播(發給全部的consumer)和單播(發給任意一個consumer)的手段。一個topic能夠有多個CG。topic的消息會複製(不是真的複製,是概念上的)到全部的CG,但每一個partion只會把消息發給該CG中的一個consumer。若是須要實現廣播,只要每一個consumer有一個獨立的CG就能夠了。要實現單播只要全部的consumer在同一個CG。用CG還能夠將consumer進行自由的分組而不須要屢次發送消息到不一樣的topic。 1.5:Broker :一臺kafka服務器就是一個broker。一個集羣由多個broker組成。一個broker能夠容納多個topic。 1.6:Partition:爲了實現擴展性,一個很是大的topic能夠分佈到多個broker(即服務器)上,一個topic能夠分爲多個partition,每一個partition是一個有序的隊列。partition中的每條消息都會被分配一個有序的id(offset)。kafka只保證按一個partition中的順序將消息發給consumer,不保證一個topic的總體(多個partition間)的順序。 1.7:Offset:kafka的存儲文件都是按照offset.kafka來命名,用offset作名字的好處是方便查找。例如你想找位於2049的位置,只要找到2048.kafka的文件便可。固然the first offset就是00000000000.kafka。 2:Consumer與topic關係?本質上kafka只支持Topic。 2.1:每一個group中能夠有多個consumer,每一個consumer屬於一個consumer group; 一般狀況下,一個group中會包含多個consumer,這樣不只能夠提升topic中消息的併發消費能力,並且還能提升"故障容錯"性,若是group中的某個consumer失效那麼其消費的partitions將會有其餘consumer自動接管。 2.2:對於Topic中的一條特定的消息,只會被訂閱此Topic的每一個group中的其中一個consumer消費,此消息不會發送給一個group的多個consumer; 那麼一個group中全部的consumer將會交錯的消費整個Topic,每一個group中consumer消息消費互相獨立,咱們能夠認爲一個group是一個"訂閱"者。 2.3:在kafka中,一個partition中的消息只會被group中的一個consumer消費(同一時刻); 一個Topic中的每一個partions,只會被一個"訂閱者"中的一個consumer消費,不過一個consumer能夠同時消費多個partitions中的消息。 2.4:kafka的設計原理決定,對於一個topic,同一個group中不能有多於partitions個數的consumer同時消費,不然將意味着某些consumer將沒法獲得消息。 2.5:kafka只能保證一個partition中的消息被某個consumer消費時是順序的;事實上,從Topic角度來講,當有多個partitions時,消息仍不是全局有序的。 3:Kafka消息的分發,Producer客戶端負責消息的分發。 3.1:kafka集羣中的任何一個broker均可以向producer提供metadata信息,這些metadata中包含"集羣中存活的servers列表"/"partitions leader列表"等信息; 3.2:當producer獲取到metadata信息以後, producer將會和Topic下全部partition leader保持socket鏈接; 3.3:消息由producer直接經過socket發送到broker,中間不會通過任何"路由層",事實上,消息被路由到哪一個partition上由producer客戶端決定; 好比能夠採用"random""key-hash""輪詢"等,若是一個topic中有多個partitions,那麼在producer端實現"消息均衡分發"是必要的。 3.4:在producer端的配置文件中,開發者能夠指定partition路由的方式。 3.5:Producer消息發送的應答機制: 設置發送數據是否須要服務端的反饋,三個值0,1,-1。 0: producer不會等待broker發送ack。 1: 當leader接收到消息以後發送ack。 -1: 當全部的follower都同步消息成功後發送ack。 request.required.acks=0。 4:Consumer的負載均衡: 當一個group中,有consumer加入或者離開時,會觸發partitions均衡.均衡的最終目的,是提高topic的併發消費能力:
步驟以下:
a、假如topic1,具備以下partitions: P0,P1,P2,P3。
b、加入group中,有以下consumer: C1,C2。
c、首先根據partition索引號對partitions排序: P0,P1,P2,P3。
d、根據consumer.id排序: C0,C1。
e、計算倍數: M = [P0,P1,P2,P3].size / [C0,C1].size,本例值M=2(向上取整)。
f、而後依次分配partitions: C0 = [P0,P1],C1=[P2,P3],即Ci = [P(i * M),P((i + 1) * M -1)]。html
6:Kafka文件存儲基本結構:java
6.1:在Kafka文件存儲中,同一個topic下有多個不一樣partition,每一個partition爲一個目錄,partiton命名規則爲topic名稱+有序序號,第一個partiton序號從0開始,序號最大值爲partitions數量減1。
6.2:每一個partion(目錄)至關於一個巨型文件被平均分配到多個大小相等segment(段)數據文件中。但每一個段segment file消息數量不必定相等,這種特性方便old segment file快速被刪除。默認保留7天的數據。apache
6.3:每一個partiton只須要支持順序讀寫就好了,segment文件生命週期由服務端配置參數決定。(何時建立,何時刪除)。json
1:使用Idea進行開發,源碼以下所示,首先加入Kafka必須依賴的包,這句話意味着你必需要先在Idea上面搭建好的你的maven環境:api
pom.xml以下所示內容:服務器
1 <?xml version="1.0" encoding="UTF-8"?> 2 <project xmlns="http://maven.apache.org/POM/4.0.0" 3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 4 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 5 <modelVersion>4.0.0</modelVersion> 6 7 <groupId>com.bie</groupId> 8 <artifactId>storm</artifactId> 9 <version>1.0-SNAPSHOT</version> 10 11 <!-- storm的依賴關係 --> 12 <dependencies> 13 <!--storm依賴的包--> 14 <dependency> 15 <groupId>org.apache.storm</groupId> 16 <artifactId>storm-core</artifactId> 17 <version>0.9.5</version> 18 <!--<scope>provided</scope>--> 19 </dependency> 20 <!-- kafka依賴的包--> 21 <dependency> 22 <groupId>org.apache.kafka</groupId> 23 <artifactId>kafka_2.8.2</artifactId> 24 <version>0.8.1</version> 25 <exclusions> 26 <exclusion> 27 <artifactId>jmxtools</artifactId> 28 <groupId>com.sun.jdmk</groupId> 29 </exclusion> 30 <exclusion> 31 <artifactId>jmxri</artifactId> 32 <groupId>com.sun.jmx</groupId> 33 </exclusion> 34 <exclusion> 35 <artifactId>jms</artifactId> 36 <groupId>javax.jms</groupId> 37 </exclusion> 38 <exclusion> 39 <groupId>org.apache.zookeeper</groupId> 40 <artifactId>zookeeper</artifactId> 41 </exclusion> 42 <exclusion> 43 <groupId>org.slf4j</groupId> 44 <artifactId>slf4j-log4j12</artifactId> 45 </exclusion> 46 <exclusion> 47 <groupId>org.slf4j</groupId> 48 <artifactId>slf4j-api</artifactId> 49 </exclusion> 50 </exclusions> 51 </dependency> 52 </dependencies> 53 54 <!--若是依賴外部包,就打不進去外部包,因此須要引入下面所示--> 55 <build> 56 <plugins> 57 <plugin> 58 <!--把其餘外部依賴的jar包打成一個大jar包--> 59 <artifactId>maven-assembly-plugin</artifactId> 60 <configuration> 61 <descriptorRefs> 62 <descriptorRef>jar-with-dependencies</descriptorRef> 63 </descriptorRefs> 64 <archive> 65 <manifest> 66 <mainClass>com.bie.wordcount.WordCountTopologyMain</mainClass> 67 </manifest> 68 </archive> 69 </configuration> 70 <executions> 71 <execution> 72 <id>make-assembly</id> 73 <phase>package</phase> 74 <goals> 75 <goal>single</goal> 76 </goals> 77 </execution> 78 </executions> 79 </plugin> 80 <plugin> 81 <groupId>org.apache.maven.plugins</groupId> 82 <artifactId>maven-compiler-plugin</artifactId> 83 <configuration> 84 <source>1.7</source> 85 <target>1.7</target> 86 </configuration> 87 </plugin> 88 </plugins> 89 </build> 90 91 92 </project>
而後呢,書寫你的生產者源碼,以下所示:併發
package com.bie.kafka; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; import java.util.Properties; import java.util.UUID; /** * 這是一個簡單的Kafka producer代碼 * 包含兩個功能: * 一、數據發送 * 二、數據按照自定義的partition策略進行發送 * * * KafkaSpout的類 */ public class KafkaProducerSimple { public static void main(String[] args) { /** * 一、指定當前kafka producer生產的數據的目的地 * 建立topic能夠輸入如下命令,在kafka集羣的任一節點進行建立。 * bin/kafka-topics.sh --create --zookeeper master:2181 * --replication-factor 1 --partitions 1 --topic orderMq */ String TOPIC = "orderMq8"; /** * 二、讀取配置文件 */ Properties props = new Properties(); /* * key.serializer.class默認爲serializer.class */ props.put("serializer.class", "kafka.serializer.StringEncoder"); /* * kafka broker對應的主機,格式爲host1:port1,host2:port2 */ props.put("metadata.broker.list", "master:9092,slaver1:9092,slaver2:9092"); /* * request.required.acks,設置發送數據是否須要服務端的反饋,有三個值0,1,-1 * 0,意味着producer永遠不會等待一個來自broker的ack,這就是0.7版本的行爲。 * 這個選項提供了最低的延遲,可是持久化的保證是最弱的,當server掛掉的時候會丟失一些數據。 * 1,意味着在leader replica已經接收到數據後,producer會獲得一個ack。 * 這個選項提供了更好的持久性,由於在server確認請求成功處理後,client纔會返回。 * 若是剛寫到leader上,還沒來得及複製leader就掛了,那麼消息纔可能會丟失。 * -1,意味着在全部的ISR都接收到數據後,producer才獲得一個ack。 * 這個選項提供了最好的持久性,只要還有一個replica存活,那麼數據就不會丟失 */ props.put("request.required.acks", "1"); /* * 可選配置,若是不配置,則使用默認的partitioner partitioner.class * 默認值:kafka.producer.DefaultPartitioner * 用來把消息分到各個partition中,默認行爲是對key進行hash。 */ props.put("partitioner.class", "com.bie.kafka.MyLogPartitioner"); //props.put("partitioner.class", "kafka.producer.DefaultPartitioner"); /** * 三、經過配置文件,建立生產者 */ Producer<String, String> producer = new Producer<String, String>(new ProducerConfig(props)); /** * 四、經過for循環生產數據 */ for (int messageNo = 1; messageNo < 100000; messageNo++) { String messageStr = new String(messageNo + "注意:這裏須要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發注意:這裏須要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發注意:這裏須要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發注意:這裏須要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發注意:這裏須要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發注意:這裏須要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發注意:這裏須要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發注意:這裏須要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發注意:這裏須要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發注意:這裏須要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發注意:這裏須要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發注意:這裏須要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發注意:這裏須要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發注意:這裏須要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發注意:這裏須要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發注意:這裏須要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發注意:這裏須要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發注意:這裏須要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發注意:這裏須要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發注意:這裏須要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發注意:這裏須要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發注意:這裏須要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發注意:這裏須要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發注意:這裏須要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發注意:這裏須要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發注意:這裏須要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發注意:這裏須要指定 partitionKey," + "注意:這裏須要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發注意:這裏須要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發注意:這裏須要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發" + "注意:這裏須要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發注意:這裏須要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發注意:這裏須要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發" + "注意:這裏須要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發注意:這裏須要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發" + "注意:這裏須要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發注意:這裏須要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發注意:這裏須要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發" + "注意:這裏須要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發注意:這裏須要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發注意:這裏須要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發注意:這裏須要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發" + "注意:這裏須要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發注意:這裏須要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發注意:這裏須要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發" + "注意:這裏須要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發注意:這裏須要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發注意:這裏須要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發" + "用來配合自定義的MyLogPartitioner進行數據分發"); /** * 五、調用producer的send方法發送數據 * 注意:這裏須要指定 partitionKey,用來配合自定義的MyLogPartitioner進行數據分發 */ producer.send(new KeyedMessage<String, String>(TOPIC, messageNo + "", "appid" + UUID.randomUUID() + messageStr)); //producer.send(new KeyedMessage<String, String>(TOPIC, messageNo + "", "appid" + UUID.randomUUID() + "biexiansheng")); } } }
生產者須要的Partitioner以下所示內容:app
package com.bie.kafka; import kafka.producer.Partitioner; import kafka.utils.VerifiableProperties; import org.apache.log4j.Logger; public class MyLogPartitioner implements Partitioner { private static Logger logger = Logger.getLogger(MyLogPartitioner.class); public MyLogPartitioner(VerifiableProperties props) { } public int partition(Object obj, int numPartitions) { return Integer.parseInt(obj.toString())%numPartitions; // return 1; } }
生產者運行效果以下所示:負載均衡
消費者代碼以下所示:dom
package com.bie.kafka; import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import kafka.message.MessageAndMetadata; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class KafkaConsumerSimple implements Runnable { public String title; public KafkaStream<byte[], byte[]> stream; public KafkaConsumerSimple(String title, KafkaStream<byte[], byte[]> stream) { this.title = title; this.stream = stream; } @Override public void run() { System.out.println("開始運行 " + title); ConsumerIterator<byte[], byte[]> it = stream.iterator(); /** * 不停地從stream讀取新到來的消息,在等待新的消息時,hasNext()會阻塞 * 若是調用 `ConsumerConnector#shutdown`,那麼`hasNext`會返回false * */ while (it.hasNext()) { MessageAndMetadata<byte[], byte[]> data = it.next(); Object topic = data.topic(); int partition = data.partition(); long offset = data.offset(); String msg = new String(data.message()); System.out.println(String.format( "Consumer: [%s], Topic: [%s], PartitionId: [%d], Offset: [%d], msg: [%s]", title, topic, partition, offset, msg)); } System.out.println(String.format("Consumer: [%s] exiting ...", title)); } public static void main(String[] args) throws Exception{ Properties props = new Properties(); props.put("group.id", "biexiansheng"); props.put("zookeeper.connect", "master:2181,slaver1:2181,slaver2:2181"); props.put("auto.offset.reset", "largest"); props.put("auto.commit.interval.ms", "1000"); props.put("partition.assignment.strategy", "roundrobin"); ConsumerConfig config = new ConsumerConfig(props); String topic1 = "orderMq8"; //String topic2 = "paymentMq"; //只要ConsumerConnector還在的話,consumer會一直等待新消息,不會本身退出 ConsumerConnector consumerConn = Consumer.createJavaConsumerConnector(config); //定義一個map Map<String, Integer> topicCountMap = new HashMap<>(); topicCountMap.put(topic1, 3); //Map<String, List<KafkaStream<byte[], byte[]>> 中String是topic, List<KafkaStream<byte[], byte[]>是對應的流 Map<String, List<KafkaStream<byte[], byte[]>>> topicStreamsMap = consumerConn.createMessageStreams(topicCountMap); //取出 `kafkaTest` 對應的 streams List<KafkaStream<byte[], byte[]>> streams = topicStreamsMap.get(topic1); //建立一個容量爲4的線程池 ExecutorService executor = Executors.newFixedThreadPool(3); //建立20個consumer threads for (int i = 0; i < streams.size(); i++) { executor.execute(new KafkaConsumerSimple("消費者" + (i + 1), streams.get(i))); } } }
消費者運行以下所示:
運行消費者出現下面的錯誤,解決方法將pomx.ml裏面的zookeeper配置註釋了便可:
錯誤以下所示:
1 D:\soft\Java\jdk1.7.0_80\bin\java -javaagent:E:\360Downloads\idea\lib\idea_rt.jar=61635:E:\360Downloads\idea\bin -Dfile.encoding=UTF-8 -classpath D:\soft\Java\jdk1.7.0_80\jre\lib\charsets.jar;D:\soft\Java\jdk1.7.0_80\jre\lib\deploy.jar;D:\soft\Java\jdk1.7.0_80\jre\lib\ext\access-bridge-64.jar;D:\soft\Java\jdk1.7.0_80\jre\lib\ext\dnsns.jar;D:\soft\Java\jdk1.7.0_80\jre\lib\ext\jaccess.jar;D:\soft\Java\jdk1.7.0_80\jre\lib\ext\localedata.jar;D:\soft\Java\jdk1.7.0_80\jre\lib\ext\sunec.jar;D:\soft\Java\jdk1.7.0_80\jre\lib\ext\sunjce_provider.jar;D:\soft\Java\jdk1.7.0_80\jre\lib\ext\sunmscapi.jar;D:\soft\Java\jdk1.7.0_80\jre\lib\ext\zipfs.jar;D:\soft\Java\jdk1.7.0_80\jre\lib\javaws.jar;D:\soft\Java\jdk1.7.0_80\jre\lib\jce.jar;D:\soft\Java\jdk1.7.0_80\jre\lib\jfr.jar;D:\soft\Java\jdk1.7.0_80\jre\lib\jfxrt.jar;D:\soft\Java\jdk1.7.0_80\jre\lib\jsse.jar;D:\soft\Java\jdk1.7.0_80\jre\lib\management-agent.jar;D:\soft\Java\jdk1.7.0_80\jre\lib\plugin.jar;D:\soft\Java\jdk1.7.0_80\jre\lib\resources.jar;D:\soft\Java\jdk1.7.0_80\jre\lib\rt.jar;E:\360Downloads\idea\storm\target\classes;E:\maven\repository\org\apache\storm\storm-core\0.9.5\storm-core-0.9.5.jar;E:\maven\repository\org\clojure\clojure\1.5.1\clojure-1.5.1.jar;E:\maven\repository\clj-time\clj-time\0.4.1\clj-time-0.4.1.jar;E:\maven\repository\joda-time\joda-time\2.0\joda-time-2.0.jar;E:\maven\repository\compojure\compojure\1.1.3\compojure-1.1.3.jar;E:\maven\repository\org\clojure\core.incubator\0.1.0\core.incubator-0.1.0.jar;E:\maven\repository\org\clojure\tools.macro\0.1.0\tools.macro-0.1.0.jar;E:\maven\repository\clout\clout\1.0.1\clout-1.0.1.jar;E:\maven\repository\ring\ring-core\1.1.5\ring-core-1.1.5.jar;E:\maven\repository\commons-fileupload\commons-fileupload\1.2.1\commons-fileupload-1.2.1.jar;E:\maven\repository\javax\servlet\servlet-api\2.5\servlet-api-2.5.jar;E:\maven\repository\hiccup\hiccup\0.3.6\hiccup-0.3.6.jar;E:\maven\repository\ring\ring-devel\0.3.11\ring-devel-0.3.11.jar;E:\maven\repository\clj-stacktrace\clj-stacktrace\0.2.2\clj-stacktrace-0.2.2.jar;E:\maven\repository\ring\ring-jetty-adapter\0.3.11\ring-jetty-adapter-0.3.11.jar;E:\maven\repository\ring\ring-servlet\0.3.11\ring-servlet-0.3.11.jar;E:\maven\repository\org\mortbay\jetty\jetty\6.1.26\jetty-6.1.26.jar;E:\maven\repository\org\mortbay\jetty\jetty-util\6.1.26\jetty-util-6.1.26.jar;E:\maven\repository\org\clojure\tools.logging\0.2.3\tools.logging-0.2.3.jar;E:\maven\repository\org\clojure\math.numeric-tower\0.0.1\math.numeric-tower-0.0.1.jar;E:\maven\repository\org\clojure\tools.cli\0.2.4\tools.cli-0.2.4.jar;E:\maven\repository\commons-io\commons-io\2.4\commons-io-2.4.jar;E:\maven\repository\org\apache\commons\commons-exec\1.1\commons-exec-1.1.jar;E:\maven\repository\commons-lang\commons-lang\2.5\commons-lang-2.5.jar;E:\maven\repository\com\googlecode\json-simple\json-simple\1.1\json-simple-1.1.jar;E:\maven\repository\com\twitter\carbonite\1.4.0\carbonite-1.4.0.jar;E:\maven\repository\com\esotericsoftware\kryo\kryo\2.21\kryo-2.21.jar;E:\maven\repository\com\esotericsoftware\reflectasm\reflectasm\1.07\reflectasm-1.07-shaded.jar;E:\maven\repository\org\ow2\asm\asm\4.0\asm-4.0.jar;E:\maven\repository\com\esotericsoftware\minlog\minlog\1.2\minlog-1.2.jar;E:\maven\repository\org\objenesis\objenesis\1.2\objenesis-1.2.jar;E:\maven\repository\com\twitter\chill-java\0.3.5\chill-java-0.3.5.jar;E:\maven\repository\org\yaml\snakeyaml\1.11\snakeyaml-1.11.jar;E:\maven\repository\commons-logging\commons-logging\1.1.3\commons-logging-1.1.3.jar;E:\maven\repository\commons-codec\commons-codec\1.6\commons-codec-1.6.jar;E:\maven\repository\com\googlecode\disruptor\disruptor\2.10.1\disruptor-2.10.1.jar;E:\maven\repository\org\jgrapht\jgrapht-core\0.9.0\jgrapht-core-0.9.0.jar;E:\maven\repository\ch\qos\logback\logback-classic\1.0.13\logback-classic-1.0.13.jar;E:\maven\repository\ch\qos\logback\logback-core\1.0.13\logback-core-1.0.13.jar;E:\maven\repository\org\slf4j\slf4j-api\1.7.5\slf4j-api-1.7.5.jar;E:\maven\repository\org\slf4j\log4j-over-slf4j\1.6.6\log4j-over-slf4j-1.6.6.jar;E:\maven\repository\jline\jline\2.11\jline-2.11.jar;E:\maven\repository\org\apache\kafka\kafka_2.8.2\0.8.1\kafka_2.8.2-0.8.1.jar;E:\maven\repository\org\scala-lang\scala-library\2.8.2\scala-library-2.8.2.jar;E:\maven\repository\com\yammer\metrics\metrics-annotation\2.2.0\metrics-annotation-2.2.0.jar;E:\maven\repository\com\yammer\metrics\metrics-core\2.2.0\metrics-core-2.2.0.jar;E:\maven\repository\org\xerial\snappy\snappy-java\1.0.5\snappy-java-1.0.5.jar;E:\maven\repository\net\sf\jopt-simple\jopt-simple\3.2\jopt-simple-3.2.jar;E:\maven\repository\com\101tec\zkclient\0.3\zkclient-0.3.jar;E:\maven\repository\log4j\log4j\1.2.14\log4j-1.2.14.jar com.bie.kafka.KafkaConsumerSimple 2 260 [main] INFO kafka.utils.VerifiableProperties - Verifying properties 3 311 [main] INFO kafka.utils.VerifiableProperties - Property auto.commit.interval.ms is overridden to 1000 4 311 [main] INFO kafka.utils.VerifiableProperties - Property auto.offset.reset is overridden to largest 5 311 [main] INFO kafka.utils.VerifiableProperties - Property group.id is overridden to biexiansheng 6 312 [main] WARN kafka.utils.VerifiableProperties - Property partition.assignment.strategy is not valid 7 312 [main] INFO kafka.utils.VerifiableProperties - Property zookeeper.connect is overridden to master:2181,slaver1:2181,slaver2:2181 8 448 [main] INFO kafka.consumer.ZookeeperConsumerConnector - [biexiansheng_HY-201707051724-1516692275031-bffb9bfb], Connecting to zookeeper instance at master:2181,slaver1:2181,slaver2:2181 9 Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/zookeeper/Watcher 10 at java.lang.ClassLoader.defineClass1(Native Method) 11 at java.lang.ClassLoader.defineClass(ClassLoader.java:800) 12 at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) 13 at java.net.URLClassLoader.defineClass(URLClassLoader.java:449) 14 at java.net.URLClassLoader.access$100(URLClassLoader.java:71) 15 at java.net.URLClassLoader$1.run(URLClassLoader.java:361) 16 at java.net.URLClassLoader$1.run(URLClassLoader.java:355) 17 at java.security.AccessController.doPrivileged(Native Method) 18 at java.net.URLClassLoader.findClass(URLClassLoader.java:354) 19 at java.lang.ClassLoader.loadClass(ClassLoader.java:425) 20 at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) 21 at java.lang.ClassLoader.loadClass(ClassLoader.java:358) 22 at kafka.consumer.ZookeeperConsumerConnector.connectZk(ZookeeperConsumerConnector.scala:156) 23 at kafka.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:114) 24 at kafka.javaapi.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:65) 25 at kafka.javaapi.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:67) 26 at kafka.consumer.Consumer$.createJavaConsumerConnector(ConsumerConnector.scala:100) 27 at kafka.consumer.Consumer.createJavaConsumerConnector(ConsumerConnector.scala) 28 at com.bie.kafka.KafkaConsumerSimple.main(KafkaConsumerSimple.java:58) 29 Caused by: java.lang.ClassNotFoundException: org.apache.zookeeper.Watcher 30 at java.net.URLClassLoader$1.run(URLClassLoader.java:366) 31 at java.net.URLClassLoader$1.run(URLClassLoader.java:355) 32 at java.security.AccessController.doPrivileged(Native Method) 33 at java.net.URLClassLoader.findClass(URLClassLoader.java:354) 34 at java.lang.ClassLoader.loadClass(ClassLoader.java:425) 35 at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) 36 at java.lang.ClassLoader.loadClass(ClassLoader.java:358) 37 ... 19 more 38 39 Process finished with exit code 1
運行效果以下所示:
待續......