trident教程

 



(一)理論基礎
更多理論之後再補充,或者參考書籍
一、trident是什麼?
Trident is a high-level abstraction for doing realtime computing on top of Storm. It allows you to seamlessly intermix high throughput (millions of messages per second), stateful stream processing with low latency distributed querying. If you're familiar with high level batch processing tools like Pig or Cascading, the concepts of Trident will be very familiar – Trident has joins, aggregations, grouping, functions, and filters. In addition to these, Trident adds primitives for doing stateful, incremental processing on top of any database or persistence store. Trident has consistent, exactly-once semantics, so it is easy to reason about Trident topologies.

簡單的說,trident是storm的更高層次抽象,相對storm,它主要提供了2個方面的好處:

(1)提供了更高層次的抽象,將經常使用的count,sum等封裝成了方法,能夠直接調用,不須要本身實現。

(2)提供了一次原語,如groupby等。

(3)提供了事務支持,能夠保證數據均處理且只處理了一次。

二、trident每次處理消息均爲batch爲單位,即一次處理多個元組。


java

三、事務類型docker

關於事務類型,有2個比較容易混淆的概念:spout的事務類型以及事務狀態。服務器

它們都有3種類型,分別爲:事務型、非事務型和透明事務型。app

(1)spoutless

spout的類型指定了因爲下游出現問題致使元組須要重放時,應該怎麼發送元組。ide

事務型spout:重放時能保證同一個批次發送同一批元組。能夠保證每個元組都被髮送且只發送一個,且同一個批次所發送的元組是同樣的。函數

非事務型spout:沒有任何保障,發完就算。ui

透明事務型spout:同一個批次發送的元組有可能不一樣的,它能夠保證每個元組都被髮送且只發送一次,但不能保證重放時同一個批次的數據是同樣的。這對於部分失效的狀況尤爲有用,假如以kafka做爲spout,當一個topic的某個分區失效時,能夠用其它分區的數據先造成一個批次發送出去,若是是事務型spout,則必須等待那個分區恢復後才能繼續發送。spa

這三種類型能夠分別經過實現ITransactionalSpout、ITridentSpout、IOpaquePartitionedTridentSpout接口來定義。.net

 

(2)state

state的類型指定了若是將storm的中間輸出或者最終輸出持久化到某個地方(如內存),當某個批次的數據重放時應該若是更新狀態。state對於下游出現錯誤的狀況尤爲有用。

事務型狀態:同一批次tuple提供的結果是相同的。

非事務型狀態:沒有回滾能力,更新操做是永久的。

透明事務型狀態:更新操做基於先前的值,這樣因爲這批數據發生變化,對應的結果也會發生變化。透明事務型狀態除了保存當前數據外,還要保存上一批數據,當數據重放時,能夠基於上一批數據做更新。




(二)看官方提供的示例

package org.ljh.tridentdemo;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.LocalDRPC;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.StormTopology;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import storm.trident.TridentState;
import storm.trident.TridentTopology;
import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.operation.builtin.Count;
import storm.trident.operation.builtin.FilterNull;
import storm.trident.operation.builtin.MapGet;
import storm.trident.operation.builtin.Sum;
import storm.trident.testing.FixedBatchSpout;
import storm.trident.testing.MemoryMapState;
import storm.trident.tuple.TridentTuple;


public class TridentWordCount {
    public static class Split extends BaseFunction {
        @Override
        public void execute(TridentTuple tuple, TridentCollector collector) {
            String sentence = tuple.getString(0);
            for (String word : sentence.split(" ")) {
                collector.emit(new Values(word));
            }
        }
    }

    public static StormTopology buildTopology(LocalDRPC drpc) {
        FixedBatchSpout spout =
                new FixedBatchSpout(new Fields("sentence"), 3, new Values(
                        "the cow jumped over the moon"), new Values(
                        "the man went to the store and bought some candy"), new Values(
                        "four score and seven years ago"),
                        new Values("how many apples can you eat"), new Values(
                                "to be or not to be the person"));
        spout.setCycle(true);

        //建立拓撲對象
        TridentTopology topology = new TridentTopology();
        
        //這個流程用於統計單詞數據,結果將被保存在wordCounts中
        TridentState wordCounts =
                topology.newStream("spout1", spout)
                        .parallelismHint(16)
                        .each(new Fields("sentence"), new Split(), new Fields("word"))
                        .groupBy(new Fields("word"))
                        .persistentAggregate(new MemoryMapState.Factory(), new Count(),
                                new Fields("count")).parallelismHint(16);
        //這個流程用於查詢上面的統計結果
        topology.newDRPCStream("words", drpc)
                .each(new Fields("args"), new Split(), new Fields("word"))
                .groupBy(new Fields("word"))
                .stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count"))
                .each(new Fields("count"), new FilterNull())
               .aggregate(new Fields("count"), new Sum(), new Fields("sum"));
        return topology.build();
    }

    public static void main(String[] args) throws Exception {
        Config conf = new Config();
        conf.setMaxSpoutPending(20);
        if (args.length == 0) {
            LocalDRPC drpc = new LocalDRPC();
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("wordCounter", conf, buildTopology(drpc));
            for (int i = 0; i < 100; i++) {
                System.out.println("DRPC RESULT: " + drpc.execute("words", "cat the dog jumped"));
                Thread.sleep(1000);
            }
        } else {
            conf.setNumWorkers(3);
            StormSubmitter.submitTopologyWithProgressBar(args[0], conf, buildTopology(null));
        }
    }
}





實例實現了最基本的wordcount功能,而後將結果輸出。關鍵步驟以下:



一、定義了輸入流

        FixedBatchSpout spout =
                new FixedBatchSpout(new Fields("sentence"), 3, new Values(
                        "the cow jumped over the moon"), new Values(
                        "the man went to the store and bought some candy"), new Values(
                        "four score and seven years ago"),
                        new Values("how many apples can you eat"), new Values(
                                "to be or not to be the person"));
        spout.setCycle(true);



(1)使用FixedBatchSpout建立一個輸入spout,spout的輸出字段爲sentence,每3個元組做爲一個batch。
(2)數據不斷的重複發送。


二、統計單詞數量

        TridentState wordCounts =
                topology.newStream("spout1", spout)
                        .parallelismHint(16)
                        .each(new Fields("sentence"), new Split(), new Fields("word"))
                        .groupBy(new Fields("word"))
                        .persistentAggregate(new MemoryMapState.Factory(), new Count(),
                                new Fields("count")).parallelismHint(16);



這個流程用於統計單詞數據,結果將被保存在wordCounts中。6行代碼的含義分別爲:

(1)首先從spout中讀取消息,spout1定義了zookeeper中用於保存這個拓撲的節點名稱。

(2)並行度設置爲16,即16個線程同時從spout中讀取消息。

(3)each中的三個參數分別爲:輸入字段名稱,處理函數,輸出字段名稱。即從字段名稱叫sentence的數據流中讀取數據,而後通過new Split()處理後,以word做爲字段名發送出去。其中new Split()後面介紹,它的功能就是將輸入的內容以空格爲界做了切分。

(4)將字段名稱爲word的數據流做分組,即相同值的放在一組。

(5)將已經分好組的數據做統計,結果放到MemoryMapState,而後以count做爲字段名稱將結果發送出去。這步驟會同時存儲數據及狀態,並將返回TridentState對象。

(6)並行度設置。

三、輸出統計結果

        topology.newDRPCStream("words", drpc)
                .each(new Fields("args"), new Split(), new Fields("word"))
                .groupBy(new Fields("word"))
                .stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count"))
                .each(new Fields("count"), new FilterNull())
               .aggregate(new Fields("count"), new Sum(), new Fields("sum"));



這個流程從上述的wordCounts對象中讀取結果,並返回。6行代碼的含義分別爲:

(1)等待一個drpc調用,從drpc服務器中接受words的調用來提供消息。調用代碼以下:

drpc.execute("words", "cat the dog jumped")
(2)輸入爲上述調用中提供的參數,通過Split()後,以word做爲字段名稱發送出去。

(3)以word的值做分組。

(4)從wordCounts對象中查詢結果。4個參數分別表明:數據來源,輸入數據,內置方法(用於從map中根據key來查找value),輸出名稱。

(5)過濾掉空的查詢結果,如本例中,cat和dog都沒有結果。

(6)將結果做統計,並以sum做爲字段名稱發送出去,這也是DRPC調用所返回的結果。若是沒有這一行,最後的輸出結果

DRPC RESULT: [["cat the dog jumped","the",2310],["cat the dog jumped","jumped",462]]
加上這一行後,結果爲:
DRPC RESULT: [[180]]

四、split的字義

    public static class Split extends BaseFunction {
        @Override
        public void execute(TridentTuple tuple, TridentCollector collector) {
            String sentence = tuple.getString(0);
            for (String word : sentence.split(" ")) {
                collector.emit(new Values(word));
            }
        }
    }


注意它最後會發送數據。

五、建立並啓動拓撲

    public static void main(String[] args) throws Exception {
        Config conf = new Config();
        conf.setMaxSpoutPending(20);
        if (args.length == 0) {
            LocalDRPC drpc = new LocalDRPC();
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("wordCounter", conf, buildTopology(drpc));
            for (int i = 0; i < 100; i++) {
                System.out.println("DRPC RESULT: " + drpc.execute("words", "cat the dog jumped"));
                Thread.sleep(1000);
            }
        } else {
            conf.setNumWorkers(3);
            StormSubmitter.submitTopologyWithProgressBar(args[0], conf, buildTopology(null));
        }
    }


(1)當無參數運行時,啓動一個本地的集羣,及自已建立一個drpc對象來輸入。
(2)當有參數運行時,設置worker數量爲3,而後提交拓撲到集羣,並等待遠程的drpc調用。


 

(三)使用kafka做爲數據源的一個例子

 

 

package com.netease.sytopology;

import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.util.Arrays;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


import storm.kafka.BrokerHosts;
import storm.kafka.StringScheme;
import storm.kafka.ZkHosts;
import storm.kafka.trident.OpaqueTridentKafkaSpout;
import storm.kafka.trident.TridentKafkaConfig;
import storm.trident.TridentTopology;
import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.operation.builtin.Count;
import storm.trident.testing.MemoryMapState;
import storm.trident.tuple.TridentTuple;
import backtype.storm.Config;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.generated.StormTopology;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

/*
 * 本類完成如下內容
 */
public class SyTopology {

    public static final Logger LOG = LoggerFactory.getLogger(SyTopology.class);

    private final BrokerHosts brokerHosts;

    public SyTopology(String kafkaZookeeper) {
        brokerHosts = new ZkHosts(kafkaZookeeper);
    }

    public StormTopology buildTopology() {
        TridentKafkaConfig kafkaConfig = new TridentKafkaConfig(brokerHosts, "ma30", "storm");
        kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
        // TransactionalTridentKafkaSpout kafkaSpout = new
        // TransactionalTridentKafkaSpout(kafkaConfig);
        OpaqueTridentKafkaSpout kafkaSpout = new OpaqueTridentKafkaSpout(kafkaConfig);
        TridentTopology topology = new TridentTopology();

        // TridentState wordCounts =
        topology.newStream("kafka4", kafkaSpout).
        each(new Fields("str"), new Split(),
                new Fields("word")).groupBy(new Fields("word"))
                .persistentAggregate(new MemoryMapState.Factory(), new Count(),
                        new Fields("count")).parallelismHint(16);
        // .persistentAggregate(new HazelCastStateFactory(), new Count(),
        // new Fields("aggregates_words")).parallelismHint(2);


        return topology.build();
    }

    public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {
        String kafkaZk = args[0];
        SyTopology topology = new SyTopology(kafkaZk);
        Config config = new Config();
        config.put(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS, 2000);

        String name = args[1];
        String dockerIp = args[2];
        config.setNumWorkers(9);
        config.setMaxTaskParallelism(5);
        config.put(Config.NIMBUS_HOST, dockerIp);
        config.put(Config.NIMBUS_THRIFT_PORT, 6627);
        config.put(Config.STORM_ZOOKEEPER_PORT, 2181);
        config.put(Config.STORM_ZOOKEEPER_SERVERS, Arrays.asList(dockerIp));
        StormSubmitter.submitTopology(name, config, topology.buildTopology());

    }

    static class Split extends BaseFunction {
        public void execute(TridentTuple tuple, TridentCollector collector) {
            String sentence = tuple.getString(0);
            for (String word : sentence.split(",")) {
                try {
                    FileWriter fw = new FileWriter(new File("/home/data/test/ma30/ma30.txt"),true);
                    fw.write(word);
                    fw.flush();
                    fw.close();
                } catch (IOException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
                collector.emit(new Values(word));
                
            }
        }
    }
}



 

本例將從kafka中讀取消息,而後對消息根據「,」做拆分,並寫入一個本地文件。
一、定義kafka想着配置
        TridentKafkaConfig kafkaConfig = new TridentKafkaConfig(brokerHosts, "ma30", "storm");
        kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());

        OpaqueTridentKafkaSpout kafkaSpout = new OpaqueTridentKafkaSpout(kafkaConfig);

其中ma30是訂閱的topic名稱。


二、從kafka中讀取消息並處理

        topology.newStream("kafka4", kafkaSpout).
        each(new Fields("str"), new Split(),new Fields("word")).
        groupBy(new Fields("word"))
        .persistentAggregate(new MemoryMapState.Factory(), new Count(),
                        new Fields("count")).parallelismHint(16);

(1)指定了數據來源,並指定zookeeper中用於保存數據的位置,即保存在/transactional/kafka4。
(2)指定處理方法及發射的字段。
(3)根據word做分組。

(4)計數後將狀態寫入MemoryMapState

 

提交拓撲:

storm jar target/sytopology2-0.0.1-SNAPSHOT.jar com.netease.sytopology.SyTopology 192.168.172.98:2181/kafka test3 192.168.172.98

此時能夠在/home/data/test/ma30/ma30.txt看到split的結果

相關文章
相關標籤/搜索