storm --流式處理框架java
storm是個實時的、分佈式以及具有高容錯的計算系統node
- storm 進程常駐內存python
- storm 數據不通過磁盤,在內存中處理linux
Twitter開源的分佈式實時大數據處理框架,最先開源於githubgit
storm 架構 -Nimbus -Supervisor -Workergithub
編程模型: - DAG -Spout -Boltredis
數據傳輸: - ZMQ (Twitter早起的產品)sql
- ZeroMQ 開源的消息傳遞框架,並非一個MessageQueueshell
-Netty Netty是基於NIO的網絡框架,更加高效 (之因此storm0.9版本以後使用netty,是由於ZMQ的license和storm的licemse不兼容)數據庫
高可靠性: -異常處理 -消息可靠性保障機制(ACK)
可維護性: -stormUI圖形化監控接口
流式處理(同步與異步):客戶端提交數據進行計算,並不會等待數據計算結果
逐條處理:例如ETL(數據清洗)
統計分析: 例如計算pv、uv、訪問熱點以及某些數據的聚合、加和、平均等
--客戶端提交數據以後,計算完成結果存儲到redis、Hbase、Mysql或者其餘的MQ當中
客戶端並不關心最終的計算結果是多少
實時請求應答服務(同步) -客戶端提交數據請求以後,馬上取得計算結果並返回給客戶端
DRPC:
實時請求處理:
storm : 進程 、線程常駐內存運行,數據不只如此磁盤,數據經過網絡進行傳遞
MapReduce: 爲TB、PB級別數據設計 的批處理計算框架
storm與mapreduce的比較:
storm: 流式處理、毫秒級、DAG模型、常駐運行
MapReduce: 批處理、分鐘級、map+reduce模型 、反覆啓停
storm:純流式處理
- 專門爲流式處理設計
- 數據傳輸模式更爲簡單,不少地方也更爲高效
-並非不能作批處理,它也能夠用來作微批處理,來提升吞吐
Spark Streaming :微批處理
-- 將RDD作的很小來用小的批處理來接近流式處理
--基於內存和DAG處理任務作的很快
storm: 流式處理,毫秒級,已經很穩定,獨立系統專門流式計算設計
SparkStreaming: 微批處理、秒級、穩定性改進中、spark核心之上的一種計算模型,能與其餘的組件進行很好的結合
storm計算模型:
Topology-DAG 有向無環圖的實現
--對於strom實時計算邏輯的封裝
--即、由一系列經過數據流相互關聯的spout、bolt所組成的拓撲結構
--生命週期:此拓撲只要啓動就會一直在集羣中運行,直到手動將其kill,不然不會終止
tuple --元祖
---storm中最小的數據組成單元
stream --數據流
--從spout中源源不斷傳遞數據給bolt、以及上一個bolt傳遞給下一個bolt,所造成的這些數據通道即叫作stream
--stream聲明時需給其指定一個ID
spout -數據源
-拓撲中數據流的來源。通常會從指定外部的數據源讀取元祖(tuple)發送到拓撲(Topology)中
-一個spout能夠發送多個數據流(stream)
--可先經過OutputFieldsDeclear中的declear方法聲明定義的不一樣數據流,發送數據時SpoutOutPutCollector中的emit方法指定數據流的參數將數據發送出去
--spout中最核心的方法是nextyouple,該方法會被storm線程不斷對的調用、主動從數據源拉取數據,在經過emit方法將數據生成元祖(tuple)發送給只有的bolt計算
-bolt 數據流處理組件
- 拓撲中數據處理均有bolt完成。對於簡單的任務或者數據流轉換,單個bolt能夠簡單的實現;更加複雜的場景每每須要多個bolt分多個步驟處理完成
-一個bolt能夠發送多個數據流(Stream)
--能夠先經過outputFiledDeclear中的declear方法生命定義的不一樣數據流,發送數據時經過spoutOutputcollector中的emit方法指定數據流id參數將數據發送出去
--bolt最核心的方法是executor方法,該方法負責接收一個元祖數據、真正實現核心的業務邏輯
stream Grouping --數據流分組
用storm 實現wordcount單詞統計
數據發送類
package com.storm.spout; import java.util.List; import java.util.Map; import java.util.Random; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import backtype.storm.utils.Utils; public class Wdcspout extends BaseRichSpout{ private SpoutOutputCollector collector; String[] text = { "nihao hello ok", "nice to meet hello", "where are you ok", "where is you home" }; Random r = new Random(); @Override public void nextTuple() { // TODO Auto-generated method stub List line = new Values(text[r.nextInt(3)]); this.collector.emit(line); System.out.println("line==============="+line); Utils.sleep(1000); } @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { // TODO Auto-generated method stub this.collector = collector; } /** * * */ @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { // TODO Auto-generated method stub declarer.declare(new Fields("line")); } }
數據處理類:
package com.storm.bolt; import java.util.List; import java.util.Map; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; public class Wcdbolt extends BaseRichBolt{ private OutputCollector collector; @Override public void execute(Tuple input) { // TODO Auto-generated method stub //一、獲取數據,並對獲取的數據進行切分 String[] words = input.getString(0).split(" "); //二、發送數據 for(String word: words) { List tuple = new Values(word); this.collector.emit(tuple); } } @Override public void prepare(Map conf, TopologyContext context, OutputCollector collector) { // TODO Auto-generated method stub this.collector = collector; } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { // TODO Auto-generated method stub declarer.declare(new Fields("tuple")); } }
第二個處理數據的bolt
package com.storm.bolt; import java.util.HashMap; import java.util.Map; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Tuple; public class wcdsbolt extends BaseRichBolt{ private OutputCollector collector; Map<String, Integer> map = new HashMap<>(); @Override public void execute(Tuple input) { // TODO Auto-generated method stub //對接受到的數據進行處理 String word = input.getStringByField("tuple"); int count = 1; //若是單詞不存在,則把單詞的統計數添加到map中,不然,在原址value的基礎之上加1 if(map.containsKey(word)) { count = map.get(word)+1; } map.put(word, count); System.err.println(word+"----------------------------"+count); } @Override public void prepare(Map conf, TopologyContext context, OutputCollector collector) { // TODO Auto-generated method stub this.collector = collector; } @Override public void declareOutputFields(OutputFieldsDeclarer arg0) { // TODO Auto-generated method stub } }
對處理結果提交到本地運行
package com.storm.test; import org.jgrapht.alg.TarjanLowestCommonAncestor.LcaRequestResponse; import com.storm.bolt.Wcdbolt; import com.storm.bolt.wcdsbolt; import com.storm.spout.Wdcspout; import backtype.storm.LocalCluster; import backtype.storm.topology.TopologyBuilder; import backtype.storm.tuple.Fields; import backtype.storm.Config;; public class test1 { public static void main(String[] args) { TopologyBuilder tm = new TopologyBuilder(); //單線程處理 /*tm.setSpout("wcdspout", new Wdcspout()); tm.setBolt("wcdbolt", new Wcdbolt()).shuffleGrouping("wcdspout"); tm.setBolt("wcdsbolt", new wcdsbolt()).shuffleGrouping("wcdbolt");*/ //多線程處理 tm.setSpout("wcdspout", new Wdcspout()); tm.setBolt("wcdbolt", new Wcdbolt(),3).shuffleGrouping("wcdspout"); tm.setBolt("wcdsbolt", new wcdsbolt(),3).fieldsGrouping("wcdbolt", new Fields("tuple")); LocalCluster lm = new LocalCluster(); lm.submitTopology("w", new Config(), tm.createTopology()); } }
注: 當用多線程處理的時候,注意對於分發策略的選擇。 不然會發生數據統計異常的錯誤 。分發策略主要由grouping方法來進行處理的。
storm grouping --數據流分組;(即數據的分發策略)
一、 shuffle grouping
--隨機分組,隨機派發stream裏面的tuple,保證每一個bolt task 接受到的tuple數目大體相同 。
--輪詢,平均分配。
二、 Fields grouping
--按字段分組,好比,按「user-id」這個字段來進行分組,那麼具備一樣「user-id」的tuple會被分到相同的bolt裏面的一個task,而不一樣的"user-id"則可能會被分到不一樣的task
--
三、 All grouping
-- 廣播分發,對於每個tuple,全部的bolt都會收到
四、Global Grouping
--全局分組,把tuple分配給task id 最低的task
五、None grouping
-- 不分組,這個分組的意思是說storm不關心究竟是怎樣進行分組的,目前這種分組和shufflegrouping 的分組效果是同樣的。有一點不一樣的地方就是storm會把使用none grouping 的這個bolt放到這個bolt的訂閱者的同一個線程中區,(將來storm若是可能的話,會進行這樣的設計)
六、direct grouping
--指向型分組,這是一種比較特殊的分組方法,用這種分組意味着消息(tuple)的發送者指定由消息接受者的那個task處理這個消息。只要被聲明爲Direct stream 的消息流能夠聲明這種分組方法。並且這種消息的tuple必須使用emitDirect方式來發送。消息處理着能夠經過TopologyContext來獲取處理它的消息的task的id
七、Local or shuffle grouping
--本地或隨機分組。若是目標bolt有一個或者多個task與源bolt的task在同一個工做進程中,tuple將會被隨機發送給這些同進程中的task,不然,和普通的shuffle grouping 行爲一致
八、 customgrouping
--自定義,至關於mapreduce哪裏本身去實現一個partition同樣。
storm 架構設計
Nimbus: --資源調度 --任務分配 --接受jar包
Supervisor: --接受Nimbus分配的任務
-- 啓動、中止本身管理的worker進程(當前supervisor上worker數量由配置文件設定)
--Worker
--運行具體處理運算組件的進程(每一個worker對應執行一個Topology的子集)
--worker任務類型,即spout,bolt任務兩種
--啓動executor(executor即worker JVM進程中的一個java線程,通常默認每一個executor負責執行一個task任務 )
--zookeeper:
storm 提交任務流程: 一、將提交的jar包上傳至nimbus服務器numbus/inbox目錄下 二、對topology進行檢驗處理 三、創建Topology在本地的存放目錄nimbus\stormdist\topology-id(該目錄下包含三個文件)
stormjar.jar: --從nimbus/inbox目錄下移動來的Topology的jar包
stormcode.ser: --對Topology對象序列化法
stormconf.ser: --topology的運行配置
nimbus任務分配:即根據代碼初始化spout/bolt的task數目,並分配給對應的task-id,最後將這些信息寫入到zookeeper的/task節點下
nimbus在zookeeper上建立taskbeats節點,監控task的心跳
將任務分配信息寫入到assignment/topology-id節點中,此時便可認爲任務提交完畢
在zookeeper的/storms/topology-id節點下存聽任務運行的時間、狀態等信息
按期檢查zookeeper上storm節點,是否有新任務提交,刪除本地不在運行的任務
根據nimbus指定的任務信息啓動該節點上的worker
查看須要執行的task任務信息,獲取到相應的task信息,即spout/bolt任務信息,執行具體的運算,並根據IP以及端口發送消息數據
storm的安裝:
僞分佈式的安裝:
一、 上傳安裝壓縮包 二、將安裝壓縮包解壓 三、配置環境變量 四、啓動storm相關命令
storm dev-zookeeper >> ./logs/zk.out 2>&1 & (將啓動日誌重定向到logs目錄中,2>&1表明將標準的錯誤輸出重定向到標準的正確輸出中)
在僞分佈式環境上運行wordcount步驟:
一、將wordcount代碼打成jar包
二、將打好的jar包上傳到linux服務器
三、運行上傳的jar 包
運行jar包的命令: storm jar tq.jar com.storm.wordcount.Main (最後面寫的是jar包文件在eclipse中所在的jar位置和類名稱)--這種方式是本地模式來運行的
若是不打算用本地模式來運行那麼就添加一個參數來運行
storm jar tq.jar com.storm.wordcount.Main ec
ack: 線程保障機制,監控線程的運行狀況,並將監控的結果發送給spout,若是ack監控到線程運行出現了問題,那麼就讓spout將數據從新發送一遍
結束運行任務的命令: storm kill 任務名稱 -w 時間長短
注:關閉storm全部已經啓動的任務命令: killall java
storm 徹底分佈式的搭建: 一、準備環境 jdk python 2.6.6
二、 部署zookeeper : 三、上傳安裝包並解壓 四、在storm中建立logs目錄 五、修改配置文件 -conf/storm.yaml
在配置文件中指定對應的zookeeper: storm.zookeeper.servers: - "node2" - "node3" - "node4"
nimbus.host: "node1" (指定nimbus所在的節點)
storm.local.dir: "/tmp/storm"
supervisor.slots.ports:
- 6700
- 6701
- 6702
- 6703
而後將配置好的文件分發到其餘的節點 scp -r ./storm node2:/opt/
徹底分佈式的啓動:node1上面啓動主節點
./bin/storm nimbus >> ./logs/nimbus.out 2>&1 &
./bin/storm ui >> ./logs/ui.out 2>&1 &
node二、node3上面啓動從節點
./bin/storm supervisor >> ./logs/supervisor.out 2>&1 &
storm的併發機制:
worker: --進程
一個Topology拓撲會包含一個或者多個worker(每一個worker只能從屬於一個特定的Topology)
這些Worker進程會並行跑在集羣中不一樣的服務器上,即一個Topology拓撲實際上是由並行運行在storm集羣中多臺服務器上的進程所組成
Executor --線程
--executor是由Worker進程中生成的一個線程
--每一個worker進程中會運行一個或多個拓撲當中的executor線程
--一個executor線程中能夠執行一個或多個task任務(默認每一個executor只執行一個task任務),可是這些task任務都是對應着同一個組件
Task
--實際執行數據處理的最小單元
--每一個task即爲一個spout或者一個bolt
task數量在整個Topology生命週期中保持不變,executor數量能夠變化或者手動調整
(默認狀況下,task數量和executor是相同的,即每一個executor線程中默認運行一個task任務)
設置worker 進程數: -Config.setNumWorkers(Int workers)
設置Executor線程數
- TopologyBuilder.setspout()
--TopologyBuilder.setbolt()
設置task數量:
--componentConfigurationDeclare.setNumTasks(Number val)
rebalance --再平衡
--即,動態調整Topology拓撲的worker進程數、以及executor的線程數
支持兩種調整方式: 一、經過storm ui 二、經過storm cli
經過storm CLI 動態調整:
用shell命令調整並行度:
./bin/storm rebalance wc -w 30 -n 2 -e 組件名稱=(調整的並行度)
storm的通訊機制:
worker進程之間的通訊:
-ZMQ -zeroMQ開源的消息傳遞框架,並非一個MessageQueue
-Netty -netty是基於NIO的網絡框架,更加高效。
Worker內部的數據通訊:
-Disruptor --實現隊列的功能 --能夠理解爲一種事件監聽或者消息處理機制,即在隊列中一邊由生產者放入消息數據,另外一邊由消費者並行去除消息數據處理
storm的容錯機制: 一、集羣節點宕機 -Nimbus服務器 單點故障
--非Nimbus服務器 故障時,該節點上全部的task任務都會超時,Nimbus會將這些task任務從新分配到其餘服務器上運行
二、 進程掛掉
--worker
掛掉時,Supervisor會重啓這個進程,若是啓動過程當中任然一直失敗,而且沒法向nimbus發送心跳,Nimbus會將該worker從新分配到其餘服務器上
--Supervisor
無狀態(全部的狀態信息都放在zookeeper中進行管理)
快速失敗 (每當遇到異常狀況,都會自動毀滅)
--Nimbus
無狀態(全部的狀態信息都存放在zookeeper中來管理)
快速失敗(每當遇到任何的異常狀況都會自動毀滅)
三、消息的完整性
acker --消息完整性的實現機制
-- storm 的拓撲當中 特殊的一些任務
-- 負責跟蹤每一個spout發出的tuple的DAG(有向無環圖)
注:容錯機制沒法保證數據只被處理一次 ,但能夠保證全部的數據都被處理
storm -DRPC
客戶端經過向DRPC服務器發送待執行函數的名臣以及該函數的參數來獲取處理結果。實現該函數的拓撲使用一個DRPspout從DRPC服務器中接受一個函數的調用流。DRPC會爲每個函數調用都標記一個惟一的id,隨後拓撲會執行函數來計算結果,並在拓撲的最後使用一個名爲returnResult的bolt鏈接到DRPC服務器,根據函數調用的id來將函數調用的結果返回。
DRPC (Distributed RPC) --分佈式遠程調用
DRPC 是經過一個DRPC服務端(DRPC server)來實現分佈式RPC功能
DRPC server 負責接收RPC請求,並將該請求發送到Strom中運行的Topology,等待接收Topology發送的處理結果,並將該結果返回給發送請求的客戶端
DPRC設計目的:
爲了充分利用Storm的計算能力實現高密度的並行實時計算
DRPC在集羣中運行,首先須要配置配置文件,添加DRPC運行的節點,其次須要將啓動DRPC服務器。
./bin/storm drpc >> ./logs/drpc.out 2>&1 &
flume整合kafka: 安裝flume 和 kafka ,啓動zookeeper+kafka+flume
flume的安裝:
一、加壓安裝包 二、修改配置文件名稱: mv flume-env.sh.propertise flume-env.sh 三、在配置文件中配置java_home路徑
啓動三個節點kafka:bin/kafka-server-start.sh config/server.properties
啓動flume:bin/flume-ng agent -n a1 -c conf -f conf/fk.conf -Dflume.root.logger=DEBUG,console
添加flume啓動的配置文件:
a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = avro a1.sources.r1.bind = node01 a1.sources.r1.port = 41414 # Describe the sink a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.topic = testflume a1.sinks.k1.brokerList = node1:9092,node2:9092,node3:9092 a1.sinks.k1.requiredAcks = 1 a1.sinks.k1.batchSize = 20 a1.sinks.k1.channel = c1 # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000000 a1.channels.c1.transactionCapacity = 10000 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
Flume+Kafka+Storm架構設計:
package com.storm.flume; /** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Properties; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.spout.SchemeAsMultiScheme; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.TopologyBuilder; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; import storm.kafka.KafkaSpout; import storm.kafka.SpoutConfig; import storm.kafka.StringScheme; import storm.kafka.ZkHosts; import storm.kafka.bolt.KafkaBolt; import storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper; import storm.kafka.bolt.selector.DefaultTopicSelector; /** * This topology demonstrates Storm's stream groupings and multilang * capabilities. */ public class LogFilterTopology { public static class FilterBolt extends BaseBasicBolt { @Override public void execute(Tuple tuple, BasicOutputCollector collector) { String line = tuple.getString(0); System.err.println("Accept: " + line); // 包含ERROR的行留下 if (line.contains("SUCCESS")) { System.err.println("Filter: " + line); collector.emit(new Values(line)); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { // 定義message提供給後面FieldNameBasedTupleToKafkaMapper使用 declarer.declare(new Fields("message")); } } public static void main(String[] args) throws Exception { TopologyBuilder builder = new TopologyBuilder(); // https://github.com/apache/storm/tree/master/external/storm-kafka // config kafka spout,話題 String topic = "testflume"; ZkHosts zkHosts = new ZkHosts("node1:2181,node2:2181,node3:2181"); // /MyKafka,偏移量offset的根目錄,記錄隊列取到了哪裏 SpoutConfig spoutConfig = new SpoutConfig(zkHosts, topic, "/MyKafka", "MyTrack");// 對應一個應用 List<String> zkServers = new ArrayList<String>(); System.out.println(zkHosts.brokerZkStr); for (String host : zkHosts.brokerZkStr.split(",")) { zkServers.add(host.split(":")[0]); } spoutConfig.zkServers = zkServers; spoutConfig.zkPort = 2181; // 是否從頭開始消費 spoutConfig.forceFromStart = true; spoutConfig.socketTimeoutMs = 60 * 1000; // StringScheme將字節流轉解碼成某種編碼的字符串 spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig); // set kafka spout builder.setSpout("kafka_spout", kafkaSpout, 3); // set bolt builder.setBolt("filter", new FilterBolt(), 8).shuffleGrouping("kafka_spout"); // 數據寫出 // set kafka bolt // withTopicSelector使用缺省的選擇器指定寫入的topic: LogError // withTupleToKafkaMapper tuple==>kafka的key和message KafkaBolt kafka_bolt = new KafkaBolt().withTopicSelector(new DefaultTopicSelector("LogError")) .withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper()); builder.setBolt("kafka_bolt", kafka_bolt, 2).shuffleGrouping("filter"); Config conf = new Config(); // set producer properties. Properties props = new Properties(); props.put("metadata.broker.list", "node1:9092,node2:9092,node3:9092"); /** * Kafka生產者ACK機制 0 : 生產者不等待Kafka broker完成確認,繼續發送下一條數據 1 : * 生產者等待消息在leader接收成功確認以後,繼續發送下一條數據 -1 : * 生產者等待消息在follower副本接收到數據確認以後,繼續發送下一條數據 */ props.put("request.required.acks", "1"); props.put("serializer.class", "kafka.serializer.StringEncoder"); conf.put("kafka.broker.properties", props); conf.put(Config.STORM_ZOOKEEPER_SERVERS, Arrays.asList(new String[] { "node1", "node2", "node3" })); // 本地方式運行 LocalCluster localCluster = new LocalCluster(); localCluster.submitTopology("mytopology", conf, builder.createTopology()); } }
storm --事務
強順序流(強有序)
--引入事物(transaction)的概念,每一個transaction關聯一個transaction id。
--transaction id從1開始,每一個tuple會按照順序加1
--在處理tuple時,將處理成功的tuple結果以及transaction同時寫入數據庫中進行存儲
--兩種狀況
一、當前transaction id 與數據庫中的transaction id 不一致
二、兩個transaction id 相同
缺點:一次只能處理一個tuple,沒法實現分佈式計算
將Topology拆分紅兩個階段:
一、Processing phase : 容許並行處理多個batch
二、commit phase : 保證batch的強有序,一次只能處理一個batch
Design details
Manages state -狀態管理
--storm 經過 Zookeeper 存儲全部的transaction相關信息 (包含了: 當前transaction id 以及 batch的元數據信息 )
Coordinates the transaction --事物協調
--storm會管理決定transaction應該處理什麼階段(processing,committing)
Fault detection --故障檢測
--storm 內部經過ACKER 機制保障消息被正常處理(用戶不須要手動區維護)
First class batch processing API : storm 提供的batch bolt接口
三種事物:
一、普通事物 二、 partition transaction --分區事物 三、 opaque transaction --不透明分區事物
事務性拓撲(transaction topoligies) 保證消息(tuple)被且僅被處理一次