使用日誌服務進行數據處理與傳遞的過程當中,你是否遇到以下監測場景不能很好的解決:html
若是是的,您能夠考慮使用日誌服務Python消費組進行跨域實時數據監控,本文主要介紹如何使用消費組實時監控多個域中的多個日誌庫中的異常數據,並進行下一步告警動做。能夠很好解決以上問題,並利用消費組的特色,達到自動平衡、負載均衡和高可用性。python
協同消費庫(Consumer Library)是對日誌服務中日誌進行消費的高級模式,提供了消費組(ConsumerGroup)的概念對消費端進行抽象和管理,和直接使用SDK進行數據讀取的區別在於,用戶無需關心日誌服務的實現細節,只須要專一於業務邏輯,另外,消費者之間的負載均衡、failover等用戶也都無需關心。git
消費組(Consumer Group) - 一個消費組由多個消費者構成,同一個消費組下面的消費者共同消費一個logstore中的數據,消費者之間不會重複消費數據。
消費者(Consumer) - 消費組的構成單元,實際承擔消費任務,同一個消費組下面的消費者名稱必須不一樣。github
在日誌服務中,一個logstore下面會有多個shard,協同消費庫的功能就是將shard分配給一個消費組下面的消費者,分配方式遵循如下原則:編程
協同消費庫的另外一個功能是保存checkpoint,方便程序故障恢復時能接着從斷點繼續消費,從而保證數據不會被重複消費。跨域
這裏咱們描述用Python使用消費組進行編程,實時跨域監測多個域的多個日誌庫,全文或特定字段檢查
注意:本篇文章的相關代碼可能會更新,最新版本在這裏能夠找到:Github樣例.服務器
環境微信
pypy3 -m pip install aliyun-log-python-sdk -U
更多SLS Python SDK的使用手冊,能夠參考這裏網絡
以下展現如何配置程序:併發
請仔細閱讀代碼中相關注釋並根據須要調整選項:
#encoding: utf8 def get_option(): ########################## # 基本選項 ########################## # 從環境變量中加載SLS參數與選項,endpoint、project、logstore能夠多個並配對 endpoints = os.environ.get('SLS_ENDPOINTS', '').split(";") # ;分隔 projects = os.environ.get('SLS_PROJECTS', '').split(";") # ;分隔 logstores = os.environ.get('SLS_LOGSTORES', '').split(";") # ;分隔,同一個Project下的用,分隔 accessKeyId = os.environ.get('SLS_AK_ID', '') accessKey = os.environ.get('SLS_AK_KEY', '') consumer_group = os.environ.get('SLS_CG', '') # 消費的起點。這個參數在第一次跑程序的時候有效,後續再次運行將從上一次消費的保存點繼續。 # 可使」begin「,」end「,或者特定的ISO時間格式。 cursor_start_time = "2018-12-26 0:0:0" # 通常不要修改消費者名,尤爲是須要併發跑時 consumer_name = "{0}-{1}".format(consumer_group, current_process().pid) # 設定共享執行器 exeuctor = ThreadPoolExecutor(max_workers=2) # 構建多個消費組(每一個logstore一個) options = [] for i in range(len(endpoints)): endpoint = endpoints[i].strip() project = projects[i].strip() if not endpoint or not project: logger.error("project: {0} or endpoint {1} is empty, skip".format(project, endpoint)) continue logstore_list = logstores[i].split(",") for logstore in logstore_list: logstore = logstore.strip() if not logstore: logger.error("logstore for project: {0} or endpoint {1} is empty, skip".format(project, endpoint)) continue option = LogHubConfig(endpoint, accessKeyId, accessKey, project, logstore, consumer_group, consumer_name, cursor_position=CursorPosition.SPECIAL_TIMER_CURSOR, cursor_start_time=cursor_start_time, shared_executor=exeuctor) options.append(option) # 設定檢測目標字段與目標值,例如這裏是檢測status字段是否有500等錯誤 keywords = {'status': r'5\d{2}'} return exeuctor, options, keywords
注意,配置了多個endpoint、project、logstore,須要用分號分隔,而且一一對應;若是一個project下有多個logstore須要檢測,能夠將他們直接用逗號分隔。以下是一個檢測3個Region下的4個Logstore的配置:
export SLS_ENDPOINTS=cn-hangzhou.log.aliyuncs.com;cn-beijing.log.aliyuncs.com;cn-qingdao.log.aliyuncs.com export SLS_PROJECTS=project1;project2;project3 export SLS_LOGSTORES=logstore1;logstore2;logstore3_1,logstore3_2
以下代碼展現如何構建一個關鍵字檢測器,針對數據中的目標字段進行檢測,您也能夠修改邏輯設定爲符合須要的場景(例如多個字段的組合關係等)。
class KeywordMonitor(ConsumerProcessorBase): """ this consumer will keep monitor with k-v fields. like {"content": "error"} """ def __init__(self, keywords=None, logstore=None): super(KeywordMonitor, self).__init__() # remember to call base init self.keywords = keywords self.kw_check = {} for k, v in self.keywords.items(): self.kw_check[k] = re.compile(v) self.logstore = logstore def process(self, log_groups, check_point_tracker): logs = PullLogResponse.loggroups_to_flattern_list(log_groups) match_count = 0 sample_error_log = "" for log in logs: m = None for k, c in self.kw_check.items(): if k in log: m = c.search(log[k]) if m: logger.debug('Keyword detected for shard "{0}" with keyword: "{1}" in field "{2}", log: {3}' .format(self.shard_id, log[k], k, log)) if m: match_count += 1 sample_error_log = log if match_count: logger.info("Keyword detected for shard {0}, count: {1}, example: {2}".format(self.shard_id, match_count, sample_error_log)) # TODO: 這裏添加通知下游的代碼 else: logger.debug("No keyword detected for shard {0}".format(self.shard_id)) self.save_checkpoint(check_point_tracker)
以下展現如何控制多個消費者,並管理退出命令:
def main(): exeuctor, options, keywords = get_monitor_option() logger.info("*** start to consume data...") workers = [] for option in options: worker = ConsumerWorker(KeywordMonitor, option, args=(keywords,) ) workers.append(worker) worker.start() try: for i, worker in enumerate(workers): while worker.is_alive(): worker.join(timeout=60) logger.info("worker project: {0} logstore: {1} exit unexpected, try to shutdown it".format( options[i].project, options[i].logstore)) worker.shutdown() except KeyboardInterrupt: logger.info("*** try to exit **** ") for worker in workers: worker.shutdown() # wait for all workers to shutdown before shutting down executor for worker in workers: while worker.is_alive(): worker.join(timeout=60) exeuctor.shutdown() if __name__ == '__main__': main()
假設程序命名爲"monitor_keyword.py",能夠以下啓動:
export SLS_ENDPOINTS=cn-hangzhou.log.aliyuncs.com;cn-beijing.log.aliyuncs.com;cn-qingdao.log.aliyuncs.com export SLS_PROJECTS=project1;project2;project3 export SLS_LOGSTORES=logstore1;logstore2;logstore3_1,logstore3_2 export SLS_AK_ID=<YOUR AK ID> export SLS_AK_KEY=<YOUR AK KEY> export SLS_CG=<消費組名,能夠簡單命名爲"dispatch_data"> pypy3 monitor_keyword.py
若是您的目標logstore存在多個shard,或者您的目標監測日誌庫較多,您能夠進行必定劃分並並啓動屢次程序:
# export SLS_ENDPOINTS, SLS_PROJECTS, SLS_LOGSTORES nohup pypy3 dispatch_data.py & # export SLS_ENDPOINTS, SLS_PROJECTS, SLS_LOGSTORES nohup pypy3 dispatch_data.py & # export SLS_ENDPOINTS, SLS_PROJECTS, SLS_LOGSTORES nohup pypy3 dispatch_data.py & ...
注意:
全部消費者使用了同一個消費組的名字和不一樣的消費者名字(由於消費者名以進程ID爲後綴)。
但數據量較大或者目標日誌庫較多時,單個消費者的速度可能沒法知足需求,且由於Python的GIL的緣由,只能用到一個CPU核。強烈建議您根據目標日誌庫的Shard數以及CPU的數量進行劃分,啓動屢次以便重複利用CPU資源。
基於測試,在沒有帶寬限制、接收端速率限制(如Splunk端)的狀況下,以推動硬件用pypy3
運行上述樣例,單個消費者佔用大約10%的單核CPU
下能夠消費達到5 MB/s
原始日誌的速率。所以,理論上能夠達到50 MB/s
原始日誌每一個CPU核
,也就是每一個CPU核天天能夠消費4TB原始日誌
。
注意: 這個數據依賴帶寬、硬件參數等。
消費組會將檢測點(check-point)保存在服務器端,當一個消費者中止,另一個消費者將自動接管並從斷點繼續消費。
能夠在不一樣機器上啓動消費者,這樣當一臺機器中止或者損壞的清下,其餘機器上的消費者能夠自動接管並從斷點進行消費。
理論上,爲了備用,也能夠啓動大於shard數量的消費者。
每個日誌庫(logstore)最多能夠配置10個消費組,若是遇到錯誤ConsumerGroupQuotaExceed
則表示遇到限制,建議在控制檯端刪除一些不用的消費組。
若是服務入口(endpoint)配置爲https://
前綴,如https://cn-beijing.log.aliyuncs.com
,程序與SLS的鏈接將自動使用HTTPS加密。
服務器證書*.aliyuncs.com
是GlobalSign簽發,默認大多數Linux/Windows的機器會自動信任此證書。若是某些特殊狀況,機器不信任此證書,能夠參考這裏下載並安裝此證書。
原文連接 更多技術乾貨 請關注阿里云云棲社區微信號 :yunqiinsight