在 PostgreSQL中,每個進程都有屬於本身的共享緩存(shared cache)。例如,同一個系統表在不一樣的進程中都有對應的Cache來緩存它的元組(對於RelCache來講緩存的是一個RelationData結構)。同一個系統表的元組可能同時被多個進程的Cache所緩存,當其中某個Cache中的一個元組被刪除或更新時 ,須要通知其餘進程對其Cache進行同步。在 PostgreSQL的實現中,會記錄下已被刪除的無效元組 ,並經過SI Message方式(即共享消息隊列方式)在進程之間傳遞這一消息。收到無效消息的進程將同步地把無效元組(或RelationData結構)從本身的Cache中刪除。前端
typedef union { int8 id; /* type field --- must be first */ SharedInvalCatcacheMsg cc; SharedInvalCatalogMsg cat; SharedInvalRelcacheMsg rc; SharedInvalSmgrMsg sm; SharedInvalRelmapMsg rm; SharedInvalSnapshotMsg sn; } SharedInvalidationMessage;
當id爲0或正數時 ,它同時也表示產生該Invalid Message的CatCache的編號。後端
src/include/storage/sinval.h * * invalidate a specific tuple in a specific catcache * * invalidate all catcache entries from a given system catalog * * invalidate a relcache entry for a specific logical relation * * invalidate an smgr cache entry for a specific physical relation * * invalidate the mapped-relation mapping for a given database * * invalidate any saved snapshot that might be used to scan a given relation
進程經過調用函數CachelnvalidateHeapTuple()對Invalid Message進行註冊,主要包括如下幾步:緩存
所以正確的方法是保持一個無效鏈表用於記錄元組的delete/update操做。事務完成後,根據前述的無效鏈表中的信息廣播該事務過程當中產生的Invalid Message,其餘進程經過SI Message隊列讀取Invalid Message對各自的Cache進行刷新。當子事務提交時,只須要將該事務產生的Invalid Message提交到父事務,最後由最上層的事務廣播Invalid Message。數據結構
CreateSharedInvalidationState() /* Create and initialize the SI message buffer SharedInvalBackendInit() /* 每一個backend初始化時要初始化在 SI message buffer 中的Per-backend invalidation state,procState[MaxBackends] CleanupInvalidationState() /*每一個backend shutdown時在調用on_shmem_exit()函數清空對應的procState[i] SICleanupQueue() /* Remove messages that have been consumed by all active backends * Possible side effects of this routine include marking one or more * backends as "reset" in the array, and sending PROCSIG_CATCHUP_INTERRUPT * to some backend that seems to be getting too far behind. We signal at * most one backend at a time, for reasons explained at the top of the file. SendSharedInvalidMessages() /* Add shared-cache-invalidation message(s) to the global SI message queue.
那麼整個SI Message隊列工做的流程大體以下:
固然啦,你每次執行SQL的時候,是一個好時機,在執行SQL文的開頭和結尾,backend都會去check SI message隊列中的無效消息。如下是調用棧:
exec_simple_query ->start_xact_command ->StartTransactionCommand /* 事務開始 ->StartTransaction ->AtStart_Cache ->AcceptInvalidationMessages ->ReceiveSharedInvalidMessages /* consume SI message ->SIGetDataEntries -> do query ->finish_xact_command ->CommitTransactionCommand /* 事務結束 ->CommitTransaction ->AtEOXact_Inval ->SendSharedInvalidMessages /* send SI message ->SIInsertDataEntries ->SICleanupQueue
/* * Because backends sitting idle will not be reading sinval events, we * need a way to give an idle backend a swift kick in the rear and make * it catch up before the sinval queue overflows and forces it to go * through a cache reset exercise. This is done by sending * PROCSIG_CATCHUP_INTERRUPT to any backend that gets too far behind. * * The signal handler will set an interrupt pending flag and will set the * processes latch. Whenever starting to read from the client, or when * interrupted while doing so, ProcessClientReadInterrupt() will call * ProcessCatchupEvent(). */
沒有錯,要是某個backend長時間不讀取SI Message或者backend落後太多,超過了SI Message隊列能夠接受的最大長度,那麼就向該backend發送SIGUSR1,喚醒該backend讓其作適當的操做。
爲了實現SI Message的這一功能,PostgreSQL在共享內存中開闢了shmInvalBuffer記錄系統中所發出的全部Invalid Message以及全部進程處理無消息的進度。shmInvalBuffer是一個全局變量,其數據結構以下:
typedef struct SISeg { /* * General state information */ int minMsgNum; /* oldest message still needed */ int maxMsgNum; /* next message number to be assigned */ int nextThreshold; /* # of messages to call SICleanupQueue */ int lastBackend; /* index of last active procState entry, +1 */ int maxBackends; /* size of procState array */ slock_t msgnumLock; /* spinlock protecting maxMsgNum */ /* * Circular buffer holding shared-inval messages */ SharedInvalidationMessage buffer[MAXNUMMESSAGES]; /* * Per-backend invalidation state info (has MaxBackends entries). */ ProcState procState[FLEXIBLE_ARRAY_MEMBER]; } SISeg;
在shmInvalBuffer中,Invalid Message存儲在由Buffer字段指定的定長數組中(其長度MAXNUMMESSAGES預約義爲4096),該數組中每個元素存儲一個Invalid Message,也能夠稱該數組爲無效消息隊列。無效消息隊列實際是一個環狀結構,最初數組爲空時,新來的無效消息從前向後依次存放在數組中,當數組被放滿以後,新的無效消息將回到Buffer數組的頭部開始插人。minMsgNum字段記錄Buffer中還未被全部進程處理的無效消息編號中的最小值,maxMsgNum字段記錄下一個能夠用於存放新無效消息的數組元素下標。實際上,minMsgNum指出了Buffer中尚未被全部進程處理的無效消息的下界,而maxMsgNum則指出了上界,即編號比minMsgNmn小的無效消息是已經被全部進程處理完的,而編號大於等於maxMsgNum的無效消息是尚未產生的,而二者之間的無效消息則是至少還有一個進程沒有對其進行處理。所以在無效消息隊列構成的環中,除了 minMsgNum和maxMsgNum之間的位置以外,其餘位置均可以用來存放新增長的無效消息。
數組的大小爲100 (系統的默認最大進程數爲100,可在postgresql.conf中修改)。ProcState的結構如數據結構以下所示。
/* Per-backend state in shared invalidation structure */ typedef struct ProcState { /* procPid is zero in an inactive ProcState array entry. */ pid_t procPid; /* PID of backend, for signaling */ PGPROC *proc; /* PGPROC of backend */ /* nextMsgNum is meaningless if procPid == 0 or resetState is true. */ int nextMsgNum; /* next message number to read */ bool resetState; /* backend needs to reset its state */ bool signaled; /* backend has been sent catchup signal */ bool hasMessages; /* backend has unread messages */ /* * Backend only sends invalidations, never receives them. This only makes * sense for Startup process during recovery because it doesn't maintain a * relcache, yet it fires inval messages to allow query backends to see * schema changes. */ bool sendOnly; /* backend only sends, never receives */ /* * Next LocalTransactionId to use for each idle backend slot. We keep * this here because it is indexed by BackendId and it is convenient to * copy the value to and from local memory when MyBackendId is set. It's * meaningless in an active ProcState entry. */ LocalTransactionId nextLXID; } ProcState;
在ProcSlate結構中記錄了PID爲procPid的進程讀取無效消息的狀態,其中nextMsgNum的值介於 shmlnvalBuffer 的 minMsgNum 值和 maxMsgNum 值之間。
在向SI Message隊列中插入無效消息時,可能出現可用空間不夠的狀況(此時隊列中全是沒有徹底被讀取完畢的無效消息),須要清空一部分未處理無效消息,這個操做稱爲清理無效消息隊列,只有噹噹前消息數與將要插人消息數之和超過shmInvalBuffer中nextThreshold時纔會進行清理操做。這時,那些尚未處理完SI Message隊列中無效消息的進程將收到清理通知,而後這些進程將拋棄其Cache中的全部元組(至關於從新載人Cache的內容)。
• lowbound=maxMsgNum-MAXNUMMESSAGES+minFree,其中 minFree 爲須要釋放的隊列空間的最小值(minFree指出了須要在無效消息隊列中清理出多少個空位用於容納新的無效消息)。
• minsig = maxMsgNum-MAXNUMMESSAGES/2,這裏給出的是minsig的初始值,在進程重載過程當中minsig會進行調整。
/* * Recompute minMsgNum = minimum of all backends' nextMsgNum, identify the * furthest-back backend that needs signaling (if any), and reset any * backends that are too far back. Note that because we ignore sendOnly * backends here it is possible for them to keep sending messages without * a problem even when they are the only active backend. */ min = segP->maxMsgNum; minsig = min - SIG_THRESHOLD; lowbound = min - MAXNUMMESSAGES + minFree;
SICleanupQueue ->SendProcSignal
1)計算 lowbound 和 minsig 的值。
2) 對每個進程的ProcState結構進行檢査,將nextMsgNum低於lowbound的進程resetState字段設置爲true,並在nextMsgNum介於lowboumi和minsig之間的進程中找出進度最慢的一個。
3) 從新計算nextThreshoW參數。
4) 向步驟2中找到的進度最慢的進程發送SIGUSR1信號。
Postgres進程經過函數ProcessCatchupInterrupt來處理SIGUSR1信號,該函數最終將調用ReceiveSharedlnvalidMessages來處理全部未處理的無效消息,最後調用SICleanupQueue (minFree參數爲0)向下一個進度最慢的進程發送SIGUSR1信號(調用棧以下)。
ProcessCatchupInterrupt ->AcceptInvalidationMessages ->ReceiveSharedInvalidMessages ->SICleanupQueue
1) invalFunction:用於處理一條無效消息。
2) resetFunction:將該後臺進程的Cache元組所有拋棄。
1) 首先在共享內存中開闢一個數組PMSignalFlags(PMsignal.c),數組中的每一位對應一個信號。
2) 而後若是後臺進程但願向Postmaster發送一個信號,那麼後臺首先將信號在數組PMSignalFlags中相應的元素置1 (邏輯真),而後調用kill函數向Postmaster發送SIGUSR1信號。
3) 當Postmaster收到SIGUSR1後首先檢測共享存儲中PMSignalFlags,確認具體的信號是什麼。同時將信號在數組PMSignalFlags中相應的元素置0 (邏輯假)而後做出相應反應。
每個後臺進程都有一個結構PGPROC存儲在共享內存中。Procarray.c在共享內存中分配ProcArrayStruct類型的數組procArray,統一管理這些PGPROC結構。PGPROC結構中包含不少的信息,Procarray.c中的函數主要處理 PGPROC中的 pid、databaseld、roleld、xmin、xid、subxids 等字段。這些函數的功能或是統計事務的信息,或是經過databaseId統計有多少個pid (也就是多少個後臺進程)與指定數據庫相鏈接等統計信息。
IPC的內容還有很多,本次只是大體說了下關於SI Message共享隊列的處理,其它的之後有時間再去寫寫吧。