使用函數計算對錶格存儲中數據作簡單清洗

摘要: 表格存儲的增量數據流功能可以使用戶使用API獲取Table Store表中增量數據,並能夠進行增量數據流的實時增量分析、數據增量同步等。經過建立Table Store觸發器,可以實現Table Store Stream和函數計算的自動對接,讓計算函數中自定義的程序邏輯自動處理Table Store表中發生的數據修改,充分的利用了函數計算全託管、彈性伸縮的特色。html

函數計算(Function Compute) 是一個事件驅動的服務,經過函數計算,用戶無需管理服務器等運行狀況,只需編寫代碼並上傳。函數計算準備計算資源,並以彈性伸縮的方式運行用戶代碼,而用戶只需根據實際代碼運行所消耗的資源進行付費。python

Table Store Stream是用於獲取Table Store表中增量數據的一個數據通道,經過建立Table Store觸發器,可以實現Table Store Stream和函數計算的自動對接,讓計算函數中自定義的程序邏輯自動處理Table Store表中發生的數據修改。json

表格存儲高併發的寫入性能以及低廉的存儲成本很是適合物聯網、日誌、監控數據的存儲,咱們能夠將數據寫入到表格存儲中,同時在函數計算中對新增的數據作簡單的清洗、轉換、聚合計算等操做,並將清洗以後的數據寫回到表格存儲的結果表中,並對原始明細數據及結果數據提供實時訪問。服務器

下面,咱們使用函數計算對錶格存儲中的數據作簡單的清洗,並寫入到結果表中。併發

數據定義

咱們假設寫入的爲日誌數據,包括三個基礎字段:python2.7

字段名稱 類型 含義
id 整型 日誌id
level 整型 日誌的等級,越大代表等級越高
message 字符串 日誌的內容

咱們須要將 level>1 的日誌寫入到另一張數據表中,用做專門的查詢。函數

實現過程:

建立實例及數據表

表格存儲的控制檯建立表格存儲實例(__本次以 華東2 distribute-test 爲例__),並建立源表(__source_data__)及結果表(__result__),主鍵爲均 __id (整型)__,因爲表格存儲是 schemafree 結構,無需預先定義其餘屬性列字段。高併發

開啓數據源表的Stream功能

觸發器功能須要先開啓數據表的Stream功能,才能在函數計算中處理寫入表格存儲中的增量數據。性能

Stream記錄過時時長 爲經過 StreamAPI 可以讀取到的增量數據的最長時間。spa

因爲觸發器只能綁定現有的函數,故先到函數計算的控制檯上在同region建立服務及函數。

建立函數計算服務

函數計算的控制檯上建立服務及處理函數,咱們繼續使用華東2節點。

1.在華東2節點建立服務。

2.建立函數依次選擇:空白函數——不建立觸發器。

  • 函數名稱爲:etl_test,選擇 python2.7 環境,在線編輯代碼
  • 函數入口爲:etl_test.handler
  • 代碼稍後編輯,點擊下一步。

3.進行服務受權

因爲函數計算須要將運行中的日誌寫入到日誌服務中,同時,須要對錶格存儲的表進行讀寫,故須要對函數計算進行受權,爲方便起見,咱們先添加 AliyunOTSFullAccess 與 __AliyunLogFullAccess __權限,實際生產中,建議根據權限最小原則來添加權限。

4.點擊受權完成,並建立函數。

5.修改函數代碼。

建立好函數以後,點擊對應的函數代碼執行,編輯代碼並保存,其中,INSTANCE_NAME(表格存儲的實例名稱)、REGION(使用的區域)須要根據狀況進行修改:

使用示例代碼以下:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import cbor
import json
import tablestore as ots

INSTANCE_NAME = 'distribute-test'
REGION = 'cn-shanghai'
ENDPOINT = 'http://%s.%s.ots-internal.aliyuncs.com'%(INSTANCE_NAME, REGION)
RESULT_TABLENAME = 'result'


def _utf8(input):
    return str(bytearray(input, "utf-8"))

def get_attrbute_value(record, column):
    attrs = record[u'Columns']
    for x in attrs:
        if x[u'ColumnName'] == column:
            return x['Value']

def get_pk_value(record, column):
    attrs = record[u'PrimaryKey']
    for x in attrs:
        if x['ColumnName'] == column:
            return x['Value']

#因爲已經受權了AliyunOTSFullAccess權限,此處獲取的credentials具備訪問表格存儲的權限
def get_ots_client(context):
    creds = context.credentials
    client = ots.OTSClient(ENDPOINT, creds.accessKeyId, creds.accessKeySecret, INSTANCE_NAME, sts_token = creds.securityToken)
    return client

def save_to_ots(client, record):
    id = int(get_pk_value(record, 'id'))
    level = int(get_attrbute_value(record, 'level'))
    msg = get_attrbute_value(record, 'message')

    pk = [(_utf8('id'), id),]
    attr = [(_utf8('level'), level), (_utf8('message'), _utf8(msg)),]
    row = ots.Row(pk, attr)
    client.put_row(RESULT_TABLENAME, row)

def handler(event, context):
    records = cbor.loads(event)
    #records = json.loads(event)
    client = get_ots_client(context)
    for record in records['Records']:
        level = int(get_attrbute_value(record, 'level'))
        if level > 1:
            save_to_ots(client, record)
        else:
            print "Level <= 1, ignore."

對錶格存儲 Stream 數據的格式詳情請參考Stream 數據處理

綁定觸發器

1.回到表格存儲的實例管理頁面,點擊表 source_data 後的 使用觸發器 按鈕,進入觸發器綁定界面,點擊使用已有函數計算, 選擇剛建立的服務及函數,勾選 表格存儲發送事件通知的權限, 進行肯定。

2.綁定成功以後,可以看到以下的信息:

運行驗證

1.向 source_data 表中寫入數據。

2.在 result 表中查詢清洗後的數據

點擊 result 表的數據管理頁面,會查詢到剛寫入到 source_data 中的數據。
固然,向 soure_data 寫入level <=1的數據將不會同步到 result 表中

原文連接

相關文章
相關標籤/搜索