FPM master 進程啓動後,會進入函數fpm_event_loop,無限循環.
處理事件.linux
master 進程所作的的事,總的來講就是兩類:windows
簡稱timer事件,需按時運行,主要有3個:網絡
簡稱fd事件,需從文件句柄(file descriptor)讀取到指令後,依指令運行.
重複一下,unix 下一切IO, 皆文件,socket ,socketpair,pipe 都返回文件句柄(fd) 用於通訊.
主要的fd有:socket
//fpm_children.c int fpm_children_create_initial(struct fpm_worker_pool_s *wp) { ... memset(wp->ondemand_event, 0, sizeof(struct fpm_event_s)); fpm_event_set(wp->ondemand_event, wp->listening_socket, FPM_EV_READ | FPM_EV_EDGE, fpm_pctl_on_socket_accept, wp); wp->socket_event_set = 1; fpm_event_add(wp->ondemand_event, 0); ... }
worker進程標準錯誤輸出stderr (需配置catch_workers_output = yes)
默認狀況下worker進程標準輸出stdout和標準錯誤輸出stderr,爲/dev/null,不記錄.
開啓catch_workers_output,會經過pipe管道導到master 進程,寫到日誌裏.
開啓catch_workers_output,有助於排查錯誤
事件添加代碼:函數
//fpm_stdio.c int fpm_stdio_parent_use_pipes(struct fpm_child_s *child) { ... child->fd_stdout = fd_stdout[0]; child->fd_stderr = fd_stderr[0]; fpm_event_set(&child->ev_stdout, child->fd_stdout, FPM_EV_READ, fpm_stdio_child_said, child); fpm_event_add(&child->ev_stdout, 0); fpm_event_set(&child->ev_stderr, child->fd_stderr, FPM_EV_READ, fpm_stdio_child_said, child); fpm_event_add(&child->ev_stderr, 0); return 0; }
對於timer事件,多個事件在事件軸上是依次排列的,只需反覆檢查,到時運行.
對於fd事件,需監聽多個fd,需用到咱們第二篇講的IO多路複用技術.oop
若是知足事件條件,則處理事件內容.操作系統
FPM設計上,兩類事件使用同一個結構,而且事件觸發條件和事件處理邏輯放到同一個事件對象裏(C語言對象就是結構體).
舉個例子,打鈴下課,打鈴是觸發條件,下課是事件內容,兩個同時放到一個事件對象,這是一個很好的設計.設計
//fpm_event.h struct fpm_event_s { int fd; /* 沒設置,表示定時事件*/ struct timeval timeout; /* timer事件觸發時間點*/ struct timeval frequency; /* timer事件觸發事件間隔*/ void (*callback)(struct fpm_event_s *, short, void *); /* 回調函數 */ void *arg; /* 回調函數的參數 */ int flags; int index; short which; /* 事件類型 */ };
fd值: -1
flags值: FPM_EV_PERSIST
which值: FPM_EV_TIMEOUTunix
fd值: 獲取觸發指令的文件fd
flags值: FPM_EV_EDGE(fd事件底層的邊緣觸發標誌,需系統支持)
which值: FPM_EV_READ指針
兩類事件分別放在兩個事件隊列
static struct fpm_event_queue_s fpm_event_queue_timer = NULL;
static struct fpm_event_queue_s fpm_event_queue_fd = NULL;
事件隊列的結構很常見,雙向隊列:
typedef struct fpm_event_queue_s {
struct fpm_event_queue_s prev;
struct fpm_event_queue_s next;
struct fpm_event_s *ev;
} fpm_event_queue;
//fpm_events.c int fpm_event_set(struct fpm_event_s *ev, int fd, int flags, void (*callback)(struct fpm_event_s *, short, void *), void *arg) { if (!ev || !callback || fd < -1) { return -1; } memset(ev, 0, sizeof(struct fpm_event_s)); ev->fd = fd; ev->callback = callback; ev->arg = arg; ev->flags = flags; return 0; }
建立timer事件對象函數fpm_event_set_timer,
fd 值爲-1,其餘和fpm_event_set一致.
#define fpm_event_set_timer(ev, flags, cb, arg) fpm_event_set((ev), -1, (flags), (cb), (arg))
添加事件.(fpm_event_add -> fpm_event_queue_add)
static int fpm_event_queue_add(struct fpm_event_queue_s *queue, struct fpm_event_s ev)
簡單的入列操做:
對於fd事件,需加到底層事件輪詢機制裏(如:epoll).
if (*queue == fpm_event_queue_fd && module->add) { module->add(ev); }
4移除事件 (fpm_event_del -> fpm_event_queue_del)
簡單的出列操做:
static int fpm_event_queue_del(struct fpm_event_queue_s *queue, struct fpm_event_s ev)
對於fd事件,需在底層事件輪詢機制裏移除(如:epoll)
if (*queue == fpm_event_queue_fd && module->remove) { module->remove(ev); }
5,運行事件回調函數:
void fpm_event_fire(struct fpm_event_s *ev) { if (!ev || !ev->callback) { return; } (*ev->callback)( (struct fpm_event_s *) ev, ev->which, ev->arg); }
6, 底層事件輪詢模塊結構
struct fpm_event_module_s { const char *name; int support_edge_trigger; int (*init)(int max_fd); /*初始外化函數*/ int (*clean)(void); int (*wait)(struct fpm_event_queue_s *queue, unsigned long int timeout); int (*add)(struct fpm_event_s *ev); int (*remove)(struct fpm_event_s *ev); };
不一樣的操做系統,支持不一樣的IO事件機制,linux 支持epoll,
windows支持select, freebsd 支持kqueue,這個結構統一操做接口
在函數fpm_event_init_main裏 調用module->init初始化
fpm 裏對應的配置
events.mechanism = epoll
master進程在fpm_event_loop函數裏無限循環,處理定時任務和fd事件.
期間會在module->wait阻塞片刻,對於epoll機制,就是epoll_wait.
void fpm_event_loop(int err) /* {{{ */ { static struct fpm_event_s signal_fd_event; ... //添加信號處理fd事件 fpm_event_set(&signal_fd_event, fpm_signals_get_fd(), FPM_EV_READ, &fpm_got_signal, NULL); fpm_event_add(&signal_fd_event, 0); //添加檢查超時進程timer事件 if (fpm_globals.heartbeat > 0) { fpm_pctl_heartbeat(NULL, 0, NULL); } if (!err) { //添加閒時服務維護timer事件 fpm_pctl_perform_idle_server_maintenance_heartbeat(NULL, 0, NULL); ... #ifdef HAVE_SYSTEMD //添加報告systemd timer事件 fpm_systemd_heartbeat(NULL, 0, NULL); #endif } while (1) { struct fpm_event_queue_s *q, *q2; struct timeval ms; struct timeval tmp; struct timeval now; unsigned long int timeout; /*這個timeout是等待事件,事件對象的timeout是標準時間點,同名不一樣義*/ int ret; ... fpm_clock_get(&now); timerclear(&ms); /*timer時隊列裏查找應該運行的最近標準時間*/ q = fpm_event_queue_timer; while (q) { if (!timerisset(&ms)) { ms = q->ev->timeout; } else { if (timercmp(&q->ev->timeout, &ms, <)) { ms = q->ev->timeout; } } q = q->next; } /* 沒設置,默認1秒*/ if (!timerisset(&ms) || timercmp(&ms, &now, <) || timercmp(&ms, &now, ==)) { timeout = 1000; } else { /* 事件timeout值與當前時間相減,計算等待時間*/ timersub(&ms, &now, &tmp); timeout = (tmp.tv_sec * 1000) + (tmp.tv_usec / 1000) + 1; } /* 程序阻塞在這裏,設置阻塞timeout,是爲了及時處理timer事件*/ ret = module->wait(fpm_event_queue_fd, timeout); ... /* trigger timers */ q = fpm_event_queue_timer; while (q) { fpm_clock_get(&now); if (q->ev) { /* 若是事件過時或到期,運行事件回調*/ if (timercmp(&now, &q->ev->timeout, >) || timercmp(&now, &q->ev->timeout, ==)) { fpm_event_fire(q->ev); ... /*若是是連續運行timer事件 重設事件ev->timeout= ev->frequency+now */ if (q->ev->flags & FPM_EV_PERSIST) { fpm_event_set_timeout(q->ev, now); } else { /*若是是運行一次的timer事件,移除隊列*/ q2 = q; if (q->prev) { q->prev->next = q->next; } if (q->next) { q->next->prev = q->prev; } if (q == fpm_event_queue_timer) { fpm_event_queue_timer = q->next; if (fpm_event_queue_timer) { fpm_event_queue_timer->prev = NULL; } } q = q->next; free(q2); continue; } } } q = q->next; } } }