flink 學習總結(二)

flink wordcount 代碼及相關知識點總結:前端

   

package com.lw.myflinkproject;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.LocalEnvironment;
import org.apache.flink.api.java.operators.*;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.util.Collector;

import java.util.List;


/**
 *  1.Flink處理數據分爲批處理數據 ,流式處理數據,SQL處理數據
 *  2.Flink中的數據分爲有界數據,無界數據。批處理的數據就是有界數據,流處理的數據有有界數據也有無界數據。
 *  3.Flink處理數據規則: Source -> transformations ->Sink
 *  4.Flink和Spark的區別:
 *      1).Flink和Spark都是計算框架,都有批處理,SQL處理,流式處理,Flink吞吐量高,延遲低,Spark吞吐量高,延遲高。
 *      2).Spark中批次處理底層是RDD,Flink底層是DataSet
 *          Spark中流數據底層是DStream,Flink 底層是DataStream
 *      3).Spark編程是K,V格式的編程模型,Flink不是K,V格式的編程模型。
 *      4).Spark中的RDD須要action觸發執行,Flink中的transformation執行須要env.execute()觸發執行。
 *      5).Spark 批處理中能夠根據key排序,Flink中只有分區內排序,沒有全局排序
* 6).spark中是由driver進行task發送,flink中是由jobmanager進行task發送的 * 5.Flink處理數據流程: * 1).須要建立Flink環境: * 處理批數據: * 本地執行:ExecutionEnvironment.createLocalEnvironment(); --能夠不用這種方式,直接使用集羣方式。 * 集羣執行:ExecutionEnvironment.getExecutionEnvironment(); * 處理流數據: * StreamExecutionEnvironment.getExecutionEnvironment(); * 2).source -> transformations -> sink * 3).使用env.execute()觸發執行。在批處理中特殊的算子 有 print,count,collect 既有sink功能,還有觸發功能。 * * 6.注意: * 1).本地運行Flink,並行度是8,與當前機器的Cpu的線程數相同,集羣中運行的並行度就是1。 * 2).在Flink中只有在分區內排序,沒有全局排序。 * 3).能夠將數據保存成csv格式的數據,可是數據必須是tuple格式 *
*/ public class FlinkWordCount { public static void main(String[] args) throws Exception { // LocalEnvironment env = ExecutionEnvironment.createLocalEnvironment(); ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSource<String> dataSource = env.readTextFile("./data/words"); FlatMapOperator<String, String> words = dataSource.flatMap(new FlatMapFunction<String, String>() { @Override public void flatMap(String line, Collector<String> collector) throws Exception { String[] split = line.split(" "); for (String word : split) { collector.collect(word); } } }); MapOperator<String, Tuple2<String, Integer>> map = words.map(new MapFunction<String, Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> map(String word) throws Exception { return new Tuple2<>(word, 1); } }); UnsortedGrouping<Tuple2<String, Integer>> groupBy = map.groupBy(0); AggregateOperator<Tuple2<String, Integer>> sum = groupBy.sum(1).setParallelism(1); SortPartitionOperator<Tuple2<String, Integer>> sort = sum.sortPartition(1, Order.DESCENDING); DataSink<Tuple2<String, Integer>> tuple2DataSink = sort.writeAsCsv("./data/result", FileSystem.WriteMode.OVERWRITE); env.execute("xx"); // long count = sort.count(); // System.out.println("count = "+count); // List<Tuple2<String, Integer>> collect = sort.collect(); // for (Tuple2<String, Integer> stringIntegerTuple2 : collect) { // System.out.println(stringIntegerTuple2); // } // sort.print(); // sort.writeAsText("./data/result",FileSystem.WriteMode.OVERWRITE); // env.execute("xxxx"); } }

pom 文件:java

 

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.lw.myflinkproject</groupId>
  <artifactId>MyFilnk0519</artifactId>
  <version>1.0-SNAPSHOT</version>

  <name>MyFilnk0519</name>
  <!-- FIXME change it to the project's website -->
  <url>http://www.example.com</url>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
    <flink.version>1.7.1</flink.version>
  </properties>

  <dependencies>
    <!-- Flink 依賴 -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-java</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-java_2.11</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-clients_2.11</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-wikiedits_2.11</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <!-- Flink Kafka鏈接器的依賴 -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-kafka-0.11_2.11</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.11</version>
      <scope>test</scope>
    </dependency>
    <!-- log4j 和slf4j 包,若是在控制檯不想看到日誌,能夠將下面的包註釋掉-->
    <dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-log4j12</artifactId>
    <version>1.7.25</version>
    <scope>test</scope>
    </dependency>
    <dependency>
    <groupId>log4j</groupId>
    <artifactId>log4j</artifactId>
    <version>1.2.17</version>
    </dependency>
    <dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-api</artifactId>
    <version>1.7.25</version>
    </dependency>
    <dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-nop</artifactId>
    <version>1.7.25</version>
    <scope>test</scope>
    </dependency>
    <dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-simple</artifactId>
    <version>1.7.5</version>
    </dependency>

  </dependencies>

  <build>
    <plugins>
      <plugin>
        <artifactId>maven-assembly-plugin</artifactId>
        <version>2.4</version>
        <configuration>
          <!-- 設置false後是去掉 MySpark-1.0-SNAPSHOT-jar-with-dependencies.jar 後的 「-jar-with-dependencies」 -->
          <!--<appendAssemblyId>false</appendAssemblyId>-->
          <descriptorRefs>
            <descriptorRef>jar-with-dependencies</descriptorRef>
          </descriptorRefs>
          <archive>
            <manifest>
              <mainClass>com.lw.myflink.Streaming.FlinkSocketWordCount</mainClass>
            </manifest>
          </archive>
        </configuration>
        <executions>
          <execution>
            <id>make-assembly</id>
            <phase>package</phase>
            <goals>
              <goal>assembly</goal>
            </goals>
          </execution>
        </executions>
      </plugin>
    </plugins>
  </build>
</project>

 

 

 

Flink運行時包含兩種類型的進程:node

  1. JobManger:也叫做masters,協調分佈式執行,調度task,協調checkpoint,協調故障恢復(存儲總體狀態外還保存算子狀態)。在Flink程序中至少有一個JobManager,高可用能夠設置多個JobManager,其中一個是Leader,其餘都是standby狀態。
  2. TaskManager:也叫workers,執行dataflow生成的task,負責緩衝數據,及TaskManager之間的交換數據(shuffle)。Flink程序中必須有一個TaskManager.

Flink程序能夠運行在standalone集羣,Yarn或者Mesos資源調度框架中。web

clients不是Flink程序運行時的一部分,做用是向JobManager準備和發送dataflow,以後,客戶端能夠斷開鏈接或者保持鏈接。apache

 

TaskSlots 任務槽:就是並行度,下圖最大是3:worker放大圖)編程

 

每一個WorkerTaskManager)是一個JVM進程,能夠執行一個或者多個task,這些task能夠運行在任務槽上,每一個worker上至少有一個任務槽。每一個任務槽都有固定的資源,例如:TaskManager有三個TaskSlots,那麼每一個TaskSlot會將TaskMananger中的內存均分,即每一個任務槽的內存是總內存的1/3。任務槽的做用就是分離任務的託管內存,不會發生cpu隔離。(總體3g3task,每一個task1g內存。即便不用,別的task也拿不走。每一個task處理一條鏈)bootstrap

經過調整任務槽的數據量,用戶能夠指定每一個TaskManager有多少任務槽,更多的任務槽意味着更多的task能夠共享同一個JVM,同一個JVM中的task共享TCP鏈接和心跳信息,共享數據集和數據結構,從而減小TaskManager中的task開銷。windows

總結:task slot的個數表明TaskManager能夠並行執行的task數。api

 

flink集羣的搭建: 緩存

一、上傳安裝包,解壓

二、更改配置文件:

在sleaves文件中添加worker節點的節點名稱

在flink-conf.yaml文件中配置jobmanager節點的節點名稱 jobmanager.rpc.address: mynode1

三、將配置好的文件分發到其餘的節點

scp -r ./flink mynode2:`pwd`

 

啓動flink: ./start-cluster.sh 

啓動以後去前臺頁面進行查看: mynode1:8081

將本地代碼打成jar包提交到集羣的過程:在集羣中進行提交的方式有兩種,第一種是在頁面進行提交,第二種是在集羣用命令進行提交

用命令提交的方式: 一、首先在maven projects 中用clear 清空target包中的內容

二、執行parget 進行打包,完成以後將jar包上傳到集羣

三、在flink bin 目錄下執行程序: ./flink run -c com.flink.studey.stream.SocketSimple /opt/wcg/localdir/flinktest1-1.0-SNAPSHOT-jar-with-dependencies.jar --port 9999

四、查看運行結果: 在前端頁面 Task manager中進行查看程序的運行結果

注: 在將maven項目打成jar包的時候,第一次運行失敗了,失敗的主要緣由:

因爲在配置pom文件時沒有配置將項目打成jar包的配置選項,致使文件生成jar包失敗

 

<build>
    <plugins>
      <plugin>
        <artifactId>maven-assembly-plugin</artifactId>
        <version>2.4</version>
        <configuration>
          <!-- 設置false後是去掉 MySpark-1.0-SNAPSHOT-jar-with-dependencies.jar 後的 「-jar-with-dependencies」 -->
          <!--<appendAssemblyId>false</appendAssemblyId>-->
          <descriptorRefs>
            <descriptorRef>jar-with-dependencies</descriptorRef>
          </descriptorRefs>
          <archive>
            <manifest>
              <mainClass>com.lw.myflink.Streaming.FlinkSocketWordCount</mainClass>
            </manifest>
          </archive>
        </configuration>
        <executions>
          <execution>
            <id>make-assembly</id>
            <phase>package</phase>
            <goals>
              <goal>assembly</goal>
            </goals>
          </execution>
        </executions>
      </plugin>
    </plugins>
  </build>

 

在本地運行flink是沒有webui的,因此想要看webui須要在集羣進行提交

能夠在前臺頁面將運行的任務終止。

 

 flink對於與kafka關聯操做的時候,offset便可以手動設置也能夠自動設置。

對於kafka兩階段提交數據機制能夠有效保證數據不發生丟失。

package com.lw.myflink.Streaming.kafka;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.flink.util.Collector;

import java.util.Properties;

public class ReadMessageFromKafkaAndUseFileSink {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//        env.setParallelism(2);//設置並行度
        env.enableCheckpointing(15000);
//        env.setStateBackend()
//        StateBackend stateBackend = env.getStateBackend();
//        System.out.println("----"+stateBackend);
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "mynode1:9092,mynode2:9092,mynode3:9092");
        props.setProperty("group.id", "g0517");
        props.setProperty("auto.offset.reset", "earliest");
        props.setProperty("enable.auto.commit", "false");
        /**
         * 第一個參數是topic
         * 第二個參數是value的反序列化格式
         * 第三個參數是kafka配置
         */
        FlinkKafkaConsumer011<String> consumer011 =  new FlinkKafkaConsumer011<>("FlinkTopic", new SimpleStringSchema(), props);
//        consumer011.setStartFromEarliest();//從topic的最先位置offset位置讀取數據
//        consumer011.setStartFromLatest();//從topic的最新位置offset來讀取數據
        consumer011.setStartFromGroupOffsets();//默認,將從kafka中找到消費者組消費的offset的位置,若是沒有會按照auto.offset.reset 的配置策略
//        consumer011.setStartFromTimestamp(111111);//從指定的時間戳開始消費數據
        consumer011.setCommitOffsetsOnCheckpoints(true);

        DataStreamSource<String> stringDataStreamSource = env.addSource(consumer011);
        SingleOutputStreamOperator<String> flatMap = stringDataStreamSource.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String s, Collector<String> outCollector) throws Exception {
                String[] split = s.split(" ");
                for (String currentOne : split) {
                    outCollector.collect(currentOne);
                }
            }
        });
        //注意這裏的tuple2須要使用org.apache.flink.api.java.tuple.Tuple2 這個包下的tuple2
        SingleOutputStreamOperator<Tuple2<String, Integer>> map = flatMap.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String word) throws Exception {
                return new Tuple2<>(word, 1);
            }
        });
        //keyby 將數據根據key 進行分區,保證相同的key分到一塊兒,默認是按照hash 分區
        KeyedStream<Tuple2<String, Integer>, Tuple> keyByResult = map.keyBy(0);
        WindowedStream<Tuple2<String, Integer>, Tuple, TimeWindow> windowResult = keyByResult.timeWindow(Time.seconds(5));
        SingleOutputStreamOperator<Tuple2<String, Integer>> endResult = windowResult.sum(1);

        //sink 直接控制檯打印
        //執行flink程序,設置任務名稱。console 控制檯每行前面的數字表明當前數據是哪一個並行線程計算獲得的結果
//        endResult.print();

        //sink 將結果存入文件,FileSystem.WriteMode.OVERWRITE 文件目錄存在就覆蓋
//        endResult.writeAsText("./result/kafkaresult",FileSystem.WriteMode.OVERWRITE);
//        endResult.writeAsText("./result/kafkaresult",FileSystem.WriteMode.NO_OVERWRITE);

        //sink 將結果存入kafka topic中,存入kafka中的是String類型,全部endResult須要作進一步的轉換
//        FlinkKafkaProducer011<String> producer = new FlinkKafkaProducer011<>("node1:9092,node2:9092,node3:9092","FlinkResult",new SimpleStringSchema());
        //將tuple2格式數據轉換成String格式
        endResult.map(new MapFunction<Tuple2<String,Integer>, String>() {
            @Override
            public String map(Tuple2<String, Integer> tp2) throws Exception {
                return tp2.f0+"-"+tp2.f1;
            }
        })/*.setParallelism(1)*/.addSink(new MyTransactionSink()).setParallelism(1);

        //最後要調用execute方法啓動flink程序
        env.execute("kafka word count");


    }
}
package com.lw.myflink.Streaming.kafka;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.base.VoidSerializer;
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
import org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction;

import java.io.*;
import java.util.*;

import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;


public class MyTransactionSink extends TwoPhaseCommitSinkFunction<String, MyTransactionSink.ContentTransaction, Void> {

    private ContentBuffer contentBuffer = new ContentBuffer();

    public MyTransactionSink() {
        super(new KryoSerializer<>(ContentTransaction.class, new ExecutionConfig()),VoidSerializer.INSTANCE);
    }

    @Override
    /**
     * 當有數據時,會執行到這個invoke方法
     * 2執行
     */
    protected void invoke(ContentTransaction transaction, String value, Context context){
        System.out.println("====invoke===="+value);
        transaction.tmpContentWriter.write(value);

    }

    @Override
    /**
     * 開啓一個事務,在臨時目錄下建立一個臨時文件,以後,寫入數據到該文件中
     * 1執行
     */
    protected ContentTransaction beginTransaction() {
        ContentTransaction contentTransaction= new ContentTransaction(contentBuffer.createWriter(UUID.randomUUID().toString()));
        System.out.println("====beginTransaction====,contentTransaction Name = "+contentTransaction.toString());
//        return new ContentTransaction(tmpDirectory.createWriter(UUID.randomUUID().toString()));
        return contentTransaction;
    }

    @Override
    /**
     * 在pre-commit階段,flush緩存數據塊到磁盤,而後關閉該文件,確保再不寫入新數據到該文件。同時開啓一個新事務執行屬於下一個checkpoint的寫入操做
     * 3執行
     */
    protected void preCommit(ContentTransaction transaction) throws InterruptedException {
        System.out.println("====preCommit====,contentTransaction Name = "+transaction.toString());

        System.out.println("pre-Commit----正在休息5s");
        Thread.sleep(5000);
        System.out.println("pre-Commit----休息完成5s");

        transaction.tmpContentWriter.flush();
        transaction.tmpContentWriter.close();


    }

    @Override
    /**
     * 在commit階段,咱們以原子性的方式將上一階段的文件寫入真正的文件目錄下。這裏有延遲
     * 4執行
     */
    protected void commit(ContentTransaction transaction) {
        System.out.println("====commit====,contentTransaction Name = "+transaction.toString());

        /**
         * 實現寫入文件的邏輯
         */
        //獲取名稱
        String name = transaction.tmpContentWriter.getName();
        //獲取數據
        Collection<String> content = contentBuffer.read(name);

        /**
         * 測試打印
         */
        for(String s: content){
//            if("hello-1".equals(s)){
//                try {
//                    System.out.println("正在休息5s");
//                    Thread.sleep(5000);
//                    System.out.println("休息完成5s");
//                } catch (InterruptedException e) {
//                    e.printStackTrace();
//                }
//            }
            System.out.println(s);
        }

//        //將數據寫入文件
//        FileWriter fw =null;
//        PrintWriter pw =null ;
//        try {
//            //若是文件存在,則追加內容;若是文件不存在,則建立文件
//            File dir=new File("./data/FileResult/result.txt");
//            if(!dir.getParentFile().exists()){
//                dir.getParentFile().mkdirs();//建立父級文件路徑
//                dir.createNewFile();//建立文件
//            }
//            fw = new FileWriter(dir, true);
//            pw = new PrintWriter(fw);
//            for(String s:content){
//                if(s.equals("sss-1")){
//                    throw new NullPointerException();
//                }
//                pw.write(s+"\n");
//            }
//        } catch (IOException e) {
//            e.printStackTrace();
//        }
//        try {
//            pw.flush();
//            fw.flush();
//            pw.close();
//            fw.close();
//        } catch (IOException e) {
//            e.printStackTrace();
//        }
    }

    @Override
    /**
     * 一旦有異常終止事務時,刪除臨時文件
     */
    protected void abort(ContentTransaction transaction) {
        System.out.println("====abort====");
        transaction.tmpContentWriter.close();
        contentBuffer.delete(transaction.tmpContentWriter.getName());
    }

    public static class ContentTransaction {
        private ContentBuffer.TempContentWriter tmpContentWriter;

        public ContentTransaction(ContentBuffer.TempContentWriter tmpContentWriter) {
            this.tmpContentWriter = tmpContentWriter;
        }

        @Override
        public String toString() {
            return String.format("ContentTransaction[%s]", tmpContentWriter.getName());
        }
    }

}

/**
 * ContentBuffer 類中 放臨時的處理數據到一個list中
 */
class ContentBuffer implements Serializable {
    private Map<String, List<String>> filesContent = new HashMap<>();

    public TempContentWriter createWriter(String name) {
        checkArgument(!filesContent.containsKey(name), "File [%s] already exists", name);
        filesContent.put(name, new ArrayList<>());
        return new TempContentWriter(name, this);
    }
    private void putContent(String name, List<String> values) {
        List<String> content = filesContent.get(name);
        checkState(content != null, "Unknown file [%s]", name);
        content.addAll(values);
    }

    public Collection<String> read(String name) {
        List<String> content = filesContent.get(name);
        checkState(content != null, "Unknown file [%s]", name);
        List<String> result = new ArrayList<>(content);
        return result;
    }

    public void delete(String name) {
        filesContent.remove(name);
    }

    //內部類
    class TempContentWriter {
        private final ContentBuffer contentBuffer;
        private final String name;
        private final List<String> buffer = new ArrayList<>();
        private boolean closed = false;

        public String getName() {
            return name;
        }

        private TempContentWriter(String name, ContentBuffer contentBuffer) {
            this.name = checkNotNull(name);
            this.contentBuffer = checkNotNull(contentBuffer);
        }

        public TempContentWriter write(String value) {
            checkState(!closed);
            buffer.add(value);
            return this;
        }

        public TempContentWriter flush() {
            contentBuffer.putContent(name, buffer);
            return this;
        }

        public void close() {
            buffer.clear();
            closed = true;
        }
    }

}
相關文章
相關標籤/搜索