1、Streaming與Flume的聯調html
import org.apache.spark.SparkConf import org.apache.spark.streaming.flume.FlumeUtils import org.apache.spark.streaming.{Seconds, StreamingContext} /* * Spark Streaming整合Flume的第一種方式 * */ object FlumePushWordCount { def main(args: Array[String]): Unit = { //外部傳入參數 if (args.length != 2) { System.out.println("Usage: FlumePushWordCount <hostname> <port>") System.exit(1) } val Array(hostname, port) = args //外部args數組 val sparkConf = new SparkConf().setMaster("local[2]").setAppName("FlumePushWordCount") val ssc = new StreamingContext(sparkConf, Seconds(5)) //選擇輸入ssc的createStream方法,生成一個InputDStream val flumeStream = FlumeUtils.createStream(ssc, hostname, port.toInt) //因爲flume的內容有head有body, 須要先把內容拿出來, 並去掉空值 flumeStream.map(x => new String(x.event.getBody.array()).trim) .flatMap(x => x.split(" ")).map(x => (x, 1)).reduceByKey(_+_).print() ssc.start() ssc.awaitTermination() } }
import org.apache.spark.SparkConf import org.apache.spark.streaming.kafka.KafkaUtils import org.apache.spark.streaming.{Seconds, StreamingContext} /* * SparkStreaming對接Kafka其中的Receiver-based方式 * */ object KafkaReceiverWordCount { def main(args: Array[String]): Unit = { if (args.length != 4) { System.out.println("Usage: KafkaReceiverWordCount <zkQuorum> <group> <topics> <numThreads>") System.exit(1) } val Array(zkQuorum, group, topics, numThreads) = args val sparkConf = new SparkConf().setMaster("local[2]").setAppName("KafkaReceiverWordCount") val ssc = new StreamingContext(sparkConf, Seconds(5)) //createStream須要傳入的其中一個參數是一個Map,就是topics對應的線程數 val topicsMap = topics.split(",").map((_, numThreads.toInt)).toMap val message = KafkaUtils.createStream(ssc, zkQuorum, group, topicsMap) //必定要取Stream的第二位纔是數據,能夠print出來看看,在實際生產中只是更改這一行的業務邏輯!!! message.map(_._2).flatMap(_.split(",")).map((_, 1)).reduceByKey(_+_).print() ssc.start() ssc.awaitTermination() } }
import org.apache.spark.SparkConf import org.apache.spark.streaming.kafka.KafkaUtils import org.apache.spark.streaming.{Seconds, StreamingContext} /* * SparkStreaming對接Kafka其中的Direct方式 * */ object KafkaDirectWordCount { def main(args: Array[String]): Unit = { if (args.length != 4) { System.out.println("Usage: KafkaReceiverWordCount <brokers> <topics>") System.exit(1) } val Array(brokers, topics) = args val sparkConf = new SparkConf().setMaster("local[2]").setAppName("KafkaReceiverWordCount") val ssc = new StreamingContext(sparkConf, Seconds(5)) //createDirectStream須要傳入kafkaParams和topicsSet val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers) val topicsSet = topics.split(",").toSet val message = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder]( ssc, kafkaParams, topicsSet ) //必定要取Stream的第二位纔是數據,能夠print出來看看 message.map(_._2).flatMap(_.split(",")).map((_, 1)).reduceByKey(_+_).print() ssc.start() ssc.awaitTermination() } }