在本篇中,咱們就來根據一個案例,看看如何去設計一個拓撲, 如何分解問題以適應Storm架構,同時對Storm拓撲內部的並行機制會有一個基本的瞭解。前端
本章代碼都在:java
git@github.com:zyzdisciple/storm_study.gitgit
項目下的 user_behavior包下。github
有這樣一種場景,在前端存在會話,咱們會不斷收到來自前端的消息,消息包含消息的發送時間,消息內容,結束標識, 消息的發送者, SessionId等其餘信息, 咱們須要作的事情是當接收到消息以後,根據SessionId判斷是否屬於同一消息, 若是是的話將內容拼接, 若是結束標識爲 true, 表示會話已結束,則存入數據庫或其餘地方, 若是不爲true, 則等待, 在1分鐘後 仍是沒有收到消息, 則存入數據庫。數據庫
咱們對消息數據並無太嚴格的要求,由於數據自己是用戶的行爲, 如一個點擊,瀏覽事件等等其餘行爲信息,在這裏咱們用三位數的數值,來表示用戶的行爲類型。這就是案例的基本需求。apache
數據的輸入格式爲:json
{sessionId:1, content:1,isEnd:true, timeMillis:1, userId:1}安全
這些基本的信息中,就包含着某用戶在某時間段的行爲,而咱們事實上須要作的就是,瞭解用戶在一段時間內的操做行爲(在這裏過時時間是1分鐘,指的是系統自身的過時時間,並不是用戶的操做間隔),將其存入數據庫,供給其餘的系統使用。網絡
咱們須要關注的核心點在如下幾個地方:session
數據具備時效性,所以咱們必然須要儘量快的推入數據庫,不然的話用Hadoop或Spark作批處理比Storm更合適。 因此,速度對咱們而言,很重要。
可靠數據源,不可靠數據源, 這一樣也是須要考慮的一點,對於可靠數據源而言,咱們必須保證數據不丟失,當拓撲掛掉以後,或是有些數據遺失以後,可以從數據源的節點再次獲取。 但咱們這裏的是不可靠數據源,由於用戶的幾條數據丟失,對咱們而言並無太大差異。 因此在趨勢性預測方面, 數據的可靠性並無那麼重要。
因此須要把握數據源的相關特性,與業務相對應, 作出速度, 安全等其餘方面的取捨。
這也是咱們在一開始就須要關注的另外一個核心點,以下:
{sessionId:1,content:001,002,005, isEnd:true, endTime:2, userId:1}
sessionId,userId不用多說, content須要將對應的標識轉換成三位數,而後拼接存儲,以「,」間隔, isEnd表示收到最後一條數據時的 end標識, endTime,表示收到最後一條數據的時間。
在拓撲設計中,處理邏輯自己並非第一件該關注的地方, 不管未來是存入數據又或是其餘方式,須要瞭解數據源的特性,對數據的時效性安全性,性能等方面的考慮,以及未來的擴展性,可能須要優化的地方都須要放在考慮範圍內。
儘管有一種說法是,不要過早優化。但那種優化,細節並非性能的決定因素,而大數據,任何一點細節均可能會致使性能的巨大誤差, 重構設計又是一件很難的事情。 所以提前考慮優化並非沒有必要的事情。
在拓撲設計時,須要:
肯定輸入數據,怎麼把它表示成元組
肯定輸出重點數據,怎麼表示成元組
肯定中間部分,補充完善數據處理方式。
因此在開始代碼以前,最好已經肯定了每一步究竟該幹什麼。
咱們接收到的是字符串,又須要保存各個數據的相應屬性,所以在一開始的Spout中,須要將對應的數據轉換成Java對象,下發下去,同時若是傳入數據有誤,則直接跳過,不進行處理。另一點須要作的是, 將content轉成3位有效數字。
在Spout中主要工做就是對接數據源,同時它自身也是整個系統的數據源,須要作相應的數據處理,而不包含其餘業務邏輯。
當接收到對象以後,須要按照時間間隔進行分組,提取出當前數據所在的時間間隔下發到下一層級便可。
按時間收取到數據以後,再進行處理,按照以前所提到的要求進一步處理數據。
寫出。
在其中的 bolt及Spout中須要注意的一點問題是, 全部的可能拋出異常的地方都要被處理掉, 即便是 RuntimeException, 緣由稍後會提到。
spout的核心功能就是讀取數據,對數據進行基本的數據處理,做爲拓撲的數據源,下發數據。
那麼做爲數據源,須要作的工做有哪些?在平時作接口的過程當中,咱們向前端返回的數據必定是具備必定的數據格式, 且不容變動的格式,可以提供前端所需的全部信息才行, 最好數據自己已經通過必定的處理, 對待錯誤數據等其餘問題,並不直接向前端返回數據。
因此數據的基本處理, 校驗, 轉換成對象纔是基本需求。
一樣,爲了簡化處理, 這裏採起了從文件中讀取數據的方式。
代碼以下:
import com.google.gson.Gson; import com.storm.demo.user_behavior.BehaviorConstants; import com.storm.demo.user_behavior.entity.MessageInfo; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.math.NumberUtils; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.BufferedReader; import java.io.FileNotFoundException; import java.io.FileReader; import java.io.IOException; import java.util.Map; /** * @author zyzdisciple * @date 2019/4/7 */ public class FileReaderSpout extends BaseRichSpout { private static final long serialVersionUID = 2970891499685374549L; private static final Logger logger = LoggerFactory.getLogger(FileReaderSpout.class); private static Gson gson; private BufferedReader br; private SpoutOutputCollector collector; @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; gson = new Gson(); try { br = new BufferedReader(new FileReader("E:\\IdeaProjects\\storm_demo\\src\\main\\resources\\user_behavior_data.txt")); } catch (FileNotFoundException e) { e.printStackTrace(); } } @Override public void nextTuple() { try { String line = br.readLine(); if (line != null && !line.isEmpty()) { line = line.trim(); MessageInfo info = gson.fromJson(line, MessageInfo.class); //只發送有效數據,無效數據跳過 /*在這裏, 咱們是逐行發送, 若是有條件的話, 也就是說一次能取出 * 多條數據的狀況下, 所有一次發送是比較好的選擇. * */ if (isValid(info, line)) { completingMessage(info); collector.emit(new Values(info)); } } } catch (Exception e) { /*在catch語句中,咱們通常會選擇打印log日誌,表示爲何出錯,並不作其餘處理, * 即便數據格式存在問題依然須要可以正常執行下去,保證拓撲不中斷。 * 在這裏能夠進一步將Exception細分,對不一樣的Exception打印日誌不一樣。 * */ e.printStackTrace(); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields(BehaviorConstants.FIELD_INFO)); } @Override public void close() { if (br != null) { try { br.close(); } catch (IOException e) { e.printStackTrace(); } } } /** * 補全基本數據, 設置默認值 * @param info */ private void completingMessage(MessageInfo info) { try { info.setContent(formatMessage(info.getContent())); } catch (Exception e) { logger.warn("內容轉換失敗:" + info); } if (info.getEndTime() == null) { info.setEndTime(System.currentTimeMillis()); } } /** * 格式化內容,將content轉爲3位 * @param content * @return */ private String formatMessage(String content) { if (NumberUtils.isCreatable(content)) { StringBuilder sb = new StringBuilder(BehaviorConstants.CONTENT_CAPACITY); //從第一位不是0的數值開始 boolean skipZero = false; int currentValue; try { for (String c : content.trim().split("")) { //已經跳過數值爲0的字符, 若是不爲0將 skipZero設爲true //若是是+ - x等字符, 會拋出異常 currentValue = Integer.valueOf(c); if (skipZero || (currentValue != 0 && (skipZero = true))) { sb.append(currentValue); } } } catch (Exception e) { throw new NumberFormatException(); } //補全字符串 StringBuilder result = new StringBuilder(BehaviorConstants.CONTENT_CAPACITY); for (int i = 0, L = sb.length(); i < BehaviorConstants.CONTENT_CAPACITY - L; i++) { result.append(0); } result.append(sb); return result.toString(); } else { return BehaviorConstants.UNKNOWN_CONTENT; } } /** * message校驗 * @return */ private boolean isValid(MessageInfo info, String source) { if (info == null) { logger.warn("數據格式轉換失敗, 字符串爲:" + source); } else if (StringUtils.isBlank(info.getSessionId())) { logger.warn("缺失sessionId, 字符串爲:" + source); } else if (StringUtils.isBlank(info.getUserId())) { logger.warn("缺失userId, 字符串爲:" + source); } else { return true; } return false; } }
代碼並無什麼太多值得注意的地方, 更多的則是數據前期處理方面。
主要功能是劃分數據時間組, 爲了測試起見, 能夠將timeOut時間進一步調小.
import com.storm.demo.user_behavior.BehaviorConstants; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; import java.util.Map; /** * @author zyzdisciple * @date 2019/4/7 */ public class TimeIntervalBolt extends BaseRichBolt { private static final long serialVersionUID = 5632264737187544663L; private OutputCollector collector; @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; } /** * 是按照接收到的時間來進行分組. * @param input */ @Override public void execute(Tuple input) { //判斷當前的時間區間. long timeGroup = System.currentTimeMillis() / (BehaviorConstants.SESSION_TIME_OUT_SECS * 1000); collector.emit(new Values(timeGroup, input.getValueByField(BehaviorConstants.FIELD_INFO))); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields(BehaviorConstants.FIELD_TIME_GROUP, BehaviorConstants.FIELD_INFO)); } }
初步代碼以下:
import com.storm.demo.user_behavior.BehaviorConstants; import com.storm.demo.user_behavior.entity.GroupKey; import com.storm.demo.user_behavior.entity.MessageInfo; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; import java.util.HashMap; import java.util.Map; /** * @author zyzdisciple * @date 2019/4/7 */ public class ContentStitchingBolt extends BaseRichBolt { private static final long serialVersionUID = -4684689172584740403L; private OutputCollector collector; /** * 存儲每一個時間段的數據, 因爲每一個時間段可能有多組session數據, 所以放入list中處理. */ private Map<GroupKey, MessageInfo> collectMap; @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; collectMap = new HashMap<>(); } @Override public void execute(Tuple input) { //在這裏, 首先獲取數據, 若是自己爲true, 且在分組中不存在對應的數據, 則能夠直接發送. MessageInfo info = (MessageInfo) input.getValueByField(BehaviorConstants.FIELD_INFO); long timeGroup = input.getLongByField(BehaviorConstants.FIELD_TIME_GROUP); GroupKey key = new GroupKey(info.getSessionId(), timeGroup); //根據sessionId, group雙重判斷, 存儲 MessageInfo messageInfo = collectMap.get(key); //更新info的數據 updateNewInfoWithPreMessage(info, messageInfo); //若是不存在且已結束, 直接發送數據便可 if (info.getEnd()) { collector.emit(new Values(info)); //發送後須要移除相關數據 collectMap.remove(key); } else { collectMap.put(key, info); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields(BehaviorConstants.FIELD_INFO)); } /** * 用之前的messageContent 更改當前message * @param newInfo * @param preInfo */ private void updateNewInfoWithPreMessage(MessageInfo newInfo, MessageInfo preInfo) { if (preInfo != null) { StringBuilder sb = new StringBuilder(preInfo.getContent()); sb.append(",").append(newInfo.getContent()); newInfo.setContent(sb.toString()); } } }
其中GroupKey以下:
import org.apache.commons.lang3.builder.EqualsBuilder; import org.apache.commons.lang3.builder.HashCodeBuilder; /** * 以sessionId 和 group作爲分組的主鍵, 重寫equals方法 * * @author zyzdisciple * @date 2019/4/7 */ public class GroupKey { private String sessionId; private Long timeGroup; public GroupKey(String sessionId, Long timeGroup) { this.sessionId = sessionId; this.timeGroup = timeGroup; } public String getSessionId() { return sessionId; } public void setSessionId(String sessionId) { this.sessionId = sessionId; } public Long getTimeGroup() { return timeGroup; } public void setTimeGroup(Long timeGroup) { this.timeGroup = timeGroup; } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; GroupKey groupKey = (GroupKey) o; return new EqualsBuilder() .append(sessionId, groupKey.sessionId) .append(timeGroup, groupKey.timeGroup) .isEquals(); } @Override public int hashCode() { return new HashCodeBuilder(17, 37) .append(sessionId) .append(timeGroup) .toHashCode(); } }
咱們發現對於GroupKey並無序列化操做, 由於這裏是不須要的, 在emit中發送的數據須要被序列化(並不許確, 但目前能夠這樣理解).
然而,仍然存在一個問題, 咱們是已經將數據存儲進去, 而且發送, 可是, end 標識爲false的數據呢? 咱們在這裏並無進行處理, 僅僅是存入Map中, 這固然是不合適的, 有一種通用的處理辦法, 咱們並不採用 簡單的Map進行存儲, 而是 採用另外一種, TimeCacheMap, 有興趣的能夠本身瞭解一下, 設置定時時間, 過時後能夠自行處理, 並不只限於Storm中使用, 在任何代碼中均可以使用.
若是嘗試以後會發現它是過時的, 須要用另外一個 RotatingMap進行實現, 須要自行在循環中調用方法. 循環須要sleep, 週期爲 timeOut / (桶的個數 - 1); 原理與 TimeCacheMap一致.
但在這裏並不打算介紹這兩種方法, 而是Storm自身的定時機制, Tick, 其原理就在於, Storm會每隔固定時間發送一條系統消息給對應的Bolt, Spout ,甚至全部的均可以. 當咱們每次接收到系統消息時, 則表示當前已經通過一個週期, 依然沒有收到的數據, 須要進行過時處理.
它的處理機制能夠自行定義, 比較靈活, 實現代碼以下:
@Override public Map<String, Object> getComponentConfiguration() { Config conf = new Config(); conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, BehaviorConstants.SESSION_TIME_OUT_SECS); return conf; }
僅僅須要在對應的Bolt, Spout中重寫以上方法便可. 若是須要全局發送, 則須要在Topology的main方法中, 加入:
Config conf = new Config(); conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, BehaviorConstants.SESSION_TIME_OUT_SECS);
配置便可.
經過這種方式, 咱們能夠作到每隔60秒讓系統發送一條消息給bolt, 還需加入以下代碼, 更改execute方法便可:
@Override public void execute(Tuple input) { if (input.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID) && input.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID)) { //在這裏, 首先獲取數據, 若是自己爲true, 且在分組中不存在對應的數據, 則能夠直接發送. MessageInfo info = (MessageInfo) input.getValueByField(BehaviorConstants.FIELD_INFO); long timeGroup = input.getLongByField(BehaviorConstants.FIELD_TIME_GROUP); GroupKey key = new GroupKey(info.getSessionId(), timeGroup); //根據sessionId, group雙重判斷, 存儲 MessageInfo messageInfo = collectMap.get(key); //更新info的數據 updateNewInfoWithPreMessage(info, messageInfo); //若是不存在且已結束, 直接發送數據便可 if (info.getEnd()) { collector.emit(new Values(info)); //發送後須要移除相關數據 collectMap.remove(key); } else { collectMap.put(key, info); } } else { final Long timeGroup = System.currentTimeMillis() / (BehaviorConstants.SESSION_TIME_OUT_SECS * 1000); Iterator<Map.Entry<GroupKey, MessageInfo>> iterator = collectMap.entrySet().iterator(); while(iterator.hasNext()) { Map.Entry<GroupKey, MessageInfo> entry = iterator.next(); if (entry.getKey().getTimeGroup() < timeGroup) { collector.emit(new Values(entry.getValue())); iterator.remove(); } } } }
經過這種方式就實現了週期性校驗數據, 發送.
但依然要注意到的一點是, 這種發送並不許確, 並不是是嚴格的按照時間發送, 而是將發送至bolt的系統消息 與 其餘消息 加入隊列, 排隊發送, 等待前一次的執行完成.
線程安全
而當咱們採起Map的時候, 經常須要考慮到的一個問題則是線程安全, 但在Storm中, 咱們通常不須要考慮這個問題, 由於每個bolt實例都是在線程中按序執行,所以不怎麼須要考慮線程安全.
可是前提是, 不要再bolt中存在 static 變量用以保存數據, 又或是在bolt中開啓新的線程, 致使線程安全問題. 當出現這兩種狀況時, 就須要考慮線程安全了.
固然Storm中已經提供了這種Tick機制的bolt:
package org.apache.storm.topology.base; import org.apache.storm.tuple.Tuple; import org.apache.storm.utils.TupleUtils; /** * This class is based on BaseRichBolt, but is aware of tick tuple. */ public abstract class BaseTickTupleAwareRichBolt extends BaseRichBolt { @Override public void execute(final Tuple tuple) { if (TupleUtils.isTick(tuple)) { onTickTuple(tuple); } else { process(tuple); } } /** * 當接收到系統消息時, 執行的方法 */ protected void onTickTuple(final Tuple tuple) { } /** * 其餘消息執行的方法 */ protected abstract void process(final Tuple tuple); }
當咱們繼承BaseTickTupleAwareRichBolt以作定時任務時, 須要重寫 onTickTuple, process, getComponentConfiguration 便可.
而這種方式, 即tick 與 timeInterval結合的方式, 便是處理過時的一種比較好的方式.
特別是timeInterval, 經過時間分組, 是一種比較巧妙的方式.
固然,依然存在相應的問題, 因此最好的方式多是時間窗口, 在這裏暫時就以這種方式來實現.
因此是在消息信息中自身已經攜帶了時間信息是最好的方式.
這個依然沒有太多須要注意的地方, 在cleanUp中須要關閉流, 便可.
import com.google.gson.Gson; import com.storm.demo.user_behavior.BehaviorConstants; import com.storm.demo.user_behavior.entity.MessageInfo; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Tuple; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.FileWriter; import java.io.IOException; import java.io.PrintWriter; import java.util.Map; /** * @author zyzdisciple * @date 2019/4/7 */ public class MessageWriterBolt extends BaseRichBolt { private static final long serialVersionUID = 5411259920685235771L; private static final Logger logger = LoggerFactory.getLogger(MessageWriterBolt.class); private PrintWriter pw; private static Gson gson; @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { try { pw = new PrintWriter(new FileWriter("E:\\IdeaProjects\\storm_demo\\src\\main\\resources\\user_behavior_data_write.txt")); gson = new Gson(); } catch (IOException e) { logger.error("文件有誤,保存失敗"); } } @Override public void execute(Tuple input) { MessageInfo info = (MessageInfo) input.getValueByField(BehaviorConstants.FIELD_INFO); try { String jsonMessage = gson.toJson(info, MessageInfo.class); pw.println(info); pw.flush(); } catch (Exception e) { logger.warn("格式轉換失敗:" + info); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { } @Override public void cleanup() { if (pw != null) { pw.close(); } } }
排在首位的是分流策略的選取, 在前一篇文章中提到了三種, shuffle, global, field 三種方式. 那麼來整合一下吧:
import com.storm.demo.user_behavior.bolt.ContentStitchingBolt; import com.storm.demo.user_behavior.bolt.MessageWriterBolt; import com.storm.demo.user_behavior.bolt.TimeIntervalBolt; import com.storm.demo.user_behavior.spout.FileReaderSpout; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.generated.StormTopology; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.tuple.Fields; import org.apache.storm.utils.Utils; /** * @author zyzdisciple * @date 2019/4/7 */ public class UserBehaviorTopology { private static final String STREAM_SPOUT = "spout"; private static final String STREAM_TIME_INTERVAL_BOLT = "time-bolt"; private static final String STREAM_CONTENT_BOLT = "content-stitching-bolt"; private static final String STREAM_FILE_WRITER_BOLT = "file-writer-bolt"; private static final String TOPOLOGY_NAME = "user-behavior-topology"; private static final long TEN_SECONDS = 1000L * 10; public static void main(String[] args) { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout(STREAM_SPOUT, new FileReaderSpout()); builder.setBolt(STREAM_TIME_INTERVAL_BOLT, new TimeIntervalBolt()).shuffleGrouping(STREAM_SPOUT); //這裏必然要使用field 保證同一group的數據發送到同一個bolt中. builder.setBolt(STREAM_CONTENT_BOLT, new ContentStitchingBolt()) .fieldsGrouping(STREAM_TIME_INTERVAL_BOLT, new Fields(BehaviorConstants.FIELD_TIME_GROUP)); builder.setBolt(STREAM_FILE_WRITER_BOLT, new MessageWriterBolt()).shuffleGrouping(STREAM_CONTENT_BOLT); Config conf = new Config(); //當設置爲true時, 且log level定義爲info時, 會在控制檯輸出發射元組內容 conf.setDebug(true); StormTopology topology = builder.createTopology(); LocalCluster cluster = new LocalCluster(); cluster.submitTopology(TOPOLOGY_NAME, conf, topology); //停留幾秒後關閉拓撲,不然會永久運行下去 Utils.sleep(TEN_SECONDS); cluster.killTopology(TOPOLOGY_NAME); cluster.shutdown(); } }
那麼僅僅這樣就能夠了嗎? 固然不行, 咱們會發現, 在整個拓撲中, 依然是採用串行化的方式去處理數據, 對資源得不到充分的利用, 也是不可以支撐起大數據的.
因此在這裏就須要提到另外一個東西, 即拓撲的並行化
咱們能夠先來看看代碼, 在UserBehaviorTopology 中的main 方法作以下更改:
builder.setSpout(STREAM_SPOUT, new FileReaderSpout(), 4);
有什麼區別呢? 在最後一位參數加了個4.
咱們須要知道的一點是: 不管是 Spout 仍是 bolt 最終都會迴歸到虛擬機, 或物理上, 在一個JVM進程中去運行相關的代碼, 雖然咱們自身沒法分配, 決定哪個bolt在哪個jvm中去執行, 可是咱們能夠決定同時用多少個線程來運行相關的bolt, spout, 進行並行化處理, 大大提高速度, 而在這裏, 咱們則是指定了4個線程去執行spout. 也就是 excutor
須要注意:不管有多少個jvm在運行spout, 其總的執行spout的線程數,肯定爲4
那麼在第一環進行了擴展, 能夠支持更大批量的數據處理, 但這並不夠, 系統的總體性能必然是取決於最低點的.
builder.setSpout(STREAM_SPOUT, new FileReaderSpout(), 4); builder.setBolt(STREAM_TIME_INTERVAL_BOLT, new TimeIntervalBolt(), 2).shuffleGrouping(STREAM_SPOUT); builder.setBolt(STREAM_CONTENT_BOLT, new ContentStitchingBolt(), 8) .fieldsGrouping(STREAM_TIME_INTERVAL_BOLT, new Fields(BehaviorConstants.FIELD_TIME_GROUP)); builder.setBolt(STREAM_FILE_WRITER_BOLT, new MessageWriterBolt(), 4).shuffleGrouping(STREAM_CONTENT_BOLT);
在這裏考慮到Spout 與 ContentStitchingBolt的任務功能比較複雜, 所以給了較多的executor去執行.
這裏的線程數分配 並無嚴格的要求, 節點功能簡單, 能夠適當減小執行器, 節點功能複雜能夠適當增長.
而在前面提到過, 異常處理問題, 在每個bolt 或Spout中的異常都須要被處理掉, 咱們知道, 節點是運行在線程中的, 而且並不必定在某個jvm中運行多少個bolt, spout, 當拋出運行時異常或未被cathc掉的異常 會致使程序異常終止, 在這種狀況下, 可能會牽連其餘的節點也異常終止, 致使topology掛掉, 所以必須處理.
而在上面提到的jvm也被稱做 workNode, worker.
執行線程則是 executor
另外每個線程中可能會運行一個或多個節點(spout bolt實例), 這樣的實例就是task, 即任務.
這是並行性相關的三個概念.
一個worker對應一個jvm進程, 其中包含多個線程, 每一個線程中含有數目不定(默認是一個)的task, 也即spout bolt實例.
而設定bolt則是:
builder.setBolt(STREAM_CONTENT_BOLT, new ContentStitchingBolt(), 8) .setNumTasks(64) .fieldsGrouping(STREAM_TIME_INTERVAL_BOLT, new Fields (BehaviorConstants.FIELD_TIME_GROUP));
經過這種方式, setNumTasks 定義了64個實例. 每一個線程中存在8個實例, 這8個實例採起循環執行, 而非並行執行的方式.
那麼爲何要用這種方式呢? 每一個線程中多建立8個實例串行執行,效率會有所提升嗎? 並不會.
那麼意義在哪裏呢?
Storm中有一個比較有趣的功能叫作, Rebalance, 支持在拓撲運行時動態調整 work 和 executor的數量. 但卻並不支持調整 task的數量.
而且要求 executor <= task 數量, 若是超出, 則executor 會與 task保持一致. 那麼task數量爲何不能調整呢? 淺顯的例子是, 對於 fieldGrouping而言, 要求 field一致的數據, 最終都流向同一個 bolt實例, 也就是同一個task中, 若是在rebalance過程當中, 建立了新的bolt, 就會出現問題.
能夠經過這樣兩個方面來作進一步優化:
分解爲功能組件的優化方式
將每個功能細分, 分解到不一樣的節點中進行處理, 這樣知足了單一職責原則, 在未來調整擴展時帶來極大的方便.
基於重分配的方式進行設計
咱們須要在拓撲中儘量的減小重分配的次數, 由於每一次執行分配, 咱們須要進行相應的序列化, 反序列化, 以及網絡傳輸種種開銷.
分配的節點越多, 咱們須要越多的實例, 對物理虛擬資源的開銷也都會相應的變大.
所以, 咱們須要在儘量知足組件優化的同時, 減小重分配次數.
想到這裏不只產生了一點點問題:
那不是在一個單獨的Spout中所有計算, 將功能定義爲各類方法, 就是最大程度的知足了最少重分配嗎?
這裏就會存在一個問題, 計算資源的分配, 並行處理多個計算顯然要比流式處理來的更快, 咱們將不一樣的計算任務分配到各個節點, 最大化利用物理資源.
我想這大概是Storm的設計初衷.
因此其核心就在於將計算資源分配, 同時若是可以最少程度的減小網絡傳輸, 最好不過了. 而這一點, 也正是Spark的實現方式.
那麼在咱們的項目中又該如何調整呢?
答案在於調整 TimeIntervalBolt, 一方面它自己也是數據處理的一部分, 另外一方面, 與其爲了這一操做加入沒必要要的網絡傳輸序列化等, 不如在前一步直接進行處理, 減小一個節點.
實現以下:
if (isValid(info, line)) { completingMessage(info); //判斷當前的時間區間. long timeGroup = System.currentTimeMillis() / (BehaviorConstants.SESSION_TIME_OUT_SECS * 1000); collector.emit(new Values(timeGroup, info)); }
刪除TimeInterval, 加入Spout中, 並作相應字段,分流調整.
說到底, 這自己仍是一件全憑藉我的理解的問題, 合併時須要考慮這樣幾個點:
性能影響, 即合併與拆分 帶來的開銷, 利弊權衡.
語義化問題,不然會爲未來擴展帶來沒必要要的問題.
在本篇中經過一個更加真實的案例, 從頭設計了一個拓撲, 有如下知識點:
如何設計一個拓撲, 咱們須要考慮其數據源的特性, 在一開始就肯定最終的輸出格式, 以及進一步一點點肯定中間的操做步驟.
Tick 機制, 在Storm中實現相關的定時處理.
並行度, worker, executor, task, 對這些有了基本的瞭解.
bolt的線程安全問題, 有了基本瞭解
拓撲的設計方法, 基於功能, 和基於重分配的兩種方式.