摘要: 表格存儲的增量數據流功能可以使用戶使用API獲取Table Store表中增量數據,並能夠進行增量數據流的實時增量分析、數據增量同步等。經過建立Table Store觸發器,可以實現Table Store Stream和函數計算的自動對接,讓計算函數中自定義的程序邏輯自動處理Table Store表中發生的數據修改,充分的利用了函數計算全託管、彈性伸縮的特色。html
函數計算(Function Compute) 是一個事件驅動的服務,經過函數計算,用戶無需管理服務器等運行狀況,只需編寫代碼並上傳。函數計算準備計算資源,並以彈性伸縮的方式運行用戶代碼,而用戶只需根據實際代碼運行所消耗的資源進行付費。python
Table Store Stream是用於獲取Table Store表中增量數據的一個數據通道,經過建立Table Store觸發器,可以實現Table Store Stream和函數計算的自動對接,讓計算函數中自定義的程序邏輯自動處理Table Store表中發生的數據修改。json
表格存儲高併發的寫入性能以及低廉的存儲成本很是適合物聯網、日誌、監控數據的存儲,咱們能夠將數據寫入到表格存儲中,同時在函數計算中對新增的數據作簡單的清洗、轉換、聚合計算等操做,並將清洗以後的數據寫回到表格存儲的結果表中,並對原始明細數據及結果數據提供實時訪問。服務器
下面,咱們使用函數計算對錶格存儲中的數據作簡單的清洗,並寫入到結果表中。併發
咱們假設寫入的爲日誌數據,包括三個基礎字段:python2.7
字段名稱 | 類型 | 含義 |
---|---|---|
id | 整型 | 日誌id |
level | 整型 | 日誌的等級,越大代表等級越高 |
message | 字符串 | 日誌的內容 |
咱們須要將 level>1 的日誌寫入到另一張數據表中,用做專門的查詢。函數
在表格存儲的控制檯建立表格存儲實例(__本次以 華東2 distribute-test 爲例__),並建立源表(__source_data__)及結果表(__result__),主鍵爲均 __id (整型)__,因爲表格存儲是 schemafree 結構,無需預先定義其餘屬性列字段。高併發
觸發器功能須要先開啓數據表的Stream功能,才能在函數計算中處理寫入表格存儲中的增量數據。性能
Stream記錄過時時長 爲經過 StreamAPI 可以讀取到的增量數據的最長時間。spa
因爲觸發器只能綁定現有的函數,故先到函數計算的控制檯上在同region建立服務及函數。
在函數計算的控制檯上建立服務及處理函數,咱們繼續使用華東2節點。
1.在華東2節點建立服務。
2.建立函數依次選擇:空白函數——不建立觸發器。
3.進行服務受權
因爲函數計算須要將運行中的日誌寫入到日誌服務中,同時,須要對錶格存儲的表進行讀寫,故須要對函數計算進行受權,爲方便起見,咱們先添加 AliyunOTSFullAccess 與 __AliyunLogFullAccess __權限,實際生產中,建議根據權限最小原則來添加權限。
4.點擊受權完成,並建立函數。
5.修改函數代碼。
建立好函數以後,點擊對應的函數
—代碼執行
,編輯代碼並保存,其中,INSTANCE_NAME(表格存儲的實例名稱)、REGION(使用的區域)須要根據狀況進行修改:
使用示例代碼以下:
#!/usr/bin/env python # -*- coding: utf-8 -*- import cbor import json import tablestore as ots INSTANCE_NAME = 'distribute-test' REGION = 'cn-shanghai' ENDPOINT = 'http://%s.%s.ots-internal.aliyuncs.com'%(INSTANCE_NAME, REGION) RESULT_TABLENAME = 'result' def _utf8(input): return str(bytearray(input, "utf-8")) def get_attrbute_value(record, column): attrs = record[u'Columns'] for x in attrs: if x[u'ColumnName'] == column: return x['Value'] def get_pk_value(record, column): attrs = record[u'PrimaryKey'] for x in attrs: if x['ColumnName'] == column: return x['Value'] #因爲已經受權了AliyunOTSFullAccess權限,此處獲取的credentials具備訪問表格存儲的權限 def get_ots_client(context): creds = context.credentials client = ots.OTSClient(ENDPOINT, creds.accessKeyId, creds.accessKeySecret, INSTANCE_NAME, sts_token = creds.securityToken) return client def save_to_ots(client, record): id = int(get_pk_value(record, 'id')) level = int(get_attrbute_value(record, 'level')) msg = get_attrbute_value(record, 'message') pk = [(_utf8('id'), id),] attr = [(_utf8('level'), level), (_utf8('message'), _utf8(msg)),] row = ots.Row(pk, attr) client.put_row(RESULT_TABLENAME, row) def handler(event, context): records = cbor.loads(event) #records = json.loads(event) client = get_ots_client(context) for record in records['Records']: level = int(get_attrbute_value(record, 'level')) if level > 1: save_to_ots(client, record) else: print "Level <= 1, ignore."
對錶格存儲 Stream 數據的格式詳情請參考Stream 數據處理
1.回到表格存儲的實例管理頁面,點擊表 source_data 後的 使用觸發器 按鈕,進入觸發器綁定界面,點擊使用已有函數計算
, 選擇剛建立的服務及函數,勾選 表格存儲發送事件通知的權限
, 進行肯定。
2.綁定成功以後,可以看到以下的信息:
1.向 source_data 表中寫入數據。
2.在 result 表中查詢清洗後的數據
點擊 result 表的數據管理頁面,會查詢到剛寫入到 source_data 中的數據。 固然,向 soure_data 寫入level <=1的數據將不會同步到 result 表中