apache beam 初探--java篇

 

——————————————
版權聲明:本文爲博主「henyu」的原創文章,遵循CC 4.0 by-sa版權協議,轉載請附上原文出處連接及本聲明。
原文連接:https://i.cnblogs.com/EditPosts.aspx?postid=11430012
html

一 、概述
在大數據的浪潮之下,技術的更新迭代十分頻繁。受技術開源的影響,大數據開發者提供了十分豐富的工具。但也由於如此,增長了開發者選擇合適工具的難度。在大數據處理一些問題的時候,每每使用的技術是多樣化的。這徹底取決於業務需求,好比進行批處理的MapReduce,實時流處理的Flink,以及SQL交互的Spark SQL等等。而把這些開源框架,工具,類庫,平臺整合到一塊兒,所須要的工做量以及複雜度,可想而知。這也是大數據開發者比較頭疼的問題。而今天要分享的就是整合這些資源的一個解決方案,它就是 Apache Beam。java

Beam是一個統一的編程框架,支持批處理和流處理,並能夠將用Beam編程模型構造出來的程序,在多個計算引擎(Apache Apex, Apache Flink, Apache Spark, Google Cloud Dataflow等)上運行。node

 

本文重點不在於講解 apache beam 的優缺點及應用前景,着重在於爲初識beam ,而不知道怎麼入門編寫代碼的朋友拋轉引玉。python

2、apache beam 是什麼

網上關於apache beam 的介紹不少,在這裏我就不介紹了,有興趣的可參閱下面連接apache

https://blog.csdn.net/qq_34777600/article/details/87165765 (原文出自: )編程

http://www.javashuo.com/article/p-mpwqqzle-gu.html (來源於 張海濤,目前就任於海康威視雲基礎平臺,負責雲計算大數據的基礎架構設計和中間件的開發,專一雲計算大數據方向。Apache Beam 中文社區發起人之一,若是想進一步瞭解最新 Apache Beam 動態和技術研究成果,請加微信 cyrjkj 入羣共同研究和運用)微信

 

三 、代碼入門架構

示例一 、讀寫文件 TextIOapp

 

<dependency>
            <groupId>org.apache.beam</groupId>
            <artifactId>beam-sdks-java-core</artifactId>
            <version>${beam.version}</version>
            <!--<scope>provided</scope>-->
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.beam/beam-runners-direct-java -->
        <dependency>
            <groupId>org.apache.beam</groupId>
            <artifactId>beam-runners-direct-java</artifactId>
            <version>${beam.version}</version>
            <!--<scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.beam</groupId>
            <artifactId>beam-runners-core-java</artifactId>
            <version>${kafka.version}</version>
            <!--<scope>provided</scope>-->
        </dependency>

 

 /**
     * 讀寫文件 TextIO
     *
     * @param
     */
    public static void TextIo() {
        //建立管道工廠
        PipelineOptions pipelineOptions = PipelineOptionsFactory.create();
        //設置運行的模型,如今一共有3種
        pipelineOptions.setRunner(DirectRunner.class);
        //設置相應的管道
        Pipeline pipeline = Pipeline.create(pipelineOptions);
        //根據文件路徑讀取文件內容
        pipeline.apply(TextIO.read().from("C:\\bigdata\\apache_beam\\src\\main\\resources\\abc"))
                .apply("ExtractWords", ParDo.of(new DoFn<String, String>() {
                    @ProcessElement
                    public void processElement(ProcessContext c) {
                        //根據空格讀取數據
                        for (String word : c.element().split(" ")) {
                            if (!word.isEmpty()) {
                                c.output(word);
                                System.out.println("讀文件中的數據:" + word);
                            }
                        }
                    }
                })).apply(Count.<String>perElement())
                .apply("formatResult", MapElements.via(new SimpleFunction<KV<String, Long>, String>() {
                    @Override
                    public String apply(KV<String, Long> input) {
                        return input.getKey() + " : " + input.getValue();
                    }
                }))
                .apply(TextIO.write().to("C:\\bigdata\\apache_beam\\src\\main\\resources")); //進行輸出到文件夾下面

        pipeline.run().waitUntilFinish();

    }

 

示例2、啓用flink做爲計算引擎、整合kafka ,以流式數據窗口的方式,計算kafka數據框架

引入相關依賴

 <dependency>
            <groupId>org.apache.beam</groupId>
            <artifactId>beam-sdks-java-core</artifactId>
            <version>${beam.version}</version>
            <!--<scope>provided</scope>-->
        </dependency>
 <dependency>
            <groupId>org.apache.beam</groupId>
            <artifactId>beam-sdks-java-io-kafka</artifactId>
            <version>${beam.version}</version>
            <!--<scope>provided</scope>-->
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>${kafka.version}</version>
            <!--<scope>provided</scope>-->
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.beam/beam-runners-core-java -->
        <dependency>
            <groupId>org.apache.beam</groupId>
            <artifactId>beam-runners-core-java</artifactId>
            <version>${kafka.version}</version>
            <!--<scope>provided</scope>-->
        </dependency>
<dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
            <!--<scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>${flink.version}</version>
            <!--<scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-core</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime_2.11</artifactId>
            <version>${flink.version}</version>
            <!--<scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>${flink.version}</version>
            <!--<scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-metrics-core</artifactId>
            <version>${flink.version}</version>
            <!--<scope>provided</scope>-->
        </dependency>

 

 

核心代碼 :

/**
     * flink
     * 讀寫kafka數據
     * flinkRunner
     * @param
     */
    public static void flinkKafka() {
        FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
        // 顯式指定PipelineRunner:FlinkRunner必須指定若是不制定則爲本地
        options.setStreaming(true);
        options.setAppName("app_test");
        options.setJobName("flinkjob");
        options.setFlinkMaster("local");
        options.setParallelism(10);
        //建立flink管道
        Pipeline pipeline = Pipeline.create(options);
        //指定KafkaIO的模型,從源碼中不難看出這個地方的KafkaIO<K,V>類型是String和String 類型,也能夠換成其餘類型。
        PCollection<KafkaRecord<String, String>> lines =
                pipeline.apply(KafkaIO.<String, String>read()
                                //設置Kafka集羣的集羣地址
                                .withBootstrapServers(kafkaBootstrapServers)
                                //設置Kafka的主題類型,源碼中使用了單個主題類型,若是是多個主題類型則用withTopics(List<String>)方法進行設置。
                                // 設置狀況基本跟Kafka原生是同樣的
                                .withTopic(inputTopic)
                                //設置序列化類型
                                .withKeyDeserializer(StringDeserializer.class)
                                .withValueDeserializer(StringDeserializer.class)
                                //設置Kafka的消費者屬性,這個地方還能夠設置其餘的屬性。源碼中是針對消費分組進行設置。
                                .withConsumerConfigUpdates(ImmutableMap.<String, Object>of("auto.offset.reset", "latest"))
                /*//設置Kafka吞吐量的時間戳,能夠是默認的,也能夠自定義
                .withLogAppendTime()
                *//**
                         * 至關於Kafka 中"isolation.level", "read_committed" ,指定KafkaConsumer只應讀取非事務性消息,或從其輸入主題中提交事務性消息。
                         * 流處理應用程序一般在多個讀取處理寫入階段處理其數據,每一個階段使用前一階段的輸出做爲其輸入。
                         * 經過指定read_committed模式,咱們能夠在全部階段完成一次處理。針對"Exactly-once" 語義,支持Kafka 0.11版本。
                         *//*
                .withReadCommitted()
                //設置Kafka是否自動提交屬性"AUTO_COMMIT",默認爲自動提交,使用Beam 的方法來設置
                .commitOffsetsInFinalize()
                //設置是否返回Kafka的其餘數據,例如offset 信息和分區信息,不用能夠去掉
                .withoutMetadata()
                //設置只返回values值,不用返回key*/
                );

        //kafka數據獲取
        PCollection<String> kafkadata = lines.apply("Remove Kafka Metadata", ParDo.of(new DoFn<KafkaRecord<String, String>, String>() {
            @ProcessElement
            public void processElement(ProcessContext c) {
                System.out.println("輸出的分區爲----:" + c.element().getKV());
                c.output(c.element().getKV().getValue());
            }
        }));

       //kafka數據處理
        PCollection<String> wordCount = kafkadata
                .apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(5))))
                .apply(Count.<String>perElement())
                .apply("ConcatResultKV", MapElements.via(new SimpleFunction<KV<String, Long>, String>() {
                    // 拼接最後的格式化輸出(Key爲Word,Value爲Count)
                    @Override
                    public String apply(KV<String, Long> input) {
                        System.out.println("進行統計:" + input.getKey() + ": " + input.getValue());
                        return input.getKey() + ": " + input.getValue();
                    }
                }));
        //kafka 處理後的數據發送回kafka
        wordCount.apply(KafkaIO.<Void, String>write()
                        .withBootstrapServers(kafkaBootstrapServers)
                        .withTopic(outputTopic)
                        //不須要設置,類型爲void
//                .withKeySerializer(VoidDeserializer.class)
                        .withValueSerializer(StringSerializer.class)
                        .values()
        );
        pipeline.run().waitUntilFinish();

    }

 

示例三 :spark做爲runner ,讀取kafka流式數據,窗口時間,處理結果放回kafka

依賴 ,將示例二差很少

 <dependency>
            <groupId>org.apache.beam</groupId>
            <artifactId>beam-runners-spark</artifactId>
            <version>${beam.version}</version>
        </dependency>

核心代碼

/**
     * 採用spark 做爲runner
     * 消費kafka數據
     */
    public static void sparkKafka() {
        //建立管道工廠
        SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
        //參數設置
        options.setSparkMaster("local[*]");
        options.setAppName("spark-beam");
        options.setCheckpointDir("/user/chickpoint16");
        //建立管道
        Pipeline pipeline = Pipeline.create(options);
        //讀取kafka數據
        PCollection<KafkaRecord<String, String>> lines = pipeline.apply(KafkaIO.<String, String>read()
                //設置kafka地址
                .withBootstrapServers(kafkaBootstrapServers)
                //設置鏈接主題
                .withTopic(inputTopic)
                //設置序列化
                .withKeyDeserializer(StringDeserializer.class)
                .withValueDeserializer(StringDeserializer.class)
                //設置Kafka的消費者屬性,這個地方還能夠設置其餘的屬性。源碼中是針對消費分組進行設置。
                .withConsumerConfigUpdates(ImmutableMap.<String, Object>of("auto.offset.reset", " latest"))

        );
       //數據處理
        PCollection<String> wordcount = lines.apply("split data",ParDo.of(new DoFn<KafkaRecord<String, String>,String>() {
            @ProcessElement
            public void processElement(ProcessContext c){
                String[] arr=c.element().getKV().getValue().split(" ");
                for(String value :arr){
                    if(!value.isEmpty()){
                        c.output(value);
                    }
                }

            }
        })).apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(5))))
                .apply(Count.<String>perElement())
                .apply("wordcount",MapElements.via(new SimpleFunction<KV<String,Long>,String>(){
                    @Override
                    public String apply(KV<String,Long> input){
                        System.out.println(input.getKey()+" : "+input.getValue());
                        System.err.println("===============================================");
                        return input.getKey()+" : "+input.getValue();
                    }
                }));
        System.out.println(wordcount);
        //kafka 處理後的數據發送回kafka
        wordcount.apply(KafkaIO.<Void, String>write()
                .withBootstrapServers(kafkaBootstrapServers)
                .withTopic(outputTopic)
                .withValueSerializer(StringSerializer.class)
                .values()
        );
        pipeline.run().waitUntilFinish();

    }

 

示例四 :HBaseIO

依賴

<dependency>
            <groupId>org.apache.beam</groupId>
            <artifactId>beam-sdks-java-io-hbase</artifactId>
            <version>${beam.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-client -->
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>${hbase.version}</version>
        </dependency>

 

代碼 :

 /**
     * HBaseIo beam
     * 採用apache beam的方式讀取hbase 數據
     */
    public static void  getHbaseData(){
        //建立管道工廠
//        SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
//        options.setJobName("read mongo");
//        options.setSparkMaster("local[*]");
//        options.setCheckpointDir("/user/chickpoint17");
        PipelineOptions options = PipelineOptionsFactory.create();
        options.setRunner(DirectRunner.class);
        config = HBaseConfiguration.create();
        config.set("hbase.zookeeper.property.clientPort", hbase_clientPort);
        config.set("hbase.zookeeper.quorum", hbase_zookeeper_quorum);
        config.set("zookeeper.znode.parent", zookeeper_znode_parent);
        config.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
        config.setInt("hbase.rpc.timeout", 20000);
        config.setInt("hbase.client.operation.timeout", 30000);
        config.setInt("hbase.client.scanner.timeout.period", 2000000);
        //建立管道
        Pipeline pipeline = Pipeline.create(options);
        PCollection<Result> result = pipeline.apply(HBaseIO.read()
                .withConfiguration(config)
                .withTableId(hbase_table)
                .withKeyRange("001".getBytes(),"004".getBytes())
        );
        PCollection<String> process = result.apply("process", ParDo.of(new DoFn<Result, String>() {
            @ProcessElement
            public void processElement(ProcessContext c) {
                String row = Bytes.toString(c.element().getRow());
                List<Cell> cells = c.element().listCells();
                for (Cell cell:cells){
                    String family = Bytes.toString(cell.getFamilyArray(),cell.getFamilyLength(),cell.getFamilyOffset());
                    String column = Bytes.toString(cell.getQualifierArray(),cell.getQualifierOffset(),cell.getQualifierLength());
                    String value= Bytes.toString(cell.getValueArray(),cell.getValueOffset(),cell.getValueLength());
                    System.out.println(family);
                    c.output(row+"------------------ "+family+" : "+column+" = "+value);
                    System.out.println(row+"------------------ "+family+" : "+column+" = "+value);
                }
            }
        }));

        pipeline.run().waitUntilFinish();
    }

 

四:說明

apache beam 目前處於孵化狀態,目前對java的支持較好,python 等語言支持還待完善。故有興趣的朋友最好選擇java學習。

相關文章
相關標籤/搜索