html
本地環境經過該方法實例化ExecutionEnvironment.createLocalEnvironment()。默認狀況下,它將使用盡量多的本地線程執行,由於您的機器具備CPU核心(硬件上下文)。您也能夠指定所需的並行性。本地環境能夠配置爲使用enableLogging()/ 登陸到控制檯disableLogging()。前端
在大多數狀況下,ExecutionEnvironment.getExecutionEnvironment()
是更好的方式。LocalEnvironment
當程序在本地啓動時(命令行界面外),該方法會返回一個程序,而且當程序由命令行界面調用時,它會返回一個預配置的羣集執行環境。算法
注意:本地執行環境不啓動任何Web前端來監視執行。apache
object LocalEven { def main(args: Array[String]): Unit = { //TODO 初始化本地執行環境 val env: ExecutionEnvironment = ExecutionEnvironment.createLocalEnvironment() val path = "data2.csv" val data = env.readCsvFile[(String, String, String, String,String,Int,Int,Int)]( filePath = path, lineDelimiter = "\n", fieldDelimiter = ",", ignoreFirstLine = true ) data.groupBy(0,1).first(100).print() } }
使用集合的執行CollectionEnvironment是執行Flink程序的低開銷方法。這種模式的典型用例是自動化測試,調試和代碼重用。多線程
用戶也能夠使用爲批處理實施的算法,以便更具交互性的案例maven
請注意,基於集合的Flink程序的執行僅適用於適合JVM堆的小數據。集合上的執行不是多線程的,只使用一個線程ide
//TODO createCollectionsEnvironment val collectionENV = ExecutionEnvironment.createCollectionsEnvironment val path = "data2.csv" val data = collectionENV.readCsvFile[(String, String, String, String,String,Int,Int,Int)]( filePath = path, lineDelimiter = "\n", fieldDelimiter = ",", ignoreFirstLine = true ) data.groupBy(0,1).first(50).print()
Flink程序能夠在許多機器的集羣上分佈運行。有兩種方法可將程序發送到羣集以供執行:oop
./bin/flink run ./examples/batch/WordCount.jar \ --input file:///home/user/hamlet.txt --output file:///home/user/wordcount_out
遠程環境容許您直接在羣集上執行Flink Java程序。遠程環境指向要在其上執行程序的羣集測試
Maven打包:ui
<build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-jar-plugin</artifactId> <version>2.6</version> <configuration> <archive> <manifest> <addClasspath>true</addClasspath> <classpathPrefix>lib/</classpathPrefix> <mainClass>com.flink.DataStream.RemoteEven</mainClass> </manifest> </archive> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-dependency-plugin</artifactId> <version>2.10</version> <executions> <execution> <id>copy-dependencies</id> <phase>package</phase> <goals> <goal>copy-dependencies</goal> </goals> <configuration> <outputDirectory>${project.build.directory}/lib</outputDirectory> </configuration> </execution> </executions> </plugin> </plugins> </build>
val env: ExecutionEnvironment = ExecutionEnvironment.createRemoteEnvironment("hadoop01", 8081, "target/learning-flink-1.0-SNAPSHOT.jar") val data: DataSet[String] = env.readTextFile("hdfs://hadoop01:9000/README.txt") val flatMap_data: DataSet[String] = data.flatMap(line => line.toLowerCase().split("\\W+")) val mapdata: DataSet[(String, Int)] = flatMap_data.map(line => (line , 1)) val groupData: GroupedDataSet[(String, Int)] = mapdata.groupBy(line => line._1) val result = groupData.reduce((x , y) => (x._1 , x._2+y._2)) result.writeAsText("hdfs://hadoop01:9000/remote") env.execute()