1.建立StreamingContext,配置Kafka Consumer, 使用Kafka消費隊列做爲spark輸入流java
import com.alibaba.fastjson.JSONObject; import com.it18zhang.app.common.AppStartupLog; import com.it18zhang.app.common.Constants; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.streaming.Seconds; import org.apache.spark.api.java.function.*; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaInputDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.kafka010.ConsumerStrategies; import org.apache.spark.streaming.kafka010.KafkaUtils; import org.apache.spark.streaming.kafka010.LocationStrategies; import scala.Tuple2; import java.sql.*; import java.util.*; // 建立spark context SparkConf conf = new SparkConf(); conf.setAppName("App-Logs-Spark-Streaming"); conf.setMaster("local[4]") ; JavaStreamingContext sc = new JavaStreamingContext(conf, Seconds.apply(5)); // 配置kafka Map<String, Object> props = new HashMap<String, Object>(); // kafka 集羣地址 props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, Constants.KAFKA_CONSUMER_SERVERS); // 序列化方式 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getCanonicalName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getCanonicalName()); props.put(ConsumerConfig.GROUP_ID_CONFIG, "1"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"lastest"); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); // 設置主題 List<String> topics = new ArrayList<String>() ; topics.add("topic-app-startup"); // 建立kafka流 JavaInputDStream<ConsumerRecord<Object,Object>> ds1 = KafkaUtils.createDirectStream(sc, LocationStrategies.PreferBrokers(), ConsumerStrategies.Subscribe(topics,props));
2.將輸入流的JSON String 經過Map 轉爲自定義的Log對象sql
// 經過Map實現流變化 Function<ConsumerRecord<Object,Object>, AppStartupLog> mapFunc = new Function<ConsumerRecord<Object, Object>, AppStartupLog>() { public AppStartupLog call(ConsumerRecord<Object, Object> msg) throws Exception { String topic = msg.topic(); String key = (String) msg.key(); String value = (String) msg.value(); AppStartupLog log = JSONObject.parseObject(value, AppStartupLog.class); return log; } }; JavaDStream<AppStartupLog> ds2 = ds1.map(mapFunc);
3.將log對象 經過pairMap 映射爲(key, logAgg)的鍵值對數據庫
其中logAgg類是爲了方便聚合操做的自定義類,在Log類的基礎上加入了firstTime,lastTime的屬性,還包括log對象的成員
// 經過AppStartupLog 抽取出appid deviceid version 做爲key值 // 將AppStartupLog轉換爲包含firstTime 和lastTime 的聚合類AppStartupLogAgg // AppStartupLog => (key, AppStartupLogAgg) // PairFunction<AppStartupLog, String, AppStartupLogAgg> pairFunc = new PairFunction<AppStartupLog, String, AppStartupLogAgg>() { public Tuple2<String, AppStartupLogAgg> call(AppStartupLog appStartupLog) throws Exception { String appid = appStartupLog.getAppId(); String deviceid = appStartupLog.getDeviceId(); String appversion = appStartupLog.getAppVersion(); String key = appid + "," + deviceid + ","+appversion; AppStartupLogAgg value = new AppStartupLogAgg(); value.setLog(appStartupLog); value.setFirstTime(appStartupLog.getCreatedAtMs()); value.setLastTime(appStartupLog.getCreatedAtMs()); return new Tuple2<String, AppStartupLogAgg>(key, value); } }; JavaPairDStream<String, AppStartupLogAgg> ds3 = ds2.mapToPair(pairFunc);
4.根據新的鍵值經過reduceByKey聚合apache
// 經過reduceByKey聚合 Function2<AppStartupLogAgg, AppStartupLogAgg, AppStartupLogAgg> reduceFunc = new Function2<AppStartupLogAgg, AppStartupLogAgg, AppStartupLogAgg>() { public AppStartupLogAgg call(AppStartupLogAgg v1, AppStartupLogAgg v2) throws Exception { v1.setFirstTime(Math.min(v1.getFirstTime(),v2.getFirstTime())); v1.setLastTime(Math.max(v1.getLastTime(), v2.getLastTime())); return v1; } }; JavaPairDStream<String, AppStartupLogAgg> ds4 = ds3.reduceByKey(reduceFunc);
5.原來的想法是對聚合之後的每一個partition(相同key)的數據進行一次數據插入,但因爲key值爲appid+deviceid+version,每一個分區內的數據較少, 這樣頻繁插入的代價(好比開啓連接)相對較高。所以考慮按appid再分一次組,將相同appid的數據放在一塊兒後再進行插入。json
// 將key值 轉變爲 appid PairFunction<Tuple2<String, AppStartupLogAgg>,String,AppStartupLogAgg> pairFunc2 = new PairFunction<Tuple2<String, AppStartupLogAgg>, String, AppStartupLogAgg>() { public Tuple2<String, AppStartupLogAgg> call(Tuple2<String, AppStartupLogAgg> tuple) throws Exception { String key = tuple._2().getLog().getAppId(); return new Tuple2<String, AppStartupLogAgg>(key, tuple._2()); } }; JavaPairDStream<String, AppStartupLogAgg> ds5 = ds4.mapToPair(pairFunc2);
6.循環插入api
//循環聚合結果,插入phoenix庫 ds6.foreachRDD(new VoidFunction<JavaPairRDD<String, Iterable<AppStartupLogAgg>>>() { public void call(JavaPairRDD<String, Iterable<AppStartupLogAgg>> rdd) throws Exception { // 對每一個rdd 裏對每一個<K-ValueList> 執行插入操做 rdd.foreach(new VoidFunction<Tuple2<String, Iterable<AppStartupLogAgg>>>() { public void call(Tuple2<String, Iterable<AppStartupLogAgg>> tt) throws Exception { String appid = tt._1() ; Iterator<AppStartupLogAgg> it = tt._2().iterator(); Class.forName("org.apache.phoenix.jdbc.PhoenixDriver"); Connection conn = DriverManager.getConnection("jdbc:phoenix:s202:2181"); conn.setAutoCommit(false); //循環全部聚合數據 while(it.hasNext()){ AppStartupLogAgg agg = it.next(); upsert2Phoenix(conn, appid, agg); } conn.commit(); } }); } });
7.執行app
sc.start(); sc.awaitTermination();
Hive目前不支持更新操做。另外,因爲hive在hadoop上運行批量操做,它須要花費很長的時間,一般是幾分鐘到幾個小時才能夠獲取到查詢的結果。Hive必須提供預先定義好的schema將文件和目錄映射到列,而且Hive與ACID不兼容。分佈式
HBase查詢是經過特定的語言來編寫的,這種語言須要從新學習。類SQL的功能能夠經過Apache Phonenix實現,但這是以必須提供schema爲代價的。另外,Hbase也並非兼容全部的ACID特性,雖然它支持某些特性。最後但不是最重要的–爲了運行Hbase,Zookeeper是必須的,zookeeper是一個用來進行分佈式協調的服務,這些服務包括配置服務,維護元信息和命名空間服務。工具
Hive適合用來對一段時間內的數據進行分析查詢,例如,用來計算趨勢或者網站的日誌。Hive不該該用來進行實時的查詢。由於它須要很長時間才能夠返回結果。oop
Hbase很是適合用來進行大數據的實時查詢。Facebook用Hbase進行消息和實時的分析。它也能夠用來統計Facebook的鏈接數。
Hive和Hbase是兩種基於Hadoop的不一樣技術–Hive是一種類SQL的引擎,而且運行MapReduce任務,Hbase是一種在Hadoop之上的NoSQL 的Key/vale數據庫。固然,這兩種工具是能夠同時使用的。就像用Google來搜索,用FaceBook進行社交同樣,Hive能夠用來進行統計查詢,HBase能夠用來進行實時查詢,數據也能夠從Hive寫到Hbase,設置再從Hbase寫回Hive。