Python ES日誌庫CMRESHandler修復[[types removal] Specifying types in bulk requests is deprecated.]警告

背景

Elasticsearch版本前兩天升級到了7.x,每次打印日誌都提示了[[types removal] Specifying types in bulk requests is deprecated.]警告,網上查了一通,發現是7.x版本後,類型已經棄用,在CMRESHandler的Issues中看到了遇到一樣問題的朋友,並向做者提交了一個PR,目前仍是未合併狀態,因此本身先本地進行重寫,後續等做者合併代碼併發布最新版本後,再使用原生的python

Issues連接

https://github.com/cmanaha/python-elasticsearch-logger/issues/76git

PR連接

https://github.com/cmanaha/python-elasticsearch-logger/pull/79github

本地重寫方案

from elasticsearch import helpers as eshelpers
from elasticsearch import Elasticsearch, RequestsHttpConnection
try:
    from requests_kerberos import HTTPKerberosAuth, DISABLED
    CMR_KERBEROS_SUPPORTED = True
except ImportError:
    CMR_KERBEROS_SUPPORTED = False

try:
    from requests_aws4auth import AWS4Auth
    AWS4AUTH_SUPPORTED = True
except ImportError:
    AWS4AUTH_SUPPORTED = False

class PrivateCMRESHandler(CMRESHandler):
    '''
    重寫CMRESHandler下的__get_es_client方法和flush方法,不指定es_doc_type,修復[[types removal] Specifying types in bulk requests is deprecated.]警告
    '''
    def __get_es_client(self):
        if self.auth_type == PrivateCMRESHandler.AuthType.NO_AUTH:
            if self._client is None:
                self._client = Elasticsearch(hosts=self.hosts,
                                             use_ssl=self.use_ssl,
                                             verify_certs=self.verify_certs,
                                             connection_class=RequestsHttpConnection,
                                             serializer=self.serializer)
            return self._client

        if self.auth_type == PrivateCMRESHandler.AuthType.BASIC_AUTH:
            if self._client is None:
                return Elasticsearch(hosts=self.hosts,
                                     http_auth=self.auth_details,
                                     use_ssl=self.use_ssl,
                                     verify_certs=self.verify_certs,
                                     connection_class=RequestsHttpConnection,
                                     serializer=self.serializer)
            return self._client

        if self.auth_type == PrivateCMRESHandler.AuthType.KERBEROS_AUTH:
            if not CMR_KERBEROS_SUPPORTED:
                raise EnvironmentError("Kerberos module not available. Please install \"requests-kerberos\"")
            # For kerberos we return a new client each time to make sure the tokens are up to date
            return Elasticsearch(hosts=self.hosts,
                                 use_ssl=self.use_ssl,
                                 verify_certs=self.verify_certs,
                                 connection_class=RequestsHttpConnection,
                                 http_auth=HTTPKerberosAuth(mutual_authentication=DISABLED),
                                 serializer=self.serializer)

        if self.auth_type == PrivateCMRESHandler.AuthType.AWS_SIGNED_AUTH:
            if not AWS4AUTH_SUPPORTED:
                raise EnvironmentError("AWS4Auth not available. Please install \"requests-aws4auth\"")
            if self._client is None:
                awsauth = AWS4Auth(self.aws_access_key, self.aws_secret_key, self.aws_region, 'es')
                self._client = Elasticsearch(
                    hosts=self.hosts,
                    http_auth=awsauth,
                    use_ssl=self.use_ssl,
                    verify_certs=True,
                    connection_class=RequestsHttpConnection,
                    serializer=self.serializer
                )
            return self._client

        raise ValueError("Authentication method not supported")

    def flush(self):
        """ Flushes the buffer into ES
        :return: None
        """
        if self._timer is not None and self._timer.is_alive():
            self._timer.cancel()
        self._timer = None

        if self._buffer:
            try:
                with self._buffer_lock:
                    logs_buffer = self._buffer
                    self._buffer = []
                actions = (
                    {
                        '_index': self._index_name_func.__func__(self.es_index_name),
                        '_source': log_record
                    }
                    for log_record in logs_buffer
                )
                eshelpers.bulk(
                    client=self.__get_es_client(),
                    actions=actions,
                    stats_only=True
                )
            except Exception as exception:
                if self.raise_on_indexing_exceptions:
                    raise exception

調用

# 添加 CMRESHandler
es_handler = PrivateCMRESHandler(hosts=[{'host': self.ELASTIC_SEARCH_HOST, 'port': self.ELASTIC_SEARCH_PORT}],
						  # 能夠配置對應的認證權限
						  auth_type=PrivateCMRESHandler.AuthType.BASIC_AUTH,
						  auth_details=self.AUTH_DETAILS,
						  es_index_name=self.ELASTIC_SEARCH_INDEX,
						  # 一個月分一個 Index
						  index_name_frequency=PrivateCMRESHandler.IndexNameFrequency.MONTHLY,
						  # 額外增長環境標識
						  es_additional_fields={'environment': self.APP_ENVIRONMENT}
						  )
es_handler.setLevel(level=self.es_output_level)
formatter = self.formatter
es_handler.setFormatter(formatter)
self.logger.addHandler(es_handler)

另外還找到一個已修復該問題的第三方庫

連接

https://github.com/IMInterne/python-elasticsearch-ecs-logger併發

安裝

pip install ElasticECSHandler
相關文章
相關標籤/搜索