Flink實例-Wordcount詳細步驟

link實例之Wordcount詳細步驟html

1.個人IDE是IntelliJ IDEA.在官網上https://www.jetbrains.com/idea/下載最新版2018.2的IDEA,以下圖。破解能夠再http://idea.lanyus.com/上獲取破解碼進行破解,以下圖。前端

 

2.當IDE準備就緒後,開始建立一個項目名爲bbb的maven項目,以下圖。java

 

3.在新窗口打開bbb項目時,IDEA會提示咱們是否自動導包。選擇自動導包,以下圖。apache

 

 4.對pom.xml配置文件進行修改,以下代碼。後端

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.xiao</groupId>
    <artifactId>bbb</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.2.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.10</artifactId>
            <version>1.2.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.10</artifactId>
            <version>1.2.0</version>
        </dependency>

    </dependencies>

</project>

5.在src/main/java/目錄下新建一個類,個人類名爲WordCount,以下代碼。api

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;


public class WordCount {

    public static void main(String[] args) throws Exception {
        //定義socket的端口號
        int port;
        try{
            ParameterTool parameterTool = ParameterTool.fromArgs(args);
            port = parameterTool.getInt("port");
        }catch (Exception e){
            System.err.println("沒有指定port參數,使用默認值9000");
            port = 9000;
        }

        //獲取運行環境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //鏈接socket獲取輸入的數據
        DataStreamSource<String> text = env.socketTextStream("10.192.12.106", port, "\n");

        //計算數據
        DataStream<WordWithCount> windowCount = text.flatMap(new FlatMapFunction<String, WordWithCount>() {
            public void flatMap(String value, Collector<WordWithCount> out) throws Exception {
                String[] splits = value.split("\\s");
                for (String word:splits) {
                    out.collect(new WordWithCount(word,1L));
                }
            }
        })//打平操做,把每行的單詞轉爲<word,count>類型的數據
                .keyBy("word")//針對相同的word數據進行分組
                .timeWindow(Time.seconds(2),Time.seconds(1))//指定計算數據的窗口大小和滑動窗口大小
                .sum("count");
               
        //把數據打印到控制檯
        windowCount.print()
                .setParallelism(1);//使用一個並行度
        //注意:由於flink是懶加載的,因此必須調用execute方法,上面的代碼纔會執行
        env.execute("streaming word count");

    }

    /**
     * 主要爲了存儲單詞以及單詞出現的次數
     */
    public static class WordWithCount{
        public String word;
        public long count;
        public WordWithCount(){}
        public WordWithCount(String word, long count) {
            this.word = word;
            this.count = count;
        }

        @Override
        public String toString() {
            return "WordWithCount{" +
                    "word='" + word + '\'' +
                    ", count=" + count +
                    '}';
        }
    }


}

6.開啓IP爲10.192.12.106的虛擬機,並開啓該虛擬機的終端,在終端輸入以下命令,該命令能夠打開一個端口號爲9000的監聽,輸入命令後光標會停留在以下圖的地方。瀏覽器

nc -l 9000

7.切換回IDEA,在菜單欄Build->Build Project,而後運行該類,當控制檯console輸出以下圖所示的信息時表示Wordcount成功的與9000的監聽端口創建了鏈接。socket

 

8.在虛擬機終端開的光標停留出,輸入hello hello world world world world,而後 回車。在IDEA的控制檯會顯示以下單詞和詞頻的信息,表示成功。maven

 9.接下來把項目bbb打jar包,上傳Flink後臺運行,進行以下圖操做。ide

首先要保證Java Compiler版本爲1.8。

 

而後選擇File->Project Structure,進行修改。

 

 

 

10.在配置好Flink的虛擬機下,進入目錄/opt/data/flink-1.3.2/bin中,輸入以下命令,開啓Flink的本地模式。(不會配置flink的小夥伴能夠打開連接https://www.cnblogs.com/ALittleMoreLove/p/9396118.html

./start-local.sh

11.在瀏覽器裏輸入開啓Flink守護進程的虛擬機的IP和8081端口,進入以下Flink前端頁面。

 12.上傳bbb.jar文件到Flink後端運行。

備註:在學習大數據的漫長道路上,咱們會遇到各類各樣奇怪的問題,在嘗試了多種方法仍然沒法解決後 若是再沒有高人指點,常常一個問題就卡好幾天。這種無奈與絕望的感受我想各位自學大數據的小夥伴們應該深有體會。我我的解決問題一般有兩種方法:一種是直接找大牛幫忙,另一種是在網上找各類相關的博客和帖子,再從中總結出一套能夠解決本身問題的方法。本身探索新知識時,每每是很艱辛的,遇到好多天也解決不了的問題也是很正常的,可是千萬不要放棄,堅持下來就必定會有收穫的!Wordcount實例令我躺了兩天的坑,最後終於找到了解決的方法,但願這篇隨筆能夠對自學大數據的小夥伴提供必定的幫助。

相關文章
相關標籤/搜索