StreamingListener 是針對spark streaming的各個階段的事件監聽機制。node
//須要監聽spark streaming中各個階段的事件只需實現這個特質中對應的事件函數便可
//自己既有註釋說明
trait StreamingListener {
/** Called when the streaming has been started */
/** streaming 啓動的事件 */
def onStreamingStarted(streamingStarted: StreamingListenerStreamingStarted) { }
/** Called when a receiver has been started */
/** 接收啓動事件 */
def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) { }
/** Called when a receiver has reported an error */
def onReceiverError(receiverError: StreamingListenerReceiverError) { }
/** Called when a receiver has been stopped */
def onReceiverStopped(receiverStopped: StreamingListenerReceiverStopped) { }
/** Called when a batch of jobs has been submitted for processing. */
/** 每一個批次提交的事件 */
def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted) { }
/** Called when processing of a batch of jobs has started. */
/** 每一個批次啓動的事件 */
def onBatchStarted(batchStarted: StreamingListenerBatchStarted) { }
/** Called when processing of a batch of jobs has completed. */
/** 每一個批次完成的事件 */
def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) { }
/** Called when processing of a job of a batch has started. */
def onOutputOperationStarted(
outputOperationStarted: StreamingListenerOutputOperationStarted) { }
/** Called when processing of a job of a batch has completed. */
def onOutputOperationCompleted(
outputOperationCompleted: StreamingListenerOutputOperationCompleted) { }
}複製代碼
功能:監控批次處理時間,若超過閾值則告警,每次告警間隔2分鐘redis
class SparkStreamingDelayListener(private val appName:String, private val duration: Int,private val times: Int) extends StreamingListener{
private val logger = LoggerFactory.getLogger("SparkStreamingDelayListener")
//每一個批次完成時執行
override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {
val batchInfo = batchCompleted.batchInfo
val processingStartTime = batchCompleted.batchInfo.processingStartTime
val numRecords = batchCompleted.batchInfo.numRecords
val processingEndTime = batchInfo.processingEndTime
val processingDelay = batchInfo.processingDelay
val totalDelay = batchInfo.totalDelay
//將每次告警時間寫入redis,用以判斷告警間隔大於2分鐘
val jedis = RedisClusterClient.getJedisClusterClient()
val current_time = (System.currentTimeMillis / 1000).toInt
val redis_time = jedis.get(appName)
var flag = false
if(redis_time==null || current_time-redis_time.toInt>120){
jedis.set(appName,current_time.toString)
flag = true
}
//若批次處理延遲大於批次時長指定倍數,而且告警間隔大約2分鐘,則告警
if(totalDelay.get >= times * duration * 1000 && flag){
val monitorContent = appName+": numRecords ->"+numRecords+",processingDelay ->"+processingDelay.get/1000+" s,totalDelay -> "+totalDelay.get/1000+"s"
println(monitorContent)
val msg = "Streaming_"+appName+"_DelayTime:"+totalDelay.get/1000+"S"
val getURL = "http://node1:8002/message/weixin?msg="+msg
HttpClient.doGet(getURL)
}
}
}複製代碼
//streamingListener不須要在配置中設置,能夠直接添加到streamingContext中
object My{
def main(args : Array[String]) : Unit = {
val sparkConf = new SparkConf()
val ssc = new StreamingContext(sparkConf,Seconds(20))
ssc.addStreamingListener(new SparkStreamingDelayListener("Userid2Redis", duration,times))
....
}
}複製代碼
關注微信公衆號《大數據技術進階》,從點到面,帶你瞭解大數據技術架構及應用 !微信