摘要:本次分享主要介紹Kafka產品的原理和使用方式,以及同步數據到MaxCompute的參數介紹、獨享集成資源組與自定義資源組的使用背景和配置方式、Kafka同步數據到MaxCompute的開發到生產的總體部署操做等內容。html
演講嘉賓簡介:耿江濤,阿里雲智能技術支持工程師安全
如下內容根據演講視頻以及PPT整理而成。
本次分享主要圍繞如下兩個方面:服務器
1、背景介紹
2、具體操做流程
1.Kafka消息隊列使用以及原理
2.資源組介紹以及配置
3.同步過程及其注意事項
4.開發測試以及生產部署網絡
1、背景介紹
1. 實驗目的
在平常工做中,不少企業將APP或網站產生的行爲日誌和業務數據經過Kafka收集以後作兩方面的處理。一方面是離線處理,一方面是實時處理。而且通常會投遞到MaxCompute中做爲模型的構建,進行相關的業務處理,如用戶的特徵、銷售排名、訂單地區分佈等。這些數據造成以後會在數據報表中做爲展現。架構
2. 方案說明
Kafka數據同步到DataWorks有兩條鏈路。一條鏈路是業務數據和行爲日誌經過Kafka,再經過Flume 上傳到Datahub,以及Max Compute,最終在QuickBI進行展現。另外一條鏈路是業務數據和行爲日誌經過Kafka以及DataWorks,MaxCompute,最終在QuickBI當中展現。
本次展現Kafka經過DataWorks上傳到MaxCompute的流程。從DataWorks上傳到MaxCompute是經過兩種方案進行上傳數據同步的。方案一是自定義資源組,方案二是獨享資源組。自定義資源組通常適用於複雜網絡的數據上雲場景。獨享資源組操做方式主要針對集成資源不足的狀況。併發
2、具體操做流程
1.Kafka消息隊列使用及其原理
Kafka產品概述:消息隊列 for Apache Kafka 是阿里雲提供的分佈式、高吞吐、可擴展的消息隊列服務。消息隊列for Apache Kafka通常用於日誌收集、監控數據聚合、流式數據處理、在線離線分析等大數據領域。消息隊列 for Apache Kafka 針對開源的 Apache Kafka 提供全託管服務,完全解決開源產品長期以來的痛點。雲上Kafka具備低成本、更彈性、更可靠的優點,用戶只需專一於業務開發,無需部署運維。負載均衡
Kafka架構介紹:以下圖所示,一個典型的Kafka集羣主要分爲四部分。Producer生產數據並經過 push 模式向消息隊列 for Apache Kafka 的 Kafka Broker 發送消息。發送的消息能夠是網站的頁面訪問、服務器日誌,也能夠是 CPU 和內存相關的系統資源信息。Kafka Broker用於存儲消息的服務器。Kafka Broker 支持水平擴展。 Kafka Broker 節點的數量越多,Kafka 集羣的吞吐率越高。Kafka Broker針對topic會partition一個概念,partition有leader、follower的角色分配。Consumer經過 pull 模式從消息隊列 for Apache Kafka Broker 訂閱並消費leader的信息數據。其中partition內部有offset做爲消息的消費點位。經過ZooKeeper管理集羣的配置、選舉 leader 分區,而且在Consumer Group 發生變化時,管理partition_leader的負載均衡。運維
Kafka消息隊列購買以及部署:以下圖,用戶首先能夠到Kafka消息隊列產品頁面點擊購買,根據我的狀況選擇對應包年、包月等消費方式、地區、實例類型、磁盤、流量以及消息存放時間。其中較爲重要的一點是要選擇對應地區,若是用戶的MaxCompute在華北,那麼儘可能選擇華北地區。選擇開通完成後須要進行部署。點擊部署,選擇合適的VPC及其交換機進行部署。分佈式
部署完成後進入Kafka Topic管理頁面,點擊建立Topic輸入本身的Topic。Topic命名下面有三條注意信息,命名儘可能跟本身的業務一致,好比是財經業務或者是商務業務,儘可能進行區分。第四步進入Consumer Group管理,點擊建立Consumer Group建立本身所須要的Consumer Group。Consumer Group的命名也須要規範,若是是財經或商務業務,儘可能和本身的Topic相對應。ide
Kafka白名單配置:Kafka安裝部署完成以後確認須要訪問Kafka的服務器或產品的白名單。下圖中的默認接入點即爲訪問接口。
2.資源組介紹及其配置
自定義資源組的使用背景:自定義資源組通常針對IDC之間的網絡問題。本地網絡和雲上網絡存在差別,如DataWorks能夠經過免費傳輸能力(默認任務資源組)進行海量數據上雲,但默認資源組沒法實現傳輸速度存在較高要求或複雜環境中的數據源同步上雲的需求。此時用戶可使用自定義資源組可實現複雜環境同步上雲的需求,解決DataWorks默 認資源組與您的數據源不通的問題,或實現更高速度的傳輸能力。然而,自定義資源組主要解決的仍是複雜網絡環境上雲同步問題,打通任意網絡環境之間的數據傳輸同步。
自定義資源組的配置:自定義資源組的配置須要六步操做,首先點擊進入DataWorks控制檯,點開工做空間的列表,選擇用戶須要的項目空間,點擊進入數據集成,即確認本身的數據集成是要在哪一個空間項目下進行添加。以後,點擊進入數據源界面,點擊新增自定義資源組。要注意頁面右上角的新增自定義資源組是隻有項目管理員有權限添加。
第三步是確認Kafka與須要添加的自定義資源組屬於同一個VPC下。本次實驗是ECS向Kafka發送消息,兩者的VPC應該一致。第四步登陸ECS,即我的的自定義資源組。執行命令dmidecode|grep UUID獲得ECS的UUID。
第五步是將添加服務器UUID以及自定義資源組的IP或機器CPU和內存填寫進來。最後是在ECS上執行相關命令,Agent安裝共5步,作一一確認,在第4小步完成後點擊刷新查看服務是否爲可用狀態。添加完成後進行檢查連通測試,檢查是否添加成功。
獨享資源組的使用背景:一些客戶反映在Kafka同步到MaxCompute時會報資源不足的問題,能夠經過新增獨享資源組的方式進行數據同步。獨享資源模式下,機器的物理資源(網絡、磁盤、CPU和內存等)徹底獨享。不只能夠隔離用戶間的資源使用,也能夠隔離不一樣工做空間任務的資源使用。此外,獨享資源也支持靈活的擴容、縮容功能,能夠知足資源獨
享、靈活配置等需求。獨享資源組能夠訪問在同一地域下的VPC數據源,同時也能夠訪問跨地域的公網RDS地址。
獨享資源組的配置:獨享資源組的配置主要須要兩步操做,首先進入DataWorks控制檯的資源列表,點擊新增獨享資源組,包括獨享集成資源組和獨享調度資源組。此處選擇新增獨享集成資源組,點擊購買時仍要注意選擇對應的購買方式、區域、資源、內存、時間期限、數量等。
購買完成後須要把獨享集成資源組綁定到與Kafka對應的VPC,點擊專有網絡綁定,選擇與Kafka對應的交換機(最明顯的是可用區的區別)、安全組。
3.同步過程及其注意事項
Kafka同步到MaxCompute的須要進行相關參數配置同時須要注意如下幾個事項。
DataWorks數據集成操做:進入DataWorks操做界面,點擊建立業務流程,在新建的業務流程添加數據同步節點,再進行命名。
以下圖所示,進入數據同步節點,包括Reader端和Writer端,點擊Reader端數據源爲Kafka,Writer端數據源爲ODPS。點擊轉化爲腳本模式。下圖右上角是幫助文檔,Reader或Writer端的一些同步參數能夠在此處就近點擊,方便閱讀、操做和理解。
Kafka Reader的主要參數:Kafka Reader的主要參數首先server,上文所述Kafka的默認接入點就是其中一個server,ip:port。注意此處server是必填參數。topic,表示在Kafka部署完成以後,Kafka處理數據源的topic,此處也是必填參數。下一個參數是針對列column,column支持常量列、數據列、屬性列。常量列和數據列不過重要。同步的完整消息通常存放在屬性列 value 中,若是須要其它信息,如partition、offset、timestamp,也能夠在屬性列中篩選。column是必填參數。
keyType、valueType各有6種類型,根據用戶同步的數據,選擇相應的信息,同步一個類型。須要注意同步方式是按消息時間同步,仍是按消費點位置同步的。按數據消費點位置同步有四個場景,beginDateTime,endDateTime,beginOffset,endOffset。 beginDateTime 和beginOffset 二選其一,做爲數據消費起點。endDateTime 和endOffset 二選其一。須要注意beginDateTime、endDateTime 中須要Kafka0.10.2版本以上才支持按數據消費點位置同步功能。另外須要注意beginOffset有三個比較特殊的形式:seekToBeginning,表示從開始點位消費數據;seekToLast,表示從上次消費的偏移位置消費數據,按照beginOffset從上次偏移位置只能一次消費,若是使用beginDateTime則能夠屢次消費,這取決於消息存放時間;seekToEnd,表示從最後點位消費數據,會讀取到空數據。
skipExceeedRecord沒有太大做用,是沒必要填項。partition對topic全部分區共同讀消費的,因此無需自定義一個分區,是非必填項。kafkaConfig,若是有其它相關配置參數能夠擴展配置在kafkaConfig,kafkaConfig也是非必填項。
MaxCompute Writer的主要參數:dataSource是數據源名稱,添加ODPS數據源。tables,表示所建立的數據表的表名稱,Kafka的數據要同步到哪張表中,相應的字段也能夠創建。
partition,若是表爲分區表,則必須配置到最後一級分區,肯定同步位置。若爲非分區表,則沒必要填。column,儘可能與Kafka column中的相關字段作一一對應的操做。同步的字段對應,信息同步才能確認成功。truncate,寫入時同步的數據是選擇以追加模式寫仍是以覆蓋模式寫,儘可能避免多個DDL同時操做一個分區,或者在多個併發做業啓動前提早建立分區。
Kafka同步數據到MaxCompute:將下圖拆分爲三部分。Kafka的Reader端,MaxCompute的Writer端以及限制參數。Reader包含server、endOffset、kafkaConfig、group.id、valueType、ByteArray、column字段、topic、beginOffset、seekToLast等。MaxCompute的Writer端包含覆蓋、追加、壓縮、查看源碼、同步到的表、字段要和Kafka的Reader端作一一對應,最重要的是value數據同步。限制參數,主要有errorlimit,數據超過幾個錯誤後會進行報錯;speed,能夠限制流速、併發度等。
參考Kafka生產者SDK編寫代碼:最終生產出的數據要發送到Kafka中,經過相關代碼能夠查看用戶的生產數據。下圖一段代碼表示配置信息的讀取,協議、序列化方式以及請求的等待時間,須要發送哪個topic,發送什麼樣的消息。發送完成後回傳一個信息。詳細代碼能夠參考配置文件、消息來源、生產者消費者的代碼模板:
https://help.aliyun.com/document_detail/99957.html?spm=a2c4g.11186623.6.566.45fc54eayX69b0。
代碼打包運行在ECS上(與Kafka同一個可用區):以下圖所示,執行crontab-e命令,每到17:00執行一次。下圖爲發送日誌完成後的消息記錄。
在MaxCompute上建立表:進入DataWorks業務流程頁面,建立目標表,使用一個DDL語句建立同步的表,或根據用戶我的業務相應建立不一樣的表的字段。
4.開發測試以及生產部署
選擇自定義資源組(或獨享集成資源組)進行同步操做:下圖所示,選擇右上角「配置任務資源組」,根據用戶我的需求選擇資源組,點擊執行。執行完成後,會出現標識顯示成功,同步數據記錄以及結果是否成功。同步過程基本結束。
查詢同步的數據結果:在DataWorks臨界面查看同步結果,在臨時節點點擊查詢命令,select * from testkafka3(表),查看數據同步結果。數據已經同步過來,證實測試成功。
設置調度參數:業務流程開發數據同步以後,會對相關模型進行一些業務處理,最後設計一些SQL節點、同步節點,進行部署。以下圖所示,在右側點擊調度配置,輸入調度時間。具體操做可參考DataWorks官方文檔完善業務處理流程。
提交業務流程節點,並打包發佈:點擊業務流程,選擇所須要提交的節點並提交。一些業務流程提交以後不須要放到生產環境當中。而後進入任務發佈界面,將節點添加到待發布進行任務部署。
確認業務流程發佈成功:最後在運維中心頁面,確認發佈是否在生產環境中存在。至此Kafka同步數據到MaxCompute過程結束。到了對應的調度時間,在各個節點或者右上角會有節點的日誌展現,能夠查看日誌運行狀況是否正常,或是否須要進行後續操做,部署數據或是相關命令。
本文爲雲棲社區原創內容,未經容許不得轉載。