1)定義html
Spark Streaming is an extension of the core Spark API that enables scalable, high-throughput, fault-tolerant stream processing of live data streamsjava
1)在線安裝nc命令mysql
#安裝web
上傳nc-1.84-22.el6.x86_64.rpm包到software目錄,再安裝sql
#啓動shell
啓動以後在下邊能夠進行數據輸入,而後就可以從spark端進行詞頻統計(如2)所示)數據庫
2)運行Spark Streaming 的WordCountapache
#數據輸入編程
#結果統計bootstrap
注:把日誌級別調整爲WARN才能出現以上效果,不然會被日誌覆蓋,影響觀察
3)把文件經過管道做爲nc的輸入,而後觀察spark Streaming計算結果
文件具體內容
1)Spark Streaming數據流處理
2)接收器工做原理
3)綜合工做原理
1)StreamingContext初始化的兩種方式
#第一種
#第二種
2)集羣測試
#啓動spark
#在nc服務器端輸入數據
#結果統計
1)spark-shell運行Streaming程序,要麼線程數大於1,要麼基於集羣。
2)spark 運行模式
a)編寫測試代碼,並本地運行
b)啓動nc服務發送數據
1)保存到mysql數據庫
而後在nc服務器端輸入數據,統計結果則會寫入數據庫內的webCount表中。
2)保存到hdfs
這種方法相比於寫入數據庫則更簡單了,感興趣的請參考下面代碼自行測試一下。
特別說明:每次執行,HDFS文件內容都會被重置覆蓋!
1)complete輸出模式
2)update輸出模式
這種模式下你在nc服務器端繼續輸入,則會一直統計剛纔輸入及歷史輸入的值,而若是把outputMod修改成「update」,則會根據歷史輸入進行統計更新,而且只顯示出最近一次輸入value值更新後的統計結果。
3)append輸出模式
把outputMod修改成「append」的話代碼也要有一點小小的修改
能夠看出,這種模式只是把每次輸入進行簡單追加而已。
1)準備工做
根據官網要求,咱們以前的kafka的版本低了,須要下載一個至少0.10.0版本的。
下載地址 http://kafka.apache.org/downloads
修改配置很簡單,只須要把咱們原來配置的/config文件夾複製過來替換便可,並按照原來的配置新建kafka-logs和logs文件夾。而後,將配置文件夾中路徑修改掉便可。
2)編寫測試代碼並啓動運行
咱們把包上傳上來(3個節點都這樣作)
啓動spark-shell
把代碼拷貝進來
這個時候必定要保持kafka和生產者是開啓的:
在生產者這邊輸入幾個單詞
回到spark-shell界面能夠看到統計結果
咱們先把mysqld的test數據庫的webCount的表的內容清除
打開idea,咱們編寫兩個程序
在pom.xml文件裏添加這個依賴包
我在這裏說一下這個依賴包版本的選擇上最好要跟你集羣裏面的依賴包版本同樣,否則可能會報錯的,能夠參考hive裏的Lib路徑下的版本。
保持集羣的dfs,hbase,yarn,zookeeper,都是啓動的狀態
啓動咱們節點1和節點2的flume,在啓動以前咱們先修改一下flume的配置,由於咱們把jdk版本和kafka版本後面更換了,因此咱們要修改配置文件(3個節點的都改)
啓動節點1的flume
啓動節點1的kafka
啓動節點2的flume
在節點2上把數據啓動起來,實時產生數據
回到idea咱們把程序運行一下
回到mysql裏面查看webCount表,已經有數據進來了
咱們把配置文件修改以下
把表刪除了
從新建立表
從新在運行一次程序
能夠看到沒有中文亂碼了,同時咱們也能夠經過可視化工具鏈接mysql查看
以上就是博主爲你們介紹的這一板塊的主要內容,這都是博主本身的學習過程,但願能給你們帶來必定的指導做用,有用的還望你們點個支持,若是對你沒用也望包涵,有錯誤煩請指出。若有期待可關注博主以第一時間獲取更新哦,謝謝!同時也歡迎轉載,但必須在博文明顯位置標註原文地址,解釋權歸博主全部!