雲函數 SCF 與對象存儲實現 WordCount 算法

本文將嘗試經過 MapReduce 模型實現一個簡單的 WordCount 算法,區別於傳統使用 Hadoop 等大數據框架,本文使用雲函數 SCF 與對象存儲 COS 來實現。python

MapReduce 在維基百科中的解釋以下:git

MapReduce 是 Google 提出的一個軟件架構,用於大規模數據集(大於 1TB)的並行運算。概念「Map(映射)」和「Reduce(概括)」,及他們的主要思想,都是從函數式編程語言借來的,還有從矢量編程語言借來的特性。github

經過這段描述,咱們知道,MapReduce 是面向大數據並行處理的計算模型、框架和平臺,在傳統學習中,一般會在 Hadoop 等分佈式框架下進行 MapReduce 相關工做,隨着雲計算的逐漸發展,各個雲廠商也都前後推出了在線的 MapReduce 業務。算法

理論基礎

在開始以前,咱們根據 MapReduce 的要求,先繪製一個簡單的流程圖:express

在這個結構中,咱們須要 2 個雲函數分別做 Mapper 和 Reducer;以及 3 個對象存儲的存儲桶,分別做爲輸入的存儲桶、中間臨時緩存存儲桶和結果存儲桶。在實例前,因爲咱們的函數即將部署在廣州區,所以在廣州區創建 3 個存儲桶:編程

對象存儲1	ap-guangzhou	srcmr
對象存儲2	ap-guangzhou	middlestagebucket
對象存儲3	ap-guangzhou	destcmr

爲了讓整個 Mapper 和 Reducer 邏輯更加清晰,在開始以前先對傳統的 WordCount 結構進行改造,使其更加適合雲函數,同時合理分配
Mapper 和 Reducer 的工做:數組

功能實現

編寫 Mapper 相關邏輯,代碼以下:瀏覽器

# -*- coding: utf8 -*-
import datetime
from qcloud_cos_v5 import CosConfig
from qcloud_cos_v5 import CosS3Client
from qcloud_cos_v5 import CosServiceError
import re
import os
import sys
import logging
logging.basicConfig(level=logging.INFO, stream=sys.stdout)
logger = logging.getLogger()
logger.setLevel(level=logging.INFO)
region = u'ap-guangzhou'  # 根據實際狀況,修改地域
middle_stage_bucket = 'middlestagebucket'  # 根據實際狀況,修改bucket名
def delete_file_folder(src):
    if os.path.isfile(src):
        try:
            os.remove(src)
        except:
            pass
    elif os.path.isdir(src):
        for item in os.listdir(src):
            itemsrc = os.path.join(src, item)
            delete_file_folder(itemsrc)
        try:
            os.rmdir(src)
        except:
            pass
def download_file(cos_client, bucket, key, download_path):
    logger.info("Get from [%s] to download file [%s]" % (bucket, key))
    try:
        response = cos_client.get_object(Bucket=bucket, Key=key, )
        response['Body'].get_stream_to_file(download_path)
    except CosServiceError as e:
        print(e.get_error_code())
        print(e.get_error_msg())
        return -1
    return 0
def upload_file(cos_client, bucket, key, local_file_path):
    logger.info("Start to upload file to cos")
    try:
        response = cos_client.put_object_from_local_file(
            Bucket=bucket,
            LocalFilePath=local_file_path,
            Key='{}'.format(key))
    except CosServiceError as e:
        print(e.get_error_code())
        print(e.get_error_msg())
        return -1
    logger.info("Upload data map file [%s] Success" % key)
    return 0
def do_mapping(cos_client, bucket, key, middle_stage_bucket, middle_file_key):
    src_file_path = u'/tmp/' + key.split('/')[-1]
    middle_file_path = u'/tmp/' + u'mapped_' + key.split('/')[-1]
    download_ret = download_file(cos_client, bucket, key, src_file_path)  # download src file
    if download_ret == 0:
        inputfile = open(src_file_path, 'r')  # open local /tmp file
        mapfile = open(middle_file_path, 'w')  # open a new file write stream
        for line in inputfile:
            line = re.sub('[^a-zA-Z0-9]', ' ', line)  # replace non-alphabetic/number characters
            words = line.split()
            for word in words:
                mapfile.write('%s\t%s' % (word, 1))  # count for 1
                mapfile.write('\n')
        inputfile.close()
        mapfile.close()
        upload_ret = upload_file(cos_client, middle_stage_bucket, middle_file_key,
                                 middle_file_path)  # upload the file's each word
        delete_file_folder(src_file_path)
        delete_file_folder(middle_file_path)
        return upload_ret
    else:
        return -1
def map_caller(event, context, cos_client):
    appid = event['Records'][0]['cos']['cosBucket']['appid']
    bucket = event['Records'][0]['cos']['cosBucket']['name'] + '-' + appid
    key = event['Records'][0]['cos']['cosObject']['key']
    key = key.replace('/' + str(appid) + '/' + event['Records'][0]['cos']['cosBucket']['name'] + '/', '', 1)
    logger.info("Key is " + key)
    middle_bucket = middle_stage_bucket + '-' + appid
    middle_file_key = '/' + 'middle_' + key.split('/')[-1]
    return do_mapping(cos_client, bucket, key, middle_bucket, middle_file_key)
def main_handler(event, context):
    logger.info("start main handler")
    if "Records" not in event.keys():
        return {"errorMsg": "event is not come from cos"}
    secret_id = "" 
    secret_key = ""  
    config = CosConfig(Region=region, SecretId=secret_id, SecretKey=secret_key, )
    cos_client = CosS3Client(config)
    start_time = datetime.datetime.now()
    res = map_caller(event, context, cos_client)
    end_time = datetime.datetime.now()
    print("data mapping duration: " + str((end_time - start_time).microseconds / 1000) + "ms")
    if res == 0:
        return "Data mapping SUCCESS"
    else:
        return "Data mapping FAILED"

一樣的方法,創建 reducer.py 文件,編寫 Reducer 邏輯,代碼以下:緩存

# -*- coding: utf8 -*-
from qcloud_cos_v5 import CosConfig
from qcloud_cos_v5 import CosS3Client
from qcloud_cos_v5 import CosServiceError
from operator import itemgetter
import os
import sys
import datetime
import logging
region = u'ap-guangzhou'  # 根據實際狀況,修改地域
result_bucket = u'destmr'  # 根據實際狀況,修改bucket名
logging.basicConfig(level=logging.INFO, stream=sys.stdout)
logger = logging.getLogger()
logger.setLevel(level=logging.INFO)
def delete_file_folder(src):
    if os.path.isfile(src):
        try:
            os.remove(src)
        except:
            pass
    elif os.path.isdir(src):
        for item in os.listdir(src):
            itemsrc = os.path.join(src, item)
            delete_file_folder(itemsrc)
        try:
            os.rmdir(src)
        except:
            pass
def download_file(cos_client, bucket, key, download_path):
    logger.info("Get from [%s] to download file [%s]" % (bucket, key))
    try:
        response = cos_client.get_object(Bucket=bucket, Key=key, )
        response['Body'].get_stream_to_file(download_path)
    except CosServiceError as e:
        print(e.get_error_code())
        print(e.get_error_msg())
        return -1
    return 0
def upload_file(cos_client, bucket, key, local_file_path):
    logger.info("Start to upload file to cos")
    try:
        response = cos_client.put_object_from_local_file(
            Bucket=bucket,
            LocalFilePath=local_file_path,
            Key='{}'.format(key))
    except CosServiceError as e:
        print(e.get_error_code())
        print(e.get_error_msg())
        return -1
    logger.info("Upload data map file [%s] Success" % key)
    return 0
def qcloud_reducer(cos_client, bucket, key, result_bucket, result_key):
    word2count = {}
    src_file_path = u'/tmp/' + key.split('/')[-1]
    result_file_path = u'/tmp/' + u'result_' + key.split('/')[-1]
    download_ret = download_file(cos_client, bucket, key, src_file_path)
    if download_ret == 0:
        map_file = open(src_file_path, 'r')
        result_file = open(result_file_path, 'w')
        for line in map_file:
            line = line.strip()
            word, count = line.split('\t', 1)
            try:
                count = int(count)
                word2count[word] = word2count.get(word, 0) + count
            except ValueError:
                logger.error("error value: %s, current line: %s" % (ValueError, line))
                continue
        map_file.close()
        delete_file_folder(src_file_path)
    sorted_word2count = sorted(word2count.items(), key=itemgetter(1))[::-1]
    for wordcount in sorted_word2count:
        res = '%s\t%s' % (wordcount[0], wordcount[1])
        result_file.write(res)
        result_file.write('\n')
    result_file.close()
    upload_ret = upload_file(cos_client, result_bucket, result_key, result_file_path)
    delete_file_folder(result_file_path)
    return upload_ret
def reduce_caller(event, context, cos_client):
    appid = event['Records'][0]['cos']['cosBucket']['appid']
    bucket = event['Records'][0]['cos']['cosBucket']['name'] + '-' + appid
    key = event['Records'][0]['cos']['cosObject']['key']
    key = key.replace('/' + str(appid) + '/' + event['Records'][0]['cos']['cosBucket']['name'] + '/', '', 1)
    logger.info("Key is " + key)
    res_bucket = result_bucket + '-' + appid
    result_key = '/' + 'result_' + key.split('/')[-1]
    return qcloud_reducer(cos_client, bucket, key, res_bucket, result_key)
def main_handler(event, context):
    logger.info("start main handler")
    if "Records" not in event.keys():
        return {"errorMsg": "event is not come from cos"}
    secret_id = "SecretId" 
    secret_key = "SecretKey"  
    config = CosConfig(Region=region, SecretId=secret_id, SecretKey=secret_key, )
    cos_client = CosS3Client(config)
    start_time = datetime.datetime.now()
    res = reduce_caller(event, context, cos_client)
    end_time = datetime.datetime.now()
    print("data reducing duration: " + str((end_time - start_time).microseconds / 1000) + "ms")
    if res == 0:
        return "Data reducing SUCCESS"
    else:
        return "Data reducing FAILED"

部署與測試

遵循 Serverless Framework 的 yaml 規範,編寫 serveerless.yaml:架構

WordCountMapper:
  component: "@serverless/tencent-scf"
  inputs:
    name: mapper
    codeUri: ./code
    handler: index.main_handler
    runtime: Python3.6
    region: ap-guangzhou
    description: 網站監控
    memorySize: 64
    timeout: 20
    events:
      - cos:
          name: srcmr-1256773370.cos.ap-guangzhou.myqcloud.com
          parameters:
            bucket: srcmr-1256773370.cos.ap-guangzhou.myqcloud.com
            filter:
              prefix: ''
              suffix: ''
            events: cos:ObjectCreated:*
            enable: true

WordCountReducer:
  component: "@serverless/tencent-scf"
  inputs:
    name: reducer
    codeUri: ./code
    handler: index.main_handler
    runtime: Python3.6
    region: ap-guangzhou
    description: 網站監控
    memorySize: 64
    timeout: 20
    events:
      - cos:
          name: middlestagebucket-1256773370.cos.ap-guangzhou.myqcloud.com
          parameters:
            bucket: middlestagebucket-1256773370.cos.ap-guangzhou.myqcloud.com
            filter:
              prefix: ''
              suffix: ''
            events: cos:ObjectCreated:*
            enable: true

完成以後,經過 sls --debug 指令進行部署。部署成功以後,進行基本的測試:

  1. 準備一個英文文檔:

  1. 登陸騰訊雲後臺,打開咱們最初創建的存儲桶:srcmr,並上傳該文件;

  2. 上傳成功以後,稍等片刻便可看到 Reducer 程序已經在 Mapper 執行以後,產出日誌:

此時,咱們打開結果存儲桶,查看結果:

如今,咱們就完成了簡單的詞頻統計功能。

總結

Serverless 架構是適用於大數據處理的。在騰訊雲官網,咱們也能夠看到其關於數據 ETL 處理的場景描述:

本實例中,有一鍵部署多個函數的操做。在實際生產中,每一個項目都不會是單個函數單打獨鬥的,而是多個函數組合應用,造成一個 Service 體系,因此一鍵部署多個函數就顯得尤其重要。經過本實例,但願讀者能夠對 Serverless 架構的應用場景有更多的瞭解,而且能有所啓發,將雲函數和不一樣觸發器進行組合,應用在自身業務中。

Serverless Framework 30 天試用計劃

咱們誠邀您來體驗最便捷的 Serverless 開發和部署方式。在試用期內,相關聯的產品及服務均提供免費資源和專業的技術支持,幫助您的業務快速、便捷地實現 Serverless!

詳情可查閱:Serverless Framework 試用計劃

One More Thing

3 秒你能作什麼?喝一口水,看一封郵件,仍是 —— 部署一個完整的 Serverless 應用?

複製連接至 PC 瀏覽器訪問:https://serverless.cloud.tencent.com/deploy/express

3 秒極速部署,當即體驗史上最快的 Serverless HTTP 實戰開發!

傳送門:

歡迎訪問:Serverless 中文網,您能夠在 最佳實踐 裏體驗更多關於 Serverless 應用的開發!


推薦閱讀:《Serverless 架構:從原理、設計到項目實戰》

相關文章
相關標籤/搜索