storm與kafka單機功能整合很順利,可是到了storm集羣環境和數據處理性能時則出現了一些問題,現將測試過程和問題簡單記錄以下: java
性能指標:每分鐘處理至少100萬的信息(csv格式,100bytes左右),信息解析後持久化到DB中。 shell
架構設計:flume讀取文件緩存到kafka隊列後消費到storm中 緩存
問題: 架構
1、storm集羣任務調度時出現以下問題,具體日誌見下: 併發
2014-09-24 16:47:38 b.s.m.n.Client [INFO] Reconnect started for Netty-Client-computer7-62/ip:6706... [8] 2014-09-24 16:47:38 b.s.m.n.Client [INFO] connection established to a remote host Netty-Client-computer7-62/ip:6706, [id: 0x0b596170, /ip:34836 => computer7-62/ip:6706] 2014-09-24 16:47:38 b.s.m.n.Client [INFO] Closing Netty Client Netty-Client-computer7-60/ip:6706 2014-09-24 16:47:38 b.s.m.n.StormClientErrorHandler [INFO] Connection failed Netty-Client-computer7-60/ip:6706 java.nio.channels.ClosedChannelException: null at org.jboss.netty.channel.socket.nio.NioWorker.cleanUpWriteBuffer(NioWorker.java:649) ~[netty-3.2.2.Final.jar:na] at org.jboss.netty.channel.socket.nio.NioWorker.writeFromUserCode(NioWorker.java:370) ~[netty-3.2.2.Final.jar:na] at org.jboss.netty.channel.socket.nio.NioClientSocketPipelineSink.eventSunk(NioClientSocketPipelineSink.java:117) ~[netty-3.2.2.Final.jar:na] at org.jboss.netty.channel.Channels.write(Channels.java:632) [netty-3.2.2.Final.jar:na] at org.jboss.netty.handler.codec.oneone.OneToOneEncoder.handleDownstream(OneToOneEncoder.java:70) ~[netty-3.2.2.Final.jar:na] at org.jboss.netty.channel.Channels.write(Channels.java:611) [netty-3.2.2.Final.jar:na] at org.jboss.netty.channel.Channels.write(Channels.java:578) [netty-3.2.2.Final.jar:na] at org.jboss.netty.channel.AbstractChannel.write(AbstractChannel.java:259) [netty-3.2.2.Final.jar:na] at backtype.storm.messaging.netty.Client.flushRequest(Client.java:328) [storm-core-0.9.2-incubating.jar:0.9.2-incubating] at backtype.storm.messaging.netty.Client.close(Client.java:272) [storm-core-0.9.2-incubating.jar:0.9.2-incubating] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.6.0_24] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) ~[na:1.6.0_24] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.6.0_24] at java.lang.reflect.Method.invoke(Method.java:616) ~[na:1.6.0_24] at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:93) [clojure-1.5.1.jar:na] at clojure.lang.Reflector.invokeNoArgInstanceMember(Reflector.java:298) [clojure-1.5.1.jar:na] at backtype.storm.daemon.worker$mk_refresh_connections$this__5887.invoke(worker.clj:284) [storm-core-0.9.2-incubating.jar:0.9.2-incubating] at backtype.storm.daemon.worker$mk_refresh_connections$this__5887.invoke(worker.clj:250) [storm-core-0.9.2-incubating.jar:0.9.2-incubating] at backtype.storm.timer$schedule_recurring$this__1134.invoke(timer.clj:99) [storm-core-0.9.2-incubating.jar:0.9.2-incubating] at backtype.storm.timer$mk_timer$fn__1117$fn__1118.invoke(timer.clj:50) [storm-core-0.9.2-incubating.jar:0.9.2-incubating] at backtype.storm.timer$mk_timer$fn__1117.invoke(timer.clj:42) [storm-core-0.9.2-incubating.jar:0.9.2-incubating] at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na] at java.lang.Thread.run(Thread.java:679) [na:1.6.0_24] 2014-09-24 16:47:38 b.s.m.n.Client [INFO] failed to send requests to computer7-60/ip:6706: java.nio.channels.ClosedChannelException: null at org.jboss.netty.channel.socket.nio.NioWorker.cleanUpWriteBuffer(NioWorker.java:649) ~[netty-3.2.2.Final.jar:na] at org.jboss.netty.channel.socket.nio.NioWorker.writeFromUserCode(NioWorker.java:370) ~[netty-3.2.2.Final.jar:na] at org.jboss.netty.channel.socket.nio.NioClientSocketPipelineSink.eventSunk(NioClientSocketPipelineSink.java:117) ~[netty-3.2.2.Final.jar:na] at org.jboss.netty.channel.Channels.write(Channels.java:632) ~[netty-3.2.2.Final.jar:na] at org.jboss.netty.handler.codec.oneone.OneToOneEncoder.handleDownstream(OneToOneEncoder.java:70) ~[netty-3.2.2.Final.jar:na] at org.jboss.netty.channel.Channels.write(Channels.java:611) ~[netty-3.2.2.Final.jar:na] at org.jboss.netty.channel.Channels.write(Channels.java:578) ~[netty-3.2.2.Final.jar:na] at org.jboss.netty.channel.AbstractChannel.write(AbstractChannel.java:259) ~[netty-3.2.2.Final.jar:na] at backtype.storm.messaging.netty.Client.flushRequest(Client.java:328) [storm-core-0.9.2-incubating.jar:0.9.2-incubating] at backtype.storm.messaging.netty.Client.close(Client.java:272) [storm-core-0.9.2-incubating.jar:0.9.2-incubating] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.6.0_24] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) ~[na:1.6.0_24] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.6.0_24] at java.lang.reflect.Method.invoke(Method.java:616) ~[na:1.6.0_24] at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:93) [clojure-1.5.1.jar:na] at clojure.lang.Reflector.invokeNoArgInstanceMember(Reflector.java:298) [clojure-1.5.1.jar:na] at backtype.storm.daemon.worker$mk_refresh_connections$this__5887.invoke(worker.clj:284) [storm-core-0.9.2-incubating.jar:0.9.2-incubating] at backtype.storm.daemon.worker$mk_refresh_connections$this__5887.invoke(worker.clj:250) [storm-core-0.9.2-incubating.jar:0.9.2-incubating] at backtype.storm.timer$schedule_recurring$this__1134.invoke(timer.clj:99) [storm-core-0.9.2-incubating.jar:0.9.2-incubating] at backtype.storm.timer$mk_timer$fn__1117$fn__1118.invoke(timer.clj:50) [storm-core-0.9.2-incubating.jar:0.9.2-incubating] at backtype.storm.timer$mk_timer$fn__1117.invoke(timer.clj:42) [storm-core-0.9.2-incubating.jar:0.9.2-incubating] at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na] at java.lang.Thread.run(Thread.java:679) [na:1.6.0_24] 2014-09-24 16:47:38 b.s.m.n.Client [INFO] Waiting for pending batchs to be sent with Netty-Client-computer7-60/ip:6706..., timeout: 600000ms, pendings: 0 2014-09-24 16:47:38 b.s.util [ERROR] Async loop died! java.lang.RuntimeException: java.lang.RuntimeException: Client is being closed, and does not take requests any more at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating] at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating] at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating] at backtype.storm.disruptor$consume_loop_STAR_$fn__758.invoke(disruptor.clj:94) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating] at backtype.storm.util$async_loop$fn__457.invoke(util.clj:431) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating] at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na] at java.lang.Thread.run(Thread.java:679) [na:1.6.0_24] Caused by: java.lang.RuntimeException: Client is being closed, and does not take requests any more at backtype.storm.messaging.netty.Client.send(Client.java:194) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating] at backtype.storm.utils.TransferDrainer.send(TransferDrainer.java:54) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating] at backtype.storm.daemon.worker$mk_transfer_tuples_handler$fn__5927$fn__5928.invoke(worker.clj:322) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating] at backtype.storm.daemon.worker$mk_transfer_tuples_handler$fn__5927.invoke(worker.clj:320) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating] at backtype.storm.disruptor$clojure_handler$reify__745.onEvent(disruptor.clj:58) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating] at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating] ... 6 common frames omitted 2014-09-24 16:47:38 b.s.m.n.Client [INFO] New Netty Client, connect to computer7-60, 6706, config: , buffer_size: 52
2、kafka性能瓶頸 socket
kafka與storm整合時數據處理性能不是很好,未達到預期要求。一開始懷疑是kafkaspout代碼問題,可是storm external中已經將其收錄進來,感受問題應該不是出在這裏。後來看了一下kafkaspout實現,找到了可能的性能瓶頸點。kafka在設計時,爲了增長併發訪問及處理性能,在topic中加入了partitions屬性,也就是將數據打散,提升併發與處理性能。因爲隊列信息offset是在客戶端維護,kafkaspout在解決併發互斥時採用task與partitions一一對應的方式來解決互斥訪問。topology在使用時,kafkaspout的併發度能夠根據具體topic的partitions屬性來設定。這樣經過增長topic partitions和併發度(8),達到了預期的處理性能。 async
由此聯想,以前遇到的flume緩存到kafka隊列的問題也多是partitions設定方式問題致使,後續再測試驗證一下。 高併發