【原創】大數據基礎之Flink(1)簡介、安裝、使用

 Flink 1.7html

官方:https://flink.apache.org/redis

 

一 簡介

 

Apache Flink is an open source platform for distributed stream and batch data processing. Flink’s core is a streaming dataflow engine that provides data distribution, communication, and fault tolerance for distributed computations over data streams. Flink builds batch processing on top of the streaming engine, overlaying native iteration support, managed memory, and program optimization.express

Flink是一個開源的分佈式流式和批處理平臺;Flink核心是流式數據流引擎,而後在流式引擎的基礎上實現批處理。和spark正好相反,spark核心是批處理引擎,而後在批處理引擎的基礎上實現流式處理;apache

 

1 集羣部署結構視圖

理解flink中的集羣級別概念:master-slave,即JobManager-TaskManager(task slot)windows

master-worker結構-1

master-worker結構-2

角色

1 JobManager -- 即master

 The JobManagers (also called masters) coordinate the distributed execution. They schedule tasks, coordinate checkpoints, coordinate recovery on failures, etc.api

 There is always at least one Job Manager. A high-availability setup will have multiple JobManagers, one of which one is always the leader, and the others are standby.session

 JobManager,負責協調分佈式執行,包括調度任務、協調檢查點、協調錯誤恢復等;至少須要一個JobManager,若是有多個(ha),只能有一個是leader,其餘是standby;併發

2 TaskManager -- 即worker

 The TaskManagers (also called workers) execute the tasks (or more specifically, the subtasks) of a dataflow, and buffer and exchange the data streams.app

 There must always be at least one TaskManager.less

 TaskManager,負責具體執行數據流中的任務;至少須要一個TaskManager;

2.1 Task Slot

  Each worker (TaskManager) is a JVM process, and may execute one or more subtasks in separate threads. To control how many tasks a worker accepts, a worker has so called task slots (at least one).

  Each task slot represents a fixed subset of resources of the TaskManager. A TaskManager with three slots, for example, will dedicate 1/3 of its managed memory to each slot. Slotting the resources means that a subtask will not compete with subtasks from other jobs for managed memory, but instead has a certain amount of reserved managed memory. Note that no CPU isolation happens here; currently slots only separate the managed memory of tasks.

  每一個worker(TaskManager)都是一個JVM進程,每一個worker劃分爲多個task slot,一個task slot表示worker中的一部分獨立資源(即內存),這樣在不一樣task slot中的子任務之間就沒有內存競爭;

3 client

 The client is not part of the runtime and program execution, but is used to prepare and send a dataflow to the JobManager. After that, the client can disconnect, or stay connected to receive progress reports. The client runs either as part of the Java/Scala program that triggers the execution, or in the command line process ./bin/flink run ....

 client負責提交數據流到JobManager,提交完成以後,client就能夠斷開;client一般這樣啓動 ./bin/flink run ....

 

2 Hello World應用-詞頻統計

下面經過代碼和圖示來理解flink中的幾個應用級別概念:dataflow、task、operator(subtask)、stream(partition)、task slot

批處理代碼

import org.apache.flink.api.scala._

object WordCount {
  def main(args: Array[String]) {

    val env = ExecutionEnvironment.getExecutionEnvironment
    val text = env.fromElements(
      "Who's there?",
      "I think I hear them. Stand, ho! Who's there?")

    val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
      .map { (_, 1) }
      .groupBy(0)
      .sum(1)

    counts.print()
  }
}

流式代碼

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time

object WindowWordCount {
  def main(args: Array[String]) {

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val text = env.socketTextStream("localhost", 9999)

    val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
      .map { (_, 1) }
      .keyBy(0)
      .timeWindow(Time.seconds(5))
      .sum(1)

    counts.print()

    env.execute("Window Stream WordCount")
  }
}

圖示執行過程-1

解釋:應用代碼實際上描述了一個dataflow,一個dataflow由多個operator組成:Source Operator、Transformation Operator、Sink Operator,operator之間造成stream;

 

圖示執行過程-2

解釋:每一個operator能夠拆分爲多個subtask執行,這樣operator以後的stream也造成多個partition;另外有些operator之間須要shuffle;

 

Programs in Flink are inherently parallel and distributed. During execution, a stream has one or more stream partitions, and each operator has one or more operator subtasks. The operator subtasks are independent of one another, and execute in different threads and possibly on different machines or containers.

The number of operator subtasks is the parallelism of that particular operator. The parallelism of a stream is always that of its producing operator. Different operators of the same program may have different levels of parallelism.

flink執行過程當中,一個stream有一個或多個partition,每一個operator都有一個或多個subtask,一個operator下的subtask之間彼此獨立(在不一樣的線程中執行而且儘量的在不一樣的機器或者容器中執行);

 

Streams can transport data between two operators in a one-to-one (or forwarding) pattern, or in a redistributing pattern:

  • One-to-one streams (for example between the Source and the map() operators in the figure above) preserve the partitioning and ordering of the elements. That means that subtask[1] of the map() operator will see the same elements in the same order as they were produced by subtask[1] of the Source operator.
  • Redistributing streams (as between map() and keyBy/window above, as well as between keyBy/window and Sink) change the partitioning of streams. Each operator subtask sends data to different target subtasks, depending on the selected transformation. Examples are keyBy() (which re-partitions by hashing the key), broadcast(), or rebalance() (which re-partitions randomly). In a redistributing exchange the ordering among the elements is only preserved within each pair of sending and receiving subtasks (for example, subtask[1] of map() and subtask[2] of keyBy/window). So in this example, the ordering within each key is preserved, but the parallelism does introduce non-determinism regarding the order in which the aggregated results for different keys arrive at the sink.

operator分爲兩種,一種是一對一(one-to-one或者forwarding),一種是從新分佈(redistributing,即shuffle);

flink中forwarding operator相似於spark中trasformation的map,redistributing operator相似於spark中transformation的reduceByKey(須要shuffle),sink operator相似於spark中的action;

 

圖示執行過程-3

解釋:多個operator能夠chain(連接)起來造成一個task;

 

For distributed execution, Flink chains operator subtasks together into tasks. Each task is executed by one thread. Chaining operators together into tasks is a useful optimization: it reduces the overhead of thread-to-thread handover and buffering, and increases overall throughput while decreasing latency. 

將operation連接起來組成task是一個很是有用的優化:減小了線程間的數據交換、增長吞吐量同時減小延遲;

 

圖示執行過程-4

解釋:一個task的subtask會被分配到不一樣的task slot執行,不一樣task的subtask能夠共享一個task slot;

 

By default, Flink allows subtasks to share slots even if they are subtasks of different tasks, so long as they are from the same job. The result is that one slot may hold an entire pipeline of the job. Allowing this slot sharing has two main benefits:

  • A Flink cluster needs exactly as many task slots as the highest parallelism used in the job. No need to calculate how many tasks (with varying parallelism) a program contains in total.

  • It is easier to get better resource utilization. Without slot sharing, the non-intensive source/map() subtasks would block as many resources as the resource intensive window subtasks. With slot sharing, increasing the base parallelism in our example from two to six yields full utilization of the slotted resources, while making sure that the heavy subtasks are fairly distributed among the TaskManagers.

flink默認容許一個job下不一樣task的subtask能夠共享一個task slot;共享task slot有兩個好處:1)task slot數量表示一個job的最高併發數;2)合理利用資源;

As a rule-of-thumb, a good default number of task slots would be the number of CPU cores. With hyper-threading, each slot then takes 2 or more hardware thread contexts.

一般task slot數量設置爲cpu的核數;

 

3 API

四層抽象(API)說明:

  • The lowest level abstraction simply offers stateful streaming. It is embedded into the DataStream API via the Process Function. It allows users freely process events from one or more streams, and use consistent fault tolerant state. In addition, users can register event time and processing time callbacks, allowing programs to realize sophisticated computations.
  • In practice, most applications would not need the above described low level abstraction, but would instead program against the Core APIs like the DataStream API (bounded/unbounded streams) and the DataSet API (bounded data sets). These fluent APIs offer the common building blocks for data processing, like various forms of user-specified transformations, joins, aggregations, windows, state, etc. Data types processed in these APIs are represented as classes in the respective programming languages.
  • The Table API is a declarative DSL centered around tables, which may be dynamically changing tables (when representing streams). The Table API follows the (extended) relational model: Tables have a schema attached (similar to tables in relational databases) and the API offers comparable operations, such as select, project, join, group-by, aggregate, etc. Table API programs declaratively define what logical operation should be done rather than specifying exactly how the code for the operation looks. Though the Table API is extensible by various types of user-defined functions, it is less expressive than the Core APIs, but more concise to use (less code to write). In addition, Table API programs also go through an optimizer that applies optimization rules before execution.
  • The highest level abstraction offered by Flink is SQL. This abstraction is similar to the Table API both in semantics and expressiveness, but represents programs as SQL query expressions. The SQL abstraction closely interacts with the Table API, and SQL queries can be executed over tables defined in the Table API.

 

DataStream經常使用api:https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/index.html

DataSet經常使用api:https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/batch/index.html#dataset-transformations

相似於spark中的transformation算子

 

參考: 

https://ci.apache.org/projects/flink/flink-docs-release-1.7/concepts/programming-model.html
https://ci.apache.org/projects/flink/flink-docs-release-1.7/concepts/runtime.html

二 安裝

下載

# wget http://mirrors.tuna.tsinghua.edu.cn/apache/flink/flink-1.7.1/flink-1.7.1-bin-hadoop28-scala_2.11.tgz
# tar xvf flink-1.7.1-bin-hadoop28-scala_2.11.tgz
# cd flink-1.7.1

集羣環境ssh免登錄(單機請跳過)

詳見:http://www.javashuo.com/article/p-rndbchju-bd.html

啓動

1 單機standalone方式啓動

啓動

# bin/start-cluster.sh

詳見:https://ci.apache.org/projects/flink/flink-docs-release-1.7/tutorials/local_setup.html

2 單master多worker方式啓動

配置

# cat conf/flink-conf.yaml
jobmanager.rpc.address: $master_server
jobmanager.rpc.port: 6123

同步flink-conf.yaml到全部節點

# cat conf/slaves
$slave1
$slave2
$slave3

啓動

# bin/start-cluster.sh

詳見:https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/deployment/cluster_setup.html

3 ha方式啓動(多master多worker)

配置

# cat conf/flink-conf.yaml
#jobmanager.rpc.address: $master_server
#jobmanager.rpc.port: 6123

high-availability: zookeeper
high-availability.storageDir: hdfs:///flink/ha/
high-availability.zookeeper.quorum: $zk_server1:2181

rest.port: 8081

同步flink-conf.yaml到全部節點

主要修改成:打開high-availability相關配置,同時將jobmanager.rpc.address和jobmanager.rpc.port註釋掉,官方解釋以下:

The config parameter defining the network address to connect to for communication with the job manager. This value is only interpreted in setups where a single JobManager with static name or address exists (simple standalone setups, or container setups with dynamic service name resolution). It is not used in many high-availability setups, when a leader-election service (like ZooKeeper) is used to elect and discover the JobManager leader from potentially multiple standby JobManagers.

# cat conf/masters
$master1:8081
$master2:8081

# cat conf/slaves
$slave1
$slave2
$slave3

啓動

# bin/start-cluster.sh

另外也能夠逐個啓動

# bin/jobmanager.sh start
# bin/taskmanager.sh start

4 其餘(Yarn、Mesos等)

在yarn上啓動flink集羣

./bin/yarn-session.sh -n 4 -jm 1024m -tm 4096m

 

訪問 http://$master1:8081

 

三 使用

提交任務

$ ./bin/flink run examples/streaming/SocketWindowWordCount.jar --port 9000

提交任務到yarn

./bin/flink run -m yarn-cluster -yn 4 -yjm 1024m -ytm 4096m ./examples/batch/WordCount.jar

相關文章
相關標籤/搜索