Kafka - SQL 引擎分享

1.概述

  大多數狀況下,咱們使用 Kafka 只是做爲消息處理。在有些狀況下,咱們須要屢次讀取 Kafka 集羣中的數據。固然,咱們能夠經過調用 Kafka 的 API 來完成,可是針對不一樣的業務需求,咱們須要去編寫不一樣的接口,在通過編譯,打包,發佈等一系列流程。最後才能看到咱們預想的結果。那麼,咱們能不能有一種簡便的方式去實現這一部分功能,經過編寫 SQL 的方式,來可視化咱們的結果。今天,筆者給你們分享一些心得,經過使用 SQL 的形式來完成這些需求。前端

2.內容

  實現這些功能,其架構和思路並不複雜。這裏筆者將整個實現流程,經過一個原理圖來呈現。以下圖所示:json

  這裏筆者給你們詳述一下上圖的含義,消息數據源存放與 Kafka 集羣當中,開啓低階和高階兩個消費線程,將消費的結果以 RPC 的方式共享出去(即:請求者)。數據共享出去後,迴流經到 SQL 引擎處,將內存中的數據翻譯成 SQL Tree,這裏使用到了 Apache 的 Calcite 項目來承擔這一部分工做。而後,咱們經過 Thrift 協議來響應 Web Console 的 SQL 請求,最後將結果返回給前端,讓其以圖表的實行可視化。架構

3.插件配置

  這裏,咱們須要遵循 Calcite 的 JSON Models,好比,針對 Kafka 集羣,咱們須要配置一下內容:學習

{
    version: '1.0',
    defaultSchema: 'kafka',  
    schemas: [  
        {
            name: 'kafka',  
            type: 'custom',
            factory: 'cn.smartloli.kafka.visual.engine.KafkaMemorySchemaFactory',  
            operand: {
                database: 'kafka_db'
            }  
        } 
    ]
}

  另外,這裏最好對錶也作一個表述,配置內容以下所示:fetch

[
    {
        "table":"Kafka",
        "schemas":{
            "_plat":"varchar",
            "_uid":"varchar",
            "_tm":"varchar",
            "ip":"varchar",
            "country":"varchar",
            "city":"varchar",
            "location":"jsonarray"
        }
    }
]

4.操做

  下面,筆者給你們演示經過 SQL 來操做相關內容。相關截圖以下所示:ui

  在查詢處,填寫相關 SQL 查詢語句。點擊 Table 按鈕,獲得以下所示結果:spa

  咱們,能夠將獲取的結果以報表的形式進行導出。插件

  固然,咱們能夠在 Profile 模塊下,瀏覽查詢歷史記錄和當前正在運行的查詢任務。至於其餘模塊,都屬於輔助功能(展現集羣信息,Topic 的 Partition 信息等)這裏就很少贅述了。線程

5.總結

  分析下來,總體架構和實現的思路都不算太複雜,也不存在太大的難點,須要注意一些實現上的細節,好比消費 API 針對集羣消息參數的調整,特別是低階消費 API,尤其須要注意,其 fetch_size 的大小,以及 offset 是須要咱們本身維護的。在使用 Calcite 做爲 SQL 樹時,咱們要遵循其 JSON Model 和標準的 SQL 語法來操做數據源。翻譯

6.結束語

這篇博客就和你們分享到這裏,若是你們在研究學習的過程中有什麼問題,能夠加羣進行討論或發送郵件給我,我會盡我所能爲您解答,與君共勉!

相關文章
相關標籤/搜索