使用日誌服務的Web-tracking、logtail(文件極簡)、syslog等收集上來的日誌常常存在各類各樣的格式,咱們須要針對特定的日誌(例如topic)進行必定的分發到特定的logstore中處理和索引,本文主要介紹如何使用消費組實時分發日誌到不通的目標日誌庫中。而且利用消費組的特定,達到自動平衡、負載均衡和高可用性。html
協同消費庫(Consumer Library)是對日誌服務中日誌進行消費的高級模式,提供了消費組(ConsumerGroup)的概念對消費端進行抽象和管理,和直接使用SDK進行數據讀取的區別在於,用戶無需關心日誌服務的實現細節,只須要專一於業務邏輯,另外,消費者之間的負載均衡、failover等用戶也都無需關心。python
消費組(Consumer Group) - 一個消費組由多個消費者構成,同一個消費組下面的消費者共同消費一個logstore中的數據,消費者之間不會重複消費數據。
消費者(Consumer) - 消費組的構成單元,實際承擔消費任務,同一個消費組下面的消費者名稱必須不一樣。git
在日誌服務中,一個logstore下面會有多個shard,協同消費庫的功能就是將shard分配給一個消費組下面的消費者,分配方式遵循如下原則:github
協同消費庫的另外一個功能是保存checkpoint,方便程序故障恢復時能接着從斷點繼續消費,從而保證數據不會被重複消費。sql
這裏咱們描述用Python使用消費組進行編程,實時根據數據的topic進行分發。
注意:本篇文章的相關代碼可能會更新,最新版本在這裏能夠找到:Github樣例.編程
環境跨域
pypy3 -m pip install aliyun-log-python-sdk -U
更多SLS Python SDK的使用手冊,能夠參考這裏服務器
以下展現如何配置程序:微信
請仔細閱讀代碼中相關注釋並根據須要調整選項:網絡
#encoding: utf8 def get_option(): ########################## # 基本選項 ########################## # 從環境變量中加載SLS參數與選項,根據須要能夠配置多個目標 accessKeyId = os.environ.get('SLS_AK_ID', '') accessKey = os.environ.get('SLS_AK_KEY', '') endpoint = os.environ.get('SLS_ENDPOINT', '') project = os.environ.get('SLS_PROJECT', '') logstore = os.environ.get('SLS_LOGSTORE', '') to_endpoint = os.environ.get('SLS_ENDPOINT_TO', endpoint) to_project = os.environ.get('SLS_PROJECT_TO', project) to_logstore1 = os.environ.get('SLS_LOGSTORE_TO1', '') to_logstore2 = os.environ.get('SLS_LOGSTORE_TO2', '') to_logstore3 = os.environ.get('SLS_LOGSTORE_TO3', '') consumer_group = os.environ.get('SLS_CG', '') # 消費的起點。這個參數在第一次跑程序的時候有效,後續再次運行將從上一次消費的保存點繼續。 # 能夠使」begin「,」end「,或者特定的ISO時間格式。 cursor_start_time = "2018-12-26 0:0:0" # 通常不要修改消費者名,尤爲是須要併發跑時 consumer_name = "{0}-{1}".format(consumer_group, current_process().pid) # 構建一個消費組和消費者 option = LogHubConfig(endpoint, accessKeyId, accessKey, project, logstore, consumer_group, consumer_name, cursor_position=CursorPosition.SPECIAL_TIMER_CURSOR, cursor_start_time=cursor_start_time) # bind put_log_raw which is faster to_client = LogClient(to_endpoint, accessKeyId, accessKey) put_method1 = partial(to_client.put_log_raw, project=to_project, logstore=to_logstore1) put_method2 = partial(to_client.put_log_raw, project=to_project, logstore=to_logstore2) put_method3 = partial(to_client.put_log_raw, project=to_project, logstore=to_logstore3) return option, {u'ngnix': put_method1, u'sql_audit': put_method2, u'click': put_method3}
注意,這裏使用了functools.partial
對put_log_raw
進行綁定,以便後續調用方便。
以下代碼展現如何從SLS拿到數據後根據topic
進行轉發。
if __name__ == '__main__': option, put_methods = get_copy_option() def copy_data(shard_id, log_groups): for log_group in log_groups.LogGroups: # update topic if log_group.Topic in put_methods: put_methods[log_group.Topic](log_group=log_group) logger.info("*** start to consume data...") worker = ConsumerWorker(ConsumerProcessorAdaptor, option, args=(copy_data, )) worker.start(join=True)
假設程序命名爲"dispatch_data.py",能夠以下啓動:
export SLS_ENDPOINT=<Endpoint of your region> export SLS_AK_ID=<YOUR AK ID> export SLS_AK_KEY=<YOUR AK KEY> export SLS_PROJECT=<SLS Project Name> export SLS_LOGSTORE=<SLS Logstore Name> export SLS_LOGSTORE_TO1=<SLS To Logstore1 Name> export SLS_LOGSTORE_TO1=<SLS To Logstore2 Name> export SLS_LOGSTORE_TO1=<SLS To Logstore3 Name> export SLS_CG=<消費組名,能夠簡單命名爲"dispatch_data"> pypy3 dispatch_data.py
基於消費組的程序能夠直接啓動屢次以便達到併發做用:
nohup pypy3 dispatch_data.py & nohup pypy3 dispatch_data.py & nohup pypy3 dispatch_data.py & ...
注意:
全部消費者使用了同一個消費組的名字和不一樣的消費者名字(由於消費者名以進程ID爲後綴)。
由於一個分區(Shard)只能被一個消費者消費,假設一個日誌庫有10個分區,那麼最多有10個消費者同時消費。
基於測試,在沒有帶寬限制、接收端速率限制(如Splunk端)的狀況下,以推動硬件用pypy3
運行上述樣例,單個消費者佔用大約10%的單核CPU
下能夠消費達到5 MB/s
原始日誌的速率。所以,理論上能夠達到50 MB/s
原始日誌每一個CPU核
,也就是每一個CPU核天天能夠消費4TB原始日誌
。
注意: 這個數據依賴帶寬、硬件參數和目標Logstore是否可以較快接收數據。
消費組會將檢測點(check-point)保存在服務器端,當一個消費者中止,另一個消費者將自動接管並從斷點繼續消費。
能夠在不一樣機器上啓動消費者,這樣當一臺機器中止或者損壞的清下,其餘機器上的消費者能夠自動接管並從斷點進行消費。
理論上,爲了備用,也能夠啓動大於shard數量的消費者。
每個日誌庫(logstore)最多能夠配置10個消費組,若是遇到錯誤ConsumerGroupQuotaExceed
則表示遇到限制,建議在控制檯端刪除一些不用的消費組。
若是服務入口(endpoint)配置爲https://
前綴,如https://cn-beijing.log.aliyuncs.com
,程序與SLS的鏈接將自動使用HTTPS加密。
服務器證書*.aliyuncs.com
是GlobalSign簽發,默認大多數Linux/Windows的機器會自動信任此證書。若是某些特殊狀況,機器不信任此證書,能夠參考這裏下載並安裝此證書。
原文連接 更多技術乾貨 請關注阿里云云棲社區微信號 :yunqiinsight