數據的接入能夠經過將數據實時寫入Kafka進行接入,不論是直接的寫入仍是經過oracle和mysql的實時接入方式,好比oracle的ogg,mysql的binlogjava
Golden Gate(簡稱OGG)提供異構環境下交易數據的實時捕捉、變換、投遞。mysql
經過OGG能夠實時的將oracle中的數據寫入Kafka中。web
對生產系統影響小:實時讀取交易日誌,以低資源佔用實現大交易量數據實時複製spring
以交易爲單位複製,保證交易一致性:只同步已提交的數據sql
高性能數據庫
MySQL 的二進制日誌 binlog 能夠說是 MySQL 最重要的日誌,它記錄了全部的 DDL
和 DML
語句(除了數據查詢語句select、show等),以事件形式記錄,還包含語句所執行的消耗的時間,MySQL的二進制日誌是事務安全型的。binlog 的主要目的是複製和恢復。apache
經過這些手段,能夠將數據同步到kafka也就是咱們的實時系統中來。json
Apache Kafka Connector能夠方便對kafka數據的接入。bootstrap
依賴api
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.11</artifactId> <version>1.9.0</version></dependency>
必須有的:
1.topic名稱
2.用於反序列化Kafka數據的DeserializationSchema / KafkaDeserializationSchema
3.配置參數:「bootstrap.servers」 「group.id」 (kafka0.8還須要 「zookeeper.connect」)
val properties = new Properties()properties.setProperty("bootstrap.servers", "localhost:9092")// only required for Kafka 0.8properties.setProperty("zookeeper.connect", "localhost:2181")properties.setProperty("group.id", "test")stream = env .addSource(new FlinkKafkaConsumer[String]("topic", new SimpleStringSchema(), properties)) .print()
在許多狀況下,記錄的時間戳(顯式或隱式)嵌入記錄自己。另外,用戶可能想要週期性地或以不規則的方式發出水印。
咱們能夠定義好Timestamp Extractors / Watermark Emitters,經過如下方式將其傳遞給消費者
val env = StreamExecutionEnvironment.getExecutionEnvironment()val myConsumer = new FlinkKafkaConsumer[String](...)myConsumer.setStartFromEarliest() // start from the earliest record possiblemyConsumer.setStartFromLatest() // start from the latest recordmyConsumer.setStartFromTimestamp(...) // start from specified epoch timestamp (milliseconds)myConsumer.setStartFromGroupOffsets() // the default behaviour//指定位置//val specificStartOffsets = new java.util.HashMap[KafkaTopicPartition, java.lang.Long]()//specificStartOffsets.put(new KafkaTopicPartition("myTopic", 0), 23L)//myConsumer.setStartFromSpecificOffsets(specificStartOffsets)val stream = env.addSource(myConsumer)
啓用Flink的檢查點後,Flink Kafka Consumer將使用主題中的記錄,並以一致的方式按期檢查其全部Kafka偏移以及其餘操做的狀態。若是做業失敗,Flink會將流式程序恢復到最新檢查點的狀態,並從存儲在檢查點中的偏移量開始從新使用Kafka的記錄。
若是禁用了檢查點,則Flink Kafka Consumer依賴於內部使用的Kafka客戶端的自動按期偏移提交功能。
若是啓用了檢查點,則Flink Kafka Consumer將在檢查點完成時提交存儲在檢查點狀態中的偏移量。
val env = StreamExecutionEnvironment.getExecutionEnvironment()env.enableCheckpointing(5000) // checkpoint every 5000 msecs
Flink消費Kafka完整代碼:
import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;import java.util.Properties;public class KafkaConsumer { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "localhost:9092"); properties.setProperty("group.id", "test"); //構建FlinkKafkaConsumer FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), properties); //指定偏移量 myConsumer.setStartFromEarliest(); DataStream<String> stream = env .addSource(myConsumer); env.enableCheckpointing(5000); stream.print(); env.execute("Flink Streaming Java API Skeleton"); }
這樣數據已經實時的接入咱們系統中,能夠在Flink中對數據進行處理了,那麼如何對標籤進行計算呢? 標籤的計算過程極大的依賴於數據倉庫的能力,因此擁有了一個好的數據倉庫,標籤也就很容易計算出來了。
數據倉庫是指一個面向主題的、集成的、穩定的、隨時間變化的數據的集合,以用於支持管理決策的過程。
(1)面向主題
業務數據庫中的數據主要針對事物處理任務,各個業務系統之間是各自分離的。而數據倉庫中的數據是按照必定的主題進行組織的
(2)集成
數據倉庫中存儲的數據是從業務數據庫中提取出來的,但並非原有數據的簡單複製,而是通過了抽取、清理、轉換(ETL)等工做。
業務數據庫記錄的是每一項業務處理的流水帳,這些數據不適合於分析處理,進入數據倉庫以前須要通過系列計算,同時拋棄一些分析處理不須要的數據。
(3)穩定
操做型數據庫系統中通常只存儲短時間數據,所以其數據是不穩定的,記錄的是系統中數據變化的瞬態。
數據倉庫中的數據大多表示過去某一時刻的數據,主要用於查詢、分析,不像業務系統中數據庫同樣常常修改。通常數據倉庫構建完成,主要用於訪問
OLTP 聯機事務處理
OLTP是傳統關係型數據庫的主要應用,主要用於平常事物、交易系統的處理
一、數據量存儲相對來講不大
二、實時性要求高,須要支持事物
三、數據通常存儲在關係型數據庫(oracle或mysql中)
OLAP 聯機分析處理
OLAP是數據倉庫的主要應用,支持複雜的分析查詢,側重決策支持
一、實時性要求不是很高,ETL通常都是T+1的數據;
二、數據量很大;
三、主要用於分析決策;
星形模型是最經常使用的數據倉庫設計結構。由一個事實表和一組維表組成,每一個維表都有一個維主鍵。
該模式核心是事實表,經過事實表將各類不一樣的維錶鏈接起來,各個維表中的對象經過事實表與另外一個維表中的對象相關聯,這樣創建各個維表對象之間的聯繫
維表:用於存放維度信息,包括維的屬性和層次結構;
事實表:是用來記錄業務事實並作相應指標統計的表。同維表相比,事實表記錄數量不少
雪花模型是對星形模型的擴展,每個維表均可以向外鏈接多個詳細類別表。除了具備星形模式中維表的功能外,還鏈接對事實表進行詳細描述的維度,可進一步細化查看數據的粒度
例如:地點維表包含屬性集{location_id,街道,城市,省,國家}。這種模式經過地點維度表的city_id與城市維度表的city_id相關聯,獲得如{101,「解放大道10號」,「武漢」,「湖北省」,「中國」}、{255,「解放大道85號」,「武漢」,「湖北省」,「中國」}這樣的記錄。
星形模型是最基本的模式,一個星形模型有多個維表,只存在一個事實表。在星形模式的基礎上,用多個表來描述一個複雜維,構造維表的多層結構,就獲得雪花模型
清晰數據結構:每個數據分層都有它的做用域,這樣咱們在使用表的時候能更方便地定位和理解
髒數據清洗:屏蔽原始數據的異常
屏蔽業務影響:沒必要改一次業務就須要從新接入數據
數據血緣追蹤:簡單來說能夠這樣理解,咱們最終給業務呈現的是能直接使用的一張業務表,可是它的來源有不少,若是有一張來源表出問題了,咱們但願可以快速準確地定位到問題,並清楚它的危害範圍。
減小重複開發:規範數據分層,開發一些通用的中間層數據,可以減小極大的重複計算。
把複雜問題簡單化。將一個複雜的任務分解成多個步驟來完成,每一層只處理單一的步驟,比較簡單和容易理解。便於維護數據的準確性,當數據出現問題以後,能夠不用修復全部的數據,只須要從有問題的步驟開始修復。
數據倉庫的數據直接對接OLAP或日誌類數據,
用戶畫像只是站在用戶的角度,對數據倉庫數據作進一步的建模加工。所以天天畫像標籤相關數據的調度依賴上游數據倉庫相關任務執行完成。
在瞭解了數據倉庫之後,咱們就能夠進行標籤的計算了。在開發好標籤的邏輯之後,將數據寫入hive和druid中,完成實時與離線的標籤開發工做。
Flink從1.9開始支持集成Hive,不過1.9版本爲beta版,不推薦在生產環境中使用。在最新版Flink1.10版本,標誌着對 Blink的整合宣告完成,隨着對 Hive 的生產級別集成,Hive做爲數據倉庫系統的絕對核心,承擔着絕大多數的離線數據ETL計算和數據管理,期待Flink將來對Hive的完美支持。
而 HiveCatalog 會與一個 Hive Metastore 的實例鏈接,提供元數據持久化的能力。要使用 Flink 與 Hive 進行交互,用戶須要配置一個 HiveCatalog,並經過 HiveCatalog 訪問 Hive 中的元數據。
要與Hive集成,須要在Flink的lib目錄下添加額外的依賴jar包,以使集成在Table API程序或SQL Client中的SQL中起做用。或者,能夠將這些依賴項放在文件夾中,並分別使用Table API程序或SQL Client 的-C
或-l
選項將它們添加到classpath中。本文使用第一種方式,即將jar包直接複製到$FLINK_HOME/lib目錄下。本文使用的Hive版本爲2.3.4(對於不一樣版本的Hive,能夠參照官網選擇不一樣的jar包依賴),總共須要3個jar包,以下:
<!-- Flink Dependency --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-hive_2.11</artifactId> <version>1.10.0</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge_2.11</artifactId> <version>1.10.0</version> <scope>provided</scope> </dependency> <!-- Hive Dependency --> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-exec</artifactId> <version>${hive.version}</version> <scope>provided</scope> </dependency>
package com.flink.sql.hiveintegration; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.catalog.hive.HiveCatalog; public class FlinkHiveIntegration { public static void main(String[] args) throws Exception { EnvironmentSettings settings = EnvironmentSettings .newInstance() .useBlinkPlanner() // 使用BlinkPlanner .inBatchMode() // Batch模式,默認爲StreamingMode .build(); //使用StreamingMode /* EnvironmentSettings settings = EnvironmentSettings .newInstance() .useBlinkPlanner() // 使用BlinkPlanner .inStreamingMode() // StreamingMode .build();*/ TableEnvironment tableEnv = TableEnvironment.create(settings); String name = "myhive"; // Catalog名稱,定義一個惟一的名稱表示 String defaultDatabase = "qfbap_ods"; // 默認數據庫名稱 String hiveConfDir = "/opt/modules/apache-hive-2.3.4-bin/conf"; // hive-site.xml路徑 String version = "2.3.4"; // Hive版本號 HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version); tableEnv.registerCatalog("myhive", hive); tableEnv.useCatalog("myhive"); // 建立數據庫,目前不支持建立hive表 String createDbSql = "CREATE DATABASE IF NOT EXISTS myhive.test123"; tableEnv.sqlUpdate(createDbSql); } }
能夠將Flink分析好的數據寫回kafka,而後在druid中接入數據,也能夠將數據直接寫入druid,如下爲示例代碼:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.8.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.flinkdruid</groupId> <artifactId>FlinkDruid</artifactId> <version>0.0.1-SNAPSHOT</version> <name>FlinkDruid</name> <description>Flink Druid Connection</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-core</artifactId> <version>1.9.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.12</artifactId> <version>1.9.0</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
@SpringBootApplication public class FlinkDruidApp { private static String url = "http://localhost:8200/v1/post/wikipedia"; private static RestTemplate template; private static HttpHeaders headers; FlinkDruidApp() { template = new RestTemplate(); headers = new HttpHeaders(); headers.setAccept(Arrays.asList(MediaType.APPLICATION_JSON)); headers.setContentType(MediaType.APPLICATION_JSON); } public static void main(String[] args) throws Exception { SpringApplication.run(FlinkDruidApp.class, args); // Creating Flink Execution Environment ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); //Define data source DataSet<String> data = env.readTextFile("/wikiticker-2015-09-12-sampled.json"); // Trasformation on the data data.map(x -> { return httpsPost(x).toString(); }).print(); } // http post method to post data in Druid private static ResponseEntity<String> httpsPost(String json) { HttpEntity<String> requestEntity = new HttpEntity<>(json, headers); ResponseEntity<String> response = template.exchange("http://localhost:8200/v1/post/wikipedia", HttpMethod.POST, requestEntity, String.class); return response; } @Bean public RestTemplate restTemplate() { return new RestTemplate(); } }
標籤的開發工做繁瑣,須要不斷的開發而且優化,可是如何將作好的標籤提供出去產生真正的價值呢? 下一章,咱們將介紹用戶畫像產品化,未完待續~
參考文獻
《用戶畫像:方法論與工程化解決方案》
更多實時數據分析相關博文與科技資訊,歡迎關注 「實時流式計算」