騰訊雲 Serverless 銜接 Kafka 上下游數據流轉實戰

騰訊雲 CKafka 做爲大數據架構中的關鍵組件,起到了數據聚合,流量削峯,消息管道的做用。在 CKafka 上下游中的數據流轉中有各類優秀的開源解決方案。如 Logstash,File Beats,Spark,Flink 等等。本文將帶來一種新的解決方案:Serverless Function。其在學習成本,維護成本,擴縮容能力等方面相對已有開源方案將有優異的表現。php

做者簡介:許文強,騰訊雲 Ckafka 核心研發,精通 Kafka 及其周邊生態。對 Serverless,消息隊列等領域有較深的理解。專一於 Kafka 在公有云多租戶和大規模集羣場景下的性能分析和優化、及雲上消息隊列 serverless 化的相關探索。java

Tencent Cloud Kafka 介紹

Tencent Cloud Kafka 是基於開源 Kafka 引擎研發的適合大規模公有云部署的 Cloud Kafka。是一款適合公有云部署、運行、運維的分佈式的、高可靠、高吞吐和高可擴展的消息隊列系統。它 100% 兼容開源的 Kafka API,目前主要支持開源的 0.9, 0.10, 1.1.1, 2.4.2 四個大版本 ,並提供向下兼容的能力。node

目前 Tencent Cloud Kafka 維護了超過 4000+ 節點的集羣,每日吞吐的消息量超過 9 萬億+條,峯值帶寬達到了 800GB+/s, 堆積數據達到了 20PB+。是一款集成了租戶隔離、限流、鑑權、安全、數據監控告警、故障快速切換、跨可用區容災等等一系列特性的,歷經大流量檢驗的、可靠的公有云上 Kafka 集羣。python

什麼是數據流轉

CKafka 做爲一款高吞吐,高可靠的消息隊列引擎。須要承接大量數據的流入和流出,數據流動的這一過程咱們稱之它爲數據流轉。而在處理數據的流入和流出過程當中,會有不少成熟豐富的開源的解決方案,如 Logstash,Spark,Fllink等。從簡單的數據轉儲,到複雜的數據清洗,過濾,聚合等,都有現成的解決方案。golang

如圖所示,在 Kafka 上下游生態圖中,CKafka 處於中間層,起到數據聚合,流量削峯,消息管道的做用。圖左和圖上是數據寫入的組件概覽,圖右和圖下是下游流式數據處理方案和持久化存儲引擎。這些構成了 Kafka 周邊的數據流動的生態。json

圖 1: Kafka 上下游生態圖

數據流轉新方案: Serverless Function

下圖是流式計算典型數據流動示意圖。其中承接數據流轉方案的是各類開源解決方案。單純從功能和性能的角度來說,開源解決方案都有很優秀的表現。後端

圖 2: 流式計算典型數據流動示意圖

而從學習成本,維護成本,金錢成本,擴縮容能力等角度來看,這些開源方案仍是有欠缺的。怎麼說呢?開源方案的缺點主要在於以下三點:安全

  • 學習成本
  • 調優、維護、解決問題的成本
  • 擴縮容能力

以 Logstash 爲例,它的入門使用學習門檻不高,進階使用有必定的成本,主要包括衆多 release 版本的使用成本,參數調優和故障處理成本,後續的維護成本(進程可用性,單機的負載處理)等。若是用流式計算引擎,如 spark 和 flink,其雖然具備分佈式調度能力和即時的數據處理能力,可是其學習門檻和後期的集羣維護成本,將大大提升。ruby

來看 Serverless Function 是怎麼處理數據流轉的。如圖所示,Serverless Function 運行在數據的流入和流出的處理層的位置,代替了開源的解決方案。Serverless Function 是以自定義代碼的形式來實現數據清洗、過濾、聚合、轉儲等能力的。它具備學習成本低、無維護成本、自動擴縮容和按量計費等優秀特性。數據結構

圖 3: Serverless Function 實現低成本數據流轉

接下來咱們來看一下 Serverless Function 是怎麼實現數據流轉的,而且瞭解一下其底層的運行機制及其優點。

Serverless Function 實現數據流轉

首先來看一下怎麼使用 Serverless Function 實現 Kafka To Elasticsearch 的數據流轉。下面以 Function 事件觸發的方式來講明 Function 是怎麼實現低成本的數據清洗、過濾、格式化和轉儲的:

在業務錯誤日誌採集分析的場景中,會將機器上的日誌信息採集併發送到服務端。服務端選擇 Kafka 做爲消息中間件,起到數據可靠存儲,流量削峯的做用。爲了保存長時間的數據(月,年),通常會將數據清洗、格式化、過濾、聚合後,存儲到後端的分佈式存儲系統,如 HDFS,HBASE,Elasticsearch 中。

如下代碼段分爲三部分:數據源的消息格式,處理後的目標消息格式,功能實現的 Function 代碼段

  • 源數據格式:
{
            "version": 1,
            "componentName": "trade",
            "timestamp": 1595944295,
            "eventId": 9128499,
            "returnValue": -1,
            "returnCode": 101103,
            "returnMessage": "return has no deal return error[錯誤:缺乏**c參數][seqId:u3Becr8iz*]",
            "data": [],
            "seqId": "@kibana-highlighted-field@u3Becr8iz@/kibana-highlighted-field@*"
        }
  • 目標數據格式:
{
            "timestamp": "2020-07-28 21:51:35",
            "returnCode": 101103,
            "returnError": "return has no deal return error",
            "returnMessage": "錯誤:缺乏**c參數",
            "requestId": "u3Becr8iz*"
        }
  • Function 代碼

Function 實現的功能是將數據從源格式,經過清洗,過濾,格式化轉化爲目標數據格式,並轉儲到 Elasticsearch。代碼的邏輯很簡單:CKafka 收到消息後,觸發了函數的執行,函數接收到信息後會執行 convertAndFilter 函數的過濾,重組,格式化操做,將源數據轉化爲目標格式,最後數據會被存儲到 Elasticsearch。

#!/usr/bin/python
# -*- coding: UTF-8 -*-
from datetime import datetime
from elasticsearch import Elasticsearch
from elasticsearch import helpers

esServer = "http://172.16.16.53:9200"  # 修改成 es server 地址+端口 E.g. http://172.16.16.53:9200
esUsr = "elastic"  # 修改成 es 用戶名 E.g. elastic
esPw = "PW123"  # 修改成 es 密碼 E.g. PW2312321321
esIndex = "pre1"  # es 的 index 設置

# ... or specify common parameters as kwargs
es = Elasticsearch([esServer],
                   http_auth=(esUsr, esPw),
                   sniff_on_start=False,
                   sniff_on_connection_fail=False,
                   sniffer_timeout=None)

def convertAndFilter(sourceStr):
    target = {}
    source = json.loads(sourceStr)
    # 過濾掉returnCode=0的日誌
    if source["returnCode"] == 0:
        return
    dateArray = datetime.datetime.fromtimestamp(source["timestamp"])
    target["timestamp"] = dateArray.strftime("%Y-%m-%d %H:%M:%S")
    target["returnCode"] = source["returnCode"]
    message = source["returnMessage"]
    message = message.split("][")
    errorInfo = message[0].split("[")
    target["returnError"] = errorInfo[0]
    target["returnMessage"] = errorInfo[1]
    target["requestId"] = message[1].replace("]", "").replace("seqId:", "")
    return target


def main_handler(event, context):
    # 獲取 event Records 字段並作轉化操做 數據結構 https://cloud.tencent.com/document/product/583/17530
    for record in event["Records"]:
        target = convertAndFilter(record)
        action = {
            "_index": esIndex,
            "_source": {
                "msgBody": target  # 獲取 Ckafka 觸發器 msgBody
            }
        }
        helpers.bulk(es, action)
    return ("successful!")

看到這裏,你們可能會發現,這個代碼段平時是處理單機的少許數據的腳本是同樣的,就是作轉化,轉儲,很簡單。其實不少分佈式的系統作的系統從微觀的角度看,其實就是作的這麼簡單的事情。分佈式框架自己作的更多的是分佈式調度,分佈式運行,可靠性,可用性等等工做,細化到執行單元,功能其實和上面的代碼段是同樣的。

從宏觀來看,Serverless Function 作的事情和分佈式計算框架 Spark, Flink 等作的事情是同樣的,都是調度,執行基本的執行單元,處理業務邏輯。區別在於用開源的方案,須要使用方去學習,使用,維護運行引擎,而 Serverless Function 則是平臺來幫用戶作這些事情。

接下來咱們來看 Serverless Function 在底層是怎麼去支持這些功能的,來看一下其底層的運行機制。如圖所示:

圖 4: Serverless Function 實現數據流轉原理解析

Function 做爲一個代碼片斷,提交給平臺之後。須要有一種觸發函數運行的方式,目前主要有以下三種:事件觸發、定時觸發和主動觸發。

在上面的例子中,咱們是以事件觸發爲例的。當消息提交到 Kafka,就會觸發函數的運行。此時 Serverless 調度運行平臺就會調度底層的 Container 併發去執行函數,並執行函數的邏輯。此時關於 Container 的併發度是由系統自動調度,自動計算的,當 Kafka 的源數據多的時候,併發量就大,當數據少的時候,相應的就會較少併發數。由於函數是以運行時長計費的,當源消息數據量少的時候,併發量小,天然運行時長就少,天然所需付出的資金成本就降下來。

在函數執行過程中,函數的可靠性運行,自動擴縮容調度,併發度等都是用戶不須要關心的。用戶須要 Cover 的只是函數代碼段的可運行,無 BUG。這對於研發人員的精力投入成本就下降不少。

值得一談的是,在開發語言方面,開源方案只支持其相對應的語言,如 Logstash 的嵌入腳本用的是 ruby,spark 主要支持java,scala,python 等。而 Serverless Function 支持的是幾乎業界常見到的開發語言,包括不限於 java,golang,python,node JS,php 等等。這點就可讓研發人員用其熟悉的語言去解決數據流轉問題,這在無形中就減小了不少代碼出錯和出問題的機會。

Serverless Function 在數據流轉場景的優點

下面咱們來統一看一下 Serverless Function 和開源的方案的主要區別及優點。如圖5所示,和開源方案相比。在非實時的數據流轉場景中,Serverless Function 相對現有的開源方案,它具備的優點幾乎是壓倒性的。從功能和性能的角度,它在批式計算(實時)的場景中是徹底能夠知足的。可是它相對開源方案在學習成本,運維成本幾乎能夠忽略,其動態擴縮容,按需付費,毫秒級付費對於資金成本的投入也是很是友好的。

圖 5:Serverless Function 對比現有開源方案的優點

用一句話總結就是:Serverless Function 能用一段熟悉的語言編寫一小段代碼去銜接契合流式計算中的數據流轉。

Serverless Function 在批式計算場景的展望

隨着流式計算的發展,慢慢演化出了批量計算 (batch computing)、流式計算 (stream computing)、交互計算 (interactive computing)、圖計算 (graph computing) 等方向。而架構師在業務中選擇批式計算或者流式計算,其核心是但願按需使用批式計算或流式計算,以取得在延時、吞吐、容錯、成本投入等方面的平衡。在使用者看來,批式處理能夠提供精確的批式數據視圖,流式處理能夠提供近實時的數據視圖。而在批式處理當中,或者說在將來的批式處理和流式處理的底層技術的合流過程當中,Lambda 架構是其發展的必然路徑。

Serverless Function 以其按需使用,自動擴縮容及近乎無限的橫向擴容能力給現階段的批式處理提供了一種選擇,而且在將來批流一體化的過程當中,將來可期。

圖 6: 批式處理和流式處理

One More Thing

當即體驗騰訊雲 Serverless Demo,領取 Serverless 新用戶禮包 👉 serverless/start

歡迎訪問:Serverless 中文網

相關文章
相關標籤/搜索