[APP大數據項目]第六天筆記

spark streaming 實現用戶的準實時更新

  1. spark thrift server 替換hiveserver2
  2. HBase建立app_users 表 :
    基本屬性字段,firsttime(第一次啓動時間), lasttime(最後一次啓動時間)
  3. 使用phoenix,Hbase上的SQL支持

實現思路

  1. spark streaming 從kafka接收消息,設置每5秒爲一個窗口
  2. 一個窗口期會來不少數據,要作的是:
    1)若是該 [app, 用戶, version] 沒在表中出現,則插入一天新記錄,firsttime 和 lasttime 相同
    2) 若是該 [ app, 用戶, version ] 出現過,則更新lasttime便可
  3. 因爲咱們只須要第一次和最後一次的數據,所以須要對窗口期內的數據,按【app,用戶和version】聚合,而後排序, 取出第一次和最後一次作插入更新HBase便可
具體實現

1.建立StreamingContext,配置Kafka Consumer, 使用Kafka消費隊列做爲spark輸入流java

import com.alibaba.fastjson.JSONObject;
import com.it18zhang.app.common.AppStartupLog;
import com.it18zhang.app.common.Constants;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.streaming.Seconds;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import scala.Tuple2;

import java.sql.*;
import java.util.*;

// 建立spark context
SparkConf conf = new SparkConf();
conf.setAppName("App-Logs-Spark-Streaming");
conf.setMaster("local[4]") ;

JavaStreamingContext sc = new JavaStreamingContext(conf, Seconds.apply(5));

// 配置kafka
Map<String, Object> props = new HashMap<String, Object>();
// kafka 集羣地址
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, Constants.KAFKA_CONSUMER_SERVERS);
// 序列化方式
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getCanonicalName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getCanonicalName());
props.put(ConsumerConfig.GROUP_ID_CONFIG, "1");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"lastest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

// 設置主題
List<String> topics = new ArrayList<String>() ;
topics.add("topic-app-startup");

// 建立kafka流
JavaInputDStream<ConsumerRecord<Object,Object>> ds1 = KafkaUtils.createDirectStream(sc,
        LocationStrategies.PreferBrokers(), ConsumerStrategies.Subscribe(topics,props));

2.將輸入流的JSON String 經過Map 轉爲自定義的Log對象sql

// 經過Map實現流變化
Function<ConsumerRecord<Object,Object>, AppStartupLog> mapFunc = new Function<ConsumerRecord<Object, Object>, AppStartupLog>() {
    public AppStartupLog call(ConsumerRecord<Object, Object> msg) throws Exception {
        String topic = msg.topic();
        String key = (String) msg.key();
        String value = (String) msg.value();
        AppStartupLog log = JSONObject.parseObject(value, AppStartupLog.class);
        return log;
    }
};

JavaDStream<AppStartupLog> ds2 = ds1.map(mapFunc);

3.將log對象 經過pairMap 映射爲(key, logAgg)的鍵值對數據庫

其中logAgg類是爲了方便聚合操做的自定義類,在Log類的基礎上加入了firstTime,lastTime的屬性,還包括log對象的成員
// 經過AppStartupLog 抽取出appid deviceid version 做爲key值
// 將AppStartupLog轉換爲包含firstTime 和lastTime 的聚合類AppStartupLogAgg
// AppStartupLog => (key, AppStartupLogAgg)
//
PairFunction<AppStartupLog, String, AppStartupLogAgg> pairFunc = new PairFunction<AppStartupLog, String, AppStartupLogAgg>() {
   public Tuple2<String, AppStartupLogAgg> call(AppStartupLog appStartupLog) throws Exception {
       String appid = appStartupLog.getAppId();
       String deviceid = appStartupLog.getDeviceId();
       String appversion = appStartupLog.getAppVersion();
       String key = appid + "," + deviceid + ","+appversion;

       AppStartupLogAgg value = new AppStartupLogAgg();
       value.setLog(appStartupLog);
       value.setFirstTime(appStartupLog.getCreatedAtMs());
       value.setLastTime(appStartupLog.getCreatedAtMs());
       return new Tuple2<String, AppStartupLogAgg>(key, value);
   }
};

JavaPairDStream<String, AppStartupLogAgg> ds3 = ds2.mapToPair(pairFunc);

4.根據新的鍵值經過reduceByKey聚合apache

// 經過reduceByKey聚合
Function2<AppStartupLogAgg, AppStartupLogAgg, AppStartupLogAgg> reduceFunc = new Function2<AppStartupLogAgg, AppStartupLogAgg, AppStartupLogAgg>() {
    public AppStartupLogAgg call(AppStartupLogAgg v1, AppStartupLogAgg v2) throws Exception {
        v1.setFirstTime(Math.min(v1.getFirstTime(),v2.getFirstTime()));
        v1.setLastTime(Math.max(v1.getLastTime(), v2.getLastTime()));
        return v1;
    }
};

JavaPairDStream<String, AppStartupLogAgg> ds4 = ds3.reduceByKey(reduceFunc);

5.原來的想法是對聚合之後的每一個partition(相同key)的數據進行一次數據插入,但因爲key值爲appid+deviceid+version,每一個分區內的數據較少, 這樣頻繁插入的代價(好比開啓連接)相對較高。所以考慮按appid再分一次組,將相同appid的數據放在一塊兒後再進行插入。json

// 將key值 轉變爲 appid
PairFunction<Tuple2<String, AppStartupLogAgg>,String,AppStartupLogAgg> pairFunc2 = new PairFunction<Tuple2<String, AppStartupLogAgg>, String, AppStartupLogAgg>() {
    public Tuple2<String, AppStartupLogAgg> call(Tuple2<String, AppStartupLogAgg> tuple) throws Exception {
        String key = tuple._2().getLog().getAppId();
        return new Tuple2<String, AppStartupLogAgg>(key, tuple._2());
    }
};
JavaPairDStream<String, AppStartupLogAgg> ds5 = ds4.mapToPair(pairFunc2);

6.循環插入api

//循環聚合結果,插入phoenix庫
ds6.foreachRDD(new VoidFunction<JavaPairRDD<String, Iterable<AppStartupLogAgg>>>() {
    public void call(JavaPairRDD<String, Iterable<AppStartupLogAgg>> rdd) throws Exception {
        // 對每一個rdd 裏對每一個<K-ValueList> 執行插入操做
        rdd.foreach(new VoidFunction<Tuple2<String, Iterable<AppStartupLogAgg>>>() {
            public void call(Tuple2<String, Iterable<AppStartupLogAgg>> tt) throws Exception {
                String appid = tt._1() ;
                Iterator<AppStartupLogAgg> it = tt._2().iterator();

                Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");
                Connection conn = DriverManager.getConnection("jdbc:phoenix:s202:2181");
                conn.setAutoCommit(false);
                //循環全部聚合數據
                while(it.hasNext()){
                    AppStartupLogAgg agg = it.next();
                    upsert2Phoenix(conn, appid, agg);
                }
                conn.commit();
            }
        });
    }
});

7.執行app

sc.start();
sc.awaitTermination();

簡單擴展:HIVE和HBASE區別

限制

Hive目前不支持更新操做。另外,因爲hive在hadoop上運行批量操做,它須要花費很長的時間,一般是幾分鐘到幾個小時才能夠獲取到查詢的結果。Hive必須提供預先定義好的schema將文件和目錄映射到列,而且Hive與ACID不兼容。分佈式

HBase查詢是經過特定的語言來編寫的,這種語言須要從新學習。類SQL的功能能夠經過Apache Phonenix實現,但這是以必須提供schema爲代價的。另外,Hbase也並非兼容全部的ACID特性,雖然它支持某些特性。最後但不是最重要的–爲了運行Hbase,Zookeeper是必須的,zookeeper是一個用來進行分佈式協調的服務,這些服務包括配置服務,維護元信息和命名空間服務。工具

應用場景

Hive適合用來對一段時間內的數據進行分析查詢,例如,用來計算趨勢或者網站的日誌。Hive不該該用來進行實時的查詢。由於它須要很長時間才能夠返回結果。oop

Hbase很是適合用來進行大數據的實時查詢。Facebook用Hbase進行消息和實時的分析。它也能夠用來統計Facebook的鏈接數。

總結

Hive和Hbase是兩種基於Hadoop的不一樣技術–Hive是一種類SQL的引擎,而且運行MapReduce任務,Hbase是一種在Hadoop之上的NoSQL 的Key/vale數據庫。固然,這兩種工具是能夠同時使用的。就像用Google來搜索,用FaceBook進行社交同樣,Hive能夠用來進行統計查詢,HBase能夠用來進行實時查詢,數據也能夠從Hive寫到Hbase,設置再從Hbase寫回Hive。

相關文章
相關標籤/搜索