libubox [4] - uloop runqueue ustream

事件處理循環(uloop.c/h)

接口說明

主框架

/**
 * 初始化事件循環
 */
int uloop_init(void)

/**
 * 事件循環主處理入口
 */
void uloop_run(void)

/**
 * 銷燬事件循環
 */
void uloop_done(void)

描述符事件

/**
 * 註冊一個新描述符到事件處理循環
 */
int uloop_fd_add(struct uloop_fd *sock, unsigned int flags)

/** 
 * 從事件處理循環中銷燬指定描述符
 */
int uloop_fd_delete(struct uloop_fd *sock)

定時器事件

/**
 * 註冊一個新定時器
 */
int uloop_timeout_add(struct uloop_timeout *timeout)

/**
 * 設置定時器超時時間(毫秒),並添加
 */
int uloop_timeout_set(struct uloop_timeout *timeout, int msecs)

/**
 * 銷燬指定定時器
 */
int uloop_timeout_cancel(struct uloop_timeout *timeout)

/**
 * 獲取定時器還剩多長時間超時
 */
int uloop_timeout_remaining(struct uloop_timeout *timeout)

進程事件

/**
 * 註冊新進程到事件處理循環
 */
int uloop_process_add(struct uloop_process *p)

/**
 * 從事件處理循環中銷燬指定進程
 */
int uloop_process_delete(struct uloop_process *p)

數據結構

描述符

struct uloop_fd {
    uloop_fd_handler cb;    /** 文件描述符,調用者初始化 */
    int fd;                 /** 文件描述符,調用者初始化 */
    bool eof;                       
    bool error;                     
    bool registered;        /** 是否已註冊到uloop中 */     
    uint8_t flags;                      
};

定時器

struct uloop_timeout {
    struct list_head list;              
    bool pending;               
    uloop_timeout_handler cb; /** 文件描述符, 調用者初始化 */
    struct timeval time;      /** 文件描述符, 調用者初始化 */
};

進程

struct uloop_process {
    struct list_head list;              
    bool pending;                   
    uloop_process_handler cb;       /** 文件描述符, 調用者初始化 */
    pid_t pid;                      /** 文件描述符, 調用者初始化 */
};

事件回調函數

描述符

typedef void (*uloop_fd_handler)(struct uloop_fd *u, unsigned int events)

定時器

typedef void (*uloop_timeout_handler)(struct uloop_timeout *t)

進程

typedef void (*uloop_process_handler)(struct uloop_process *c, int ret)

事件標誌

#define ULOOP_READ          (1 << 0)
#define ULOOP_WRITE         (1 << 1)
#define ULOOP_EDGE_TRIGGER  (1 << 2)
#define ULOOP_BLOCKING      (1 << 3)
#define ULOOP_EVENT_MASK    (ULOOP_READ | ULOOP_WRITE)

任務隊列(runqueue.c/h)

任務隊列是經過uloop定時器實現,把定時器超時時間設置爲1,經過uloop事件循環來處理定時器就會處理任務隊列中的task。進程任務在任務隊列基本上實現,加入子進程退出監控數據結構

數據結構

struct runqueue {
    struct safe_list tasks_active;      /** 活動任務隊列 */
    struct safe_list tasks_inactive;    /** 不活動任務隊列 */
    struct uloop_timeout timeout;

    int running_tasks;      /** 當前活動任務數目 */
    int max_running_tasks;  /** 容許最大活動任務數目 */
    bool stopped;           /** 是否中止任務隊列 */
    bool empty;             /** 任務隊列(包括活動和不活動)是否爲空 */

    /* called when the runqueue is emptied */
    void (*empty_cb)(struct runqueue *q);
};

struct runqueue_task_type {
    const char *name;

    /*
     * called when a task is requested to run
     *
     * The task is removed from the list before this callback is run. It
     * can re-arm itself using runqueue_task_add.
     */
    void (*run)(struct runqueue *q, struct runqueue_task *t);

    /*
     * called to request cancelling a task
     *
     * int type is used as an optional hint for the method to be used when
     * cancelling the task, e.g. a signal number for processes. Calls
     * runqueue_task_complete when done.
     */
    void (*cancel)(struct runqueue *q, struct runqueue_task *t, int type);

    /*
     * called to kill a task. must not make any calls to runqueue_task_complete,
     * it has already been removed from the list.
     */
    void (*kill)(struct runqueue *q, struct runqueue_task *t);
};

struct runqueue_task {
    struct safe_list list;
    const struct runqueue_task_type *type;
    struct runqueue *q;

    void (*complete)(struct runqueue *q, struct runqueue_task *t);

    struct uloop_timeout timeout;
    int run_timeout;    /** >0表示規定此任務執行只有run_timeout毫秒 */
    int cancel_timeout; /** >0表示規則任務延取消操做執行只有run_timeout毫秒*/
    int cancel_type;

    bool queued;        /** 此任務是否已加入任務隊列中 */
    bool running;       /** 此任務是否活動,即已在活動隊列中 */
    bool cancelled;     /** 此任務是否已被取消 */
};

struct runqueue_process {
    struct runqueue_task task;
    struct uloop_process proc;
};

接口說明

任務隊列

/**
 * 初始化任務隊列
 */
void runqueue_init(struct runqueue *q)

/** 
 * 取消全部任務隊列
 */
void runqueue_cancel(struct runqueue *q);

/** 
 * 取消活動中的任務
 */
void runqueue_cancel_active(struct runqueue *q);

/** 
 * 取消不活動的任務 
 */
void runqueue_cancel_pending(struct runqueue *q);

/**
 * 殺死全部任務
 */
void runqueue_kill(struct runqueue *q);

/** 
 * 中止全部任務
 */
void runqueue_stop(struct runqueue *q);

/**
 * 從新開始任務
 */
void runqueue_resume(struct runqueue *q);

任務操做

/**
 * 添加新任務到隊列尾
 *
 * @running true-加入活動隊列;false-加入不活動隊列
 */
void runqueue_task_add(struct runqueue *q, struct runqueue_task *t, bool running);

/**
 * 添加新任務到隊列頭
 *
 * @running true-加入活動隊列;false-加入不活動隊列
 */
void runqueue_task_add_first(struct runqueue *q, struct runqueue_task *t, 
bool running);

/**
 * 徹底任務
 */
void runqueue_task_complete(struct runqueue_task *t);

/**
 * 取消任務
 */
void runqueue_task_cancel(struct runqueue_task *t, int type);

/**
 * 殺死任務
 */
void runqueue_task_kill(struct runqueue_task *t);

進程任務

void runqueue_process_add(struct runqueue *q, struct runqueue_process *p, 
pid_t pid);

/**
 * to be used only from runqueue_process callbacks 
 */
void runqueue_process_cancel_cb(struct runqueue *q, struct runqueue_task *t, 
int type);
void runqueue_process_kill_cb(struct runqueue *q, struct runqueue_task *t);

流緩衝管理(ustream.c/h/ustream-fd.c)

數據結構

struct ustream_buf {
    struct ustream_buf *next;

    char *data;     /** 指向上次操做buff開始地址 */
    char *tail;     /** 指向未使用buff開始地址 */
    char *end;      /** 指向buf結束地址 */

    char head[];    /** 指向buf開始地址 */
};

struct ustream_buf_list {
    struct ustream_buf *head;       /** 指向第1塊ustream_buf */
    struct ustream_buf *data_tail;  /** 指向未使用的ustream_buf */
    struct ustream_buf *tail;       /** 指向最後的ustream_buf */

    int (*alloc)(struct ustream *s, struct ustream_buf_list *l);

    int data_bytes;    /** 已用存儲空間大小 */

    int min_buffers;   /** 可存儲最小的ustream_buf塊個數 */
    int max_buffers;   /** 可存儲最大的ustream_buf塊個數 */
    int buffer_len;    /** 每塊ustream_buf塊存儲空間大小 */

    int buffers;       /** ustream_buf塊個數 */
};

struct ustream {
    struct ustream_buf_list r, w;
    struct uloop_timeout state_change;
    struct ustream *next;

    /*
     * notify_read: (optional)
     * called by the ustream core to notify that new data is available
     * for reading.
     * must not free the ustream from this callback
     */
    void (*notify_read)(struct ustream *s, int bytes_new);

    /*
     * notify_write: (optional)
     * called by the ustream core to notify that some buffered data has
     * been written to the stream.
     * must not free the ustream from this callback
     */
    void (*notify_write)(struct ustream *s, int bytes);

    /*
     * notify_state: (optional)
     * called by the ustream implementation to notify that the read
     * side of the stream is closed (eof is set) or there was a write
     * error (write_error is set).
     * will be called again after the write buffer has been emptied when
     * the read side has hit EOF.
     */
    void (*notify_state)(struct ustream *s);

    /*
     * write:
     * must be defined by ustream implementation, accepts new write data.
     * 'more' is used to indicate that a subsequent call will provide more
     * data (useful for aggregating writes)
     * returns the number of bytes accepted, or -1 if no more writes can
     * be accepted (link error)
     */
    int (*write)(struct ustream *s, const char *buf, int len, bool more);

    /*
     * free: (optional)
     * defined by ustream implementation, tears down the ustream and frees data
     */
    void (*free)(struct ustream *s);

    /*
     * set_read_blocked: (optional)
     * defined by ustream implementation, called when the read_blocked flag
     * changes
     */
    void (*set_read_blocked)(struct ustream *s);

    /*
     * poll: (optional)
     * defined by the upstream implementation, called to request polling for
     * available data.
     * returns true if data was fetched.
     */
    bool (*poll)(struct ustream *s);

    /*
     * ustream user should set this if the input stream is expected
     * to contain string data. the core will keep all data 0-terminated.
     */
    bool string_data;     /** 此ustream是否爲字符串,true-是;false-否 */
    bool write_error;     /** 寫出錯,true-是;false-否 */
    bool eof, eof_write_done;

    enum read_blocked_reason read_blocked;
};

struct ustream_fd {
    struct ustream stream;
    struct uloop_fd fd;
};

存儲結構

ustream存儲結構

接口說明

初始/銷燬

/**
 * ustream_fd_init: create a file descriptor ustream (uses uloop) 
 */
void ustream_fd_init(struct ustream_fd *s, int fd)

/**
 * ustream_init_defaults: fill default callbacks and options 
 */
void ustream_init_defaults(struct ustream *s)

/**
 * ustream_free: free all buffers and data associated with a ustream 
 */
void ustream_free(struct ustream *s)

寫入read buffer

/*
 * ustream_reserve: allocate rx buffer space
 *      分配len大小的read buffer可用內存空間,與ustream_fill_read()配合使用
 *
 * len: hint for how much space is needed (not guaranteed to be met)
 * maxlen: pointer to where the actual buffer size is going to be stored
 */
char *ustream_reserve(struct ustream *s, int len, int *maxlen)

/**
 * ustream_fill_read: mark rx buffer space as filled 
 *      設置被ustream_reseve()分配read buffer後寫入的數據大小,
 *      回調notify_read()接口,表示有數據可讀
 */
void ustream_fill_read(struct ustream *s, int len)

讀出read buffer

通常在notify_read()回調接口使用框架

/* 
 * ustream_get_read_buf: get a pointer to the next read buffer data 
 *      獲取新一次寫入的內容,與ustream_consume()配置使用
 */
char *ustream_get_read_buf(struct ustream *s, int *buflen)

/**
 * ustream_consume: remove data from the head of the read buffer 
 */
void ustream_consume(struct ustream *s, int len)

操做write buffer

盡最大能力調用write()回調用接口寫入,若是超出能力將把未寫入的數據存儲在write buffer中ide

/* 
 * ustream_write: add data to the write buffer 
 */
int ustream_write(struct ustream *s, const char *buf, int len, bool more)
int ustream_printf(struct ustream *s, const char *format, ...)
int ustream_vprintf(struct ustream *s, const char *format, va_list arg)

把在write buffer中的數據寫入實際地方,調用write()回調接口和notify_write()回調接口。通常在描述符的poll操做中調用,表示當描述符變爲可寫時當即把上一次未寫入的內容進行寫入操做。函數

/*
 * ustream_write_pending: attempt to write more data from write buffers
 * returns true if all write buffers have been emptied.
 */
bool ustream_write_pending(struct ustream *s)

例子

進程任務隊列

初始化:oop

static struct runqueue q;

static void
q_empty(struct runnqueue *q)
{
}

static void
task_init(void)
{
    runqueue_init(&q);
    q.empty_cb = q_empty;
    q.max_running_tasks = 1;  /** 每次只能執行一個任務 */
}

定義任務:fetch

struct task {
    struct runqueue_process proc;
    char arg[128];
};

static void
task_run(struct runqueue *q, struct runqueue_task *t)
{
    struct task *tk = container_of(t, struct task, proc.task);
    pid_t pid;

    pid = fork();
    if (pid < 0)
        return;
    if (pid) {
        /** 
         * 由於此task正在運行,實際上只是把此task子進程加入到
         * 進程監聽uloop中,當此task子進程運行完成時回調自定義接口,
         * 並執行一個任務循環 「runqueue_start_next」
         */
        runqueue_process_add(q, &tk->proc, pid);
        return;
    }
    /** 子進程使用sleep命令代替 */
    execlp("sleep", "sleep", tk->arg, NULL);
    exit(1);
}

struct const struct runqueue_task_type task_type = {
    .run = task_run,                      /** 須要實現 */
    .cancel = runqueue_process_cancel_cb, /** 自帶接口 */
    .kill = runqueue_process_kill_cb,     /** 自帶接口 */
};

添加任務:ui

static void
task_complete(struct runqueue *q, struct runqueue_task *t)
{
    struct task *tk = container_of(t, struct task, proc.task);
    free(tk);
}

void
task_add(char *arg)
{
    struct task *tk;

    tk = calloc(1, sizeof(struct task));
    tk->proc.task.type = &task_type;            /** 指定任務類型 */
    tk->proc.task.complete = task_complete;     /** 任務執行完成後hook接口 */
    tk->proc.task.run_timeout = timeout;        /** 任務執行超時時間 */

    strcpy(tk->arg, arg);

    /** 加入到任務隊列中 */
    runqueue_task_add(&q, &tk->proc.task, false);
}
相關文章
相關標籤/搜索