/** * 初始化事件循環 */ int uloop_init(void) /** * 事件循環主處理入口 */ void uloop_run(void) /** * 銷燬事件循環 */ void uloop_done(void)
/** * 註冊一個新描述符到事件處理循環 */ int uloop_fd_add(struct uloop_fd *sock, unsigned int flags) /** * 從事件處理循環中銷燬指定描述符 */ int uloop_fd_delete(struct uloop_fd *sock)
/** * 註冊一個新定時器 */ int uloop_timeout_add(struct uloop_timeout *timeout) /** * 設置定時器超時時間(毫秒),並添加 */ int uloop_timeout_set(struct uloop_timeout *timeout, int msecs) /** * 銷燬指定定時器 */ int uloop_timeout_cancel(struct uloop_timeout *timeout) /** * 獲取定時器還剩多長時間超時 */ int uloop_timeout_remaining(struct uloop_timeout *timeout)
/** * 註冊新進程到事件處理循環 */ int uloop_process_add(struct uloop_process *p) /** * 從事件處理循環中銷燬指定進程 */ int uloop_process_delete(struct uloop_process *p)
struct uloop_fd { uloop_fd_handler cb; /** 文件描述符,調用者初始化 */ int fd; /** 文件描述符,調用者初始化 */ bool eof; bool error; bool registered; /** 是否已註冊到uloop中 */ uint8_t flags; };
struct uloop_timeout { struct list_head list; bool pending; uloop_timeout_handler cb; /** 文件描述符, 調用者初始化 */ struct timeval time; /** 文件描述符, 調用者初始化 */ };
struct uloop_process { struct list_head list; bool pending; uloop_process_handler cb; /** 文件描述符, 調用者初始化 */ pid_t pid; /** 文件描述符, 調用者初始化 */ };
typedef void (*uloop_fd_handler)(struct uloop_fd *u, unsigned int events)
typedef void (*uloop_timeout_handler)(struct uloop_timeout *t)
typedef void (*uloop_process_handler)(struct uloop_process *c, int ret)
#define ULOOP_READ (1 << 0) #define ULOOP_WRITE (1 << 1) #define ULOOP_EDGE_TRIGGER (1 << 2) #define ULOOP_BLOCKING (1 << 3) #define ULOOP_EVENT_MASK (ULOOP_READ | ULOOP_WRITE)
任務隊列是經過uloop定時器實現,把定時器超時時間設置爲1,經過uloop事件循環來處理定時器就會處理任務隊列中的task。進程任務在任務隊列基本上實現,加入子進程退出監控數據結構
struct runqueue { struct safe_list tasks_active; /** 活動任務隊列 */ struct safe_list tasks_inactive; /** 不活動任務隊列 */ struct uloop_timeout timeout; int running_tasks; /** 當前活動任務數目 */ int max_running_tasks; /** 容許最大活動任務數目 */ bool stopped; /** 是否中止任務隊列 */ bool empty; /** 任務隊列(包括活動和不活動)是否爲空 */ /* called when the runqueue is emptied */ void (*empty_cb)(struct runqueue *q); }; struct runqueue_task_type { const char *name; /* * called when a task is requested to run * * The task is removed from the list before this callback is run. It * can re-arm itself using runqueue_task_add. */ void (*run)(struct runqueue *q, struct runqueue_task *t); /* * called to request cancelling a task * * int type is used as an optional hint for the method to be used when * cancelling the task, e.g. a signal number for processes. Calls * runqueue_task_complete when done. */ void (*cancel)(struct runqueue *q, struct runqueue_task *t, int type); /* * called to kill a task. must not make any calls to runqueue_task_complete, * it has already been removed from the list. */ void (*kill)(struct runqueue *q, struct runqueue_task *t); }; struct runqueue_task { struct safe_list list; const struct runqueue_task_type *type; struct runqueue *q; void (*complete)(struct runqueue *q, struct runqueue_task *t); struct uloop_timeout timeout; int run_timeout; /** >0表示規定此任務執行只有run_timeout毫秒 */ int cancel_timeout; /** >0表示規則任務延取消操做執行只有run_timeout毫秒*/ int cancel_type; bool queued; /** 此任務是否已加入任務隊列中 */ bool running; /** 此任務是否活動,即已在活動隊列中 */ bool cancelled; /** 此任務是否已被取消 */ }; struct runqueue_process { struct runqueue_task task; struct uloop_process proc; };
/** * 初始化任務隊列 */ void runqueue_init(struct runqueue *q) /** * 取消全部任務隊列 */ void runqueue_cancel(struct runqueue *q); /** * 取消活動中的任務 */ void runqueue_cancel_active(struct runqueue *q); /** * 取消不活動的任務 */ void runqueue_cancel_pending(struct runqueue *q); /** * 殺死全部任務 */ void runqueue_kill(struct runqueue *q); /** * 中止全部任務 */ void runqueue_stop(struct runqueue *q); /** * 從新開始任務 */ void runqueue_resume(struct runqueue *q);
/** * 添加新任務到隊列尾 * * @running true-加入活動隊列;false-加入不活動隊列 */ void runqueue_task_add(struct runqueue *q, struct runqueue_task *t, bool running); /** * 添加新任務到隊列頭 * * @running true-加入活動隊列;false-加入不活動隊列 */ void runqueue_task_add_first(struct runqueue *q, struct runqueue_task *t, bool running); /** * 徹底任務 */ void runqueue_task_complete(struct runqueue_task *t); /** * 取消任務 */ void runqueue_task_cancel(struct runqueue_task *t, int type); /** * 殺死任務 */ void runqueue_task_kill(struct runqueue_task *t);
void runqueue_process_add(struct runqueue *q, struct runqueue_process *p, pid_t pid); /** * to be used only from runqueue_process callbacks */ void runqueue_process_cancel_cb(struct runqueue *q, struct runqueue_task *t, int type); void runqueue_process_kill_cb(struct runqueue *q, struct runqueue_task *t);
struct ustream_buf { struct ustream_buf *next; char *data; /** 指向上次操做buff開始地址 */ char *tail; /** 指向未使用buff開始地址 */ char *end; /** 指向buf結束地址 */ char head[]; /** 指向buf開始地址 */ }; struct ustream_buf_list { struct ustream_buf *head; /** 指向第1塊ustream_buf */ struct ustream_buf *data_tail; /** 指向未使用的ustream_buf */ struct ustream_buf *tail; /** 指向最後的ustream_buf */ int (*alloc)(struct ustream *s, struct ustream_buf_list *l); int data_bytes; /** 已用存儲空間大小 */ int min_buffers; /** 可存儲最小的ustream_buf塊個數 */ int max_buffers; /** 可存儲最大的ustream_buf塊個數 */ int buffer_len; /** 每塊ustream_buf塊存儲空間大小 */ int buffers; /** ustream_buf塊個數 */ }; struct ustream { struct ustream_buf_list r, w; struct uloop_timeout state_change; struct ustream *next; /* * notify_read: (optional) * called by the ustream core to notify that new data is available * for reading. * must not free the ustream from this callback */ void (*notify_read)(struct ustream *s, int bytes_new); /* * notify_write: (optional) * called by the ustream core to notify that some buffered data has * been written to the stream. * must not free the ustream from this callback */ void (*notify_write)(struct ustream *s, int bytes); /* * notify_state: (optional) * called by the ustream implementation to notify that the read * side of the stream is closed (eof is set) or there was a write * error (write_error is set). * will be called again after the write buffer has been emptied when * the read side has hit EOF. */ void (*notify_state)(struct ustream *s); /* * write: * must be defined by ustream implementation, accepts new write data. * 'more' is used to indicate that a subsequent call will provide more * data (useful for aggregating writes) * returns the number of bytes accepted, or -1 if no more writes can * be accepted (link error) */ int (*write)(struct ustream *s, const char *buf, int len, bool more); /* * free: (optional) * defined by ustream implementation, tears down the ustream and frees data */ void (*free)(struct ustream *s); /* * set_read_blocked: (optional) * defined by ustream implementation, called when the read_blocked flag * changes */ void (*set_read_blocked)(struct ustream *s); /* * poll: (optional) * defined by the upstream implementation, called to request polling for * available data. * returns true if data was fetched. */ bool (*poll)(struct ustream *s); /* * ustream user should set this if the input stream is expected * to contain string data. the core will keep all data 0-terminated. */ bool string_data; /** 此ustream是否爲字符串,true-是;false-否 */ bool write_error; /** 寫出錯,true-是;false-否 */ bool eof, eof_write_done; enum read_blocked_reason read_blocked; }; struct ustream_fd { struct ustream stream; struct uloop_fd fd; };
/** * ustream_fd_init: create a file descriptor ustream (uses uloop) */ void ustream_fd_init(struct ustream_fd *s, int fd) /** * ustream_init_defaults: fill default callbacks and options */ void ustream_init_defaults(struct ustream *s) /** * ustream_free: free all buffers and data associated with a ustream */ void ustream_free(struct ustream *s)
/* * ustream_reserve: allocate rx buffer space * 分配len大小的read buffer可用內存空間,與ustream_fill_read()配合使用 * * len: hint for how much space is needed (not guaranteed to be met) * maxlen: pointer to where the actual buffer size is going to be stored */ char *ustream_reserve(struct ustream *s, int len, int *maxlen) /** * ustream_fill_read: mark rx buffer space as filled * 設置被ustream_reseve()分配read buffer後寫入的數據大小, * 回調notify_read()接口,表示有數據可讀 */ void ustream_fill_read(struct ustream *s, int len)
通常在notify_read()回調接口使用框架
/* * ustream_get_read_buf: get a pointer to the next read buffer data * 獲取新一次寫入的內容,與ustream_consume()配置使用 */ char *ustream_get_read_buf(struct ustream *s, int *buflen) /** * ustream_consume: remove data from the head of the read buffer */ void ustream_consume(struct ustream *s, int len)
盡最大能力調用write()回調用接口寫入,若是超出能力將把未寫入的數據存儲在write buffer中ide
/* * ustream_write: add data to the write buffer */ int ustream_write(struct ustream *s, const char *buf, int len, bool more) int ustream_printf(struct ustream *s, const char *format, ...) int ustream_vprintf(struct ustream *s, const char *format, va_list arg)
把在write buffer中的數據寫入實際地方,調用write()回調接口和notify_write()回調接口。通常在描述符的poll操做中調用,表示當描述符變爲可寫時當即把上一次未寫入的內容進行寫入操做。函數
/* * ustream_write_pending: attempt to write more data from write buffers * returns true if all write buffers have been emptied. */ bool ustream_write_pending(struct ustream *s)
初始化:oop
static struct runqueue q; static void q_empty(struct runnqueue *q) { } static void task_init(void) { runqueue_init(&q); q.empty_cb = q_empty; q.max_running_tasks = 1; /** 每次只能執行一個任務 */ }
定義任務:fetch
struct task { struct runqueue_process proc; char arg[128]; }; static void task_run(struct runqueue *q, struct runqueue_task *t) { struct task *tk = container_of(t, struct task, proc.task); pid_t pid; pid = fork(); if (pid < 0) return; if (pid) { /** * 由於此task正在運行,實際上只是把此task子進程加入到 * 進程監聽uloop中,當此task子進程運行完成時回調自定義接口, * 並執行一個任務循環 「runqueue_start_next」 */ runqueue_process_add(q, &tk->proc, pid); return; } /** 子進程使用sleep命令代替 */ execlp("sleep", "sleep", tk->arg, NULL); exit(1); } struct const struct runqueue_task_type task_type = { .run = task_run, /** 須要實現 */ .cancel = runqueue_process_cancel_cb, /** 自帶接口 */ .kill = runqueue_process_kill_cb, /** 自帶接口 */ };
添加任務:ui
static void task_complete(struct runqueue *q, struct runqueue_task *t) { struct task *tk = container_of(t, struct task, proc.task); free(tk); } void task_add(char *arg) { struct task *tk; tk = calloc(1, sizeof(struct task)); tk->proc.task.type = &task_type; /** 指定任務類型 */ tk->proc.task.complete = task_complete; /** 任務執行完成後hook接口 */ tk->proc.task.run_timeout = timeout; /** 任務執行超時時間 */ strcpy(tk->arg, arg); /** 加入到任務隊列中 */ runqueue_task_add(&q, &tk->proc.task, false); }