Spark Streaming實時數據分析

 


1.Spark Streaming功能介紹

1)定義html

Spark Streaming is an extension of the core Spark API that enables scalable, high-throughput, fault-tolerant stream processing of live data streamsjava

http://ke.dajiangtai.com/content/6919/1.png

2.NC服務安裝並運行Spark Streaming

1)在線安裝nc命令mysql

  • rpm –ivh nc-1.84-22.el6.x86_64.rpm(優先選擇)

#安裝web

上傳nc-1.84-22.el6.x86_64.rpm包到software目錄,再安裝sql

[kfk@bigdata-pro02 softwares]$ sudo rpm -ivh nc-1.84-22.el6.x86_64.rpm
Preparing...                ########################################### [100%]
   1:nc                     ########################################### [100%]

[kfk@bigdata-pro02 softwares]$ which nc
/usr/bin/n

 

#啓動shell

nc -lk 9999(相似於一個接收器)

 

啓動以後在下邊能夠進行數據輸入,而後就可以從spark端進行詞頻統計(如2)所示)數據庫

  • yum install -y nc

2)運行Spark Streaming 的WordCountapache

bin/run-example --master local[2] streaming.NetworkWordCount localhost 9999

 

#數據輸入編程


#結果統計bootstrap



注:把日誌級別調整爲WARN才能出現以上效果,不然會被日誌覆蓋,影響觀察


3)把文件經過管道做爲nc的輸入,而後觀察spark Streaming計算結果

cat test.txt 
nc -lk 9999

 

文件具體內容

hadoop  storm   spark
hbase   spark   flume
spark   dajiangtai     spark
hdfs    mapreduce      spark
hive    hdfs    solr
spark   flink   storm
hbase   storm   es

 

3.Spark Streaming工做原理

1)Spark Streaming數據流處理

http://ke.dajiangtai.com/content/6919/2.png

2)接收器工做原理

http://ke.dajiangtai.com/content/6919/3.png

http://ke.dajiangtai.com/content/6919/4.png

http://ke.dajiangtai.com/content/6919/5.png

http://ke.dajiangtai.com/content/6919/6.png

3)綜合工做原理

http://ke.dajiangtai.com/content/6919/7.png

http://ke.dajiangtai.com/content/6919/8.png

4.Spark Streaming編程模型

1)StreamingContext初始化的兩種方式

#第一種

val ssc = new StreamingContext(sc, Seconds(5))

 

#第二種

val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))

 

2)集羣測試

#啓動spark

bin/spark-shell --master local[2]

scala> :paste

// Entering paste mode (ctrl-D to finish)

import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._

val ssc = new StreamingContext(sc, Seconds(5))
val lines = ssc.socketTextStream("localhost", 9999)
val words = lines.flatMap(_.split(" "))
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()

// Exiting paste mode, now interpreting.

 

 

#在nc服務器端輸入數據

spark
hive hbase
hadoop hive
hbase hbase
spark hadoop
hive hbase
spark Hadoop

 

#結果統計


5.Spark Streaming讀取Socket流數據

1)spark-shell運行Streaming程序,要麼線程數大於1,要麼基於集羣。

bin/spark-shell --master local[2]
bin/spark-shell --master spark://bigdata-pro01.kfk.com:7077

 

2)spark 運行模式

http://ke.dajiangtai.com/content/6919/9.png

3)Spark Streaming讀取Socket流數據

a)編寫測試代碼,並本地運行


TestStreaming.scala



package com.zimo.spark

import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  *
  * @author Zimo
  * @date 2019/4/29
  *
  */
object TestStreaming {

  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder
      .master("local[2]")
      .appName("streaming")
      .getOrCreate()

    val sc = spark.sparkContext



//監聽網絡端口,參數一:hostname 參數二:port 參數三:存儲級別,建立了lines流
    val ssc = new StreamingContext(spark.sparkContext, Seconds(5))
    val lines = ssc.socketTextStream("bigdata-pro02.kfk.com", 9999)
    val words = lines.flatMap(_.split(" ")).map(word => (word, 1)).reduceByKey(_ + _)
    words.print()
    ssc.start()
    ssc.awaitTermination()

  }
}

 

b)啓動nc服務發送數據

nc -lk 9999

spark hadoop
spark hadoop
hive hbase
spark hadoop

 


6.Spark Streaming保存數據到外部系統

1)保存到mysql數據庫


import java.sql.DriverManager
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.{Seconds, StreamingContext}

val sc = spark.sparkContext
  val ssc = new StreamingContext(spark.sparkContext, Seconds(5))
  val lines = ssc.socketTextStream("bigdata-pro02.kfk.com", 9999)
  val words = lines.flatMap(_.split(" ")).map(word => (word, 1)).reduceByKey(_ + _)

  words.foreachRDD(rdd => rdd.foreachPartition(lines => {
    Class.forName("com.mysql.jdbc.Driver")
    val conn = DriverManager
      .getConnection("jdbc:mysql://bigdata-pro01.kfk.com:3306/test", "root", "root")
    try {
      for (row <- lines){
        val sql = "insert into webCount(titleName,count)values('"+row._1+"',"+row._2+")"
        conn.prepareStatement(sql).executeUpdate()
      }
    }finally {
        conn.close()
    }
  }))
  ssc.start()
  ssc.awaitTermination()

 

而後在nc服務器端輸入數據,統計結果則會寫入數據庫內的webCount表中。

mysql> select * from webCount;
+-----------+-------+
| titleName | count |
+-----------+-------+
| hive      |     4 |
| spark     |     4 |
| hadoop    |     4 |
| hbase     |     5 |
+-----------+-------+
4 rows in set (0.00 sec

 

2)保存到hdfs

這種方法相比於寫入數據庫則更簡單了,感興趣的請參考下面代碼自行測試一下。

http://ke.dajiangtai.com/content/6919/11.png

特別說明:每次執行,HDFS文件內容都會被重置覆蓋!

7.Structured Streaming 編程模型



1)complete輸出模式

import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
import spark.implicits._

val lines = spark.readStream
  .format("socket")
  .option("host", "bigdata-pro02.kfk.com")
  .option("port", 9999)
  .load()
val words = lines.as[String].flatMap(_.split(" "))
val wordCounts = words.groupBy("value").count()
val query = wordCounts.writeStream.outputMode("complete").format("console").start()

 




2)update輸出模式

這種模式下你在nc服務器端繼續輸入,則會一直統計剛纔輸入及歷史輸入的值,而若是把outputMod修改成「update」,則會根據歷史輸入進行統計更新,而且只顯示出最近一次輸入value值更新後的統計結果。


3)append輸出模式

把outputMod修改成「append」的話代碼也要有一點小小的修改

import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
import spark.implicits._

val lines = spark.readStream
  .format("socket")
  .option("host", "bigdata-pro02.kfk.com")
  .option("port", 9999)
  .load()
val words = lines.as[String].flatMap(_.split(" ")).map(x => (x, 1))
val query = words.writeStream.outputMode("append").format("console").start()

 


能夠看出,這種模式只是把每次輸入進行簡單追加而已。

8.實時數據處理業務分析


9.Spark Streaming與Kafka集成

1)準備工做


根據官網要求,咱們以前的kafka的版本低了,須要下載一個至少0.10.0版本的。

下載地址 http://kafka.apache.org/downloads

修改配置很簡單,只須要把咱們原來配置的/config文件夾複製過來替換便可,並按照原來的配置新建kafka-logs和logs文件夾。而後,將配置文件夾中路徑修改掉便可。

2)編寫測試代碼並啓動運行

咱們把包上傳上來(3個節點都這樣作)


啓動spark-shell

https://images2018.cnblogs.com/blog/1023171/201803/1023171-20180322010955932-1783709958.png

把代碼拷貝進來

val df = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "bigdata-pro01.kfk.com:9092")
      .option("subscribe", "weblogs")
      .load()

    import spark.implicits._
   val lines= df.selectExpr("CAST(value AS STRING)").as[String]
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.groupBy("value").count()
    val query = wordCounts.writeStream
      .outputMode("update")
      .format("console")
      .start()

    query.awaitTermination()

 

這個時候必定要保持kafka和生產者是開啓的:

bin/kafka-console-producer.sh --broker-list bigdata-pro01.kfk.com:9092 --topic weblog

 

在生產者這邊輸入幾個單詞


回到spark-shell界面能夠看到統計結果


10. 基於結構化流完成數據實時分析

咱們先把mysqld的test數據庫的webCount的表的內容清除

https://images2018.cnblogs.com/blog/1023171/201803/1023171-20180322114150081-1902027289.png

打開idea,咱們編寫兩個程序

https://images2018.cnblogs.com/blog/1023171/201803/1023171-20180322225746195-1933346147.png

https://images2018.cnblogs.com/blog/1023171/201803/1023171-20180322230140026-170366579.png

https://images2018.cnblogs.com/blog/1023171/201803/1023171-20180322230200976-303812454.png

 

package com.spark.test

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.ProcessingTime


/**

  * Created by Zimo on 2017/10/16.

  */

object StructuredStreamingKafka {

  case class Weblog(datatime:String,
                    userid:String,
                    searchname:String,
                    retorder:String,
                    cliorder:String,
                    cliurl:String)

  def main(args: Array[String]): Unit = {

    val spark  = SparkSession.builder()
      .master("local[2]")
      .appName("streaming").getOrCreate()

    val df = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "bigdata-pro01.kfk.com:9092")
      .option("subscribe", "weblogs")
      .load()

    import spark.implicits._
    val lines = df.selectExpr("CAST(value AS STRING)").as[String]
    val weblog = lines.map(_.split(","))
      .map(x => Weblog(x(0), x(1), x(2),x(3),x(4),x(5)))
    val titleCount = weblog
      .groupBy("searchname").count().toDF("titleName","count")
    val url ="jdbc:mysql://bigdata-pro01.kfk.com:3306/test"
    val username="root"
    val password="root"
    val writer = new JDBCSink(url,username,password)
    val query = titleCount.writeStream
      .foreach(writer)
      .outputMode("update")
        //.format("console")
      .trigger(ProcessingTime("5 seconds"))
      .start()
    query.awaitTermination()
  }

}

 

package com.spark.test

import java.sql._
import java.sql.{Connection, DriverManager}
import org.apache.spark.sql.{ForeachWriter, Row}

/**
  * Created by Zimo on 2017/10/17.
  */
class JDBCSink(url:String, username:String,password:String) extends ForeachWriter[Row]{

  var statement : Statement =_
  var resultSet : ResultSet =_
  var connection : Connection=_
  override def open(partitionId: Long, version: Long): Boolean = {
    Class.forName("com.mysql.jdbc.Driver")
    //  connection = new MySqlPool(url,username,password).getJdbcConn();
    connection = DriverManager.getConnection(url,username,password);
      statement = connection.createStatement()
      return true
  }

  override def process(value: Row): Unit = {
    val titleName = value.getAs[String]("titleName").replaceAll("[\\[\\]]","")
    val count = value.getAs[Long]("count");

    val querySql = "select 1 from webCount " +
      "where titleName = '"+titleName+"'"

    val updateSql = "update webCount set " +
      "count = "+count+" where titleName = '"+titleName+"'"

    val insertSql = "insert into webCount(titleName,count)" +
      "values('"+titleName+"',"+count+")"

    try{


      var resultSet = statement.executeQuery(querySql)
      if(resultSet.next()){
        statement.executeUpdate(updateSql)
      }else{
        statement.execute(insertSql)
      }
    }catch {
      case ex: SQLException => {
        println("SQLException")
      }
      case ex: Exception => {
        println("Exception")
      }
      case ex: RuntimeException => {
        println("RuntimeException")
      }
      case ex: Throwable => {
        println("Throwable")
      }
    }

  }

  override def close(errorOrNull: Throwable): Unit = {
//    if(resultSet.wasNull()){
//      resultSet.close()
//    }
    if(statement==null){
      statement.close()
    }
    if(connection==null){
      connection.close()
    }
  }
}

 

 

在pom.xml文件裏添加這個依賴包

https://images2018.cnblogs.com/blog/1023171/201803/1023171-20180322232057028-1016980854.png

<dependency>

      <groupId>mysql</groupId>

      <artifactId>mysql-connector-java</artifactId>

      <version>5.1.27</version>

    </dependency>

 

我在這裏說一下這個依賴包版本的選擇上最好要跟你集羣裏面的依賴包版本同樣,否則可能會報錯的,能夠參考hive裏的Lib路徑下的版本。

保持集羣的dfs,hbase,yarn,zookeeper,都是啓動的狀態

https://images2018.cnblogs.com/blog/1023171/201803/1023171-20180322231010171-803607012.png

https://images2018.cnblogs.com/blog/1023171/201803/1023171-20180322231027392-1471559359.png

https://images2018.cnblogs.com/blog/1023171/201803/1023171-20180322231051438-199414215.png

 啓動咱們節點1和節點2的flume,在啓動以前咱們先修改一下flume的配置,由於咱們把jdk版本和kafka版本後面更換了,因此咱們要修改配置文件(3個節點的都改)

https://images2018.cnblogs.com/blog/1023171/201803/1023171-20180322230310836-1178269231.png

啓動節點1的flume

https://images2018.cnblogs.com/blog/1023171/201803/1023171-20180322230609636-1099721704.png

啓動節點1的kafka

bin/kafka-server-start.sh config/server.properties

 

https://images2018.cnblogs.com/blog/1023171/201803/1023171-20180322230705621-1488899955.png

啓動節點2的flume

https://images2018.cnblogs.com/blog/1023171/201803/1023171-20180322230822747-2147304152.png

在節點2上把數據啓動起來,實時產生數據

https://images2018.cnblogs.com/blog/1023171/201803/1023171-20180322230952822-1218764676.png

回到idea咱們把程序運行一下

https://images2018.cnblogs.com/blog/1023171/201803/1023171-20180322231215101-809927133.png

回到mysql裏面查看webCount表,已經有數據進來了

https://images2018.cnblogs.com/blog/1023171/201803/1023171-20180322231711813-1200711695.png

咱們把配置文件修改以下

https://images2018.cnblogs.com/blog/1023171/201803/1023171-20180323101547610-1665572357.png

 https://images2018.cnblogs.com/blog/1023171/201803/1023171-20180323101606325-2117330682.png

 

[client]

socket=/var/lib/mysql/mysql.sock

default-character-set=utf8



[mysqld]

character-set-server=utf8

datadir=/var/lib/mysql

socket=/var/lib/mysql/mysql.sock

user=mysql

# Disabling symbolic-links is recommended to prevent assorted security risks

symbolic-links=0



[mysql]

default-character-set=utf8



[mysqld_safe]

log-error=/var/log/mysqld.log

pid-file=/var/run/mysqld/mysqld.pid

 

 

把表刪除了

https://images2018.cnblogs.com/blog/1023171/201803/1023171-20180323101707279-514845126.png

從新建立表

https://images2018.cnblogs.com/blog/1023171/201803/1023171-20180323101755737-410818476.png

create table webCount( titleName varchar(255) CHARACTER SET utf8 DEFAULT NULL, count int(11) DEFAULT NULL )ENGINE=lnnoDB DEFAULT CHARSET=utf8;

 

從新在運行一次程序

https://images2018.cnblogs.com/blog/1023171/201803/1023171-20180323101901780-79374838.png

 https://images2018.cnblogs.com/blog/1023171/201803/1023171-20180323101922746-2012918589.png

能夠看到沒有中文亂碼了,同時咱們也能夠經過可視化工具鏈接mysql查看

https://images2018.cnblogs.com/blog/1023171/201803/1023171-20180323102046986-882146045.png


以上就是博主爲你們介紹的這一板塊的主要內容,這都是博主本身的學習過程,但願能給你們帶來必定的指導做用,有用的還望你們點個支持,若是對你沒用也望包涵,有錯誤煩請指出。若有期待可關注博主以第一時間獲取更新哦,謝謝!同時也歡迎轉載,但必須在博文明顯位置標註原文地址,解釋權歸博主全部!

相關文章
相關標籤/搜索