從PyMongo看MongoDB Read Preference

  在CAP理論與MongoDB一致性、可用性的一些思考一文中提到,MongoDB提供了一些選項,如Read Preference、Read Concern、Write Concern,對MongoDB的一致性、可用性、可靠性(durability)、性能會有較大的影響。與Read Concern、Write Concern不一樣的是,Read Preference基本上徹底由MongoDb Driver實現,所以,本文經過PyMongo來看看Read Preference具體是如何實現的。html

  本文分析的PyMongo版本是PyMongo3.6,該版本兼容MongoDB3.6及如下的MongoDB。node

  本文地址:http://www.javashuo.com/article/p-cwetbpre-bb.htmlpython

Read Preference

Read preference describes how MongoDB clients route read operations to the members of a replica set.git

  Read Prefenrece決定了使用複製集(replica set)時,讀操做路由到哪一個mongod節點,若是使用Sharded Cluster,路由選擇由Mongos決定,若是直接使用replica set,那麼路由選擇由driver決定。以下圖所示:github

  MongoDB提供瞭如下Read Preference Mode:mongodb

  • primary:默認模式,一切讀操做都路由到replica set的primary節點
  • primaryPreferred:正常狀況下都是路由到primary節點,只有當primary節點不可用(failover)的時候,才路由到secondary節點。
  • secondary:一切讀操做都路由到replica set的secondary節點
  • secondaryPreferred:正常狀況下都是路由到secondary節點,只有當secondary節點不可用的時候,才路由到primary節點。
  • nearest:從延時最小的節點讀取數據,不論是primary仍是secondary。對於分佈式應用且MongoDB是多數據中心部署,nearest能保證最好的data locality。

  這五種模式還受到maxStalenessSecondstagsets的影響。數據庫

  不一樣的read Preference mode適合不一樣的應用場景,若是數據的一致性很重要,好比必須保證read-after-write一致性,那麼就須要從primary讀,由於secondary的數據有必定的滯後。若是能接受必定程度的stale data,那麼從secondary讀數據能夠減輕primary的壓力,且在primary failover期間也能提供服務,可用性更高。若是對延時敏感,那麼適合nearest。另外,經過tagsets,還能夠有更豐富的定製化讀取策略,好比指定從某些datacenter讀取。api

PyMongo

  首先給出pymongo中與read preference相關的類,方便後面的分析。網絡

  上圖中實線箭頭表示強引用(複合),虛線箭頭表示弱引用(聚合)app

connect to replica set

  PyMongo的文檔給出瞭如何鏈接到複製集:指定複製集的名字,以及一個或多個該複製集內的節點。如:

MongoClient('localhost', replicaset='foo')

  上述操做是non-blocking,當即返回,經過後臺線程去鏈接指定節點,PyMongo鏈接到節點後,會從mongod節點獲取到複製集內其餘節點的信息,而後再鏈接到複製集內的其餘節點。

from time import sleep
c = MongoClient('localhost', replicaset='foo'); print(c.nodes); sleep(0.1); print(c.nodes)
frozenset([])
frozenset([(u'localhost', 27019), (u'localhost', 27017), (u'localhost', 27018)])

  能夠看到,剛初始化MongoClient實例時,並無鏈接到任何節點(c.nodes)爲空;過了一段時間,再查看,那麼會發現已經連上了複製集內的三個節點。

  那麼問題來了,建立MongoClient後,還沒有鏈接到複製集節點以前,可否當即操做數據庫?

If you need to do any operation with a MongoClient, such as a find() or an insert_one(), the client waits to discover a suitable member before it attempts the operation.

  經過後續的代碼分析能夠看到,會經過一個條件變量(threading.Condition)去協調。

PyMongo Monitor

  上面提到,初始化MongoClient對象的時候,會經過指定的mognod節點去發現複製集內的其餘節點,這個就是經過monitor.Monitor來實現的。從上面的類圖能夠看到,每個server(與一個mongod節點對應)都有一個monitor。Monitor的做用在於:

  • Health: detect when a member goes down or comes up, or if a different member becomes primary
  • Configuration: detect when members are added or removed, and detect changes in members’ tags
  • Latency: track a moving average of each member’s ping time

  Monitor會啓動一個後臺線程 PeriodExecutor,定時(默認10s)經過socket鏈接Pool給對應的mongod節點發送 ismaster 消息。核心代碼(略做調整)以下

def _run(self):
    self._server_description = self._check_with_retry()
    self._topology.on_change(self._server_description)

def _check_with_retry(self):
    address = self._server_description.address
    response, round_trip_time = self._check_with_socket(
                sock_info, metadata=metadata)
    self._avg_round_trip_time.add_sample(round_trip_time)  # 更新rtt
    sd = ServerDescription(
        address=address,
        ismaster=response,
        round_trip_time=self._avg_round_trip_time.get())
    return sd

def _check_with_socket(self, sock_info, metadata=None):
    """Return (IsMaster, round_trip_time).

    Can raise ConnectionFailure or OperationFailure.
    """
    cmd = SON([('ismaster', 1)])
    if metadata is not None:
        cmd['client'] = metadata
    if self._server_description.max_wire_version >= 6:
        cluster_time = self._topology.max_cluster_time()
        if cluster_time is not None:
            cmd['$clusterTime'] = cluster_time
    start = _time()
    request_id, msg, max_doc_size = message.query(
        0, 'admin.$cmd', 0, -1, cmd,
        None, DEFAULT_CODEC_OPTIONS)

    # TODO: use sock_info.command()
    sock_info.send_message(msg, max_doc_size)
    reply = sock_info.receive_message(request_id)
    return IsMaster(reply.command_response()), _time() - start

  類IsMaster是對ismaster command reponse的封裝,比較核心的屬性包括:

  • replica_set_name:從mongod節點看來,複製集的名字
  • primary:從mongod節點看來,誰是Priamry
  • all_hosts: 從mongod節點看來,複製集中的全部節點
  • last_write_date: mongod節點最後寫入數據的時間,用來判斷secondary節點的staleness
  • set_version:config version
  • election_id只有當mongod是primary時纔會設置,表示最新的primary選舉編號

  當某個server的monitor獲取到了在server對應的mongod上的複製集信息信息時,調用Tolopogy.on_change更新複製集的拓撲信息:

def on_change(self, server_description):
    """Process a new ServerDescription after an ismaster call completes."""
    if self._description.has_server(server_description.address):
        self._description = updated_topology_description(
            self._description, server_description)
        
        self._update_servers()  # 根據信息,鏈接到新增的節點,移除(斷開)已經不存在的節點
        self._receive_cluster_time_no_lock(
            server_description.cluster_time)
        
        # Wake waiters in select_servers().
        self._condition.notify_all()

  核心在updated_topology_description, 根據本地記錄的topology信息,以及收到的server_description(來自IsMaster- ismaster command response),來調整本地的topology信息。以一種狀況爲例:收到一個ismaster command response,對方自稱本身是primary,無論當前topology有沒有primary,都會進入調用如下函數

def _update_rs_from_primary(
        sds,
        replica_set_name,
        server_description,
        max_set_version,
        max_election_id):
    """Update topology description from a primary's ismaster response.

    Pass in a dict of ServerDescriptions, current replica set name, the
    ServerDescription we are processing, and the TopologyDescription's
    max_set_version and max_election_id if any.

    Returns (new topology type, new replica_set_name, new max_set_version,
    new max_election_id).
    """
    if replica_set_name is None:
        replica_set_name = server_description.replica_set_name

    elif replica_set_name != server_description.replica_set_name:   # 不是來自同一個複製集
        # We found a primary but it doesn't have the replica_set_name
        # provided by the user.
        sds.pop(server_description.address)
        return (_check_has_primary(sds),
                replica_set_name,
                max_set_version,
                max_election_id)

    max_election_tuple = max_set_version, max_election_id
    if None not in server_description.election_tuple:
        if (None not in max_election_tuple and
                max_election_tuple > server_description.election_tuple):  # 節點是priamry,但比topology中記錄的舊

            # Stale primary, set to type Unknown.
            address = server_description.address
            sds[address] = ServerDescription(address)   # 傳入空dict,則server-type爲UnKnown
            return (_check_has_primary(sds),
                    replica_set_name,
                    max_set_version,
                    max_election_id)

        max_election_id = server_description.election_id

    if (server_description.set_version is not None and         # 節點的config version版本更高
        (max_set_version is None or
            server_description.set_version > max_set_version)):

        max_set_version = server_description.set_version

    # We've heard from the primary. Is it the same primary as before?
    for server in sds.values():
        if (server.server_type is SERVER_TYPE.RSPrimary
                and server.address != server_description.address):

            # Reset old primary's type to Unknown.
            sds[server.address] = ServerDescription(server.address)

            # There can be only one prior primary.
            break

    # Discover new hosts from this primary's response.
    for new_address in server_description.all_hosts:
        if new_address not in sds:
            sds[new_address] = ServerDescription(new_address)

    # Remove hosts not in the response.
    for addr in set(sds) - server_description.all_hosts:
        sds.pop(addr)

    # If the host list differs from the seed list, we may not have a primary
    # after all.
    return (_check_has_primary(sds),
            replica_set_name,
            max_set_version,
            max_election_id)

  注意看docstring中的Returns,都是返回新的複製集信息

  那麼整個函數從上往下檢查

  • 是否是同一個複製集
  • 新節點(自認爲是primary)與topology記錄的primary相比,誰更新。比較(set_version, election_id)
  • 比較set_servion
  • 若是topology中已經有stale primary,那麼將其server-type改爲Unknown
  • 從Primary節點的all_hosts中取出新加入複製集的節點
  • 移除已經不存在於複製集中的節點

  PyMongo關於複製集的狀態都來自於全部節點的ismaster消息,Source of Truth在於複製集,並且這個Truth來自於majority 節點。所以,某個節點返回給driver的信息多是過時的、錯誤的,driver經過有限的信息判斷複製集的狀態,若是判斷失誤,好比將寫操做發到了stale primary上,那麼會在複製集上再次判斷,保證正確性。

PyMongo read preference

  前面詳細介紹了PyMongo是如何更新複製集的信息,那麼這一部分來看看基於拓撲信息具體是如何根據read preference路由到某個節點上的。

  咱們從Collection.find出發,一路跟蹤, 會調用MongoClient._send_message_with_response

def _send_message_with_response(self, operation, read_preference=None,
                                    exhaust=False, address=None):
        topology = self._get_topology()
        if address:
            server = topology.select_server_by_address(address)
            if not server:
                raise AutoReconnect('server %s:%d no longer available'
                                    % address)
        else:
            selector = read_preference or writable_server_selector
            server = topology.select_server(selector)

        return self._reset_on_error(
            server,
            server.send_message_with_response,
            operation,
            set_slave_ok,
            self.__all_credentials,
            self._event_listeners,
            exhaust)

  代碼很清晰,根據指定的address或者read_preference, 選擇出server,而後經過server發請求,等待回覆。topology.select_server一路調用到下面這個函數

def _select_servers_loop(self, selector, timeout, address):
    """select_servers() guts. Hold the lock when calling this."""
    now = _time()
    end_time = now + timeout
    server_descriptions = self._description.apply_selector(  # _description是TopologyDescription
        selector, address)

    while not server_descriptions:
        # No suitable servers.
        if timeout == 0 or now > end_time:
            raise ServerSelectionTimeoutError(
                self._error_message(selector))

        self._ensure_opened()
        self._request_check_all()

        # Release the lock and wait for the topology description to
        # change, or for a timeout. We won't miss any changes that
        # came after our most recent apply_selector call, since we've
        # held the lock until now.
        self._condition.wait(common.MIN_HEARTBEAT_INTERVAL) # Conditional.wait
        self._description.check_compatible()
        now = _time()
        server_descriptions = self._description.apply_selector(
            selector, address)

    self._description.check_compatible()
    return server_descriptions

  能夠看到,不必定能一次選出來,若是選不出server,意味着此時尚未鏈接到足夠多的mongod節點,那麼等待一段時間(_condition.wait)重試。在上面Topology.on_change 能夠看到,會調用_condition.notify_all喚醒。

def apply_selector(self, selector, address):

    def apply_local_threshold(selection):
        if not selection:
            return []

        settings = self._topology_settings

        # Round trip time in seconds.
        fastest = min(
            s.round_trip_time for s in selection.server_descriptions)
        threshold = settings.local_threshold_ms / 1000.0
        return [s for s in selection.server_descriptions
                if (s.round_trip_time - fastest) <= threshold]

    # 省略了無關代碼...
    return apply_local_threshold(
        selector(Selection.from_topology_description(self)))

  上面selector就是read_preference._ServerMode的某一個子類,以Nearest爲例

class Nearest(_ServerMode):
    def __call__(self, selection):
        """Apply this read preference to Selection."""
        return member_with_tags_server_selector(
            self.tag_sets,
            max_staleness_selectors.select(
                self.max_staleness, selection))

  首先要受到maxStalenessSeconds的約束,而後再用tagsets過濾一遍,這裏只關注前者。
關於maxStalenessSeconds

The read preference maxStalenessSeconds option lets you specify a maximum replication lag, or 「staleness」, for reads from secondaries. When a secondary’s estimated staleness exceeds maxStalenessSeconds, the client stops using it for read operations.

  怎麼計算的,若是節點有primary,則調用下面這個函數

def _with_primary(max_staleness, selection):
    """Apply max_staleness, in seconds, to a Selection with a known primary."""
    primary = selection.primary
    sds = []

    for s in selection.server_descriptions:
        if s.server_type == SERVER_TYPE.RSSecondary:
            # See max-staleness.rst for explanation of this formula.
            staleness = (
                (s.last_update_time - s.last_write_date) -
                (primary.last_update_time - primary.last_write_date) +
                selection.heartbeat_frequency)

            if staleness <= max_staleness:
                sds.append(s)
        else:
            sds.append(s)

    return selection.with_server_descriptions(sds)

  上面的代碼用到了IsMaster的last_write_date屬性,正是用這個屬性來判斷staleness。

  公式的解釋可參考max-staleness.rst

  我的以爲能夠這麼理解:假設網絡延時一致,若是在同一時刻收到心跳回復,那麼只用P.lastWriteDate - S.lastWriteDate就好了,但心跳時間不一樣,因此得算上時間差。我會寫成(P.lastWriteDate - S.lastWriteDate) + (S.lastUpdateTime - P.lastUpdateTime) 。加上 心跳間隔是基於悲觀假設,若是剛心跳完以後secondary就中止複製,那麼在下一次心跳以前最多的stale程度就得加上 心跳間隔。

  從代碼能夠看到Nearest找出了全部可讀的節點,而後經過apply_local_threshold函數來刷選出最近的。

references

Read preference

PyMongo 3.6.0 Documentation

相關文章
相關標籤/搜索