吞吐量 編程語言 處理速度 生態java
Storm 較低 clojure 很是快(亞秒) 阿里(JStorm)python
Flink 較高 scala 較快(亞秒) 國內使用較少android
Spark Streaming 很是高 scala 快(毫秒) 完善的生態圈c++
//建立StreamingContext 至少要有兩個線程 一個線程用於接收數據 一個線程用於處理數據 val conf = new SparkConf().setAppName("Ops1").setMaster("local[2]") val ssc = new StreamingContext(conf, Milliseconds(3000)) val receiverDS: ReceiverInputDStream[String] = ssc.socketTextStream("uplooking01", 44444) val pairRetDS: DStream[(String, Int)] = receiverDS.flatMap(_.split(",")).map((_, 1)).reduceByKey(_ + _) pairRetDS.print() //開啓流計算 ssc.start() //優雅的關閉 ssc.awaitTermination()
Receiverweb
Directredis
基於Receiver的方式整合kafka(生產環境不建議使用,在0.10中已經移除了)apache
//建立StreamingContext 至少要有兩個線程 一個線程用於接收數據 一個線程用於處理數據 val conf = new SparkConf().setAppName("Ops1").setMaster("local[2]") val ssc = new StreamingContext(conf, Milliseconds(3000)) val zkQuorum = "uplooking03:2181,uplooking04:2181,uplooking05:2181" val groupId = "myid" val topics = Map("hadoop" -> 3) val receiverDS: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc, zkQuorum, groupId, topics) receiverDS.flatMap(_._2.split(" ")).map((_,1)).reduceByKey(_+_).print() ssc.start() ssc.awaitTermination()
基於Direct的方式(生產環境使用)編程
//建立StreamingContext 至少要有兩個線程 一個線程用於接收數據 一個線程用於處理數據 val conf = new SparkConf().setAppName("Ops1").setMaster("local[2]") val ssc = new StreamingContext(conf, Milliseconds(3000)) val kafkaParams = Map("metadata.broker.list" -> "uplooking03:9092,uplooking04:9092,uplooking05:9092") val topics = Set("hadoop") val inputDS: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics) inputDS.flatMap(_._2.split(" ")).map((_, 1)).reduceByKey(_ + _).print() ssc.start() ssc.awaitTermination()
public class GenerateAccessLog { public static void main(String[] args) throws IOException, InterruptedException { //準備數據 int[] ips = {123, 18, 123, 112, 181, 16, 172, 183, 190, 191, 196, 120}; String[] requesTypes = {"GET", "POST"}; String[] cursors = {"/vip/112", "/vip/113", "/vip/114", "/vip/115", "/vip/116", "/vip/117", "/vip/118", "/vip/119", "/vip/120", "/vip/121", "/free/210", "/free/211", "/free/212", "/free/213", "/free/214", "/company/312", "/company/313", "/company/314", "/company/315"}; String[] courseNames = {"大數據", "python", "java", "c++", "c", "scala", "android", "spark", "hadoop", "redis"}; String[] references = {"www.baidu.com/", "www.sougou.com/", "www.sou.com/", "www.google.com"}; FileWriter fw = new FileWriter(args[0]); PrintWriter printWriter = new PrintWriter(fw); while (true) { // Thread.sleep(1000); //產生字段 String date = new Date().toLocaleString(); String method = requesTypes[getRandomNum(0, requesTypes.length)]; String url = "/cursor" + cursors[getRandomNum(0, cursors.length)]; String HTTPVERSION = "HTTP/1.1"; String ip = ips[getRandomNum(0, ips.length)] + "." + ips[getRandomNum(0, ips.length)] + "." + ips[getRandomNum(0, ips.length)] + "." + ips[getRandomNum(0, ips.length)]; String reference = references[getRandomNum(0, references.length)]; String rowLog = date + " " + method + " " + url + " " + HTTPVERSION + " " + ip + " " + reference; printWriter.println(rowLog); printWriter.flush(); } } //[start,end) public static int getRandomNum(int start, int end) { int i = new Random().nextInt(end - start) + start; return i; } }
# The configuration file needs to define the sources, # the channels and the sinks. # Sources, channels and sinks are defined per agent, # in this case called 'agent' f1.sources = r1 f1.channels = c1 f1.sinks = k1 #define sources f1.sources.r1.type = exec f1.sources.r1.command =tail -F /logs/access.log #define channels f1.channels.c1.type = memory f1.channels.c1.capacity = 1000 f1.channels.c1.transactionCapacity = 100 #define sink 採集日誌到uplooking03 f1.sinks.k1.type = avro f1.sinks.k1.hostname = uplooking03 f1.sinks.k1.port = 44444 #bind sources and sink to channel f1.sources.r1.channels = c1 f1.sinks.k1.channel = c1
# The configuration file needs to define the sources, # the channels and the sinks. # Sources, channels and sinks are defined per agent, # in this case called 'agent' f2.sources = r2 f2.channels = c2 f2.sinks = k2 #define sources f2.sources.r2.type = avro f2.sources.r2.bind = uplooking03 f2.sources.r2.port = 44444 #define channels f2.channels.c2.type = memory f2.channels.c2.capacity = 1000 f2.channels.c2.transactionCapacity = 100 #define sink f2.sinks.k2.type = logger #bind sources and sink to channel f2.sources.r2.channels = c2 f2.sinks.k2.channel = c2
# The configuration file needs to define the sources, # the channels and the sinks. # Sources, channels and sinks are defined per agent, # in this case called 'agent' f2.sources = r2 f2.channels = c2 f2.sinks = k2 #define sources f2.sources.r2.type = avro f2.sources.r2.bind = uplooking03 f2.sources.r2.port = 44444 #define channels f2.channels.c2.type = memory f2.channels.c2.capacity = 1000 f2.channels.c2.transactionCapacity = 100 #define sink f2.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink f2.sinks.k2.topic = hadoop f2.sinks.k2.brokerList = uplooking03:9092,uplooking04:9092,uplooking05:9092 f2.sinks.k2.requiredAcks = 1 f2.sinks.k2.batchSize = 2 #bind sources and sink to channel f2.sources.r2.channels = c2 f2.sinks.k2.channel = c2