Apache Kafka的Apache Trafodion消費者git
本文介紹瞭如何實現Apache Trafodion與Apache Kafka的無縫結合。咱們展現了Trafodion如何輕鬆地獲取數據,如何結合不一樣的開源組件,從而使用 Apache Kafka、 Trafodion、 HBase 和Hadoop建立近實時的流式處理工做流。github
如何實現各組件的結合?編程
什麼是Kafka?Kafka是一個分佈式、分區、多複本的日誌提交服務。Kafka維護按類區分的消息,稱爲主題(topic)。生產者(producer)向Kafka的主題發佈消息。消費者(consumer)訂閱主題,接收發布到這些主題的消息。一個主題就是一個類別或者一個可訂閱的條目名稱。對每一個主題來講,Kafka維護的是一個分區日誌(partitioned log)。客戶端控制將消息發佈到哪一個分區。服務器
Kafka集羣包含一個或多個服務器,每臺服務器被稱爲broker。消費者向分區的leader broker發出fetch請求。在每一個請求中,消費者指定偏移量(offset),從該位置返回日誌塊。若是有須要,消費者能夠將偏移量倒回,從新消費數據。同時,Kafka保留消費者在日誌中的位置,即偏移量(offset)。消費者在讀取消息時,會提升其偏移量。消費者也能夠按照任意的順序消費消息。分區容許日誌擴展到超過單個服務器,可是每一個分區的大小必須適應於其服務器。咱們能夠對給定主題的數據進行分區,以便處理大量數據。架構
接下來,關於Trafodion SQL引擎的工做原理。Trafodion SQL編譯器爲全部的關係操做使用運算符模型,包括進程間的消息傳遞、用於橫向擴展處理的可擴展分區功能。編譯器根據region的邊界或統計數據信息,生成使用表的分區佈局的並行查詢計劃。分佈式
在同一進程中以及在跨多個節點從新分區或收集數據時,查詢引擎在操做符間使用數據流模型。Trafodion工做負載在運行時使用分區並行,從而並行處理多個數據分區。oop
在分區並行計劃中,多個運算符爲相同的計劃工做。使用多隊列或管道合併結果,再保存輸入分區的sort順序。因爲數據被劃分紅多個獨立執行的單元,因此分區也被稱爲「數據並行」。表映射UDF容許在Trafodion使用MapReduce模型編程。這些UDF可使用可選的表值參數並生成表值輸出。佈局
如同Trafodion中的其餘運算符,表映射UDF能夠並行執行。可選的優化器接口容許表映射UDF的編寫人員實現多態性,以便在編譯時肯定結果列(名稱和類型),將謂詞下推到UDF或其輸入表,從而影響UDF的並行度並執行各類其餘優化。fetch
Trafodion優化器採用從上而下的方法,很是適合經過運算符優化查詢(例如,表映射UDF);它不會假設查詢樹的運算符是硬編碼的。除了MapReduce模型編程的常規應用程序,表映射UDF也是一個簡便而強大的機制,能夠將其餘數據源整合到Trafodion。優化
結合Trafodion和Kafka的並行架構是很是有益的。例如,能夠直接使用示例的Trafodion Kafka UDF,也能夠進行一些修改,用於消費Kafka生產者發出的數據。一旦執行了查詢,Kafka UDF就將啓動等待中的Trafodion執行程序。
下圖展現了完整的流程:
示例:將數據導入Trafodion
如下步驟將連續數據加載到8節點集羣的Trafodion表:
1.建立用於加載數據的表。
CREATE TABLE employee (id int not null, name varchar(20), email varchar(20), primary key(id)) SALT USING 4 PARTITIONS ON (id);
上表有4個分區;每一個節點一個分區。
2.建立一個主題。
建立一個名爲employee的主題,有4個分區和2個副本。
> bin/kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 1 –partitions 4 –topic employee
3.開啓進程。
建立一個生產者。示例代碼:
Producer producer = new Producer(config); String topic = 「employee」 ; // Produce strings in delimited form 「id|name|email」 for (int i = 1 ; i < = 1000 ; i++) String msg = ; System.out.println(msg); data = new KeyedMessage(topic, String.valueOf(i), msg); producer.send(data); }
使用易鯨捷github資料庫中的示例UDF(https://github.com/esgyn/code...),消費Trafodion中的數據。注意:該簡單的示例UDF不是並行執行的。
SELECT *FROM udf(kafka(‘localhost:2181′, — zookeeper connection 0, — Kafka group id ’employee’, — Kafka topic ‘IC20C20’, — int, and two char output cols ‘|’, — field delimiter 100, — max. rows to read 10000)) — timeout 10 seconds KafkaResult(id, name, email);– name the output columns