#include "postgres.h" #include #include "fmgr.h" #include "access/xlog.h" #include "replication/walreceiver.h" #include "utils/elog.h" #include "utils/builtins.h" #include "utils/timestamp.h" #include "funcapi.h" #include "access/htup_details.h" #include "catalog/pg_type.h" #include "utils/pg_lsn.h" #ifdef PG_MODULE_MAGIC PG_MODULE_MAGIC; #endif PG_FUNCTION_INFO_V1(get_rcv_replication_stat); Datum get_rcv_replication_stat(PG_FUNCTION_ARGS) { Assert(PG_NARGS() == 0); // 表示沒有輸入參數 if (!RecoveryInProgress()) // 在數據庫處於恢復狀態下時運行,不然不容許 ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("recovery is not in progress"), errhint("This functions can only be executed during recovery."))); /* use volatile pointer to prevent code rearrangement */ volatile WalRcvData *walrcv = WalRcv; // 共享內存中用於管理流複製的數據結構 src/include/replication/walreceiver.h TupleDesc tupdesc; // 建立一個行描述變量 Datum DatumValue[8]; // 建立一個存儲值的Datum數組, 須要返回幾個字段, 建立相應長度的數組 bool nulls[8]; // 元組中的屬性數 typedef struct TupleDescData -> natts; /* Initialise DatumValue and NULL flags arrays 初始化 */ MemSet(DatumValue, 0, sizeof(DatumValue)); MemSet(nulls, 0, sizeof(nulls)); /* Initialise attributes information in the tuple descriptor 定義字段類型和字段名, 到相應的頭文件src/include/catalog/pg_type.h找到對應的類型 */ /* CreateTemplateTupleDesc 該函數分配一個空的元組描述符結構。 元組類型的ID信息被初始設置爲一個匿名記錄類型; 若是須要調用者能夠覆蓋此。 TupleDescInitEntry: 此函數在先前分配的元組描述符中初始化單個屬性結構。 若是attributeName爲NULL,則attname字段將設置爲空字符串(這是在咱們不知道或不須要該字段名稱的狀況下)。 另外,某些調用者使用此功能來更改現有tupdesc中與數據類型相關的字段;例如, 它們傳遞attributeName = NameStr(att-> attname)來指示不該修改attname字段。 請注意,attcollation設置爲指定數據類型的默認值。 若是須要非默認排序規則,請稍後使用TupleDescInitEntryCollation插入它。 */ tupdesc = CreateTemplateTupleDesc(8, false); TupleDescInitEntry(tupdesc, (AttrNumber) 1, "last_walend_time", TIMESTAMPTZOID, -1, 0); TupleDescInitEntry(tupdesc, (AttrNumber) 2, "last_recv_lsn", LSNOID, -1, 0); TupleDescInitEntry(tupdesc, (AttrNumber) 3, "last_apply_lsn", LSNOID, -1, 0); TupleDescInitEntry(tupdesc, (AttrNumber) 4, "last_apply_delay_ms", INT4OID, -1, 0); TupleDescInitEntry(tupdesc, (AttrNumber) 5, "receiver_pid", INT4OID, -1, 0); TupleDescInitEntry(tupdesc, (AttrNumber) 6, "receiver_state", INT4OID, -1, 0); TupleDescInitEntry(tupdesc, (AttrNumber) 7, "receiver_start_time", INT8OID, -1, 0); TupleDescInitEntry(tupdesc, (AttrNumber) 8, "receiver_conninfo", TEXTOID, -1, 0); BlessTupleDesc(tupdesc); // 完成對返回類型的構造, 參考src/include/funcapi.h // 接下來將每一個值轉換爲對應的Datum存儲到DatumValue數組, 對應的nulls數組僅當值爲空時設置爲true. TimestampTz receipttime; receipttime = walrcv->latestWalEndTime; // TimestampTz latestWalEndTime; DatumValue[0] = TimestampTzGetDatum(receipttime); // #define TimestampTzGetDatum(X) Int64GetDatum(X) XLogRecPtr recvPtr; // src/include/access/xlogdefs.h 指向XLOG中的位置的指針。 這些指針爲64位寬,由於咱們不但願它們溢出。 /* src/backend/replication/walreceiverfuncs.c: GetWalRcvWriteRecPtr 返回walreceiver已寫入的最後1個字節位置。 (可選)返回上一個塊開始,即在最近的walreceiver刷新週期中寫入的第一個字節。 對該值不感興趣的調用者能夠爲lastChunkStart傳遞NULL。 與receiveTLI相同。 */ recvPtr = GetWalRcvWriteRecPtr(NULL, NULL); // XLogRecPtr recptr; if (recvPtr == 0) nulls[1] = true; else DatumValue[1] = LSNGetDatum(recvPtr); // #define LSNGetDatum(X) (Int64GetDatum((int64) (X))) /* xlog.c: GetXLogReplayRecPtr 獲取最新的重作apply position。 導出以容許WALReceiver直接讀取指針。 */ XLogRecPtr applyPtr; applyPtr = GetXLogReplayRecPtr(NULL); if (recvPtr == 0) nulls[2] = true; else DatumValue[2] = LSNGetDatum(applyPtr); int apply_delay_ms; /* walreceiverfuncs.c: GetReplicationApplyDelay 若是沒有應用延遲信息,則返回複製應用延遲(以毫秒爲單位)或-1 */ apply_delay_ms = GetReplicationApplyDelay(); if (apply_delay_ms == -1) nulls[3] = true; else DatumValue[3] = Int32GetDatum(apply_delay_ms); DatumValue[4] = Int32GetDatum(walrcv->pid); DatumValue[5] = Int32GetDatum(walrcv->walRcvState); DatumValue[6] = Int64GetDatum(walrcv->startTime); DatumValue[7] = PointerGetDatum(cstring_to_text((char *)walrcv->conninfo)); // 返回 /* Returns the record as Datum 把元組變成 datum */ PG_RETURN_DATUM(HeapTupleGetDatum( heap_form_tuple(tupdesc, DatumValue, nulls))); // heap_form_tuple 構造一個tuple 返回 HeapTupleHeader , HeapTupleGetDatum 將HeapTupleHeader指針轉換爲Datum。 }