Apache Flink 零基礎入門(十)Flink DataSet編程

DataSet programs in Flink are regular programs that implement transformations on data sets (e.g., filtering, mapping, joining, grouping). The data sets are initially created from certain sources (e.g., by reading files, or from local collections). Results are returned via sinks, which may for example write the data to (distributed) files, or to standard output (for example the command line terminal). Flink programs run in a variety of contexts, standalone, or embedded in other programs. The execution can happen in a local JVM, or on clusters of many machines.java

Flink中DataSet編程是很是常規的編程,只須要實現他的數據集的轉換(例如filtering, mapping, joining, grouping)。這個數據集最初是經過數據源建立(例如讀取文件、本地數據集加載本地集合),轉換的結果經過sink返回到本地(或者分佈式)的文件系統或者終端。Flink程序能夠運行在各類環境中例如單機,或者嵌入其餘程序中。執行過程能夠在本地JVM中或者集羣中。程序員

Source ===> Flink(transformation)===> Sinkapache

 基於文件

  • readTextFile(path) / TextInputFormat - Reads files line wise and returns them as Strings.
  • readTextFileWithValue(path) / TextValueInputFormat - Reads files line wise and returns them as StringValues. StringValues are mutable strings.
  • readCsvFile(path) / CsvInputFormat - Parses files of comma (or another char) delimited fields. Returns a DataSet of tuples or POJOs. Supports the basic java types and their Value counterparts as field types.
  • readFileOfPrimitives(path, Class) / PrimitiveInputFormat - Parses files of new-line (or another char sequence) delimited primitive data types such as String or Integer.
  • readFileOfPrimitives(path, delimiter, Class) / PrimitiveInputFormat - Parses files of new-line (or another char sequence) delimited primitive data types such as String or Integer using the given delimiter.

基於集合 

  • fromCollection(Collection)
  • fromCollection(Iterator, Class)
  • fromElements(T ...)
  • fromParallelCollection(SplittableIterator, Class)
  • generateSequence(from, to)

從簡單的基於集合建立DataSet

基於集合的數據源每每用來在開發環境中或者程序員學習中,能夠隨意造咱們所須要的數據,由於方式簡單。下面從java和scala兩種方式來實現使用集合做爲數據源。數據源是簡單的1到10編程

java

import org.apache.flink.api.java.ExecutionEnvironment;

import java.util.ArrayList;
import java.util.List;

public class JavaDataSetSourceApp {
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        fromCollection(executionEnvironment);
    }

    public static void fromCollection(ExecutionEnvironment env) throws Exception {
        List<Integer> list = new ArrayList<Integer>();
        for (int i = 1; i <= 10; i++) {
            list.add(i);
        }
        env.fromCollection(list).print();
    }
}

scala

import org.apache.flink.api.scala.ExecutionEnvironment

object DataSetSourceApp {

  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment
    fromCollection(env)
  }

  def fromCollection(env: ExecutionEnvironment): Unit = {
    import org.apache.flink.api.scala._
    val data = 1 to  10
    env.fromCollection(data).print()
  }

}

讀文件或文件夾方式建立DataSet

在本地文件夾:E:\test\input,下面有一個hello.txt,內容以下:api

hello	world	welcome
hello	welcome

Scala

def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment
    //fromCollection(env)
    textFile(env)
  }

  def textFile(env: ExecutionEnvironment): Unit = {
    val filePathFilter = "E:/test/input/hello.txt"
    env.readTextFile(filePathFilter).print()

  }

readTextFile方法須要參數1:文件路徑(可使本地,也能夠是hdfs://host:port/file/path),參數2:編碼(若是不寫,默認UTF-8)app

是否能夠指定文件夾?分佈式

咱們直接傳遞文件夾路徑ide

def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment
    //fromCollection(env)
    textFile(env)
  }

  def textFile(env: ExecutionEnvironment): Unit = {
    //val filePathFilter = "E:/test/input/hello.txt"
    val filePathFilter = "E:/test/input"
    env.readTextFile(filePathFilter).print()

  }

運行結果正常。說明readTextFile方法傳入文件夾,也沒有問題,它將會遍歷文件夾下面的全部文件學習

Java

public static void main(String[] args) throws Exception {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        // fromCollection(executionEnvironment);
        textFile(executionEnvironment);
    }

    public static void textFile(ExecutionEnvironment env) throws Exception {
        String filePath = "E:/test/input/hello.txt";
        // String filePath = "E:/test/input";
        env.readTextFile(filePath).print();
    }

一樣的道理,java中也能夠指定文件或者文件夾,若是指定文件夾,那麼將遍歷文件夾下面的全部文件。優化

讀CSV文件建立DataSet

建立一個CSV文件,內容以下:

name,age,job
Tom,26,cat
Jerry,24,mouse
sophia,30,developer

Scala

讀取csv文件方法readCsvFile,參數以下:

filePath: String,
      lineDelimiter: String = "\n",
      fieldDelimiter: String = ",", 字段分隔符
      quoteCharacter: Character = null,
      ignoreFirstLine: Boolean = false,  是否忽略第一行
      ignoreComments: String = null,
      lenient: Boolean = false,
      includedFields: Array[Int] = null, 讀取文件的哪幾列
      pojoFields: Array[String] = null)

讀取csv文件代碼以下:

def csvFile(env:ExecutionEnvironment): Unit = {
    import org.apache.flink.api.scala._
    val filePath = "E:/test/input/people.csv"
    env.readCsvFile[(String, Int, String)](filePath, ignoreFirstLine = true).print()
  }

如何只讀前兩列,就須要指定includedFields了,

env.readCsvFile[(String, Int)](filePath, ignoreFirstLine = true, includedFields = Array(0, 1)).print()

以前使用Tuple方式指定類型,如何指定自定義的一個case class?

def csvFile(env: ExecutionEnvironment): Unit = {
    import org.apache.flink.api.scala._
    val filePath = "E:/test/input/people.csv"
    //    env.readCsvFile[(String, Int, String)](filePath, ignoreFirstLine = true).print()
    //    env.readCsvFile[(String, Int)](filePath, ignoreFirstLine = true, includedFields = Array(0, 1)).print()

    env.readCsvFile[MyCaseClass](filePath, ignoreFirstLine = true, includedFields = Array(0, 1)).print()
  }
  case class MyCaseClass(name: String, age: Int)

如何指定POJO?

新建一個POJO類,people

public class People {
    private String name;
    private int age;
    private String job;

    @Override
    public String toString() {
        return "People{" +
                "name='" + name + '\'' +
                ", age=" + age +
                ", job='" + job + '\'' +
                '}';
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public int getAge() {
        return age;
    }

    public void setAge(int age) {
        this.age = age;
    }

    public String getJob() {
        return job;
    }

    public void setJob(String job) {
        this.job = job;
    }
}
env.readCsvFile[People](filePath, ignoreFirstLine = true, pojoFields = Array("name", "age", "job")).print()

java

public static void csvFile(ExecutionEnvironment env) throws Exception {
        String filePath = "E:/test/input/people.csv";
        DataSource<Tuple2<String, Integer>> types = env.readCsvFile(filePath).ignoreFirstLine().includeFields("11").types(String.class, Integer.class);
        types.print();
    }

只取出第一列和第二列的數據。

讀取POJO數據:

env.readCsvFile(filePath).ignoreFirstLine().pojoType(People.class, "name", "age", "job").print();

讀遞歸文件夾建立DataSet

scala

def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment
    //fromCollection(env)
    //    textFile(env)
//    csvFile(env)
    readRecursiveFiles(env)
  }

  def readRecursiveFiles(env: ExecutionEnvironment): Unit = {
    val filePath = "E:/test/nested"
    val parameter = new Configuration()
    parameter.setBoolean("recursive.file.enumeration", true)
    env.readTextFile(filePath).withParameters(parameter).print()
  }

從壓縮文件中建立DataSet

Scala

def readCompressionFiles(env: ExecutionEnvironment): Unit = {
    val filePath = "E:/test/my.tar.gz"
    env.readTextFile(filePath).print()
  }

能夠直接讀取壓縮文件。由於提升了空間利用率,可是卻致使CPU的壓力也提高了。所以須要一個權衡。須要調優,在各類狀況下去選擇更合適的方式。不是任何一種優化都能帶來想要的結果。若是自己集羣的CPU壓力就高,那麼就不該該讀取壓縮文件了。

相關文章
相關標籤/搜索