日誌服務Python消費組實戰(三):實時跨域監測多日誌庫數據

解決問題

使用日誌服務進行數據處理與傳遞的過程當中,你是否遇到以下監測場景不能很好的解決:html

  1. 特定數據上傳到日誌服務中須要檢查數據內的異常狀況,而沒有現成監控工具?
  2. 須要檢索數據裏面的關鍵字,但數據沒有創建索引,沒法使用日誌服務的告警功能?
  3. 數據監測要求實時性(<5秒,例如Web訪問500錯誤),而特定功能都有必定延遲(1分鐘以上)?
  4. 存在多個域的多個日誌庫(例如每一個Region的錯誤文件對應的日誌庫),數據量不大,但監控邏輯相似,每一個目標都要監控與配置,比較繁瑣?

若是是的,您能夠考慮使用日誌服務Python消費組進行跨域實時數據監控,本文主要介紹如何使用消費組實時監控多個域中的多個日誌庫中的異常數據,並進行下一步告警動做。能夠很好解決以上問題,並利用消費組的特色,達到自動平衡、負載均衡和高可用性。python

image

基本概念

協同消費庫(Consumer Library)是對日誌服務中日誌進行消費的高級模式,提供了消費組(ConsumerGroup)的概念對消費端進行抽象和管理,和直接使用SDK進行數據讀取的區別在於,用戶無需關心日誌服務的實現細節,只須要專一於業務邏輯,另外,消費者之間的負載均衡、failover等用戶也都無需關心。git

消費組(Consumer Group) - 一個消費組由多個消費者構成,同一個消費組下面的消費者共同消費一個logstore中的數據,消費者之間不會重複消費數據。
消費者(Consumer) - 消費組的構成單元,實際承擔消費任務,同一個消費組下面的消費者名稱必須不一樣。github

在日誌服務中,一個logstore下面會有多個shard,協同消費庫的功能就是將shard分配給一個消費組下面的消費者,分配方式遵循如下原則:編程

  • 每一個shard只會分配到一個消費者。
  • 一個消費者能夠同時擁有多個shard。
    新的消費者加入一個消費組,這個消費組下面的shard從屬關係會調整,以達到消費負載均衡的目的,可是上面的分配原則不會變,分配過程對用戶透明。

協同消費庫的另外一個功能是保存checkpoint,方便程序故障恢復時能接着從斷點繼續消費,從而保證數據不會被重複消費。跨域

使用消費組進行實時分發

這裏咱們描述用Python使用消費組進行編程,實時跨域監測多個域的多個日誌庫,全文或特定字段檢查
注意:本篇文章的相關代碼可能會更新,最新版本在這裏能夠找到:Github樣例.服務器

安裝

環境微信

  1. 建議程序運行在靠近源日誌庫同Region下的ECS上,並使用局域網服務入口,這樣好處是網絡速度最快,其次是讀取沒有外網費用產生。
  2. 強烈推薦PyPy3來運行本程序,而不是使用標準CPython解釋器。
  3. 日誌服務的Python SDK能夠以下安裝:
pypy3 -m pip install aliyun-log-python-sdk -U

更多SLS Python SDK的使用手冊,能夠參考這裏網絡

程序配置

以下展現如何配置程序:併發

  1. 配置程序日誌文件,以便後續測試或者診斷可能的問題(跳過,具體參考樣例)。
  2. 基本的日誌服務鏈接與消費組的配置選項。
  3. 目標Logstore的一些鏈接信息

請仔細閱讀代碼中相關注釋並根據須要調整選項:

#encoding: utf8
def get_option():
    ##########################
    # 基本選項
    ##########################

    # 從環境變量中加載SLS參數與選項,endpoint、project、logstore能夠多個並配對
    endpoints = os.environ.get('SLS_ENDPOINTS', '').split(";")  # ;分隔
    projects = os.environ.get('SLS_PROJECTS', '').split(";")    # ;分隔
    logstores = os.environ.get('SLS_LOGSTORES', '').split(";")  # ;分隔,同一個Project下的用,分隔
    accessKeyId = os.environ.get('SLS_AK_ID', '')
    accessKey = os.environ.get('SLS_AK_KEY', '')
    consumer_group = os.environ.get('SLS_CG', '')

    # 消費的起點。這個參數在第一次跑程序的時候有效,後續再次運行將從上一次消費的保存點繼續。
    # 可使」begin「,」end「,或者特定的ISO時間格式。
    cursor_start_time = "2018-12-26 0:0:0"

    # 通常不要修改消費者名,尤爲是須要併發跑時
    consumer_name = "{0}-{1}".format(consumer_group, current_process().pid)

      # 設定共享執行器
    exeuctor = ThreadPoolExecutor(max_workers=2)

    # 構建多個消費組(每一個logstore一個)
    options = []
    for i in range(len(endpoints)):
        endpoint = endpoints[i].strip()
        project = projects[i].strip()
        if not endpoint or not project:
            logger.error("project: {0} or endpoint {1} is empty, skip".format(project, endpoint))
            continue

        logstore_list = logstores[i].split(",")
        for logstore in logstore_list:
            logstore = logstore.strip()
            if not logstore:
                logger.error("logstore for project: {0} or endpoint {1} is empty, skip".format(project, endpoint))
                continue

            option = LogHubConfig(endpoint, accessKeyId, accessKey, project, logstore, consumer_group,
                                  consumer_name, cursor_position=CursorPosition.SPECIAL_TIMER_CURSOR,
                                  cursor_start_time=cursor_start_time, shared_executor=exeuctor)
            options.append(option)

    # 設定檢測目標字段與目標值,例如這裏是檢測status字段是否有500等錯誤
    keywords = {'status': r'5\d{2}'}

    return exeuctor, options, keywords

注意,配置了多個endpoint、project、logstore,須要用分號分隔,而且一一對應;若是一個project下有多個logstore須要檢測,能夠將他們直接用逗號分隔。以下是一個檢測3個Region下的4個Logstore的配置:

export SLS_ENDPOINTS=cn-hangzhou.log.aliyuncs.com;cn-beijing.log.aliyuncs.com;cn-qingdao.log.aliyuncs.com
export SLS_PROJECTS=project1;project2;project3
export SLS_LOGSTORES=logstore1;logstore2;logstore3_1,logstore3_2

數據監測

以下代碼展現如何構建一個關鍵字檢測器,針對數據中的目標字段進行檢測,您也能夠修改邏輯設定爲符合須要的場景(例如多個字段的組合關係等)。

class KeywordMonitor(ConsumerProcessorBase):
    """
    this consumer will keep monitor with k-v fields. like {"content": "error"}
    """
    def __init__(self, keywords=None, logstore=None):
        super(KeywordMonitor, self).__init__()  # remember to call base init

        self.keywords = keywords
        self.kw_check = {}
        for k, v in self.keywords.items():
            self.kw_check[k] = re.compile(v)
        self.logstore = logstore

    def process(self, log_groups, check_point_tracker):
        logs = PullLogResponse.loggroups_to_flattern_list(log_groups)
        match_count = 0
        sample_error_log = ""
        for log in logs:
            m = None
            for k, c in self.kw_check.items():
                if k in log:
                    m = c.search(log[k])
                    if m:
                        logger.debug('Keyword detected for shard "{0}" with keyword: "{1}" in field "{2}", log: {3}'
                                    .format(self.shard_id, log[k], k, log))
            if m:
                match_count += 1
                sample_error_log = log

        if match_count:
            logger.info("Keyword detected for shard {0}, count: {1}, example: {2}".format(self.shard_id, match_count, sample_error_log))
                    
            # TODO: 這裏添加通知下游的代碼
    
        else:
            logger.debug("No keyword detected for shard {0}".format(self.shard_id))

        self.save_checkpoint(check_point_tracker)

控制邏輯

以下展現如何控制多個消費者,並管理退出命令:

def main():
    exeuctor, options, keywords = get_monitor_option()

    logger.info("*** start to consume data...")
    workers = []

    for option in options:
        worker = ConsumerWorker(KeywordMonitor, option, args=(keywords,) )
        workers.append(worker)
        worker.start()

    try:
        for i, worker in enumerate(workers):
            while worker.is_alive():
                worker.join(timeout=60)
            logger.info("worker project: {0} logstore: {1} exit unexpected, try to shutdown it".format(
                options[i].project, options[i].logstore))
            worker.shutdown()
    except KeyboardInterrupt:
        logger.info("*** try to exit **** ")
        for worker in workers:
            worker.shutdown()

        # wait for all workers to shutdown before shutting down executor
        for worker in workers:
            while worker.is_alive():
                worker.join(timeout=60)

    exeuctor.shutdown()


if __name__ == '__main__':
    main()

啓動

假設程序命名爲"monitor_keyword.py",能夠以下啓動:

export SLS_ENDPOINTS=cn-hangzhou.log.aliyuncs.com;cn-beijing.log.aliyuncs.com;cn-qingdao.log.aliyuncs.com
export SLS_PROJECTS=project1;project2;project3
export SLS_LOGSTORES=logstore1;logstore2;logstore3_1,logstore3_2

export SLS_AK_ID=<YOUR AK ID>
export SLS_AK_KEY=<YOUR AK KEY>
export SLS_CG=<消費組名,能夠簡單命名爲"dispatch_data">

pypy3 monitor_keyword.py

性能考慮

啓動多個消費者

若是您的目標logstore存在多個shard,或者您的目標監測日誌庫較多,您能夠進行必定劃分並並啓動屢次程序:

# export SLS_ENDPOINTS, SLS_PROJECTS, SLS_LOGSTORES
nohup pypy3 dispatch_data.py &

# export SLS_ENDPOINTS, SLS_PROJECTS, SLS_LOGSTORES
nohup pypy3 dispatch_data.py &

# export SLS_ENDPOINTS, SLS_PROJECTS, SLS_LOGSTORES
nohup pypy3 dispatch_data.py &
...

注意:
全部消費者使用了同一個消費組的名字和不一樣的消費者名字(由於消費者名以進程ID爲後綴)。
但數據量較大或者目標日誌庫較多時,單個消費者的速度可能沒法知足需求,且由於Python的GIL的緣由,只能用到一個CPU核。強烈建議您根據目標日誌庫的Shard數以及CPU的數量進行劃分,啓動屢次以便重複利用CPU資源。

性能吞吐

基於測試,在沒有帶寬限制、接收端速率限制(如Splunk端)的狀況下,以推動硬件用pypy3運行上述樣例,單個消費者佔用大約10%的單核CPU下能夠消費達到5 MB/s原始日誌的速率。所以,理論上能夠達到50 MB/s原始日誌每一個CPU核,也就是每一個CPU核天天能夠消費4TB原始日誌

注意: 這個數據依賴帶寬、硬件參數等。

高可用性

消費組會將檢測點(check-point)保存在服務器端,當一個消費者中止,另一個消費者將自動接管並從斷點繼續消費。

能夠在不一樣機器上啓動消費者,這樣當一臺機器中止或者損壞的清下,其餘機器上的消費者能夠自動接管並從斷點進行消費。

理論上,爲了備用,也能夠啓動大於shard數量的消費者。

其餘

限制與約束

每個日誌庫(logstore)最多能夠配置10個消費組,若是遇到錯誤ConsumerGroupQuotaExceed則表示遇到限制,建議在控制檯端刪除一些不用的消費組。

監測

Https

若是服務入口(endpoint)配置爲https://前綴,如https://cn-beijing.log.aliyuncs.com,程序與SLS的鏈接將自動使用HTTPS加密。

服務器證書*.aliyuncs.com是GlobalSign簽發,默認大多數Linux/Windows的機器會自動信任此證書。若是某些特殊狀況,機器不信任此證書,能夠參考這裏下載並安裝此證書。

更多案例

 

原文連接 更多技術乾貨 請關注阿里云云棲社區微信號 :yunqiinsight

相關文章
相關標籤/搜索