記一次偶發的bug排查——redis-py-cluster庫的bug

 排查流水帳:
  1. 經過平臺監控,發現不少偶發的查看推薦列表的接口時延大於0.5s
  2. 寫單元測試,不能重現。在測試環境不能重現。只有在正式環境能夠偶發重現。
  3. 經過日誌埋點,等待重現
  4. 不斷地加日誌埋點後發現耗時在redis的hmget操做
    1. 這時猜測緣由
      1. hmget命令纔會有,會不會是hmget命令的問題
      2. 查看redis的慢查詢日誌,發現沒有慢查詢。排除是Redis執行慢的緣由
      3. 查看當時的負載狀況,負載很低,併發也很少。因此排除是Redis的命令等待緣由
      4. 多協程下,20條hmget,不會所有都卡,只會卡幾條,後面又會正常
      5. 正常hmget的用時是0.01s左右,卡的時候須要0.3-0.9s
      6. 本身寫個腳本,不斷地用多協程去執行hmget,不能重現。
    2. 猜測解決方案:
      1. 修改多協程池的數量,從20改成3
      2. 獲取用戶信息改成串行
  5. 繼續往hmget命令裏面的代碼加埋點日誌
    1. 因爲是修改第三方庫,因此要格外當心
    2. 經過閱讀源碼,發現hmget的底層流程是rediscluster模塊裏面的client文件裏面的execute_command函數
    3. 修改後,測試環境之下單元測試沒問題後,部署到正式環境
  6. 最後定位到是在裏面的self.connection_pool.nodes.initialize()這行代碼耗時
  7. 只有當refresh_table_asap=true纔會執行這行代碼,因此解決思路
    1. 爲何refresh_table_asap會等於true
      1. 發現只有當鏈接redis的時候報錯ClusterDownError或者MovedError纔會設置refresh_table_asap=True
      2. 經過日誌埋點。發現是MovedError異常致使的。
      3. 繼續增長日誌埋點,發現整個觸發的流程是:
        1. 觸發異常ConnectionError
        2. 設置try_random_node=True
        3. 下一次隨機拿一個節點,這時候可能拿到slot不對的節點
        4. 鏈接節點後,會報異常MovedError,並把目標節點的信息返回來,同時設置refresh_table_asap=True。
        5. 這時會把slot對應的節點設置爲返回來的節點信息
        6. 從新鏈接節點,執行命令成功
        7. 可是這時候已經設置了refresh_table_asap=True,執行下一個命令的時候,就會執行self.connection_pool.nodes.initialize()
        8. 因爲使用了多協程,並且self.connection_pool.nodes.initialize()命令沒有加鎖,因此會致使這個耗時加重
      4. 經過print traceback,看看爲何會觸發ConnectionError異常,發現是redis服務端斷開了鏈接。
      5. 這時候回想到redis有機制,超過必定時間沒有命令過來,就會關閉鏈接。在redis的timeout 配置,通常是300s。因此這樣解釋了爲何這個是偶發的。
      6. 寫單元測試,創建鏈接後,等待350s再執行命令,穩定重現bug。
    2. 爲何initialize耗時這麼慢
      1. 經過單元測試,發現initialize命令並不慢,大於0.04s左右就能完成,可是多協程下是0.5s左右。
      2. 因此考慮是多協程下,由於沒有鎖,因此多個協程都執行了這條命令,致使最終的用時是原來的10倍
  8. 修改測試環境redis的timeout=5s,寫個測試用例,在測試環境能夠穩定重現。
  9. 因此定位到rediscluster有問題,解決思路
    1. 不要在多協程執行redis命令(感受很差)
    2. 升級庫,看能不能解決。查看這個庫的git地址(https://github.com/Grokzen/redis-py-cluster)的最新版本,問題依然存在。
    3. catchConnectionError異常的時候,區分是否服務端斷開鏈接,若是是,不設置try_random_node=True,重試
    4. init的時候加鎖
    5. 參考redis.py的作法,在catch服務端斷開鏈接異常後,從新鏈接後重試
  10. 最後選用了思路5。
execute_command函數(包含埋點日誌,去除沒必要要的代碼段),在rediscluster庫的client.py
@clusterdown_wrapper
def execute_command(self, *args, **kwargs):
    """
    Send a command to a node in the cluster
    """
    import logging
    log=logging.getLogger('service.log')
    log.error(u'redis execute_command 1 %s ' % str(args))
 
    # If set externally we must update it before calling any commands
    if self.refresh_table_asap:  #執行self.connection_pool.nodes.initialize()的代碼段
        log.error(u'redis execute_command 2 %s ' % str(args))
        self.connection_pool.nodes.initialize()
        log.error(u'redis execute_command 3 %s ' % str(args))
        self.refresh_table_asap = False
    log.error(u'redis execute_command 4 %s ' % str(args))
 
    redirect_addr = None
    asking = False
 
    try_random_node = False
    log.error(u'redis execute_command 7 %s ' % str(args))
    slot = self._determine_slot(*args)
    log.error(u'redis execute_command 8 %s ' % str(args))
    ttl = int(self.RedisClusterRequestTTL)
 
    while ttl > 0:
        ttl -= 1
 
 
        if asking:
            node = self.connection_pool.nodes.nodes[redirect_addr]
            r = self.connection_pool.get_connection_by_node(node)
        elif try_random_node:
            r = self.connection_pool.get_random_connection()
            try_random_node = False
        else:
            if self.refresh_table_asap:
                # MOVED
                node = self.connection_pool.get_master_node_by_slot(slot)
            else:
                node = self.connection_pool.get_node_by_slot(slot)
            r = self.connection_pool.get_connection_by_node(node)
 
        try:
              r.send_command(*args)
              log.error(u'redis execute_command 10 %s ' % str(args))
              ret= self.parse_response(r, command, **kwargs)
              log.error(u'redis execute_command 11 %s ' % str(args))
              return ret
 
        except (RedisClusterException, BusyLoadingError):
            raise
        except (ConnectionError, TimeoutError):
            try_random_node = True
            log.error(u'redis execute_command 14 %s ' % str(args))
            if ttl < self.RedisClusterRequestTTL / 2:
                log.error(u'redis execute_command 15 %s ' % str(args))
                time.sleep(0.1)
        except ClusterDownError as e:
            log.error(u'redis execute_command 17 %s ' % str(args))
            self.connection_pool.disconnect()
            self.connection_pool.reset()
            self.refresh_table_asap = True
            raise e
        except MovedError as e:
            # Reinitialize on ever x number of MovedError.
            # This counter will increase faster when the same client object
            # is shared between multiple threads. To reduce the frequency you
            # can set the variable 'reinitialize_steps' in the constructor.
            import traceback
            print traceback.format_exc()
            log.error(u'redis execute_command 16 %s ' % str(args))
            self.refresh_table_asap = True
            self.connection_pool.nodes.increment_reinitialize_counter()
 
 
            node = self.connection_pool.nodes.set_node(e.host, e.port, server_type='master')
            self.connection_pool.nodes.slots[e.slot_id][0] = node
 

 

優化:node

              r.send_command(*args)
              ret= self.parse_response(r, command, **kwargs)
              return ret

改成python

try:
    r.send_command(*args)
    return self.parse_response(r, command, **kwargs)
except ConnectionError as e:
    from redis.connection import SERVER_CLOSED_CONNECTION_ERROR
    if SERVER_CLOSED_CONNECTION_ERROR in e.message:
        r.disconnect()
        r.send_command(*args)
        return self.parse_response(r, command, **kwargs)
    else:
        raise
 

 未經許可,請不要轉載git

相關文章
相關標籤/搜索