Flink從入門到真香(1-分別使用流模式和批模式運行第一個demo)

基本概念部分,批處理和流處理的區別

批處理在大數據世界有着悠久的歷史,比較典型的就是spark。批處理主要操做大容量靜態數據集,並在計算過程完成後返回結果。java

批處理模式中使用的數據集一般符合下列特徵:web

(1) 有界:批處理數據集表明數據的有限集合apache

(2) 持久:數據一般始終存儲在某種類型的持久存儲位置中api

(3) 大量:批處理操做一般是處理極爲海量數據集的惟一方法服務器

批處理很是適合須要訪問全套記錄才能完成的計算工做。例如在計算總數和平均數時,必須將數據集做爲一個總體加以處理,而不能將其視做多條記錄的集合。這些操做要求在計算進行過程當中數據維持本身的狀態。框架

須要處理大量數據的任務一般最適合用批處理操做進行處理。不管直接從持久存儲設備處理數據集,或首先將數據集載入內存,批處理系統在設計過程當中就充分考慮了數據的量,可提供充足的處理資源。因爲批處理在應對大量持久數據方面的表現極爲出色,所以常常被用於對歷史數據進行分析。大量數據的處理須要付出大量時間,所以批處理不適合對處理時間要求較高的場合。流處理系統會對隨時進入系統的數據進行計算。相比批處理模式,這是一種大相徑庭的處理方式。流處理方式無需針對整個數據集執行操做,而是對經過系統傳輸的每一個數據項執行操做。socket

流處理中的數據集是「無邊界」的,這就產生了幾個重要的影響:maven

(1) 完整數據集只能表明截至目前已經進入到系統中的數據總量。ide

(2) 工做數據集也許更相關,在特定時間只能表明某個單一數據項。測試

(3) 處理工做是基於事件的,除非明確中止不然沒有「盡頭」。處理結果馬上可用,並會隨着新數據的抵達繼續更新。

流處理系統能夠處理幾乎無限量的數據,但同一時間只能處理一條(真正的流處理)或不多量(微批處理,Micro-batch Processing)數據,不一樣記錄間只維持最少許的狀態。雖然大部分系統提供了用於維持某些狀態的方法,但流處理主要針對反作用更少,更加功能性的處理(Functional processing)進行優化。

功能性操做主要側重於狀態或反作用有限的離散步驟。針對同一個數據執行同一個操做會或略其餘因素產生相同的結果,此類處理很是適合流處理,由於不一樣項的狀態一般是某些困難、限制,以及某些狀況下不須要的結果的結合體。所以雖然某些類型的狀態管理一般是可行的,但這些框架一般在不具有狀態管理機制時更簡單也更高效。

此類處理很是適合某些類型的工做負載。有近實時處理需求的任務很適合使用流處理模式。分析、服務器或應用程序錯誤日誌,以及其餘基於時間的衡量指標是最適合的類型,由於對這些領域的數據變化作出響應對於業務職能來講是極爲關鍵的。流處理很適合用來處理必須對變更或峯值作出響應,而且關注一段時間內變化趨勢的數據。

目標

讀取一個txt文件分別使用flink的流模式和批模式進行計算統計

開始上代碼,環境準備

使用IDEA新建一個maven項目並在pom.xml中增長2個引入

<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-scala_2.12</artifactId>
        <version>1.10.1</version>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-scala_2.12</artifactId>
        <version>1.10.1</version>
    </dependency>
</dependencies>
<build>
    <plugins>
        <!--該插件用於將scala代碼編譯成class文件 -->
        <plugin>
            <groupId>net.alchim31.maven</groupId>
            <artifactId>scala-maven-plugin</artifactId>
            <version>4.4.0</version>
            <executions>
                <execution>
                    <goals>
                        <goal>compile</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>

        <!--打包用 -->
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-assembly-plugin</artifactId>
            <version>3.3.0</version>
            <configuration>
                <descriptorRefs>
                    <descriptiorRef>jar-with-dependencies</descriptiorRef>
                </descriptorRefs>
            </configuration>
            <executions>
                <execution>
                    <id>make-assembly</id>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

在java同目錄下新建一個scala目錄,設置爲resource root

批處理模式處理worldCount

新建一個包 com.mafei.wc 下面新建一個WordCount的scala Object
運行第一個簡單的demo,從文件中讀取數據,作一些過濾,分割,分組統計,求和等操做

package com.mafei.wc

import org.apache.flink.api.scala.ExecutionEnvironment

//把scala裏面定義的隱式轉換拿出來
import org.apache.flink.api.scala._

object WordCount {
  def main(args: Array[String]): Unit = {
    //建立一個批處理執行環境
    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment

    //從文件中讀取數據
    val inputPath: String = "/opt/java2020_study/maven/flink1/src/main/resources/hello.txt"
    val inputDataSet: DataSet[String] = env.readTextFile(inputPath)

    //  對數據進行轉換處理統計,先分詞,再按照word進行分組,最後聚合統計
    val resultDataSet: DataSet[(String,Int)] = inputDataSet
      .flatMap(_.split(" ")) //根據空格分隔
      .map((_,1))
      .groupBy(0) // 以第一個元素做爲key進行分組統計
      .sum(1)  //對分組以後的全部數據的第二個元素求和

    //打印輸出
    resultDataSet.print()
   }

}

代碼結構及運行效果

Flink從入門到真香(1-分別使用流模式和批模式運行第一個demo)


流處理樣例測試,目標監聽一個socket端口,獲取實時輸出並計算結果

一、新建一個StreamWorldCount流處理class

package com.mafei.wc

import org.apache.flink.streaming.api.scala._

object StreamWordCount {
  def main(args: Array[String]): Unit = {

    //建立流處理的執行環境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //接收一個socket文本流
    val inputDataStream = env.socketTextStream("127.0.0.1",7777)

    //進行轉換處理統計
    val resultDataStreams = inputDataStream
      .flatMap(_.split(" ")) //按照空格進行分割
      .filter(_.nonEmpty) //過濾非空的數據
      .map((_, 1))  //每次給key設置數量
      .keyBy(0) //按照第一個key來作聚合
      .sum(1) //作統計

    resultDataStreams.print()
    //最終執行的操做
    env.execute("stream world count")

  }

}

二、新開一個終端,監聽本機7777端口

nc -lk 7777

三、啓動代碼,發現程序在監聽端口數據中,
四、到第二步新開的端口上隨意輸出字符,回車,能夠看到代碼在實時計算中


上面都是直接在本地運行的flink任務,下面在flink 服務器上跑一波
新建一個測試的類,從socket中讀取數據並作一些分組計算等等
不一樣點在於socket的server IP信息不是寫死在代碼中,而是經過flink運行時傳參實現

package com.mafei.wc

import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala._

object StreamWordCount2 {
  def main(args: Array[String]): Unit = {

    //建立流處理的執行環境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
//    env.setParallelism(10)  //針對整個任務設置並行度
   //env.disableOperatorChaining()  //關閉任務合併
    //從外部命令中提取參數,做爲socket主機名和端口號
    val paramTool: ParameterTool = ParameterTool.fromArgs(args)
    val host: String = paramTool.get("host")
//    val port: Int = paramTool.getInt("port")

    //接收一個socket文本流
    val inputDataStream = env.socketTextStream(host,7777)
//    val inputDataStream = env.socketTextStream("127.0.0.1",7777)

    //進行轉換處理統計
    val resultDataStreams = inputDataStream
      .flatMap(_.split(" "))
      .filter(_.nonEmpty)
//      .map((_, 1)).setParallelism(3) //針對單個算子設置並行度
      .map((_, 1)).setParallelism(2) //針對單個算子設置並行度
      .keyBy(0)
      .sum(1)

    resultDataStreams.print().setParallelism(1)   //也能夠針對輸出設置並行度,用來相似輸出到文件的場景等
    env.execute("stream world count")

  }

}

把代碼打包成jar包,而後打開flink的8081默認web頁面,在Submit New Job一欄,上傳打包好的jar包
頁面上點擊上傳後的jar包,
在Entry Class欄輸入 com.mafei.wc.StreamWordCount2
在Program Arguments 輸入: --host 127.0.0.1 --port 7777
最終效果:

Flink從入門到真香(1-分別使用流模式和批模式運行第一個demo)

服務器上監聽nc程序:
安裝nc: yum install nc -y
監聽7777端口: nc -lk 7777

最終flink界面上任務運行效果圖:
Flink從入門到真香(1-分別使用流模式和批模式運行第一個demo)

看flink輸出效果(記得在socket那個終端上隨意敲一些數據):

tail -f /opt/flink-1.10.2/log/flink*
(sd,1)
(fg,1)
(sdfg,1)
(s,1)
(dfg,1)
(sdf,1)
(g,1)
(wert,1)
(wert,2)
(xdfcg,1)

命令行執行方式:
把打包好的jar包手動上傳到服務器上

/opt/flink-1.10.2/bin/flink run -c com.mafei.wc.StreamWordCount2 -p 1 /opt/flink1-1.0-SNAPSHOT-jar-with-dependencies.jar --host localhost --port 7777

列出來全部任務:
/opt/flink-1.10.2/bin/flink list

列出來全部任務-包含已運行完成的
/opt/flink-1.10.2/bin/flink list -a

取消任務/opt/flink-1.10.2/bin/flink cancel 43dcc61e27b64e63306c9e9ab1b8e0f9

相關文章
相關標籤/搜索