Citus7.4-Citus 9.3新特性解析

最近開始着手Citus7.4到Citus 9.3的升級,因此比較全面地瀏覽了這期間的Citus變動。 從Citus7.4到Citus 9.3不少方面的改進,本文只列出一些比較重要的部分。html

如下用到了一些示例,示例的驗證環境以下node

軟件git

  • PostreSQL 12
  • Citus 9.3

集羣成員github

  • CN
    • 127.0.0.1:9000
  • Worker
    • 127.0.0.1:9001
    • 127.0.0.1:9002

SQL支持加強類

1.支持非分區列的count distinct

這個Citus 7.4應該已經支持了,不知道是Citus的Changelog更新延誤,仍是Citus 7.5支持得更完善了。算法

表定義sql

create table tb1(id int,c1 int);
select create_distributed_table('tb1','id');

非分區列的count distinct的執行計劃數據庫

postgres=# explain select count(distinct c1) from tb1;
                                        QUERY PLAN                                        
------------------------------------------------------------------------------------------
 Aggregate  (cost=250.00..250.01 rows=1 width=8)
   ->  Custom Scan (Citus Adaptive)  (cost=0.00..0.00 rows=100000 width=4)
         Task Count: 32
         Tasks Shown: One of 32
         ->  Task
               Node: host=127.0.0.1 port=9001 dbname=postgres
               ->  HashAggregate  (cost=38.25..40.25 rows=200 width=4)
                     Group Key: c1
                     ->  Seq Scan on tb1_102339 tb1  (cost=0.00..32.60 rows=2260 width=4)
(9 rows)

2.支持UPSERT

支持UPSERT,即支持INSERT INTO SELECT..ON CONFLICT/RETURNINGjson

表定義api

create table tb1(id int, c1 int);
select create_distributed_table('tb1','id');
create table tb2(id int primary key, c1 int);
select create_distributed_table('tb2','id');

UPSERT SQL執行緩存

postgres=# INSERT INTO tb2
  SELECT * from tb1
  ON CONFLICT(id) DO UPDATE SET c1 = EXCLUDED.c1;
INSERT 0 1

3.支持GENERATED ALWAYS AS STORED

使用示例以下:

create table tbgenstore(id int, c1 int GENERATE ALWAYS AS (id+1)STORED);
select create_distributed_table('tbgenstore','id');

4.支持用戶定義的分佈式函數

支持用戶自定義分佈式函數。Citus會把分佈式函數(包括聚合函數)以及依賴的對象定義下發到全部Worker上。 後續在執行SQL的時候也能夠合理的把分佈式函數的執行下推到Worker。

分佈式函數還能夠和某個分佈表綁定"親和"關係,這一個特性的使用場景以下:

在多租戶類型的業務中,把單個租戶的一個事務中的多個SQL打包成一個「分佈式函數」下發到Worker上。 CN只須要下推一次分佈式函數的調用,分佈式函數內部的多個SQL的執行所有在Worker節點內部完成。 避免CN和Worker之間來回交互,能夠大大提高OLTP的性能(利用這個特性去跑TPCC,簡直太溜了!)。

下面看下手冊裏的例子。

https://docs.citusdata.com/en/v9.3/develop/api_udf.html?highlight=distributed%20function#create-distributed-function

-- an example function which updates a hypothetical
-- event_responses table which itself is distributed by event_id
CREATE OR REPLACE FUNCTION
  register_for_event(p_event_id int, p_user_id int)
RETURNS void LANGUAGE plpgsql AS $fn$
BEGIN
  INSERT INTO event_responses VALUES ($1, $2, 'yes')
  ON CONFLICT (event_id, user_id)
  DO UPDATE SET response = EXCLUDED.response;
END;
$fn$;

-- distribute the function to workers, using the p_event_id argument
-- to determine which shard each invocation affects, and explicitly
-- colocating with event_responses which the function updates
SELECT create_distributed_function(
  'register_for_event(int, int)', 'p_event_id',
  colocate_with := 'event_responses'
);

5.徹底支持聚合函數

Citus中對聚合函數有3種不一樣的執行方式

  1. 按照分片字段分組的聚合,直接下推到Worker執行聚合
  2. 對部分Citus可以識別的聚合函數,Citus執行兩階段聚合,如今Worker執行部分聚合,再把結果彙總到CN上進行最終聚合。
  3. 對其餘的聚合函數,Citus把數據拉到CN上,在CN上執行聚合。

詳細參考,https://docs.citusdata.com/en/v9.3/develop/reference_sql.html?highlight=Aggregation#aggregate-functions

顯然第3種方式性能會比較差,對不按分片字段分組的聚合,怎麼讓它按第2種方式執行呢?

Citus中預約義了一部分聚合函數能夠按第2中方式執行。

citus-9.3.0/src/include/distributed/multi_logical_optimizer.h:

static const char *const AggregateNames[] = {
	"invalid", "avg", "min", "max",
	"sum", "count", "array_agg",
	"jsonb_agg", "jsonb_object_agg",
	"json_agg", "json_object_agg",
	"bit_and", "bit_or", "bool_and", "bool_or", "every",
	"hll_add_agg", "hll_union_agg",
	"topn_add_agg", "topn_union_agg",
	"any_value"
};

對不在上面白名單的聚合函數,好比用戶自定義的聚合函數,能夠經過create_distributed_function()添加。 示例以下:

citus-9.3.0/src/test/regress/expected/aggregate_support.out:

create function sum2_sfunc(state int, x int)
returns int immutable language plpgsql as $$
begin return state + x;
end;
$$;
create function sum2_finalfunc(state int)
returns int immutable language plpgsql as $$
begin return state * 2;
end;
$$;
create aggregate sum2 (int) (
    sfunc = sum2_sfunc,
    stype = int,
    finalfunc = sum2_finalfunc,
    combinefunc = sum2_sfunc,
    initcond = '0'
);

select create_distributed_function('sum2(int)');

執行這個自定義的聚合函數的執行計劃以下

postgres=# explain select sum2(c1) from tb1;
                                        QUERY PLAN                                        
------------------------------------------------------------------------------------------
 Aggregate  (cost=250.00..250.01 rows=1 width=4)
   ->  Custom Scan (Citus Adaptive)  (cost=0.00..0.00 rows=100000 width=32)
         Task Count: 32
         Tasks Shown: One of 32
         ->  Task
               Node: host=127.0.0.1 port=9001 dbname=postgres
               ->  Aggregate  (cost=38.25..38.26 rows=1 width=32)
                     ->  Seq Scan on tb1_102339 tb1  (cost=0.00..32.60 rows=2260 width=4)
(8 rows)

可是當前這種方式不支持stype = internal的自定義聚合函數。 Citus社區已經在對應這個問題,詳細參考https://github.com/citusdata/citus/issues/3916

6.徹底支持窗口函數

對不按分片字段分組的聚合函數,Citus支持把數據拉到CN上再執行,和聚合函數類型。 須要注意這種執行方式對性能的影響,特別是包含多個不一樣分組字段的窗口函數時, Worker拉到CN上結果集是這些字段組合的笛卡爾積。

7.支持在事務塊中傳播LOCAL參數

當在CN的事務塊中設置LOCAL參數時,能夠把這個參數傳播到Worker節點。

前提條件是citus.propagate_set_commands參數必須爲local

set citus.propagate_set_commands TO local;

事務塊中設置LOCAL參數

postgres=# begin;
BEGIN
postgres=*# set local enable_hashagg to off;
SET
postgres=*# SELECT current_setting('enable_hashagg') FROM tb1 WHERE id = 3;
 current_setting 
-----------------
 off
(1 row)

8. 支持本地表和參考表Join

若是一個數據庫須要用到本地表,而本地表和以參考表的形式部署的維表又有Join的需求,改如何處理?

原來咱們只能在CN上再建立一套本地的維表,而後由應用或者經過觸發器維護兩套維表之間的數據同步。

如今能夠用更簡單的方式實現。 具體就是把CN節點也能夠做爲一個Worker加到Citus集羣裏,groupid必定要設置爲0。

SELECT master_add_node('127.0.0.1', 9001, groupid => 0);

這樣CN上也就和其餘Worker同樣擁有了參考表的一個副本,本地表和參考表Join的時候就直接在本地執行了。

DDL支持加強

9.支持把SCHEMA的賦權廣播到Worker上

GRANT USAGE ON SCHEMA dist_schema TO role1;

10.支持修改表SCHEMA廣播到Worker上

ALTER TABLE ... SET SCHEMA

11.支持建立索引時指定INCLUDE選項

create index tb1_idx_id on tb1(id) include (c1);

12. 支持使用CONCURRENTLY選項建立索引

create index CONCURRENTLY tb1_idx_id2 on tb1(id);

13. 支持傳播REINDEX到worker節點上

以前版本reindex不能傳播到Worker節點,還須要到每一個worker分別執行reindex。 新版的Citus支持了。

reindex index tb1_idx_id;

Citus MX功能加強

14.支持在MX 節點上對參考表執行DML

表定義

create table tbref(id int, c1 int);
select create_refence_table('tbref');

在MX worker(即擴展worker)上修改參考表

postgres=# insert into tbref values(1,1),(2,2);
INSERT 0 2
postgres=# update tbref set c1=10;
UPDATE 2
postgres=# delete from tbref where id=1;
DELETE 1
postgres=# select * from tbref;
 id | c1 
----+----
  2 | 10
(1 row)

15.支持在MX節點上執行TRUNCATE

以前MX節點上是不支持對分佈表和參考表執行truncate操做的。如今也支持了

postgres=# truncate tb1;
TRUNCATE TABLE
postgres=# truncate tbref;
TRUNCATE TABLE

16.支持在Citus MX架構下使用serial和smallserial

以前在Citus MX(即多CN部署)環境下,自增序列只能使用bigserial類型,如今也能夠支持serial和smallserial了。

表定義

create table tbserial(id int,c1 int);
select create_distributed_table('tbserial','id');

Citus中,自增字段經過CN和MX節點上邏輯表上的序列對象實現。

postgres=# \d tbserial
                            Table "public.tbserial"
 Column |  Type   | Collation | Nullable |               Default                
--------+---------+-----------+----------+--------------------------------------
 id     | integer |           | not null | nextval('tbserial_id_seq'::regclass)
 c1     | integer |           |          |

爲了防止多個MX節點產生的序列衝突。在Citus MX環境下,序列值的開頭部分是產生序列的節點的groupid,後面纔是順序累加的值。 這等於按groupid把序列值分紅了不一樣的範圍,互不重疊。

即:

全局序列值 = groupid,節點內的順序遞增值

對不一樣serial的數據類型,groupid佔的位數是不同的。具體以下

  • bigserial:16bit
  • serial:4bit
  • smallserial:4bit

根據上groupid佔的長度,咱們須要注意

  1. 單個節點(CN或擴展Worker)上,能產生的序列值的數量變少了,要防止溢出。
  2. 若是使用了serial或smallserial,最多部署7個擴展Worker節點。

序列對象的定義

上面提到的全局序列的實現具體體現爲:在不一樣節點上,序列對象定義的範圍不同。以下

CN節點上的序列對象定義(CN節點的groupid固定爲0)

postgres=# \d tbserial_id_seq
                  Sequence "public.tbserial_id_seq"
  Type   | Start | Minimum |  Maximum   | Increment | Cycles? | Cache 
---------+-------+---------+------------+-----------+---------+-------
 integer |     1 |       1 | 2147483647 |         1 | no      |     1
Owned by: public.tbserial.id

MX Worker節點上的序列對象定義(groupid=1)

postgres=# \d tbserial_id_seq
                    Sequence "public.tbserial_id_seq"
  Type  |   Start   |  Minimum  |  Maximum  | Increment | Cycles? | Cache 
--------+-----------+-----------+-----------+-----------+---------+-------
 bigint | 268435457 | 268435457 | 536870913 |         1 | no      |     1

如何知道每一個Worker節點的groupid?

每一個Worker節點的groupid能夠從pg_dist_node獲取。

postgres=# select * from pg_dist_node;
 nodeid | groupid | nodename  | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster | metadatasynced | shouldhaveshards 
--------+---------+-----------+----------+----------+-------------+----------+----------+-------------+----------------+------------------
      2 |       2 | 127.0.0.1 |     9002 | default  | t           | t        | primary  | default     | t              | t
      1 |       1 | 127.0.0.1 |     9001 | default  | t           | t        | primary  | default     | t              | t
(2 rows)

也能夠在每一個節點本地查詢pg_dist_local_group得到本節點的groupid。

postgres=# select * from pg_dist_local_group;
 groupid 
---------
       1
(1 row)

CN節點和普通的Worker節點(非MX Worker)的pg_dist_local_group中查詢到的groupid都爲0.

17.在Citus MX經過本地執行提高性能

以前測試Citus MX架構的時候發現,當Citus MX節點上放分片時,性能比不放分片差一倍。 新版的Citus在這方面作了優化,當在Citus MX節點上訪問本節點上的分片時,再也不走新建一個到本地的數據庫鏈接再讀寫分片的常規執行方式。 而是直接用當前鏈接訪問分片。根據下面的測試數據,性能能夠提高一倍。

https://github.com/citusdata/citus/pull/2938

- Test 1: HammerDB test with 250 users, 1,000,000 transactions per. 8 Node Citus MX 
          - (a) With local execution: `System achieved 116473 PostgreSQL TPM at 160355 NOPM`   
          - (b) without local execution: ` System achieved 61392 PostgreSQL TPM at 100503 NOPM`

  - Test 2: HammerDB test with 250 users, 10,000,000 transactions per. 8 Node Citus MX 
           - (a) With local execution: `System achieved 91921 PostgreSQL TPM at 174557 NOPM`   
           - (b) without local execution: ` System achieved 84186 PostgreSQL TPM at 98408 NOPM`

- Test 3: Pgbench, 1 worker node, -c64 -c256 -T 120
            - (a) Local execution enabled (tps): `select-only`: 56202   `simple-update`:  11771 `tpcb-like`: 7796
            - (a) Local execution disabled (tps): `select-only`:  24524 `simple-update`: 5077  `tpcb-like`:   3510 (some connection errors for tpcb-like)

在我司的多CN部署方式下,擴展Worker上是不放分片的。因此這個優化和咱們無關。

性能加強

18.替換real-time爲新的執行器Adaptive Executor

Adaptive Executor是一個新的執行器,它和real-time的差別主要體如今能夠經過參數對CN到worker的鏈接數進行控制。具體以下:

  1. citus.max_shared_pool_size 能夠經過citus.max_shared_pool_size控制CN(或MX Worker)在單個Worker上可同時創建的最大鏈接數,默認值等於CN的max_connections。 達到鏈接數使用上限後,新的SQL請求可能等待,有些操做不受限制,好比COPY和重分區的Join。 Citus MX架構下,單個Worker上同時接受到鏈接數最大多是 max_shared_pool_size * (1 + MX Worker節點數)

  2. citus.max_adaptive_executor_pool_size 能夠經過citus.max_adaptive_executor_pool_size控制CN(或MX Worker)上的單個會話在單個Worker上可同時創建的最大鏈接數,默認值等於16。

  3. citus.max_cached_conns_per_worker 能夠經過citus.max_cached_conns_per_worker控制CN(或MX Worker)上的單個會話在事務結束後對每一個Worker緩存的鏈接數,默認值等於1。

  4. citus.executor_slow_start_interval 對於執行時間很短的多shard的SQL,併發開多個鏈接,不只頻繁建立銷燬鏈接的消耗很高,也極大的消耗了worker上有限的鏈接資源。 adaptive執行器,在執行多shard的SQL時,不是一次就建立出全部須要的鏈接數,而是先建立一部分,隔一段時間再建立一部分。 中途若是有shard的任務提早完成了,它的鏈接能夠被複用,就能夠減小對新建鏈接的需求。 所以執行多shard的SQL最少只須要一個鏈接,最多不超過max_adaptive_executor_pool_size,固然也不會超過目標worker上的shard數。

    這個算法叫"慢啓動",慢啓動的間隔由參數citus.executor_slow_start_interval控制,默認值爲10ms。 初始建立的鏈接數是:max(1,citus.max_cached_conns_per_worker),以後每批新建的鏈接數都在前一批的基礎上加1。 即默認狀況下,每批新建的鏈接數依次爲1,2,3,4,5,6...

    "慢啓動"主要優化了短查詢,對長查詢(手冊上給的標準是大於500ms),會增長必定的響應時間。

下面看幾個例子

citus.max_shared_pool_size的使用示例

postgres=# alter system set citus.max_shared_pool_size to 4;
ALTER SYSTEM
postgres=# select pg_reload_conf();
 pg_reload_conf 
----------------
 t
(1 row)
postgres=# begin;
BEGIN
postgres=*# update tb1 set c1=11;
UPDATE 1
postgres=*# select * from citus_remote_connection_stats();
 hostname  | port | database_name | connection_count_to_node 
-----------+------+---------------+--------------------------
 127.0.0.1 | 9002 | postgres      |                        4
 127.0.0.1 | 9001 | postgres      |                        4
(2 rows)

citus.executor_slow_start_interval的使用示例

tb1總共有32個分片,每一個worker上有16個分片。 初始每一個worker上保持2個鏈接

postgres=# select * from citus_remote_connection_stats();
 hostname  | port | database_name | connection_count_to_node 
-----------+------+---------------+--------------------------
 127.0.0.1 | 9002 | postgres      |                        2
 127.0.0.1 | 9001 | postgres      |                        2
(2 rows)

citus.executor_slow_start_interval = '10ms'時,執行一個空表的update,只額外建立了2個新鏈接。

postgres=# set citus.executor_slow_start_interval='10ms';
SET
postgres=# begin;
BEGIN
postgres=*# update tb1 set c1=100;
UPDATE 0
postgres=*# select * from citus_remote_connection_stats();
 hostname  | port | database_name | connection_count_to_node 
-----------+------+---------------+--------------------------
 127.0.0.1 | 9002 | postgres      |                        4
 127.0.0.1 | 9001 | postgres      |                        4
(2 rows)

citus.executor_slow_start_interval = '500ms'時,沒有建立新的鏈接,都複用了一個緩存的鏈接

postgres=# set citus.executor_slow_start_interval='500ms';
SET
postgres=# begin;
BEGIN
postgres=*# update tb1 set c1=100;
UPDATE 0
postgres=*# select * from citus_remote_connection_stats();
 hostname  | port | database_name | connection_count_to_node 
-----------+------+---------------+--------------------------
 127.0.0.1 | 9002 | postgres      |                        2
 127.0.0.1 | 9001 | postgres      |                        2
(2 rows)

citus.executor_slow_start_interval = '0ms'時,建立了比較多的新鏈接。

postgres=# set citus.executor_slow_start_interval = '0ms';
SET
postgres=# begin;
BEGIN
postgres=*# update tb1 set c1=100;
UPDATE 0
postgres=*# select * from citus_remote_connection_stats();
 hostname  | port | database_name | connection_count_to_node 
-----------+------+---------------+--------------------------
 127.0.0.1 | 9002 | postgres      |                        5
 127.0.0.1 | 9001 | postgres      |                       14
(2 rows)

參考

adaptive執行器鏈接建立"慢啓動"的代碼參考:

citus-9.3.0/src/backend/distributed/executor/adaptive_executor.c:

static void
ManageWorkerPool(WorkerPool *workerPool)
{
...
		/* cannot open more than targetPoolSize connections */
		int maxNewConnectionCount = targetPoolSize - initiatedConnectionCount;//targetPoolSize的值爲max(1,`citus.max_cached_conns_per_worker`)

		/* total number of connections that are (almost) available for tasks */
		int usableConnectionCount = UsableConnectionCount(workerPool);

		/*
		 * Number of additional connections we would need to run all ready tasks in
		 * parallel.
		 */
		int newConnectionsForReadyTasks = readyTaskCount - usableConnectionCount;

		/*
		 * Open enough connections to handle all tasks that are ready, but no more
		 * than the target pool size.
		 */
		newConnectionCount = Min(newConnectionsForReadyTasks, maxNewConnectionCount);

		if (newConnectionCount > 0 && ExecutorSlowStartInterval != SLOW_START_DISABLED)
		{
			if (MillisecondsPassedSince(workerPool->lastConnectionOpenTime) >=
				ExecutorSlowStartInterval)
			{
				newConnectionCount = Min(newConnectionCount,
										 workerPool->maxNewConnectionsPerCycle);

				/* increase the open rate every cycle (like TCP slow start) */
				workerPool->maxNewConnectionsPerCycle += 1;
			}
			else
			{
				/* wait a bit until opening more connections */
				return;
			}
		}

19.經過adaptive執行器執行重分佈的Join

citus.enable_repartition_joins=on時,Citus支持經過數據重分佈的方式執行非親和Inner Join, 以前版本Citus會自動切換到task-tracker執行器執行重分佈的Join,可是使用task-tracker執行器須要CN節點給Worker下發任務再不斷檢查任務完成狀態,其額外消耗很大,響應時間很是長。

新版Citus改進後,能夠經過adaptive執行器執行重分佈的Join。

根據官網博客,1000w如下數據的重分佈Join,性能提高了10倍。 詳細參考:https://www.citusdata.com/blog/2020/03/02/citus-9-2-speeds-up-large-scale-htap/

咱們本身的簡單測試中,2張空表的重分佈Join,以前須要16秒,如今只須要2秒。

20.支持重分佈的方式執行INSERT...SELECT

表定義

create table tb1(id int, c1 int);
select create_distributed_table('tb1','id');
set citus.shard_count to 16;
create table tb2(id int primary key, c1 int);
select create_distributed_table('tb2','id');

tb1和tb2的分片數不同,即它們不是親和的。 此前,Citus必須把數據全拉到CN節點上中轉。 新版Citus能夠經過重分佈的方式執行這個SQL,各個Worker之間直接互相傳送數據,CN節點只執行工具函數驅動任務執行,性能可大幅提高。

postgres=# explain INSERT INTO tb2
  SELECT * from tb1
  ON CONFLICT(id) DO UPDATE SET c1 = EXCLUDED.c1;
                                     QUERY PLAN                                     
------------------------------------------------------------------------------------
 Custom Scan (Citus INSERT ... SELECT)  (cost=0.00..0.00 rows=0 width=0)
   INSERT/SELECT method: repartition
   ->  Custom Scan (Citus Adaptive)  (cost=0.00..0.00 rows=100000 width=8)
         Task Count: 32
         Tasks Shown: One of 32
         ->  Task
               Node: host=127.0.0.1 port=9001 dbname=postgres
               ->  Seq Scan on tb1_102339 tb1  (cost=0.00..32.60 rows=2260 width=8)
(8 rows)

根據官網博客,這項優化使性能提高了5倍。 詳細參考:https://www.citusdata.com/blog/2020/03/02/citus-9-2-speeds-up-large-scale-htap/

須要注意的是,若是插入時,須要在目標表上自動生成自增字段,Citus會退回到原來的執行方式,數據都會通過CN中轉一下。

21.支持以輪詢的方式訪問參考表的多個副本

以前Citus查詢參考表時,始終只訪問參考表的第一個副本,新版Citus能夠經過參數設置,在參考表多個副本輪詢訪問,均衡負載。

postgres=# set citus.task_assignment_policy TO "round-robin";
SET
postgres=# explain select * from tbref;
                                    QUERY PLAN                                    
----------------------------------------------------------------------------------
 Custom Scan (Citus Adaptive)  (cost=0.00..0.00 rows=0 width=0)
   Task Count: 1
   Tasks Shown: All
   ->  Task
         Node: host=127.0.0.1 port=9001 dbname=postgres
         ->  Seq Scan on tbref_102371 tbref  (cost=0.00..32.60 rows=2260 width=8)
(6 rows)

postgres=# explain select * from tbref;
                                    QUERY PLAN                                    
----------------------------------------------------------------------------------
 Custom Scan (Citus Adaptive)  (cost=0.00..0.00 rows=0 width=0)
   Task Count: 1
   Tasks Shown: All
   ->  Task
         Node: host=127.0.0.1 port=9002 dbname=postgres
         ->  Seq Scan on tbref_102371 tbref  (cost=0.00..32.60 rows=2260 width=8)
(6 rows)

citus.task_assignment_policy的默認值是greedy。greedy比較適合多副本的分佈表。 對於涉及多個shard的SQL,每一個shard都有多個可選的副本,在greedy策略下, Citus會盡可能確保每一個worker分配到任務數相同。

具體實現時Citus一次輪詢全部Worker,直到把全部shard任務都分配完。 所以對參考表這種只有一個shard的場景,greedy會致使其始終把任務分配給第一個worker。 詳細能夠參考GreedyAssignTaskList()函數的代碼。

22.表數據導出優化

Citus導出數據時,中間結果會寫到在CN上,並且CN從Worker拉數據是並行拉的,不過Worker仍是CN負載都會很高。 新版Citus優化了COPY導出處理,依次從每一個Worker上抽出數返回給客戶端,中途數據不落盤。

可是這一優化只適用於下面這種固定形式的全表COPY到STDOUT的場景

COPY table tb1 to STDOUT

這能夠大大優化pg_dump,延遲更低,內存使用更少。

集羣管理加強

23.支持控制worker不分配shard

能夠經過設置節點的shouldhaveshards屬性控制某個節點不放分片。

SELECT master_set_node_property('127.0.0.1', 9002, 'shouldhaveshards', false);

shouldhaveshards屬性會對後續建立新的分佈表和參考表生效。 也會對後續執行的企業版Citus的rebalance功能生效,社區版不支持rebalance,但若是自研Citus部署和維護工具也能夠利用這個參數。

  • 擴展Worker的實現邏輯改成使用這個參數,簡化處理邏輯,不用先建好分佈表後再挪分片。
  • 擴縮容腳本也可使用這個參數決定Worker上是否放置分片,不須要區分是否是所有是擴展Worker的部署架構

24.支持使用master_update_node實施failover

採用主備流複製實現Worker高可用時,通常CN經過VIP訪問Worker,worker主備切換時只須要漂移vip到新的主節點便可。 新版Citus提供了一個新的可選方案,經過master_update_node()函數修改某個worker的IP和Port。 這提供了一種新的不依賴VIP的Worker HA實現方案。

postgres=# \df master_update_node
List of functions
-[ RECORD 1 ]-------+-----------------------------------------------------------------------------------------------------------------------------
Schema              | pg_catalog
Name                | master_update_node
Result data type    | void
Argument data types | node_id integer, new_node_name text, new_node_port integer, force boolean DEFAULT false, lock_cooldown integer DEFAULT 10000
Type                | func

25.支持變動親和定義

新版Citus能夠在分佈表建立後,修改親和關係。

表定義

create table tba(id int,c1 int);
select create_distributed_table('tba','id');
create table tbb(id int,c1 int);
select create_distributed_table('tbb','id');
create table tbc(id text,c1 int);
select create_distributed_table('tbc','id');

tba和tbb這兩個表是親和的

postgres=# select * from pg_dist_partition where logicalrelid in ('tba'::regclass,'tbb'::regclass);
 logicalrelid | partmethod |                                                        partkey                                                         | colocationid | repmodel 
--------------+------------+------------------------------------------------------------------------------------------------------------------------+--------------+----------
 tba          | h          | {VAR :varno 1 :varattno 1 :vartype 23 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1} |            3 | s
 tbb          | h          | {VAR :varno 1 :varattno 1 :vartype 23 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1} |            3 | s
(2 rows)

將tbb設置爲新的親和ID,打破它們的親和關係

postgres=# SELECT update_distributed_table_colocation('tbb', colocate_with => 'none');
 update_distributed_table_colocation 
-------------------------------------
 
(1 row)

postgres=# select * from pg_dist_partition where logicalrelid in ('tba'::regclass,'tbb'::regclass);
 logicalrelid | partmethod |                                                        partkey                                                         | colocationid | repmodel 
--------------+------------+------------------------------------------------------------------------------------------------------------------------+--------------+----------
 tba          | h          | {VAR :varno 1 :varattno 1 :vartype 23 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1} |            3 | s
 tbb          | h          | {VAR :varno 1 :varattno 1 :vartype 23 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1} |           14 | s
(2 rows)

從新設置它們親和

postgres=# SELECT update_distributed_table_colocation('tbb', colocate_with => 'tba');
 update_distributed_table_colocation 
-------------------------------------
 
(1 row)

postgres=# select * from pg_dist_partition where logicalrelid in ('tba'::regclass,'tbb'::regclass);
 logicalrelid | partmethod |                                                        partkey                                                         | colocationid | repmodel 
--------------+------------+------------------------------------------------------------------------------------------------------------------------+--------------+----------
 tba          | h          | {VAR :varno 1 :varattno 1 :vartype 23 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1} |            3 | s
 tbb          | h          | {VAR :varno 1 :varattno 1 :vartype 23 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1} |            3 | s
(2 rows)

也能夠用批量將一組表設置爲和某一個表親和

postgres=# SELECT mark_tables_colocated('tba', ARRAY['tbb', 'tbc']);
ERROR:  cannot colocate tables tba and tbc
DETAIL:  Distribution column types don't match for tba and tbc.

tbc的分片字段類型不一致,不能親和,去掉tbc再次執行成功。

postgres=# SELECT mark_tables_colocated('tba', ARRAY['tbb']);
 mark_tables_colocated 
-----------------------
 
(1 row)

26.支持truncate分佈表的本地數據

把一個原來就有數據的本地表建立成分佈表,會把原來的數據拷貝到各個shard上,但原始本地表上的數據不會刪除,只是對用戶不可見。

原來沒有直接的辦法刪掉這些不須要的本地數據(能夠經過臨時篡改元數據的方式刪),如今能夠用一個函數實現。

SELECT truncate_local_data_after_distributing_table('tb1');

27. 延遲複製參考表副本

當新的Worker節點添加到Citus集羣的時候,會同步參考表的副本到上面。 若是集羣中存在比較大參考表,會致使添加Worker節點的時間不可控。 這可能使得用戶不敢在業務高峯期擴容節點。

如今Citus能夠支持把參考表的同步延遲到下次建立分片的的時候。 方法就是設置下面這個參數爲off,它的默認值爲on。

citus.replicate_reference_tables_on_activate = off

這樣咱們能夠在白天擴容,夜裏在後臺同步數據。

28.建立集羣範圍一致的恢復點

以前咱們備份Citus集羣的時候,都是各個節點各自備份恢復,真發生故障,沒辦法恢復到一個集羣範圍的一致點。

如今可使用下面的函數,建立一個全局的恢復點實行全局一致性備份。 使用方法相似於PG的pg_create_restore_point(),詳細可參考手冊。

select citus_create_restore_point('foo');

29.支持設置Citus集羣節點間互聯的鏈接選項

能夠經過citus.node_conninfo參數設置Citus內節點間互連的一些非敏感的鏈接選項。 支持鏈接選項下面的libpq的一個子集。

  • application_name
  • connect_timeout
  • gsslib
  • keepalives
  • keepalives_count
  • keepalives_idle
  • keepalives_interval
  • krbsrvname
  • sslcompression
  • sslcrl
  • sslmode (defaults to 「require」 as of Citus 8.1)
  • sslrootcert

Citus 8.1之後,在支持SSL的PostgreSQL上,citus.node_conninfo的默認值爲'sslmode=require'。 即默認開啓了SSL。這是Citus出於安全的考慮,可是啓用SSL後部署和維護會比較麻煩。 所以咱們的部署環境下,須要將其修改成sslmode=prefer

postgres=# show citus.node_conninfo;
 citus.node_conninfo 
---------------------
 sslmode=prefer
(1 row)

30.默認關閉Citus統計收集

以前Citus的守護進程默認會收集Citus集羣的一些元數據信息上報到CitusData公司的服務上(明顯有安全問題)。 新版本把這個功能默認關閉了。固然更完全的作法是在編譯Citus的時候就把這個功能屏蔽掉。

postgres=# show citus.enable_statistics_collection;
 citus.enable_statistics_collection 
------------------------------------
 off
(1 row)

31. 增長查看集羣範圍活動的函數和視圖

新版Citus提供了幾個函數和視圖,能夠在CN上很是方便的查看總體Citus的當前活動情況

  • citus_remote_connection_stats()

查看全部worker上的來自CN節點和MX Worker節點的遠程鏈接數。

postgres=# select * from citus_remote_connection_stats();
 hostname  | port | database_name | connection_count_to_node 
-----------+------+---------------+--------------------------
 127.0.0.1 | 9002 | postgres      |                        3
 127.0.0.1 | 9001 | postgres      |                        3
(2 rows)
  • citus_dist_stat_activity

查看從本CN節點或MX worker上發起的活動。這個視圖在pg_stat_activity上附加了一些Citus相關的信息。

postgres=# select * from citus_dist_stat_activity;
-[ RECORD 1 ]----------+------------------------------
query_hostname         | coordinator_host
query_hostport         | 9000
master_query_host_name | coordinator_host
master_query_host_port | 9000
transaction_number     | 57
transaction_stamp      | 2020-06-19 15:05:22.142242+08
datid                  | 13593
datname                | postgres
pid                    | 2574
usesysid               | 10
usename                | postgres
application_name       | psql
client_addr            | 
client_hostname        | 
client_port            | -1
backend_start          | 2020-06-19 10:57:58.472994+08
xact_start             | 2020-06-19 15:05:17.45487+08
query_start            | 2020-06-19 15:05:22.140954+08
state_change           | 2020-06-19 15:05:22.140957+08
wait_event_type        | Client
wait_event             | ClientRead
state                  | active
backend_xid            | 
backend_xmin           | 5114
query                  | select * from tb1;
backend_type           | client backend

注意上面的transaction_number,它表明一個事務號。 涉及更新的SQL,事務塊中查詢和push-pull方式執行的查詢都會分配一個非0的事務號。 經過這個事務號,咱們能夠很容易地識別出全部worker上來自同一SQL(或事務)的活動。

詳細參考下面的註釋。(這段註釋應該寫錯了,下面2類SQL的區別不是是否能被'show',而是transaction_number是否非0) citus-9.3.0/src/backend/distributed/transaction/citus_dist_stat_activity.c

*  An important note on this views is that they only show the activity
 *  that are inside distributed transactions. Distributed transactions
 *  cover the following:
 *     - All multi-shard modifications (DDLs, COPY, UPDATE, DELETE, INSERT .. SELECT)
 *     - All multi-shard queries with CTEs (modifying CTEs, read-only CTEs)
 *     - All recursively planned subqueries
 *     - All queries within transaction blocks (BEGIN; query; COMMMIT;)
 *
 *  In other words, the following types of queries won't be observed in these
 *  views:
 *      - Single-shard queries that are not inside transaction blocks
 *      - Multi-shard select queries that are not inside transaction blocks
 *      - Task-tracker queries
  • citus_worker_stat_activity

查看全部worker上的活動。排除非citus會話,即不通過CN或MX worker直連worker的會話。

咱們能夠指定transaction_number查看特定SQL在worker上的活動。

postgres=# select * from citus_worker_stat_activity where transaction_number = 57;
-[ RECORD 1 ]----------+---------------------------------------------
query_hostname         | 127.0.0.1
query_hostport         | 9001
master_query_host_name | coordinator_host
master_query_host_port | 9000
transaction_number     | 57
transaction_stamp      | 2020-06-19 15:05:22.142242+08
datid                  | 13593
datname                | postgres
pid                    | 4108
usesysid               | 10
usename                | postgres
application_name       | citus
client_addr            | 127.0.0.1
client_hostname        | 
client_port            | 33676
backend_start          | 2020-06-19 15:05:22.162829+08
xact_start             | 2020-06-19 15:05:22.168811+08
query_start            | 2020-06-19 15:05:22.171398+08
state_change           | 2020-06-19 15:05:22.172237+08
wait_event_type        | Client
wait_event             | ClientRead
state                  | idle in transaction
backend_xid            | 
backend_xmin           | 
query                  | SELECT id, c1 FROM tb1_102369 tb1 WHERE true
backend_type           | client backend
...
  • citus_lock_waits

查看Citus集羣內的被阻塞的查詢。下面引用Ciuts手冊上的例子

表定義

CREATE TABLE numbers AS
  SELECT i, 0 AS j FROM generate_series(1,10) AS i;
SELECT create_distributed_table('numbers', 'i');

使用2個會話終端,順序執行下面的SQL。

-- session 1                           -- session 2
-------------------------------------  -------------------------------------
BEGIN;
UPDATE numbers SET j = 2 WHERE i = 1;
                                       BEGIN;
                                       UPDATE numbers SET j = 3 WHERE i = 1;
                                       -- (this blocks)

經過citus_lock_waits能夠看到,這2個查詢是阻塞狀態。

SELECT * FROM citus_lock_waits;
-[ RECORD 1 ]-------------------------+----------------------------------------
waiting_pid                           | 88624
blocking_pid                          | 88615
blocked_statement                     | UPDATE numbers SET j = 3 WHERE i = 1;
current_statement_in_blocking_process | UPDATE numbers SET j = 2 WHERE i = 1;
waiting_node_id                       | 0
blocking_node_id                      | 0
waiting_node_name                     | coordinator_host
blocking_node_name                    | coordinator_host
waiting_node_port                     | 5432
blocking_node_port                    | 5432

這個視圖只能在CN節點查看,MX worker節點查不到數據。可是並不要求阻塞所涉及的SQL必須從CN節點發起。

詳細參考:https://docs.citusdata.com/en/v9.3/develop/api_metadata.html?highlight=citus_worker_stat_activity#distributed-query-activity

32. 增長查看錶元數據的函數和視圖

  • master_get_table_metadata()

查看分佈表的元數據

postgres=# select * from master_get_table_metadata('tb1');
-[ RECORD 1 ]---------+-----------
logical_relid         | 17148
part_storage_type     | t
part_method           | h
part_key              | id
part_replica_count    | 1
part_max_size         | 1073741824
part_placement_policy | 2
  • get_shard_id_for_distribution_column()

查看某個分佈列值對應的shardid

postgres=# SELECT get_shard_id_for_distribution_column('tb1', 4);
 get_shard_id_for_distribution_column 
--------------------------------------
                               102347
(1 row)

其餘

33. 容許在CN備庫執行簡單的DML

經過設置citus.writable_standby_coordinator參數爲on,能夠在CN的備庫上執行部分簡單的DML。 看下下面的例子

表定義

create table tbl(id int,c1 int);
select create_distributed_table('tbserial','id');

在CN備節點上能夠執行帶分片字段的DML

postgres=# insert into tb1 values(3,3);
ERROR:  writing to worker nodes is not currently allowed
DETAIL:  the database is in recovery mode 
postgres=# set citus.writable_standby_coordinator TO ON;
SET
postgres=# insert into tb1 values(3,3);
INSERT 0 1
postgres=# update tb1 set c1=20 where id=3;
UPDATE 1
postgres=# delete from tb1 where id=3;
DELETE 1

不支持不帶分片字段的UPDATE和DELETE

postgres=# update tb1 set c1=20;
ERROR:  cannot assign TransactionIds during recovery
postgres=# delete from tb1 where c1=20;
ERROR:  cannot assign TransactionIds during recovery

也不支持跨節點的事務

postgres=# begin;
BEGIN
postgres=*# insert into tb1 values(3,3);
INSERT 0 1
postgres=*# insert into tb1 values(4,4);
INSERT 0 1
postgres=*# commit;
ERROR:  cannot assign TransactionIds during recovery

對於2pc的分佈式事務,Citus須要將事務信息記錄到事務表pg_dist_transaction中。 因此,Citus也沒法在CN備節點上支持2pc的分佈式事務。

可是若是切換成1pc提交模式,仍是能夠支持跨節點事務的。

postgres=# set citus.multi_shard_commit_protocol TO '1pc';
SET
postgres=# begin;
BEGIN
postgres=*# insert into tb1 values(4,4);
INSERT 0 1
postgres=*# insert into tb1 values(5,5);
INSERT 0 1
postgres=*# commit;

而且在1pc提交模式下,跨多個分片的SQL也是支持的。

postgres=# set citus.multi_shard_commit_protocol TO '1pc';
SET
postgres=# update tb1 set c1=10;
UPDATE 3
相關文章
相關標籤/搜索