版權聲明:本套技術專欄是做者(秦凱新)平時工做的總結和昇華,經過從真實商業環境抽取案例進行總結和分享,並給出商業應用的調優建議和集羣環境容量規劃等內容,請持續關注本套博客。版權聲明:禁止轉載,歡迎學習。QQ郵箱地址:1120746959@qq.com,若有任何問題,可隨時聯繫。java
不少人可能都是在 2015 年才聽到 Flink 這個詞,其實早在 2008 年,Flink 的前身已是柏林理工大學一個研究性項目, 在 2014 被 Apache 孵化器所接受,而後迅速地成爲了 ASF(Apache Software Foundation)的頂級項目之一。node
Apache Flink is an open source platform for distributed stream and batch data
processing. Flink’s core is a streaming dataflow engine that provides data
distribution, communication, and fault tolerance for distributed computations
over data streams. Flink builds batch processing on top of the streaming engine,
overlaying native iteration support, managed memory, and program optimization.
複製代碼
流處理和批處理的糾結選擇和不容水火,Flink經過靈活的執行引擎,可以同時支持批處理任務與流處理任務,可是悖論是永遠存在的。web
SparkStreaming :微批次模型,EOS語義,基於RDD Checkpoint進行容錯,基於checkpoint狀態管理。狀態的狀態操做基於DStream模板進行管理,延時中等水平,吞吐量很高。詳情請參考個人SparkStreaming源碼解讀。apache
Flink :流處理模型,EOS語義,基於兩種狀態管理進行容錯,即:State和checkpoint兩種機制。狀態操做能夠細粒化到算子等操做上。延時不只低,並且吞吐量也很是高。vim
- State 基於task和operator兩種狀態。State類型進一步細分爲
Keyed State和 Operator State 兩種類型
- checkpoint 基全局快照來實現數據容錯,注意:State的狀態保存在java的堆裏面,
checkpoint則經過定時實現全局(全部State)狀態的持久化。
複製代碼
說實在的,Flink很狂妄: 緩存
集羣節點規劃(一主兩從)網絡
1 基礎環境:session
jdk1.8及以上【須要配置JAVA_HOME】
ssh免密碼登陸(至少要實現主節點可以免密登陸到從節點)
主機名hostname
/etc/hosts文件配置主機名和ip的映射關係
192.168.1.160 SparkMaster
192.168.1.161 SparkWorker1
192.168.1.162 SparkWorker2
關閉防火牆
複製代碼
2 在SparkMaster節點上主要須要修改的配置信息併發
cd /usr/local/flink-1.6.1/conf
vi flink-conf.yaml
jobmanager.rpc.address: SparkMaster
複製代碼
3 slaves修改app
vi slaves
SparkWorker1
SparkWorker2
複製代碼
4 而後再把修改好的flink目錄拷貝到其餘兩個節點便可
scp -rq flink-1.6.1 SparkWorker1:/usr/local/
scp -rq flink-1.6.1 SparkWorker2:/usr/local/
複製代碼
這裏發生一個小插曲,由於yarn配置文件不一致,致使 hadoop Web UI 沒法正常顯示全部NodeManager。因此注意配置文件的一致性。
SparkMaster節點進程:
14273 SecondaryNameNode
15010 Worker
14038 DataNode
25031 StandaloneSessionClusterEntrypoint
13895 NameNode
14903 Master
14424 ResourceManager
14569 NodeManager
25130 Jps
複製代碼
SparkWorker節點進程:
5732 Worker
10420 NodeManager
10268 DataNode
10540 Jps
8351 TaskManagerRunner
複製代碼
上圖一張:
(1) 增量聚合: 窗口中每進入一條數據,就進行一次計算
實現方法主要有:
reduce(reduceFunction) aggregate(aggregateFunction) sum(),min(),max()
(2) 全量聚合: 等於窗口內的數據到齊,纔開始進行聚合計算
全量聚合:能夠實現對窗口內的數據進行排序等需
實現方法主要有:
apply(windowFunction)
process(processWindowFunction)
processWindowFunction比windowFunction提供了更多的上下文信息。
複製代碼
全量聚合詳細案例以下:
public class SocketDemoFullCount {
public static void main(String[] args) throws Exception{
//獲取須要的端口號
int port;
try {
ParameterTool parameterTool = ParameterTool.fromArgs(args);
port = parameterTool.getInt("port");
}catch (Exception e){
System.err.println("No port set. use default port 9010--java");
port = 9010;
}
//獲取flink的運行環境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
String hostname = "SparkMaster";
String delimiter = "\n";
//鏈接socket獲取輸入的數據
DataStreamSource<String> text = env.socketTextStream(hostname, port, delimiter);
DataStream<Tuple2<Integer,Integer>> intData = text.map(new MapFunction<String, Tuple2<Integer,Integer>>() {
@Override
public Tuple2<Integer,Integer> map(String value) throws Exception {
return new Tuple2<>(1,Integer.parseInt(value));
}
});
intData.keyBy(0)
.timeWindow(Time.seconds(5))
.process(new ProcessWindowFunction<Tuple2<Integer,Integer>, String, Tuple, TimeWindow>() {
@Override
public void process(Tuple key, Context context, Iterable<Tuple2<Integer, Integer>> elements, Collector<String> out)
throws Exception {
System.out.println("執行process......");
long count = 0;
for(Tuple2<Integer,Integer> element: elements){
count++;
}
out.collect("window:"+context.window()+",count:"+count);
}
}).print();
//這一行代碼必定要實現,不然程序不執行
env.execute("Socket window count");
}
複製代碼
}
(3) 數據源
root@SparkMaster:/usr/local/install/hadoop-2.7.3/sbin# nc -l 9010
12
1
複製代碼
(4) 運行結果
參數調優設置:
1.jobmanager.heap.mb:jobmanager節點可用的內存大小
2.taskmanager.heap.mb:taskmanager節點可用的內存大小
3.taskmanager.numberOfTaskSlots:每臺機器可用的cpu數量
4.parallelism.default:默認狀況下任務的並行度
5.taskmanager.tmp.dirs:taskmanager的臨時數據存儲目錄
slot和parallelism總結:
1.slot是靜態的概念,是指taskmanager具備的併發執行能力
2.parallelism是動態的概念,是指程序運行時實際使用的併發能力
3.設置合適的parallelism能提升運算效率,太多了和太少了都不行
複製代碼
啓動jobmanager
若是集羣中的jobmanager進程掛了,執行下面命令啓動。
bin/jobmanager.sh start
bin/jobmanager.sh stop
啓動taskmanager
添加新的taskmanager節點或者重啓taskmanager節點
bin/taskmanager.sh start
bin/taskmanager.sh stop
複製代碼
1啓動一個一直運行的flink集羣
./bin/yarn-session.sh -n 2 -jm 1024 -tm 1024 -d
2 附着到一個已存在的flink yarn session
./bin/yarn-session.sh -id application_1463870264508_0029
3 資源所在地/tmp/.yarn-properties-root.
2018-11-24 17:24:19,644 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli
- Found Yarn properties file under /tmp/.yarn-properties-root.
4:yarn資源描述
root@SparkMaster:/usr/local/install/flink-1.6.1# vim /tmp/.yarn-properties-root
#Generated YARN properties file
#Sat Nov 24 17:39:07 CST 2018
parallelism=2
dynamicPropertiesString=
applicationID=application_1543052238521_0001
複製代碼
3 執行任務
hadoop fs -mkdir /input/
hadoop fs -put README.txt /input/
./bin/flink run ./examples/batch/WordCount.jar -input hdfs://SparkMaster:9000/input/README.txt -output hdfs://SparkMaster:9000/wordcount-result.txt
4:執行結果
root@SparkMaster:/usr/local/install/flink-1.6.1# hadoop fs -cat /wordcount-result.txt
1 1
13 1
5d002 1
740 1
about 1
account 1
administration 1
複製代碼
中止任務 【web界面或者命令行執行cancel命令】
複製代碼
1 啓動集羣,執行任務
./bin/flink run -m yarn-cluster -yn 2 -yjm 1024 -ytm 1024 ./examples/batch/WordCount.jar -input hdfs://SparkMaster:9000/input/README.txt -output hdfs://SparkMaster:9000/wordcount-result6.txt
2018-11-24 17:56:18,066 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Waiting for the cluster to be allocated
2018-11-24 17:56:18,078 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Deploying cluster, current state ACCEPTED
2018-11-24 17:56:24,901 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - YARN application has been deployed successfully.
複製代碼
2 :提交一次,生成一個Yarn-session
1 參數必選 :
-n,--container <arg> 分配多少個yarn容器 (=taskmanager的數量)
2 參數可選 :
-D <arg> 動態屬性
-d,--detached 獨立運行
-jm,--jobManagerMemory <arg> JobManager的內存 [in MB]
-nm,--name 在YARN上爲一個自定義的應用設置一個名字
-q,--query 顯示yarn中可用的資源 (內存, cpu核數)
-qu,--queue <arg> 指定YARN隊列.
-s,--slots <arg> 每一個TaskManager使用的slots數量
-tm,--taskManagerMemory <arg> 每一個TaskManager的內存 [in MB]
-z,--zookeeperNamespace <arg> 針對HA模式在zookeeper上建立NameSpace
-id,--applicationId <yarnAppId> YARN集羣上的任務id,附着到一個後臺運行的yarn session中
3 run [OPTIONS] <jar-file> <arguments>
run操做參數:
-c,--class <classname> 若是沒有在jar包中指定入口類,則須要在這裏經過這個參數指定
-m,--jobmanager <host:port> 指定須要鏈接的jobmanager(主節點)地址,使用這個參數能夠指定一個不一樣於配置文件中的jobmanager
-p,--parallelism <parallelism> 指定程序的並行度。能夠覆蓋配置文件中的默認值。
4 啓動一個新的yarn-session,它們都有一個y或者yarn的前綴
例如:./bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar
鏈接指定host和port的jobmanager:
./bin/flink run -m SparkMaster:1234 ./examples/batch/WordCount.jar -input hdfs://hostname:port/hello.txt -output hdfs://hostname:port/result1
啓動一個新的yarn-session:
./bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar -input hdfs://hostname:port/hello.txt -output hdfs://hostname:port/result1
5 注意:命令行的選項也可使用./bin/flink 工具得到。
6 Action "run" compiles and runs a program.
Syntax: run [OPTIONS] <jar-file> <arguments>
"run" action options:
-c,--class <classname> Class with the program entry point
("main" method or "getPlan()" method.
Only needed if the JAR file does not
specify the class in its manifest.
-C,--classpath <url> Adds a URL to each user code
classloader on all nodes in the
cluster. The paths must specify a
protocol (e.g. file://) and be
accessible on all nodes (e.g. by means
of a NFS share). You can use this
option multiple times for specifying
more than one URL. The protocol must
be supported by the {@link
java.net.URLClassLoader}.
-d,--detached If present, runs the job in detached
mode
-n,--allowNonRestoredState Allow to skip savepoint state that
cannot be restored. You need to allow
this if you removed an operator from
your program that was part of the
program when the savepoint was
triggered.
-p,--parallelism <parallelism> The parallelism with which to run the
program. Optional flag to override the
default value specified in the
configuration.
-q,--sysoutLogging If present, suppress logging output to
standard out.
-s,--fromSavepoint <savepointPath> Path to a savepoint to restore the job
from (for example
hdfs:///flink/savepoint-1537).
7 Options for yarn-cluster mode:
-d,--detached If present, runs the job in detached
mode
-m,--jobmanager <arg> Address of the JobManager (master) to
which to connect. Use this flag to
connect to a different JobManager than
the one specified in the
configuration.
-yD <property=value> use value for given property
-yd,--yarndetached If present, runs the job in detached
mode (deprecated; use non-YARN
specific option instead)
-yh,--yarnhelp Help for the Yarn session CLI.
-yid,--yarnapplicationId <arg> Attach to running YARN session
-yj,--yarnjar <arg> Path to Flink jar file
-yjm,--yarnjobManagerMemory <arg> Memory for JobManager Container with
optional unit (default: MB)
-yn,--yarncontainer <arg> Number of YARN container to allocate
(=Number of Task Managers)
-ynl,--yarnnodeLabel <arg> Specify YARN node label for the YARN
application
-ynm,--yarnname <arg> Set a custom name for the application
on YARN
-yq,--yarnquery Display available YARN resources
(memory, cores)
-yqu,--yarnqueue <arg> Specify YARN queue.
-ys,--yarnslots <arg> Number of slots per TaskManager
-yst,--yarnstreaming Start Flink in streaming mode
-yt,--yarnship <arg> Ship files in the specified directory
(t for transfer)
-ytm,--yarntaskManagerMemory <arg> Memory per TaskManager Container with
optional unit (default: MB)
-yz,--yarnzookeeperNamespace <arg> Namespace to create the Zookeeper
sub-paths for high availability mode
-z,--zookeeperNamespace <arg> Namespace to create the Zookeeper
sub-paths for high availability mode
複製代碼
Flink 是一個是一個開源的分佈式,高性能,高可用,準確的流處理框架。主要由 Java 代碼實現。支持實時流(stream)處理和批(batch)處理,批數據只是流數據的一個極限特例。 Flink原生支持了迭代計算、內存管理和程序優化。本文立意在運行原理兼部署及Yarn運行模式,後續精彩內容請持續關注本博客,辛苦成文,各自珍惜,謝謝!
版權聲明:本套技術專欄是做者(秦凱新)平時工做的總結和昇華,經過從真實商業環境抽取案例進行總結和分享,並給出商業應用的調優建議和集羣環境容量規劃等內容,請持續關注本套博客。版權聲明:禁止轉載,歡迎學習。QQ郵箱地址:1120746959@qq.com,若有任何問題,可隨時聯繫。
秦凱新 於深圳