spark-streaming鏈接消費nsq

spark-streaming鏈接消費nsq

目的

  • 使用 NSQ做爲消息流
  • 使用 spark-streaming 進行消費
  • 對數據進行清洗後,保存到hive倉庫中

鏈接方案

一、編寫Spark Streaming Custom Receivers(spark-streaming 自定義接收器),詳細見文檔html

二、使用 nsq 官方提供的Java程序鏈接包 JavaNSQClient ,詳細見文檔java

詳細代碼

自定義鏈接器

ReliableNSQReceiver.scalagit

import com.github.brainlag.nsq.callbacks.NSQMessageCallback
import com.github.brainlag.nsq.lookup.DefaultNSQLookup
import com.github.brainlag.nsq.{NSQConsumer, NSQMessage}
import org.apache.spark.internal.Logging
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.receiver.Receiver


class MessageCallbacks(store_fun:String => Unit) extends NSQMessageCallback with Logging {

  def message(message: NSQMessage): Unit ={

    val s = new String(message.getMessage())
    store_fun(s)
    message.finished()
  }
}
/* 自定義鏈接器 */
class ReliableNSQReceiver(host: String, port: Int, topic: String, channel: String)
  extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging {

  var consumer: NSQConsumer = null

  def onStart() {
    // 啓動經過鏈接接收數據的線程
    new Thread("Socket Receiver") {
      override def run() { receive() }
    }.start()
  }

  def onStop() {
    logInfo("Stopped receiving")
    consumer.close
  }

  /** 接收數據 */
  private def receive() {

    try {
      val lookup = new DefaultNSQLookup
      lookup.addLookupAddress(host, port)
      consumer = new NSQConsumer(lookup, topic, channel, new MessageCallbacks(store))
      consumer.start
    } catch {
      case e: java.net.ConnectException =>
        restart("Error connecting to " + host + ":" + port, e)
      case t: Throwable =>
        restart("Error receiving data", t)
    }
  }

}

使用鏈接器

import com.google.gson.JsonParser
import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}

/*
* 在定義一個 context 以後,您必須執行如下操做.

* 經過建立輸入 DStreams 來定義輸入源.
* 經過應用轉換和輸出操做 DStreams 定義流計算(streaming computations).
* 開始接收輸入而且使用 streamingContext.start() 來處理數據.
* 使用 streamingContext.awaitTermination() 等待處理被終止(手動或者因爲任何錯誤).
* 使用 streamingContext.stop() 來手動的中止處理.
 */

object ELKStreaming extends Logging{

  def main(args: Array[String]): Unit ={

    if (args.length < 4) {
      System.err.println("Usage: ELKStreaming <hostname> <port> <batchDuration> <to table>")
      System.exit(1)
    }
    logInfo("start ===========>")
    StreamingExamples.setStreamingLogLevels()
    val sparkConf = new SparkConf().setAppName("ELKStreaming").setMaster("yarn").set("hive.metastore.uris", "thrift://hadoop15.bigdata.org:9083")

    // 建立一個批次間隔爲10
    val ssc = new StreamingContext(sparkConf, Seconds(args(2).toInt))
    // 使用自定義的NSQReceiver
    val lines = ssc.receiverStream(new ReliableNSQReceiver(args(0), args(1).toInt, "log", "scalatest"))
    val hiveStream: DStream[(String, String)] = lines.map(line => prefix_exit(line))

    // 將計算後的數據保存到hive中
    hiveStream.foreachRDD(rdd => {
      // 利用SparkConf來初始化SparkSession。
      val sparkSession: SparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()

      // 導入隱式轉換來將RDD
      import sparkSession.implicits._
      // 將RDD轉換成DF
      val df: DataFrame = rdd.toDF("str", "ymd")
      // 取出表中的字段
      logInfo("df count ===========>"+ df.count)

      df.createOrReplaceTempView("spark_logs")

      sparkSession.sql("insert into "+args(3)+" partition (ymd) select str,ymd from spark_logs")
    })

    ssc.start()
    ssc.awaitTermination()
  }

  def prefix_exit(line:String):(String,String) ={
   // 對數據進行清洗計算
    val obj = new JsonParser().parse(line).getAsJsonObject
    val data_str1 = obj.get("recv_timestamp").toString().split("T|Z|\"")
    val data_str2 = data_str1(1).split('-')
    val data_str3 = data_str2(1)+"/"+data_str2(2)+"/"+data_str2(0)+" "+data_str1(2)+" [I] "+obj.get("index_type").toString().split("\"")(1)+" "+line
    val data_str4 = data_str2(0)+data_str2(1)+data_str2(2)
    (data_str3.toString(), data_str4.toString())
  }
}
相關文章
相關標籤/搜索