一、編寫Spark Streaming Custom Receivers(spark-streaming 自定義接收器),詳細見文檔html
二、使用 nsq 官方提供的Java程序鏈接包 JavaNSQClient ,詳細見文檔java
ReliableNSQReceiver.scalagit
import com.github.brainlag.nsq.callbacks.NSQMessageCallback import com.github.brainlag.nsq.lookup.DefaultNSQLookup import com.github.brainlag.nsq.{NSQConsumer, NSQMessage} import org.apache.spark.internal.Logging import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.receiver.Receiver class MessageCallbacks(store_fun:String => Unit) extends NSQMessageCallback with Logging { def message(message: NSQMessage): Unit ={ val s = new String(message.getMessage()) store_fun(s) message.finished() } } /* 自定義鏈接器 */ class ReliableNSQReceiver(host: String, port: Int, topic: String, channel: String) extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging { var consumer: NSQConsumer = null def onStart() { // 啓動經過鏈接接收數據的線程 new Thread("Socket Receiver") { override def run() { receive() } }.start() } def onStop() { logInfo("Stopped receiving") consumer.close } /** 接收數據 */ private def receive() { try { val lookup = new DefaultNSQLookup lookup.addLookupAddress(host, port) consumer = new NSQConsumer(lookup, topic, channel, new MessageCallbacks(store)) consumer.start } catch { case e: java.net.ConnectException => restart("Error connecting to " + host + ":" + port, e) case t: Throwable => restart("Error receiving data", t) } } }
import com.google.gson.JsonParser import org.apache.spark.SparkConf import org.apache.spark.internal.Logging import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.{Seconds, StreamingContext} /* * 在定義一個 context 以後,您必須執行如下操做. * 經過建立輸入 DStreams 來定義輸入源. * 經過應用轉換和輸出操做 DStreams 定義流計算(streaming computations). * 開始接收輸入而且使用 streamingContext.start() 來處理數據. * 使用 streamingContext.awaitTermination() 等待處理被終止(手動或者因爲任何錯誤). * 使用 streamingContext.stop() 來手動的中止處理. */ object ELKStreaming extends Logging{ def main(args: Array[String]): Unit ={ if (args.length < 4) { System.err.println("Usage: ELKStreaming <hostname> <port> <batchDuration> <to table>") System.exit(1) } logInfo("start ===========>") StreamingExamples.setStreamingLogLevels() val sparkConf = new SparkConf().setAppName("ELKStreaming").setMaster("yarn").set("hive.metastore.uris", "thrift://hadoop15.bigdata.org:9083") // 建立一個批次間隔爲10 val ssc = new StreamingContext(sparkConf, Seconds(args(2).toInt)) // 使用自定義的NSQReceiver val lines = ssc.receiverStream(new ReliableNSQReceiver(args(0), args(1).toInt, "log", "scalatest")) val hiveStream: DStream[(String, String)] = lines.map(line => prefix_exit(line)) // 將計算後的數據保存到hive中 hiveStream.foreachRDD(rdd => { // 利用SparkConf來初始化SparkSession。 val sparkSession: SparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate() // 導入隱式轉換來將RDD import sparkSession.implicits._ // 將RDD轉換成DF val df: DataFrame = rdd.toDF("str", "ymd") // 取出表中的字段 logInfo("df count ===========>"+ df.count) df.createOrReplaceTempView("spark_logs") sparkSession.sql("insert into "+args(3)+" partition (ymd) select str,ymd from spark_logs") }) ssc.start() ssc.awaitTermination() } def prefix_exit(line:String):(String,String) ={ // 對數據進行清洗計算 val obj = new JsonParser().parse(line).getAsJsonObject val data_str1 = obj.get("recv_timestamp").toString().split("T|Z|\"") val data_str2 = data_str1(1).split('-') val data_str3 = data_str2(1)+"/"+data_str2(2)+"/"+data_str2(0)+" "+data_str1(2)+" [I] "+obj.get("index_type").toString().split("\"")(1)+" "+line val data_str4 = data_str2(0)+data_str2(1)+data_str2(2) (data_str3.toString(), data_str4.toString()) } }