聊聊Postgres中的IPC之SI Message Queue

在 PostgreSQL中,每個進程都有屬於本身的共享緩存(shared cache)。例如,同一個系統表在不一樣的進程中都有對應的Cache來緩存它的元組(對於RelCache來講緩存的是一個RelationData結構)。同一個系統表的元組可能同時被多個進程的Cache所緩存,當其中某個Cache中的一個元組被刪除或更新時 ,須要通知其餘進程對其Cache進行同步。在 PostgreSQL的實現中,會記錄下已被刪除的無效元組 ,並經過SI Message方式(即共享消息隊列方式)在進程之間傳遞這一消息。收到無效消息的進程將同步地把無效元組(或RelationData結構)從本身的Cache中刪除。前端


1.無效消息(Invalid Message)概述

當前系統支持傳遞6種無效消息:
第一種是使給定的catcache中的一個元組無效;
第二種是使給定的系統表的全部catcache結構所有失效;
第三種是使給定的邏輯表的Relcache中RelationData結構無效;
第四種是使給定的物理表的SMGR無效(表物理位置發生變化時,須要通知SMGR關閉表文件);
第五種是使給定的數據庫的mapped-relation失效;
第六種是使一個已保存的快照失效。node

能夠看出這六種消息對應的影響範圍愈來愈大。sql

PostgreSQL使用如下所示的結構體來存儲無效消息。數據庫

typedef union
{
    int8        id;             /* type field --- must be first */
    SharedInvalCatcacheMsg cc;
    SharedInvalCatalogMsg cat;
    SharedInvalRelcacheMsg rc;
    SharedInvalSmgrMsg sm;
    SharedInvalRelmapMsg rm;
    SharedInvalSnapshotMsg sn;
} SharedInvalidationMessage;

其中,id爲:swift

  • 0或正數表示一個CatCache元組;
  • -1表示整個CatCahe緩存;
  • -2表示RelCache;
  • -3表示SMGR;
  • -4表示mapped-relation mapping;
  • -5表示Snapshot

當id爲0或正數時 ,它同時也表示產生該Invalid Message的CatCache的編號。後端

具體咱們能夠看註釋:數組

src/include/storage/sinval.h
 *  * invalidate a specific tuple in a specific catcache
 *  * invalidate all catcache entries from a given system catalog
 *  * invalidate a relcache entry for a specific logical relation
 *  * invalidate an smgr cache entry for a specific physical relation
 *  * invalidate the mapped-relation mapping for a given database
 *  * invalidate any saved snapshot that might be used to scan a given relation

進程經過調用函數CachelnvalidateHeapTuple()對Invalid Message進行註冊,主要包括如下幾步:緩存

  • 1) 註冊SysCache無效消息。
  • 2) 若是是對pg_class系統表元組進行的更新/刪除操做,其 relfilenode或 reltablespace可能發生變化,即該表物理位置發生變化,須要通知其餘進程關閉相應的SMGR。這時首先設置relationid和databaseid,而後註冊SMGR無效消息;不然轉而執行步驟3。
  • 3) 若是是對pg_attribute或者pg_index系統表元組進行的更新/刪除操做,則設置relationid和 dalabaseid,不然返回。
  • 4) 註冊RelCache無效消息(若是有的話)。
  • 5) 事務結束時註冊mapped-relation mapping和snapshot無效消息(若是有的話)。

當一個元組被刪除或者更新時,在同一個SQL命令的後續執行步驟中咱們依然認爲該元組是有效的,直到下一個命令開始或者亊務提交時改動才生效。在命令的邊界,舊元組變爲失效,同時新元組置爲有效。所以當執行heap_delete或者heap_update時,不能簡單地刷新Cache。並且,即便刷新了,也可能因爲同一個命令中的請求把該元組再次加載到Cache中。服務器

所以正確的方法是保持一個無效鏈表用於記錄元組的delete/update操做。事務完成後,根據前述的無效鏈表中的信息廣播該事務過程當中產生的Invalid Message,其餘進程經過SI Message隊列讀取Invalid Message對各自的Cache進行刷新。當子事務提交時,只須要將該事務產生的Invalid Message提交到父事務,最後由最上層的事務廣播Invalid Message。數據結構

須要注意的是,若涉及對系統表結構的改變,還須要從新加載pg_internal.init文件,由於該文件記錄了全部系統表的結構。


2.SI Message全景

如下是相關的函數,寫在前面,先混個臉熟:

CreateSharedInvalidationState()  /* Create and initialize the SI message buffer

SharedInvalBackendInit()  /* 每一個backend初始化時要初始化在 SI message buffer 中的Per-backend invalidation state,procState[MaxBackends]

CleanupInvalidationState() /*每一個backend shutdown時在調用on_shmem_exit()函數清空對應的procState[i]

SICleanupQueue()  /* Remove messages that have been consumed by all active backends
                 * Possible side effects of this routine include marking one or more
                * backends as "reset" in the array, and sending PROCSIG_CATCHUP_INTERRUPT
                * to some backend that seems to be getting too far behind.  We signal at
                * most one backend at a time, for reasons explained at the top of the file.
                
 SendSharedInvalidMessages() /* Add shared-cache-invalidation message(s) to the global SI message queue.

那麼整個SI Message隊列工做的流程大體以下:

  1. SI message 隊列的初始化。這個是由postmaster在啓動服務器時作的,做爲共享內存的一部分,由postmaster初始化。此時,SI message爲空,由於此時尚未Invalid Message產生。
  2. 每一個backend初始化(咱們知道這些Invalid Message是因爲我執行了SQL文對數據庫進行了修改才產生的,那麼很顯然咱們執行SQL文的途徑是前端發送SQL文,後端啓動一個backend進程去處理)時,須要初始化本身的共享內存而且向SI message註冊本身。註冊的目的有兩個,一個是聲明本身做爲Invalid Message的生產者的身份,另外一個表示本身也須要接受其餘backend的Invalid Message。
  3. 每一個backend執行SQL文,產生Invalid Message,其餘backend接收該Invalid Message,固然,這個過程複雜點,會在後面細說。那麼每一個backend接收和發送Invalid Message的時機是什麼呢?

固然啦,你每次執行SQL的時候,是一個好時機,在執行SQL文的開頭和結尾,backend都會去check SI message隊列中的無效消息。如下是調用棧:

exec_simple_query
    ->start_xact_command
        ->StartTransactionCommand         /* 事務開始
            ->StartTransaction
                ->AtStart_Cache
                    ->AcceptInvalidationMessages
                        ->ReceiveSharedInvalidMessages /* consume SI message
                            ->SIGetDataEntries
                        
    -> do query
    
    ->finish_xact_command
        ->CommitTransactionCommand         /* 事務結束
            ->CommitTransaction
                ->AtEOXact_Inval
                    ->SendSharedInvalidMessages       /*  send SI message
                        ->SIInsertDataEntries   
                            ->SICleanupQueue

那麼,難道我不執行SQL文,個人backend就不刷新無效消息麼?

咱們看一段註釋:

/*
 * Because backends sitting idle will not be reading sinval events, we
 * need a way to give an idle backend a swift kick in the rear and make
 * it catch up before the sinval queue overflows and forces it to go
 * through a cache reset exercise.  This is done by sending
 * PROCSIG_CATCHUP_INTERRUPT to any backend that gets too far behind.
 *
 * The signal handler will set an interrupt pending flag and will set the
 * processes latch. Whenever starting to read from the client, or when
 * interrupted while doing so, ProcessClientReadInterrupt() will call
 * ProcessCatchupEvent().
 */

沒有錯,要是某個backend長時間不讀取SI Message或者backend落後太多,超過了SI Message隊列能夠接受的最大長度,那麼就向該backend發送SIGUSR1,喚醒該backend讓其作適當的操做。


3.實現細節

爲了實現SI Message的這一功能,PostgreSQL在共享內存中開闢了shmInvalBuffer記錄系統中所發出的全部Invalid Message以及全部進程處理無消息的進度。shmInvalBuffer是一個全局變量,其數據結構以下:

typedef struct SISeg
{
    /*
     * General state information
     */
    int         minMsgNum;      /* oldest message still needed */
    int         maxMsgNum;      /* next message number to be assigned */
    int         nextThreshold;  /* # of messages to call SICleanupQueue */
    int         lastBackend;    /* index of last active procState entry, +1 */
    int         maxBackends;    /* size of procState array */

    slock_t     msgnumLock;     /* spinlock protecting maxMsgNum */

    /*
     * Circular buffer holding shared-inval messages
     */
    SharedInvalidationMessage buffer[MAXNUMMESSAGES];

    /*
     * Per-backend invalidation state info (has MaxBackends entries).
     */
    ProcState   procState[FLEXIBLE_ARRAY_MEMBER];
} SISeg;

在shmInvalBuffer中,Invalid Message存儲在由Buffer字段指定的定長數組中(其長度MAXNUMMESSAGES預約義爲4096),該數組中每個元素存儲一個Invalid Message,也能夠稱該數組爲無效消息隊列。無效消息隊列實際是一個環狀結構,最初數組爲空時,新來的無效消息從前向後依次存放在數組中,當數組被放滿以後,新的無效消息將回到Buffer數組的頭部開始插人。minMsgNum字段記錄Buffer中還未被全部進程處理的無效消息編號中的最小值,maxMsgNum字段記錄下一個能夠用於存放新無效消息的數組元素下標。實際上,minMsgNum指出了Buffer中尚未被全部進程處理的無效消息的下界,而maxMsgNum則指出了上界,即編號比minMsgNmn小的無效消息是已經被全部進程處理完的,而編號大於等於maxMsgNum的無效消息是尚未產生的,而二者之間的無效消息則是至少還有一個進程沒有對其進行處理。所以在無效消息隊列構成的環中,除了 minMsgNum和maxMsgNum之間的位置以外,其餘位置均可以用來存放新增長的無效消息。

PostgreSQL在shmInvalBuffer中用一個ProcState數組(procState字段)來存儲正在讀取無效消息的進程的讀取進度,該數組的大小與系統容許的最大進程數MaxBackends有關,在默認狀況下這個
數組的大小爲100 (系統的默認最大進程數爲100,可在postgresql.conf中修改)。ProcState的結構如數據結構以下所示。

/* Per-backend state in shared invalidation structure */
typedef struct ProcState
{
    /* procPid is zero in an inactive ProcState array entry. */
    pid_t       procPid;        /* PID of backend, for signaling */
    PGPROC     *proc;           /* PGPROC of backend */
    /* nextMsgNum is meaningless if procPid == 0 or resetState is true. */
    int         nextMsgNum;     /* next message number to read */
    bool        resetState;     /* backend needs to reset its state */
    bool        signaled;       /* backend has been sent catchup signal */
    bool        hasMessages;    /* backend has unread messages */

    /*
     * Backend only sends invalidations, never receives them. This only makes
     * sense for Startup process during recovery because it doesn't maintain a
     * relcache, yet it fires inval messages to allow query backends to see
     * schema changes.
     */
    bool        sendOnly;       /* backend only sends, never receives */

    /*
     * Next LocalTransactionId to use for each idle backend slot.  We keep
     * this here because it is indexed by BackendId and it is convenient to
     * copy the value to and from local memory when MyBackendId is set. It's
     * meaningless in an active ProcState entry.
     */
    LocalTransactionId nextLXID;
} ProcState;

在ProcSlate結構中記錄了PID爲procPid的進程讀取無效消息的狀態,其中nextMsgNum的值介於 shmlnvalBuffer 的 minMsgNum 值和 maxMsgNum 值之間。

以下圖所示,minMsgmun和MaxMsgmim就像兩個指針,它們區分出了哪些無效消息已經被全部的進程讀取以及哪些消息還在等待某些進程讀取。在minMsgnum以前的消息已經被全部進程讀完;maxMsgnum以後的區域還沒有使用;二者之間的消息是尚未被全部進程讀完的。當有進程調用函數SendSharedlnvalidMessage將其產生的無效消息添加到shmInvalBuffer中時,maxMsgnum就開始向後移動。SendSharedlnvalidMessage中將調用SIInsertDataEntries來完成無效消息的插人。

在向SI Message隊列中插入無效消息時,可能出現可用空間不夠的狀況(此時隊列中全是沒有徹底被讀取完畢的無效消息),須要清空一部分未處理無效消息,這個操做稱爲清理無效消息隊列,只有噹噹前消息數與將要插人消息數之和超過shmInvalBuffer中nextThreshold時纔會進行清理操做。這時,那些尚未處理完SI Message隊列中無效消息的進程將收到清理通知,而後這些進程將拋棄其Cache中的全部元組(至關於從新載人Cache的內容)。

顯然,讓全部進程重載Cache會致使較高的I/O次數。爲了減小重載Cache的次數,PostgreSQL會在無效消息隊列中設置兩個界限值lowbound和minsig,其計算方式以下:

• lowbound=maxMsgNum-MAXNUMMESSAGES+minFree,其中 minFree 爲須要釋放的隊列空間的最小值(minFree指出了須要在無效消息隊列中清理出多少個空位用於容納新的無效消息)。

• minsig = maxMsgNum-MAXNUMMESSAGES/2,這裏給出的是minsig的初始值,在進程重載過程當中minsig會進行調整。
SICleanupQueue

/*
     * Recompute minMsgNum = minimum of all backends' nextMsgNum, identify the
     * furthest-back backend that needs signaling (if any), and reset any
     * backends that are too far back.  Note that because we ignore sendOnly
     * backends here it is possible for them to keep sending messages without
     * a problem even when they are the only active backend.
     */
    min = segP->maxMsgNum;
    minsig = min - SIG_THRESHOLD;
    lowbound = min - MAXNUMMESSAGES + minFree;

能夠看到,lowbound實際上給出了這次清理過程當中必需要釋放的空間的位置,這是一個強制性的限制,nextMsgNum值低於lowbound的進程都將其resetState字段置爲真,這些進程將會自動進行重載Cache的工做。對於那些nextMsgNum值介於lowbound和minaig之間的進程,雖然它們並不影響本次淸理,可是爲了儘可能避免常常進行清理操做,會要求這些進程加快處理無效消息的進度(CatchUp)。淸理操做會找出這些進程中進度最慢的一個,向它發送SIGUSR1信號。該進程接收到SIGUSR1後會一次性處理完全部的無效消息,而後繼續向下一個進度最慢的進程發送SIGUSR1讓它也加快處理進度。

清理無效消息隊列的工做由函數SICleanupQueue實現,該函數的minFree參數給出了這一次淸理操做至少須要釋放出的空間大小。該函數的流程以下:

SICleanupQueue
    ->SendProcSignal

1)計算 lowbound 和 minsig 的值。

2) 對每個進程的ProcState結構進行檢査,將nextMsgNum低於lowbound的進程resetState字段設置爲true,並在nextMsgNum介於lowboumi和minsig之間的進程中找出進度最慢的一個。

3) 從新計算nextThreshoW參數。

4) 向步驟2中找到的進度最慢的進程發送SIGUSR1信號。

Postgres進程經過函數ProcessCatchupInterrupt來處理SIGUSR1信號,該函數最終將調用ReceiveSharedlnvalidMessages來處理全部未處理的無效消息,最後調用SICleanupQueue (minFree參數爲0)向下一個進度最慢的進程發送SIGUSR1信號(調用棧以下)。

ProcessCatchupInterrupt
    ->AcceptInvalidationMessages
        ->ReceiveSharedInvalidMessages
            ->SICleanupQueue

每一個進程在須要刷新其Cache時也會調用ReceiveSharedInvalidMessages函數用於讀取並處理無效消息,函數參數爲兩個函數指針:

1) invalFunction:用於處理一條無效消息。

2) resetFunction:將該後臺進程的Cache元組所有拋棄。

對於resetState設置爲真的進程,函數ReceiveSharedInvalidMessages會調用resetFunction拋棄其全部的Cache元組。不然,ReceiveSharedInvalidMessages將從消息隊列中讀取每條無效消息並調用invalFunction對消息進行處理。若是該進程是根據SIGUSR1信號調用該函數,那麼還將調用SICleanupQueue函數將這個信號傳給比它進度慢的進程。


4.其餘

在PMsignal.c中,包含後臺進程向Postmaster發送信號的相關函數。在實現中,後臺進程是這樣通知Postmaster的:

1) 首先在共享內存中開闢一個數組PMSignalFlags(PMsignal.c),數組中的每一位對應一個信號。

2) 而後若是後臺進程但願向Postmaster發送一個信號,那麼後臺首先將信號在數組PMSignalFlags中相應的元素置1 (邏輯真),而後調用kill函數向Postmaster發送SIGUSR1信號。

3) 當Postmaster收到SIGUSR1後首先檢測共享存儲中PMSignalFlags,確認具體的信號是什麼。同時將信號在數組PMSignalFlags中相應的元素置0 (邏輯假)而後做出相應反應。

每個後臺進程都有一個結構PGPROC存儲在共享內存中。Procarray.c在共享內存中分配ProcArrayStruct類型的數組procArray,統一管理這些PGPROC結構。PGPROC結構中包含不少的信息,Procarray.c中的函數主要處理 PGPROC中的 pid、databaseld、roleld、xmin、xid、subxids 等字段。這些函數的功能或是統計事務的信息,或是經過databaseId統計有多少個pid (也就是多少個後臺進程)與指定數據庫相鏈接等統計信息。

IPC負責的清除工做有兩個方面:一個是與共享內存相關的清除,另外一個是與各個後臺進程相關的清除工做。與共享內存相關的淸除並非將共享內存丟棄,而是從新設置共享內存。清除工做的流程能夠描述以下:首先在申請資源的時候,系統會同時爲該資源註冊一個清除函數,當要求作清除操做時,系統將會調用對應的淸除函數。


IPC的內容還有很多,本次只是大體說了下關於SI Message共享隊列的處理,其它的之後有時間再去寫寫吧。

相關文章
相關標籤/搜索