storm知識點學習總結(一)

storm --流式處理框架java

   storm是個實時的、分佈式以及具有高容錯的計算系統node

   - storm 進程常駐內存python

  - storm 數據不通過磁盤,在內存中處理linux

  Twitter開源的分佈式實時大數據處理框架,最先開源於githubgit

  storm 架構   -Nimbus  -Supervisor  -Workergithub

  編程模型: - DAG      -Spout  -Boltredis

  數據傳輸: - ZMQ (Twitter早起的產品)sql

                     - ZeroMQ 開源的消息傳遞框架,並非一個MessageQueueshell

                    -Netty Netty是基於NIO的網絡框架,更加高效 (之因此storm0.9版本以後使用netty,是由於ZMQ的license和storm的licemse不兼容)數據庫

  高可靠性: -異常處理    -消息可靠性保障機制(ACK)

  可維護性: -stormUI圖形化監控接口

  流式處理(同步與異步):客戶端提交數據進行計算,並不會等待數據計算結果 

  逐條處理:例如ETL(數據清洗)

  統計分析: 例如計算pv、uv、訪問熱點以及某些數據的聚合、加和、平均等

  --客戶端提交數據以後,計算完成結果存儲到redis、Hbase、Mysql或者其餘的MQ當中

   客戶端並不關心最終的計算結果是多少

  實時請求應答服務(同步) -客戶端提交數據請求以後,馬上取得計算結果並返回給客戶端

 DRPC:

 實時請求處理:

storm : 進程 、線程常駐內存運行,數據不只如此磁盤,數據經過網絡進行傳遞

MapReduce: 爲TB、PB級別數據設計 的批處理計算框架

storm與mapreduce的比較:

storm: 流式處理、毫秒級、DAG模型、常駐運行

MapReduce: 批處理、分鐘級、map+reduce模型 、反覆啓停

storm:純流式處理 

           - 專門爲流式處理設計

          - 數據傳輸模式更爲簡單,不少地方也更爲高效

         -並非不能作批處理,它也能夠用來作微批處理,來提升吞吐

 Spark Streaming :微批處理  

       -- 將RDD作的很小來用小的批處理來接近流式處理

      --基於內存和DAG處理任務作的很快

 storm: 流式處理,毫秒級,已經很穩定,獨立系統專門流式計算設計

 SparkStreaming: 微批處理、秒級、穩定性改進中、spark核心之上的一種計算模型,能與其餘的組件進行很好的結合

 storm計算模型: 

          Topology-DAG 有向無環圖的實現

          --對於strom實時計算邏輯的封裝

          --即、由一系列經過數據流相互關聯的spout、bolt所組成的拓撲結構

         --生命週期:此拓撲只要啓動就會一直在集羣中運行,直到手動將其kill,不然不會終止

       tuple  --元祖

   ---storm中最小的數據組成單元

      stream --數據流

      --從spout中源源不斷傳遞數據給bolt、以及上一個bolt傳遞給下一個bolt,所造成的這些數據通道即叫作stream

      --stream聲明時需給其指定一個ID

      spout -數據源 

      -拓撲中數據流的來源。通常會從指定外部的數據源讀取元祖(tuple)發送到拓撲(Topology)中

     -一個spout能夠發送多個數據流(stream)

    --可先經過OutputFieldsDeclear中的declear方法聲明定義的不一樣數據流,發送數據時SpoutOutPutCollector中的emit方法指定數據流的參數將數據發送出去

   --spout中最核心的方法是nextyouple,該方法會被storm線程不斷對的調用、主動從數據源拉取數據,在經過emit方法將數據生成元祖(tuple)發送給只有的bolt計算

  

    -bolt  數據流處理組件

    - 拓撲中數據處理均有bolt完成。對於簡單的任務或者數據流轉換,單個bolt能夠簡單的實現;更加複雜的場景每每須要多個bolt分多個步驟處理完成

    -一個bolt能夠發送多個數據流(Stream)

   --能夠先經過outputFiledDeclear中的declear方法生命定義的不一樣數據流,發送數據時經過spoutOutputcollector中的emit方法指定數據流id參數將數據發送出去

   --bolt最核心的方法是executor方法,該方法負責接收一個元祖數據、真正實現核心的業務邏輯

  stream Grouping --數據流分組

用storm 實現wordcount單詞統計

 數據發送類

package com.storm.spout;

import java.util.List;
import java.util.Map;
import java.util.Random;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;

public class Wdcspout extends BaseRichSpout{

    private SpoutOutputCollector collector;

    String[] text = {
            "nihao hello ok",
            "nice to meet hello",
            "where are you ok",
            "where is you home"
            
    };
    
    Random r = new Random();
    @Override
    public void nextTuple() {
        // TODO Auto-generated method stub
        
        List line = new Values(text[r.nextInt(3)]);
        this.collector.emit(line);
    
        System.out.println("line==============="+line);
        Utils.sleep(1000);
        
    }

    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        // TODO Auto-generated method stub
        this.collector = collector;
        
        
        
    }

    /**
     * 
     * 
     */
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        // TODO Auto-generated method stub
        declarer.declare(new Fields("line"));
    }

}

數據處理類:

package com.storm.bolt;

import java.util.List;
import java.util.Map;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

public class Wcdbolt extends BaseRichBolt{

    private OutputCollector collector;

    @Override
    public void execute(Tuple input) {
        // TODO Auto-generated method stub
        //一、獲取數據,並對獲取的數據進行切分
        String[] words =  input.getString(0).split(" ");
        //二、發送數據
        for(String word: words) {
            
            List tuple = new Values(word);
            this.collector.emit(tuple);
            
        }
        
        
        
        
    }

    @Override
    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
        // TODO Auto-generated method stub
        this.collector = collector;
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        // TODO Auto-generated method stub
        declarer.declare(new Fields("tuple"));
    }

}

第二個處理數據的bolt

package com.storm.bolt;

import java.util.HashMap;
import java.util.Map;


import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;

public class wcdsbolt extends BaseRichBolt{

    private OutputCollector collector;

    
    Map<String, Integer> map = new HashMap<>();
    @Override
    public void execute(Tuple input) {
        // TODO Auto-generated method stub
        //對接受到的數據進行處理 
        String word = input.getStringByField("tuple");
        int count = 1;
        //若是單詞不存在,則把單詞的統計數添加到map中,不然,在原址value的基礎之上加1 
        if(map.containsKey(word)) {
            count = map.get(word)+1;
        }
        
        map.put(word, count);
        
        System.err.println(word+"----------------------------"+count);
        
        
    }

    @Override
    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
        // TODO Auto-generated method stub
        this.collector = collector;
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer arg0) {
        // TODO Auto-generated method stub
        
    }

}

對處理結果提交到本地運行

package com.storm.test;

import org.jgrapht.alg.TarjanLowestCommonAncestor.LcaRequestResponse;

import com.storm.bolt.Wcdbolt;
import com.storm.bolt.wcdsbolt;
import com.storm.spout.Wdcspout;

import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import backtype.storm.Config;;

public class test1 {

    public static void main(String[] args) {
        
        TopologyBuilder tm = new TopologyBuilder();
        
        //單線程處理
        /*tm.setSpout("wcdspout", new Wdcspout());
        tm.setBolt("wcdbolt", new Wcdbolt()).shuffleGrouping("wcdspout");
        tm.setBolt("wcdsbolt", new wcdsbolt()).shuffleGrouping("wcdbolt");*/
        //多線程處理
        tm.setSpout("wcdspout", new Wdcspout());
        tm.setBolt("wcdbolt", new Wcdbolt(),3).shuffleGrouping("wcdspout");
        tm.setBolt("wcdsbolt", new wcdsbolt(),3).fieldsGrouping("wcdbolt", new Fields("tuple"));
        
        
        
        LocalCluster lm = new LocalCluster();
        
        lm.submitTopology("w", new Config(), tm.createTopology());    
    }
}

注: 當用多線程處理的時候,注意對於分發策略的選擇。   不然會發生數據統計異常的錯誤 。分發策略主要由grouping方法來進行處理的。

 

storm grouping --數據流分組;(即數據的分發策略)

 

一、 shuffle grouping

      --隨機分組,隨機派發stream裏面的tuple,保證每一個bolt task 接受到的tuple數目大體相同 。

     --輪詢,平均分配。

  二、 Fields grouping

     --按字段分組,好比,按「user-id」這個字段來進行分組,那麼具備一樣「user-id」的tuple會被分到相同的bolt裏面的一個task,而不一樣的"user-id"則可能會被分到不一樣的task

     --

 三、 All grouping

   -- 廣播分發,對於每個tuple,全部的bolt都會收到

四、Global Grouping

   --全局分組,把tuple分配給task id 最低的task

  五、None grouping

    -- 不分組,這個分組的意思是說storm不關心究竟是怎樣進行分組的,目前這種分組和shufflegrouping 的分組效果是同樣的。有一點不一樣的地方就是storm會把使用none grouping 的這個bolt放到這個bolt的訂閱者的同一個線程中區,(將來storm若是可能的話,會進行這樣的設計)

  六、direct grouping

   --指向型分組,這是一種比較特殊的分組方法,用這種分組意味着消息(tuple)的發送者指定由消息接受者的那個task處理這個消息。只要被聲明爲Direct stream 的消息流能夠聲明這種分組方法。並且這種消息的tuple必須使用emitDirect方式來發送。消息處理着能夠經過TopologyContext來獲取處理它的消息的task的id

 

七、Local or shuffle grouping 

   --本地或隨機分組。若是目標bolt有一個或者多個task與源bolt的task在同一個工做進程中,tuple將會被隨機發送給這些同進程中的task,不然,和普通的shuffle grouping 行爲一致

八、 customgrouping

   --自定義,至關於mapreduce哪裏本身去實現一個partition同樣。

 

storm 架構設計

    

Nimbus: --資源調度  --任務分配  --接受jar包

Supervisor: --接受Nimbus分配的任務

-- 啓動、中止本身管理的worker進程(當前supervisor上worker數量由配置文件設定)

--Worker

   --運行具體處理運算組件的進程(每一個worker對應執行一個Topology的子集)

   --worker任務類型,即spout,bolt任務兩種

   --啓動executor(executor即worker JVM進程中的一個java線程,通常默認每一個executor負責執行一個task任務 )

--zookeeper:

storm 提交任務流程:  一、將提交的jar包上傳至nimbus服務器numbus/inbox目錄下  二、對topology進行檢驗處理   三、創建Topology在本地的存放目錄nimbus\stormdist\topology-id(該目錄下包含三個文件)

       stormjar.jar:  --從nimbus/inbox目錄下移動來的Topology的jar包

       stormcode.ser:   --對Topology對象序列化法

       stormconf.ser:  --topology的運行配置

  nimbus任務分配:即根據代碼初始化spout/bolt的task數目,並分配給對應的task-id,最後將這些信息寫入到zookeeper的/task節點下

  nimbus在zookeeper上建立taskbeats節點,監控task的心跳

  將任務分配信息寫入到assignment/topology-id節點中,此時便可認爲任務提交完畢

  在zookeeper的/storms/topology-id節點下存聽任務運行的時間、狀態等信息

  按期檢查zookeeper上storm節點,是否有新任務提交,刪除本地不在運行的任務

  根據nimbus指定的任務信息啓動該節點上的worker

   查看須要執行的task任務信息,獲取到相應的task信息,即spout/bolt任務信息,執行具體的運算,並根據IP以及端口發送消息數據

  

 

  

storm的安裝:

僞分佈式的安裝:

一、 上傳安裝壓縮包   二、將安裝壓縮包解壓  三、配置環境變量  四、啓動storm相關命令

storm dev-zookeeper >> ./logs/zk.out 2>&1 & (將啓動日誌重定向到logs目錄中,2>&1表明將標準的錯誤輸出重定向到標準的正確輸出中)

 

在僞分佈式環境上運行wordcount步驟:

一、將wordcount代碼打成jar包

二、將打好的jar包上傳到linux服務器

三、運行上傳的jar 包 

運行jar包的命令: storm jar tq.jar com.storm.wordcount.Main  (最後面寫的是jar包文件在eclipse中所在的jar位置和類名稱)--這種方式是本地模式來運行的
若是不打算用本地模式來運行那麼就添加一個參數來運行
storm jar tq.jar com.storm.wordcount.Main ec

 

ack: 線程保障機制,監控線程的運行狀況,並將監控的結果發送給spout,若是ack監控到線程運行出現了問題,那麼就讓spout將數據從新發送一遍

結束運行任務的命令: storm kill 任務名稱 -w 時間長短

注:關閉storm全部已經啓動的任務命令: killall java  

storm 徹底分佈式的搭建: 一、準備環境  jdk python 2.6.6

  二、 部署zookeeper :          三、上傳安裝包並解壓        四、在storm中建立logs目錄   五、修改配置文件 -conf/storm.yaml

  在配置文件中指定對應的zookeeper:  storm.zookeeper.servers: - "node2"  - "node3" - "node4"

 nimbus.host: "node1"      (指定nimbus所在的節點)            

storm.local.dir: "/tmp/storm"

supervisor.slots.ports:

     - 6700

    - 6701

   - 6702

   - 6703

而後將配置好的文件分發到其餘的節點 scp -r ./storm node2:/opt/

徹底分佈式的啓動:node1上面啓動主節點  

./bin/storm nimbus >> ./logs/nimbus.out 2>&1 &

./bin/storm ui >> ./logs/ui.out 2>&1 &

node二、node3上面啓動從節點

./bin/storm supervisor >> ./logs/supervisor.out 2>&1 &

storm的併發機制:

worker: --進程

     一個Topology拓撲會包含一個或者多個worker(每一個worker只能從屬於一個特定的Topology)

     這些Worker進程會並行跑在集羣中不一樣的服務器上,即一個Topology拓撲實際上是由並行運行在storm集羣中多臺服務器上的進程所組成

Executor  --線程

     --executor是由Worker進程中生成的一個線程

     --每一個worker進程中會運行一個或多個拓撲當中的executor線程

    --一個executor線程中能夠執行一個或多個task任務(默認每一個executor只執行一個task任務),可是這些task任務都是對應着同一個組件

Task 

    --實際執行數據處理的最小單元

    --每一個task即爲一個spout或者一個bolt

   task數量在整個Topology生命週期中保持不變,executor數量能夠變化或者手動調整

  (默認狀況下,task數量和executor是相同的,即每一個executor線程中默認運行一個task任務)

  設置worker 進程數: -Config.setNumWorkers(Int workers)

  設置Executor線程數

    - TopologyBuilder.setspout()

   --TopologyBuilder.setbolt()

 設置task數量:

   --componentConfigurationDeclare.setNumTasks(Number val)

 

 rebalance  --再平衡

 --即,動態調整Topology拓撲的worker進程數、以及executor的線程數

  支持兩種調整方式: 一、經過storm ui  二、經過storm cli

 經過storm CLI 動態調整:

用shell命令調整並行度:

 ./bin/storm rebalance wc -w 30 -n 2 -e 組件名稱=(調整的並行度)

 

storm的通訊機制:

      worker進程之間的通訊:

      -ZMQ   -zeroMQ開源的消息傳遞框架,並非一個MessageQueue

     -Netty -netty是基於NIO的網絡框架,更加高效。

     Worker內部的數據通訊: 

      -Disruptor  --實現隊列的功能   --能夠理解爲一種事件監聽或者消息處理機制,即在隊列中一邊由生產者放入消息數據,另外一邊由消費者並行去除消息數據處理

       

storm的容錯機制:  一、集羣節點宕機  -Nimbus服務器  單點故障

                                                            --非Nimbus服務器  故障時,該節點上全部的task任務都會超時,Nimbus會將這些task任務從新分配到其餘服務器上運行

 

二、 進程掛掉 

      --worker

      掛掉時,Supervisor會重啓這個進程,若是啓動過程當中任然一直失敗,而且沒法向nimbus發送心跳,Nimbus會將該worker從新分配到其餘服務器上

    --Supervisor

      無狀態(全部的狀態信息都放在zookeeper中進行管理)

      快速失敗 (每當遇到異常狀況,都會自動毀滅)

   --Nimbus 

      無狀態(全部的狀態信息都存放在zookeeper中來管理)

      快速失敗(每當遇到任何的異常狀況都會自動毀滅)

三、消息的完整性

     acker --消息完整性的實現機制

   -- storm 的拓撲當中 特殊的一些任務

  -- 負責跟蹤每一個spout發出的tuple的DAG(有向無環圖)

  注:容錯機制沒法保證數據只被處理一次 ,但能夠保證全部的數據都被處理

 

storm -DRPC 

   客戶端經過向DRPC服務器發送待執行函數的名臣以及該函數的參數來獲取處理結果。實現該函數的拓撲使用一個DRPspout從DRPC服務器中接受一個函數的調用流。DRPC會爲每個函數調用都標記一個惟一的id,隨後拓撲會執行函數來計算結果,並在拓撲的最後使用一個名爲returnResult的bolt鏈接到DRPC服務器,根據函數調用的id來將函數調用的結果返回。

   DRPC (Distributed RPC)   --分佈式遠程調用

   DRPC 是經過一個DRPC服務端(DRPC server)來實現分佈式RPC功能

   DRPC server 負責接收RPC請求,並將該請求發送到Strom中運行的Topology,等待接收Topology發送的處理結果,並將該結果返回給發送請求的客戶端

    DPRC設計目的:

    爲了充分利用Storm的計算能力實現高密度的並行實時計算

   DRPC在集羣中運行,首先須要配置配置文件,添加DRPC運行的節點,其次須要將啓動DRPC服務器。

   ./bin/storm drpc >> ./logs/drpc.out 2>&1 &

   

flume整合kafka: 安裝flume 和 kafka ,啓動zookeeper+kafka+flume

flume的安裝:

一、加壓安裝包   二、修改配置文件名稱: mv flume-env.sh.propertise flume-env.sh  三、在配置文件中配置java_home路徑

啓動三個節點kafka:bin/kafka-server-start.sh config/server.properties   

啓動flume:bin/flume-ng agent -n a1 -c conf -f conf/fk.conf -Dflume.root.logger=DEBUG,console

添加flume啓動的配置文件:

a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.bind = node01
a1.sources.r1.port = 41414

# Describe the sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = testflume
a1.sinks.k1.brokerList = node1:9092,node2:9092,node3:9092
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 20
a1.sinks.k1.channel = c1

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000000
a1.channels.c1.transactionCapacity = 10000

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

 

Flume+Kafka+Storm架構設計:

package com.storm.flume;


/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.StringScheme;
import storm.kafka.ZkHosts;
import storm.kafka.bolt.KafkaBolt;
import storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper;
import storm.kafka.bolt.selector.DefaultTopicSelector;

/**
 * This topology demonstrates Storm's stream groupings and multilang
 * capabilities.
 */
public class LogFilterTopology {

    public static class FilterBolt extends BaseBasicBolt {
        @Override
        public void execute(Tuple tuple, BasicOutputCollector collector) {
            String line = tuple.getString(0);
            System.err.println("Accept:  " + line);
            // 包含ERROR的行留下
            if (line.contains("SUCCESS")) {
                System.err.println("Filter:  " + line);
                collector.emit(new Values(line));
            }
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            // 定義message提供給後面FieldNameBasedTupleToKafkaMapper使用
            declarer.declare(new Fields("message"));
        }
    }

    public static void main(String[] args) throws Exception {
        TopologyBuilder builder = new TopologyBuilder();

        // https://github.com/apache/storm/tree/master/external/storm-kafka
        // config kafka spout,話題
        String topic = "testflume";
        ZkHosts zkHosts = new ZkHosts("node1:2181,node2:2181,node3:2181");
        // /MyKafka,偏移量offset的根目錄,記錄隊列取到了哪裏
        SpoutConfig spoutConfig = new SpoutConfig(zkHosts, topic, "/MyKafka", "MyTrack");// 對應一個應用
        List<String> zkServers = new ArrayList<String>();
        System.out.println(zkHosts.brokerZkStr);
        for (String host : zkHosts.brokerZkStr.split(",")) {
            zkServers.add(host.split(":")[0]);
        }

        spoutConfig.zkServers = zkServers;
        spoutConfig.zkPort = 2181;
        // 是否從頭開始消費
        spoutConfig.forceFromStart = true;
        spoutConfig.socketTimeoutMs = 60 * 1000;
        // StringScheme將字節流轉解碼成某種編碼的字符串
        spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());

        KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

        // set kafka spout
        builder.setSpout("kafka_spout", kafkaSpout, 3);

        // set bolt
        builder.setBolt("filter", new FilterBolt(), 8).shuffleGrouping("kafka_spout");

        // 數據寫出
        // set kafka bolt
        // withTopicSelector使用缺省的選擇器指定寫入的topic: LogError
        // withTupleToKafkaMapper tuple==>kafka的key和message
        KafkaBolt kafka_bolt = new KafkaBolt().withTopicSelector(new DefaultTopicSelector("LogError"))
                .withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper());

        builder.setBolt("kafka_bolt", kafka_bolt, 2).shuffleGrouping("filter");

        Config conf = new Config();
        // set producer properties.
        Properties props = new Properties();
        props.put("metadata.broker.list", "node1:9092,node2:9092,node3:9092");
        /**
         * Kafka生產者ACK機制 0 : 生產者不等待Kafka broker完成確認,繼續發送下一條數據 1 :
         * 生產者等待消息在leader接收成功確認以後,繼續發送下一條數據 -1 :
         * 生產者等待消息在follower副本接收到數據確認以後,繼續發送下一條數據
         */
        props.put("request.required.acks", "1");
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        conf.put("kafka.broker.properties", props);

        conf.put(Config.STORM_ZOOKEEPER_SERVERS, Arrays.asList(new String[] { "node1", "node2", "node3" }));

        // 本地方式運行
        LocalCluster localCluster = new LocalCluster();
        localCluster.submitTopology("mytopology", conf, builder.createTopology());

    }
}

 

storm --事務 

       強順序流(強有序) 

       --引入事物(transaction)的概念,每一個transaction關聯一個transaction id。

      --transaction id從1開始,每一個tuple會按照順序加1

      --在處理tuple時,將處理成功的tuple結果以及transaction同時寫入數據庫中進行存儲

     --兩種狀況

       一、當前transaction id 與數據庫中的transaction id 不一致

       二、兩個transaction id 相同

    缺點:一次只能處理一個tuple,沒法實現分佈式計算

   

      將Topology拆分紅兩個階段:

      一、Processing phase  : 容許並行處理多個batch

      二、commit phase  : 保證batch的強有序,一次只能處理一個batch

    

   Design details

   Manages state -狀態管理 

    --storm 經過 Zookeeper 存儲全部的transaction相關信息 (包含了: 當前transaction id 以及 batch的元數據信息 )

    Coordinates the transaction --事物協調

    --storm會管理決定transaction應該處理什麼階段(processing,committing)

    Fault detection --故障檢測

    --storm 內部經過ACKER 機制保障消息被正常處理(用戶不須要手動區維護)

  First class batch processing API  : storm 提供的batch bolt接口

   三種事物:  

   一、普通事物  二、 partition transaction --分區事物  三、 opaque transaction --不透明分區事物  

   事務性拓撲(transaction topoligies) 保證消息(tuple)被且僅被處理一次

相關文章
相關標籤/搜索