[轉]Greenplum 資源隔離的原理與源碼分析

摘要: 背景 Greenplum是一個MPP的數據倉庫系統,最大的優勢是水平擴展,而且一個QUERY就能將硬件資源的能力發揮到極致。 但這也是被一些用戶詬病的一點,由於一個的QUERY就可能佔光全部的硬件資源,因此併發一多的話,query相互之間的資源爭搶就比較嚴重。 Greenplum資源隔數據庫

背景

Greenplum是一個MPP的數據倉庫系統,最大的優勢是水平擴展,而且一個QUERY就能將硬件資源的能力發揮到極致。session

但這也是被一些用戶詬病的一點,由於一個的QUERY就可能佔光全部的硬件資源,因此併發一多的話,query相互之間的資源爭搶就比較嚴重。數據結構

Greenplum資源隔離的手段

Greenplum爲了下降併發query之間的資源爭搶,設計了一套基於resource queue的資源管理方法。併發

每一個resource queue定義了資源的使用或限制模式,根據用戶的用途將用戶指派給resource queue,這樣就起到了資源管理的目的。app

例如將分析師、跑報表的、ETL分爲三用戶。根據這三類用戶的預期資源使用狀況,以及任務的優先級,規劃三類資源管理的隊列。分別將三類用戶和三類resource queue綁定,起到資源控制的做用。
screenshotdom

resource queue的建立語法

screenshot

支持的資源隔離類別

  • active_statements, 該queue同時能夠運行的query數量。
  • max_cost,指資源組內全部正在運行的query的評估成本的最大值。
  • cost_overcommit,當系統空閒時,是否容許該queue的query總cost超出設定的max_cost。
  • min_cost 指低於該值的QUERY不計入該queue 的cost成本,也不排隊,而是直接執行。
  • priority , 用於平衡各個QUEUE之間的CPU爭搶使用,分爲5個等級,每一個等級設定了響應的weight,間隔必定的時間判斷使用的資源是否達到了weight,而後對該queue 的query使用pg_usleep進行抑制。
  • mem_limit , 爲隊列中單個segment query(s)容許的最大statement(s)運行內存。

建立resource queue時必須設置active_statements與max_cost之一。ide

只有超級用戶能建立和修改resource queue。函數

綁定角色與resource queue
screenshotoop

resource queue用法舉例

建立兩個資源隊列,指派給兩個用戶(一個資源隊列能夠指派給多個用戶)。post

postgres=# create resource queue min with (active_statements=3, priority=min); CREATE QUEUE postgres=# create resource queue max with (active_statements=1, priority=max); CREATE QUEUE postgres=# create role max login encrypted password '123' resource queue max; CREATE ROLE postgres=# create role min login encrypted password '123' resource queue min; CREATE ROLE 

Greenplum資源隔離的相關代碼

src/include/catalog/pg_resqueue.h

#define PG_RESRCTYPE_ACTIVE_STATEMENTS 1 /* rsqcountlimit: count */ #define PG_RESRCTYPE_MAX_COST 2 /* rsqcostlimit: max_cost */ #define PG_RESRCTYPE_MIN_COST 3 /* rsqignorecostlimit: min_cost */ #define PG_RESRCTYPE_COST_OVERCOMMIT 4 /* rsqovercommit: cost_overcommit*/ /* start of "pg_resourcetype" entries... */ #define PG_RESRCTYPE_PRIORITY 5 /* backoff.c: priority queue */ #define PG_RESRCTYPE_MEMORY_LIMIT 6 /* memquota.c: memory quota */ 

接下來我挑選了CPU的資源調度進行源碼的分析,其餘的幾個本文就不分析了。

CPU的資源隔離

src/backend/postmaster/backoff.c
五個CPU優先級級別,以及對應的weight(可經過gp_adjust_priority函數調整當前query的weight)。

typedef struct PriorityMapping { const char *priorityVal; int weight; } PriorityMapping; const struct PriorityMapping priority_map[] = { {"MAX", 1000000}, {"HIGH", 1000}, {"MEDIUM", 500}, {"LOW", 200}, {"MIN", 100}, /* End of list marker */ {NULL, 0} }; 

單個進程的資源使用統計信息數據結構

/** * This is information that only the current backend ever needs to see. */ typedef struct BackoffBackendLocalEntry { int processId; /* Process Id of backend */ struct rusage startUsage; /* Usage when current statement began. To account for caching of backends. */ struct rusage lastUsage; /* Usage statistics when backend process performed local backoff action */ double lastSleepTime; /* Last sleep time when local backing-off action was performed */ int counter; /* Local counter is used as an approx measure of time */ bool inTick; /* Is backend currently performing tick? - to prevent nested calls */ bool groupingTimeExpired; /* Should backend try to find better leader? */ } BackoffBackendLocalEntry; 

單個segment或master內全部進程共享的資源使用統計信息數據結構

/**
 * There is a backend entry for every backend with a valid backendid on the master and segments. */ typedef struct BackoffBackendSharedEntry { struct StatementId statementId; /* A statement Id. Can be invalid. */ int groupLeaderIndex; /* Who is my leader? */ int groupSize; /* How many in my group ? */ int numFollowers; /* How many followers do I have? */ /* These fields are written by backend and read by sweeper process */ struct timeval lastCheckTime; /* Last time the backend process performed local back-off action. Used to determine inactive backends. */ /* These fields are written to by sweeper and read by backend */ bool noBackoff; /* If set, then no backoff to be performed by this backend */ double targetUsage; /* Current target CPU usage as calculated by sweeper */ bool earlyBackoffExit; /* Sweeper asking backend to stop backing off */ /* These fields are written to and read by sweeper */ bool isActive; /* Sweeper marking backend as active based on lastCheckTime */ int numFollowersActive; /* If backend is a leader, this represents number of followers that are active */ /* These fields are wrtten by backend during init and by manual adjustment */ int weight; /* Weight of this statement */ } BackoffBackendSharedEntry; /* In ms */ #define MIN_SLEEP_THRESHOLD 5000 /* In ms */ #define DEFAULT_SLEEP_TIME 100.0 

經過getrusage()系統調用得到進程的資源使用狀況

/* Provide tracing information */ PG_TRACE1(backoff__localcheck, MyBackendId); if (gettimeofday(&currentTime, NULL) < 0) { elog(ERROR, "Unable to execute gettimeofday(). Please disable query prioritization."); } if (getrusage(RUSAGE_SELF, &currentUsage) < 0) { elog(ERROR, "Unable to execute getrusage(). Please disable query prioritization."); } 

資源使用換算

if (!se->noBackoff) { /* How much did the cpu work on behalf of this process - incl user and sys time */ thisProcessTime = TIMEVAL_DIFF_USEC(currentUsage.ru_utime, le->lastUsage.ru_utime) + TIMEVAL_DIFF_USEC(currentUsage.ru_stime, le->lastUsage.ru_stime); /* Absolute cpu time since the last check. This accounts for multiple procs per segment */ totalTime = TIMEVAL_DIFF_USEC(currentTime, se->lastCheckTime); cpuRatio = thisProcessTime / totalTime; cpuRatio = Min(cpuRatio, 1.0); changeFactor = cpuRatio / se->targetUsage; // 和priority的weight有關, // 和參數gp_resqueue_priority_cpucores_per_segment有關, double CPUAvailable = numProcsPerSegment(); 有關, // se->targetUsage = (CPUAvailable) * (se->weight) / activeWeight / gl->numFollowersActive; le->lastSleepTime *= changeFactor; // 計算是否須要sleep if (le->lastSleepTime < DEFAULT_SLEEP_TIME) le->lastSleepTime = DEFAULT_SLEEP_TIME; 

超出MIN_SLEEP_THRESHOLD則進入休眠

memcpy( &le->lastUsage, &currentUsage, sizeof(currentUsage));
                memcpy( &se->lastCheckTime, &currentTime, sizeof(currentTime));

                if (le->lastSleepTime > MIN_SLEEP_THRESHOLD) // 計算是否須要sleep { /* * Sleeping happens in chunks so that the backend may exit early from its sleep if the sweeper requests it to. */ int j =0; long sleepInterval = ((long) gp_resqueue_priority_sweeper_interval) * 1000L; int numIterations = (int) (le->lastSleepTime / sleepInterval); double leftOver = (double) ((long) le->lastSleepTime % sleepInterval); for (j=0;j<numIterations;j++) { /* Sleep a chunk */ pg_usleep(sleepInterval); // 休眠 /* Check for early backoff exit */ if (se->earlyBackoffExit) { le->lastSleepTime = DEFAULT_SLEEP_TIME; /* Minimize sleep time since we may need to recompute from scratch */ break; } } if (j==numIterations) pg_usleep(leftOver); } } 

除了前面的休眠調度,還須要考慮當數據庫空閒的時候,應該儘可能使用數據庫的資源,那麼什麼狀況下不進入休眠呢?

/**
         * Under certain conditions, we want to avoid backoff. Cases are: * 1. A statement just entered or exited * 2. A statement's weight changed due to user intervention via gp_adjust_priority() * 3. There is no active backend * 4. There is exactly one statement * 5. Total number valid of backends <= number of procs per segment(gp_resqueue_priority_cpucores_per_segment 參數設置) * Case 1 and 2 are approximated by checking if total statement weight changed since last sweeper loop. */ 

如何調整正在執行的query的weight

當正在執行一個query時,若是發現它太佔資源,咱們能夠動態的設置它的weight。

當一個query正在執行時,能夠調整它的priority

postgres=# set gp_debug_resqueue_priority=on; postgres=# set client_min_messages ='debug'; 查詢當前的resource queue priority postgres=# select * from gp_toolkit.gp_resq_priority_statement; rqpdatname | rqpusename | rqpsession | rqpcommand | rqppriority | rqpweight | rqpquery ------------+------------+------------+------------+-------------+-----------+-------------------------------------------------------- postgres | digoal | 21 | 1 | MAX | 1000000 | select pg_sleep(1000000) from gp_dist_random('gp_id'); postgres | digoal | 22 | 1 | MAX | 1000000 | select pg_sleep(1000000) from gp_dist_random('gp_id'); postgres | digoal | 23 | 1 | MAX | 1000000 | select pg_sleep(1000000) from gp_dist_random('gp_id'); postgres | digoal | 24 | 1 | MAX | 1000000 | select pg_sleep(1000000) from gp_dist_random('gp_id'); postgres | digoal | 25 | 1 | MAX | 1000000 | select pg_sleep(1000000) from gp_dist_random('gp_id'); postgres | digoal | 26 | 65 | MAX | 1000000 | select * from gp_toolkit.gp_resq_priority_statement; (6 rows) 設置,能夠直接設置priority的別名(MIN, MAX, LOW, HIGH, MEDIAM),或者使用數字設置weight。 postgres=# select gp_adjust_priority(21,1,'MIN'); LOG: changing weight of (21:1) from 1000000 to 100 gp_adjust_priority -------------------- 1 (1 row) postgres=# select * from gp_toolkit.gp_resq_priority_statement; rqpdatname | rqpusename | rqpsession | rqpcommand | rqppriority | rqpweight | rqpquery ------------+------------+------------+------------+-------------+-----------+-------------------------------------------------------- postgres | digoal | 21 | 1 | MIN | 100 | select pg_sleep(1000000) from gp_dist_random('gp_id'); 600是一個非標準的priority,因此顯示NON-STANDARD postgres=# select gp_adjust_priority(21,1,600); postgres=# select * from gp_toolkit.gp_resq_priority_statement; rqpdatname | rqpusename | rqpsession | rqpcommand | rqppriority | rqpweight | rqpquery ------------+------------+------------+------------+--------------+-----------+-------------------------------------------------------- postgres | digoal | 21 | 1 | NON-STANDARD | 600 | select pg_sleep(1000000) from gp_dist_random('gp_id'); 

代碼以下

/**
 * An interface to re-weigh an existing session on the master and all backends. * Input: * session id - what session is statement on? * command count - what is the command count of statement. * priority value - text, what should be the new priority of this statement. * Output: * number of backends whose weights were changed by this call. */ Datum gp_adjust_priority_value(PG_FUNCTION_ARGS) { int32 session_id = PG_GETARG_INT32(0); int32 command_count = PG_GETARG_INT32(1); Datum dVal = PG_GETARG_DATUM(2); char *priorityVal = NULL; int wt = 0; priorityVal = DatumGetCString(DirectFunctionCall1(textout, dVal)); if (!priorityVal) { elog(ERROR, "Invalid priority value specified."); } wt = BackoffPriorityValueToInt(priorityVal); Assert(wt > 0); pfree(priorityVal);  return DirectFunctionCall3(gp_adjust_priority_int, Int32GetDatum(session_id), Int32GetDatum(command_count), Int32GetDatum(wt)); } 

經過cgroup細粒度控制query的資源使用

前面講的是Greenplum經過自帶的resource queue來控制資源使用的狀況,可是Greenplum控制的資源種類有限,有沒有更細粒度的控制方法呢?

若是要進行更細粒度的控制,能夠考慮使用cgroup來隔離各個query的資源使用。

能夠作到對cpu, memory, iops, network的細粒度控制。

作法也很簡單,
首先要在全部的物理主機建立對應的cgroup,例如爲每一個資源分配幾個等級。

  • cpu: 分若干個等級
  • memory: 分若干個等級
  • iops: 分若干個等級
  • network: 分若干個等級

_

而後得到會話對應的全部節點的backend pid,將backend pid move到對應的cgroup便可。
_1

祝你們玩得開心,歡迎隨時來阿里雲促膝長談業務需求 ,恭候光臨。

阿里雲的小夥伴們加油,努力作 最貼地氣的雲數據庫 。

 
(原文地址:https://yq.aliyun.com/articles/57763)
相關文章
相關標籤/搜索