目錄java
@(博客文章)[storm|大數據]node
待補充服務器
完整的默認配置文件見下面defaluts.yaml,若須要修改,則在storm.yaml中修改。重要參數以下:
一、storm.zookeeper.servers:指定使用哪一個zookeeper集羣網絡
storm.zookeeper.servers: - "gdc-nn01-test" - "gdc-dn01-test" - "gdc-dn02-test」
二、nimbus.host:指定nimbus是哪臺機器session
nimbus.host: "gdc-nn01-test」
三、指定supervisor在哪一個端口上運行worker,每一個端口可運行一個worker,所以有多少個配置端口,則每一個supervisor有多少個slot(便可運行多少個worker)app
supervisor.slots.ports: - 6700 - 6701 - 6702 - 6703 storm.local.dir: "/home/hadoop/storm/data"
四、jvm設置dom
#jvm setting nimbus.childopts:"-4096m」 supervisor.childopts:"-Xmx4096m" nimubs.childopts:"-Xmx3072m」
除此外,還有ui.childopts,logviewer.childoptsjvm
附完整配置文件:defaults.yamlide
########### These all have default values as shown ########### Additional configuration goes into storm.yaml java.library.path: "/usr/local/lib:/opt/local/lib:/usr/lib" ### storm.* configs are general configurations # the local dir is where jars are kept storm.local.dir: "storm-local" storm.zookeeper.servers: - "localhost" storm.zookeeper.port: 2181 storm.zookeeper.root: "/storm" storm.zookeeper.session.timeout: 20000 storm.zookeeper.connection.timeout: 15000 storm.zookeeper.retry.times: 5 storm.zookeeper.retry.interval: 1000 storm.zookeeper.retry.intervalceiling.millis: 30000 storm.cluster.mode: "distributed" # can be distributed or local storm.local.mode.zmq: false storm.thrift.transport: "backtype.storm.security.auth.SimpleTransportPlugin" storm.messaging.transport: "backtype.storm.messaging.netty.Context" storm.meta.serialization.delegate: "backtype.storm.serialization.DefaultSerializationDelegate" ### nimbus.* configs are for the master nimbus.host: "localhost" nimbus.thrift.port: 6627 nimbus.thrift.max_buffer_size: 1048576 nimbus.childopts: "-Xmx1024m" nimbus.task.timeout.secs: 30 nimbus.supervisor.timeout.secs: 60 nimbus.monitor.freq.secs: 10 nimbus.cleanup.inbox.freq.secs: 600 nimbus.inbox.jar.expiration.secs: 3600 nimbus.task.launch.secs: 120 nimbus.reassign: true nimbus.file.copy.expiration.secs: 600 nimbus.topology.validator: "backtype.storm.nimbus.DefaultTopologyValidator" ### ui.* configs are for the master ui.port: 8080 ui.childopts: "-Xmx768m" logviewer.port: 8000 logviewer.childopts: "-Xmx128m" logviewer.appender.name: "A1" drpc.port: 3772 drpc.worker.threads: 64 drpc.queue.size: 128 drpc.invocations.port: 3773 drpc.request.timeout.secs: 600 drpc.childopts: "-Xmx768m" transactional.zookeeper.root: "/transactional" transactional.zookeeper.servers: null transactional.zookeeper.port: null ### supervisor.* configs are for node supervisors # Define the amount of workers that can be run on this machine. Each worker is assigned a port to use for communication supervisor.slots.ports: - 6700 - 6701 - 6702 - 6703 supervisor.childopts: "-Xmx256m" #how long supervisor will wait to ensure that a worker process is started supervisor.worker.start.timeout.secs: 120 #how long between heartbeats until supervisor considers that worker dead and tries to restart it supervisor.worker.timeout.secs: 30 #how frequently the supervisor checks on the status of the processes it's monitoring and restarts if necessary supervisor.monitor.frequency.secs: 3 #how frequently the supervisor heartbeats to the cluster state (for nimbus) supervisor.heartbeat.frequency.secs: 5 supervisor.enable: true ### worker.* configs are for task workers worker.childopts: "-Xmx768m" worker.heartbeat.frequency.secs: 1 # control how many worker receiver threads we need per worker topology.worker.receiver.thread.count: 1 task.heartbeat.frequency.secs: 3 task.refresh.poll.secs: 10 zmq.threads: 1 zmq.linger.millis: 5000 zmq.hwm: 0 storm.messaging.netty.server_worker_threads: 1 storm.messaging.netty.client_worker_threads: 1 storm.messaging.netty.buffer_size: 5242880 #5MB buffer # Since nimbus.task.launch.secs and supervisor.worker.start.timeout.secs are 120, other workers should also wait at least that long before giving up on connecting to the other worker. The reconnection period need also be bigger than storm.zookeeper.session.timeout(default is 20s), so that we can abort the reconnection when the target worker is dead. storm.messaging.netty.max_retries: 30 storm.messaging.netty.max_wait_ms: 1000 storm.messaging.netty.min_wait_ms: 100 # If the Netty messaging layer is busy(netty internal buffer not writable), the Netty client will try to batch message as more as possible up to the size of storm.messaging.netty.transfer.batch.size bytes, otherwise it will try to flush message as soon as possible to reduce latency. storm.messaging.netty.transfer.batch.size: 262144 # We check with this interval that whether the Netty channel is writable and try to write pending messages if it is. storm.messaging.netty.flush.check.interval.ms: 10 ### topology.* configs are for specific executing storms topology.enable.message.timeouts: true topology.debug: false topology.workers: 1 topology.acker.executors: null topology.tasks: null # maximum amount of time a message has to complete before it's considered failed topology.message.timeout.secs: 30 topology.multilang.serializer: "backtype.storm.multilang.JsonSerializer" topology.skip.missing.kryo.registrations: false topology.max.task.parallelism: null topology.max.spout.pending: null topology.state.synchronization.timeout.secs: 60 topology.stats.sample.rate: 0.05 topology.builtin.metrics.bucket.size.secs: 60 topology.fall.back.on.java.serialization: true topology.worker.childopts: null topology.executor.receive.buffer.size: 1024 #batched topology.executor.send.buffer.size: 1024 #individual messages topology.receiver.buffer.size: 8 # setting it too high causes a lot of problems (heartbeat thread gets starved, throughput plummets) topology.transfer.buffer.size: 1024 # batched topology.tick.tuple.freq.secs: null topology.worker.shared.thread.pool.size: 4 topology.disruptor.wait.strategy: "com.lmax.disruptor.BlockingWaitStrategy" topology.spout.wait.strategy: "backtype.storm.spout.SleepSpoutWaitStrategy" topology.sleep.spout.wait.strategy.time.ms: 1 topology.error.throttle.interval.secs: 10 topology.max.error.report.per.interval: 5 topology.kryo.factory: "backtype.storm.serialization.DefaultKryoFactory" topology.tuple.serializer: "backtype.storm.serialization.types.ListDelegateSerializer" topology.trident.batch.emit.interval.millis: 500 topology.classpath: null topology.environment: null dev.zookeeper.path: "/tmp/dev-storm-zookeeper"</span>
一、node(服務器):指一個storm集羣中的supervisor服務器數量。
二、worker(jvm進程):指整個拓撲中worker進程的總數量,這些數量會隨機的平均分配到各個node。
三、executor(線程):指某個spout或者bolt的總線程數量,這些線程會被隨機平均的分配到各個worker。
四、task(spout/bolt實例):task是spout和bolt的實例,它們的nextTuple()和execute()方法會被executors線程調用。除非明確指定,storm會給每一個executor分配一個task。若是設置了多個task,即一個線程持有了多個spout/bolt實例.
注意:以上設置的都是總數量,這些數量會被平均分配到各自的宿主上,而不是設置每一個宿主進行多少個進程/線程。詳見下面的例子。oop
一、node:買機器吧,而後加入集羣中……
二、worker:Config#setNumWorkers() 或者配置項 TOPOLOGY_WORKERS
三、executor:Topology.setSpout()/.setBolt()
四、task:ComponentConfigurationDeclarer#setNumWorker()
// 建立topology TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("kafka-reader", new KafkaSpout(spoutConf), 5);//設置executor數量爲5 builder.setBolt("filter-bolt", new FilterBolt(), 3).shuffleGrouping( "kafka-reader");//設置executor數量爲3 builder.setBolt("log-splitter", new LogSplitterBolt(), 3) .shuffleGrouping("filter-bolt");//設置executor數量爲5 builder.setBolt("hdfs-bolt", hdfsBolt, 2).shuffleGrouping( "log-splitter");//設置executor數量爲2 // 啓動topology Config conf = new Config(); conf.put(Config.NIMBUS_HOST, nimbusHost); conf.setNumWorkers(3); //設置worker數量 StormSubmitter.submitTopologyWithProgressBar(topologyName, conf, builder.createTopology());
一、經過config.setNumWorkers(3)將worker進程數量設置爲3,假設集羣中有3個node,則每一個node會運行一個worker。
二、executor的數量分別爲:
spout:5
filter-bolt:3
log-splitter:3
hdfs-bolt:2
總共爲13個executor,這13個executor會被隨機分配到各個worker中去。
注:這段代碼是從kafka中讀取消息源的,而這個topic在kafka中的分區數量設置爲5,所以這裏spout的線程ovtn爲5.
三、這個示例都沒有單獨設置task的數量,即便用每一個executor一個task的默認配置。若須要設置,能夠:
builder.setBolt("log-splitter", new LogSplitterBolt(), 3) .shuffleGrouping("filter-bolt").setNumTasks(5);
來進行設置,這5個task會被分配到3個executor中。
(四)並行度的動態調整
對storm拓撲的並行度進行調整有2種方法:
一、kill topo—>修改代碼—>編譯—>提交拓撲
二、動態調整
第1種方法太不方便了,有時候topo不能說kill就kill,另外,若是加幾臺機器,難道要把全部topo kill掉還要修改代碼?
所以storm提供了動態調整的方法,動態調整有2種方法:
一、ui方式:進入某個topo的頁面,點擊rebalance便可,此時能夠看到topo的狀態是rebalancing。但此方法只是把進程、線程在各個機器上從新分配,即適用於增長機器,或者減小機器的情形,不能調整worker數量、executor數量等
二、cli方式:storm rebalance
舉個例子
storm rebalance toponame -n 7 -e filter-bolt=6 -e hdfs-bolt=8
將topo的worker數量設置爲7,並將filter-bolt與hdfs-bolt的executor數量分別設置爲六、8.
此時,查看topo的狀態是rebalancing,調整完成後,能夠看到3臺機器中的worker數量分別爲三、二、2
Storm經過分組來指定數據的流向,主要指定了每一個bolt消費哪一個流,以及如何消費。
storm內置了7個分組方式,並提供了CustomStreamGrouping來建立自定義的分組方式。
一、隨機分組 shuffleGrouping
這種方式會隨機分發tuple給bolt的各個task,每一個task接到到相同數量的tuple。
二、字段分組 fieldGrouping
按照指定字段進行分組,該字段具備相同組的會被髮送到同一個task,具體不一樣值的可能會被髮送到不一樣的task。
三、全複製分組 allGrouping(或者叫廣播分組)
每個tuple都會發送給全部的task,必須當心使用。
四、全局分組 globlaGrouping
將全部tuple均發送到惟一的task,會選取task ID最小的task。這種分組下,設置task的並行度是沒有意義的。另外,這種方式頗有可能引發瓶頸。
五、不分組 noneGrouping
留做之後使用,目前也隨機分組相同。
六、指向型分組 directGrouping(或者叫直接分組)
數據源會調用emitDirect()方法來判斷一個tuple應該由哪一個storm組件來接收,只能在聲明瞭是指向型的數據流上使用。
七、本地或隨機分組 localOrShuffleGrouping
若是接收bolt在同一個進程中存在一個或者多個task,tuple會優先發送給這個task。不然和隨機分組同樣。相對於隨機分組,此方式能夠減小網絡傳輸,從而提升性能。
可靠性:spout發送的消息會被拓撲樹上的全部節點ack,不然會一直重發。
致使重發的緣由有2個:
(1)fail()被調用
(2)超時無響應。
完整的可靠性示例請參考storm blueprint的chapter1 v4代碼,或者P22,或者參考從零開始學storm P102頁的例子。
關鍵步驟以下:
一、建立一個map,用於記錄已經發送的tuple的id與內容,此爲待確認的tuple列表。
private ConcurrentHashMap<UUID,Values> pending;
二、發送tuple時,加上一個參數用於指明該tuple的id。同時,將此tuple加入map中,等待確認。
UUID msgId = UUID.randomUUID();
this.pending.put(msgId,values);
this.collector.emit(values,msgId);
三、定義ack方法與fail方法。
ack方法將tuple從map中取出
this.pending.remove(msgId);
fail方法將tuple從新發送
this.collector.emit(this.pending.get(msgId),msgId);
對於沒回復的tuple,會定時從新發送。
處理該tuple的每一個bolt均須要增長如下內容: 一、emit時,增長一個參數anchor,指定響應的tuple collector.emit(tuple,new Values(word)); 二、確認接收到的tuple已經處理 this.collector.ack(tuple);