Flink單節點安裝及運行筆記

Flink 一種大數據計算引擎,和其餘計算引擎不一樣的是,它同時支持流處理和批處理的特色; 那麼首先介紹下,這兩點的概念。流處理,想象成水流,長江之水,自西而向東流,終匯入大海,源遠流長。咱們把它類比處處理數據上,那麼能夠這麼理解,數據源源不斷地產生,無界限;批處理,想象成一湖水,自然造成,靜態,類比處處理數據上,就像處理靜態數據集,那麼這個數據集就是有界限;html

學習Flink的目的,一方面是針對業務產生的日誌,能夠就其作成監控系統,對軟件業務運行過程就行監測,提升解決問題的效率;其次一方面,結合AI領域和大數據領域技術學習,能夠應對將來更多結合型產品的業務開發和實現提供更好的解決思路和方案。java

如圖,https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/libs/ml/quickstart.html 很好的提供了AI領域相關處理的第三方類庫,對於結合學習仍是大有裨益。apache

  • 安裝flink   由於我用的是mac 因此在有brew狀況下, 直接 terminal 命令: brew install flink  我本機安裝完成後的目錄位置以下api

  • **啓動單節點flink  ** 切換到目錄libexec下,輸入: ./start-cluster.sh 便可,而後訪問: http://localhost:8081  ,看到以下界面:socket

  • 第三步,編寫flink處理的job,我用的是JAVA,代碼依賴以下:maven

    package com.flink.demo;
    
    import org.apache.flink.api.common.functions.FlatMapFunction;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.util.Collector;
    
    /**
     * @author fxl
     * @version 1.0.0
     * @createTime 2019年07月17日 17:23:00
     * @Description
     */
    public class App {
    
        public static void main(String[] args) throws Exception {
            //參數檢查
            if (args.length != 2) {
                System.err.println("USAGE:\nSocketTextStreamWordCount <hostname> <port>");
                return;
            }
    
            String hostname = args[0];
            Integer port = Integer.parseInt(args[1]);
    
    
            // set up the streaming execution environment
            final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            //獲取數據
            DataStreamSource<String> stream = env.socketTextStream(hostname, port);
    
            //計數
            SingleOutputStreamOperator<Tuple2<String, Integer>> sum = stream.flatMap(new LineSplitter())
                    .keyBy(0)
                    .sum(1);
    
            sum.print();
    
            env.execute("Java WordCount from SocketTextStream Example");
        }
    
        public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
            @Override
            public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) {
                String[] tokens = s.toLowerCase().split("\\W+");
    
                for (String token : tokens) {
                    if (token.length() > 0) {
                        collector.collect(new Tuple2<String, Integer>(token, 1));
                    }
                }
            }
        }
    
    
    }

    maven依賴以下:ide

    <?xml version="1.0" encoding="UTF-8"?>
    
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>com.flink.demo</groupId>
        <artifactId>flinkDemo</artifactId>
        <version>1.0-SNAPSHOT</version>
    
        <name>flinkDemo</name>
    
        <dependencies>
            <dependency>
                <groupId>junit</groupId>
                <artifactId>junit</artifactId>
                <version>4.11</version>
                <scope>test</scope>
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-core</artifactId>
                <version>1.8.1</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-clients_2.12</artifactId>
                <version>1.8.1</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-java_2.12</artifactId>
                <version>1.8.1</version>
                <scope>provided</scope>
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-java</artifactId>
                <version>1.8.1</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-examples</artifactId>
                <version>1.8.1</version>
                <type>pom</type>
            </dependency>
    
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <version>1.18.8</version>
                <scope>provided</scope>
            </dependency>
    
            <dependency>
                <groupId>com.google.guava</groupId>
                <artifactId>guava</artifactId>
                <version>23.0</version>
            </dependency>
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.5.1</version>
                    <configuration>
                        <source>1.8</source>
                        <target>1.8</target>
                    </configuration>
                </plugin>
            </plugins>
        </build>
    
    </project>

第四步,切換到flink版本文件夾下bin,執行命令: flink run -c com.flink.demo.App *.jar ip port  
         -c   指定程序入口類 個人運行類是: com.flink.demo.App
         ip    本機IP地址 127.0.0.1
         port 端口號爲job啓動的指定端口號
         完整命令以下:學習

        fxl-2:bin fxl$ flink run -c com.flink.demo.App /Users/fxl/IdeaProjects/flinkDemo/target/flinkDemo-1.0-SNAPSHOT.jar 127.0.0.1 9008測試

        操做完成後,使用命令: nc -l 9008  監聽job程序端口,後面須要輸入測試數據入口大數據

        

   第二張圖即是統計輸入單詞的一個數量輸出結果。

相關文章
相關標籤/搜索