ubus

ubus

1、 介紹

ubus提供了一種多進程通訊的機制。存在一個守護進程ubusd,因此進程都註冊到ubusd,ubusd進行消息的接收、分發管理。html

  1. ubus依賴於ubox
  2. ubus啓動後會在後臺運行ubusd進程,該進程監聽一個unix套接字用於與其餘應用程序通訊。其餘應用程序可基於libubox提供的接口(或本身實現)與其通訊。
  3. ubus是爲發送消息而設計的,不合適傳輸大量數據。

ubusd
ubusd

2、三種實現方式:

1. invoke的方式實現端對端通訊

server端:ubus_send_reply(ctx, req, b.head);linux

uloop_init(); 
ctx = ubus_connect(NULL);
ubus_add_uloop(ctx);
static struct ubus_method scan_methods[] = {  
    UBUS_METHOD("scan", ubus_start_scan, scan_policy),  
};  
static struct ubus_object scan_obj = {  
    .name = "scan_prog", /* 對象的名字 */  
    .type = &scan_obj_type,  
    .methods = scan_methods,  
    .n_methods = ARRAY_SIZE(scan_methods),  
};  
ubus_add_object(ctx, scan_obj);
ubus_send_reply(ctx, req, b.head); 
uloop_run();
ubus_free(ctx);

client端:ubus_invoke(ctx, id, "scan", b.head, scanreq_prog_cb, NULL, timeout * 1000);json

uloop_init(); 
ctx = ubus_connect(path);
struct blob_buf b;
blob_buf_init(&b, 0);
ubus_lookup_id(ctx, "scan_prog", &id); 
ubus_invoke(ctx, id, "scan", b.head, scanreq_prog_cb, NULL, timeout * 1000); 
ubus_free(ctx);

2. subscribe/notify的方式實現訂閱

server端:ubus_notify(ctx, &test_object, "say Hi!", NULL, -1);多線程

static struct ubus_object test_object = {  
    .name = "test", /* object的名字 */  
    .type = &test_obj_type,  
    .subscribe_cb = test_client_subscribe_cb,  
}; 

static void test_client_subscribe_cb(struct ubus_context *ctx, struct ubus_object *obj) {  
    fprintf(stderr, "Subscribers active: %d\n", obj->has_subscribers);  
} 

uloop_init();
ctx = ubus_connect(NULL);  
ubus_add_uloop(ctx);  
ret = ubus_add_object(ctx, &test_object); 
while (1) {  
    sleep(2);  
    /* step2: 廣播notification消息。 */  
    ubus_notify(ctx,  &test_object, "say Hi!", NULL, -1);  
}  
uloop_run();  
ubus_free(ctx);  
uloop_done();

client端:ret = ubus_register_subscriber(ctx, &test_event);socket

static int test_notify(struct ubus_context *ctx, struct ubus_object *obj, struct ubus_request_data *req, const char *method, struct blob_attr *msg) {  
    printf("notify handler...\n");  
}

static void test_handle_remove(struct ubus_context *ctx, struct ubus_subscriber *obj, uint32_t id) {  
    printf("remove handler...\n");  
}


uloop_init(); 
ctx = ubus_connect(NULL); 
ubus_add_uloop(ctx);
struct ubus_subscriber test_event;
/* 通知到來時的處理函數。 */  
test_event.cb = test_notify;  
test_event.remove_cb = test_handle_remove; //server主動發起刪除該client的訂閱的cb函數(如server退出的時候) 
/* 註冊test_event */  
ret = ubus_register_subscriber(ctx, &test_event); 
uint32_t obj_id;
ret = ubus_lookup_id(ctx, "test", &obj_id); 
ret = ubus_subscribe(ctx, &test_event, obj_id); 

uloop_run();  
ubus_free(ctx);  
uloop_done();

3. 廣播事件

server端:發送事件廣播消息, ubus_send_event(ctx, "add_device", b.head);ide

uloop_init(); 
ctx = ubus_connect(NULL);
blob_buf_init(&b, 0);  
/* 須要傳遞的參數 */  
blobmsg_add_u32(&b, "major", 3);  
blobmsg_add_u32(&b, "minor", 56);  
blobmsg_add_string(&b, "name", "mmc01");  
/* 廣播名爲"add_device"的事件 */  
return ubus_send_event(ctx, "add_device", b.head); 
ubus_free(ctx);

client端:ret = ubus_register_event_handler(ctx, &listener, "add_device");函數

#define UBUS_EVENT_ADD_DEVICE "add_device" 
    #define UBUS_EVENT_REMOVE_DEVICE "rm_device" 
    static void ubus_probe_device_event(struct ubus_context *ctx, struct ubus_event_handler *ev, const char *type, struct blob_attr *msg) {  
        char *str;  

        if (!msg)  
            return;  

        str = blobmsg_format_json(msg, true);  
        printf("{ \"%s\": %s }\n", type, str);  
        free(str);  
    }  

    uloop_init();        
    ctx = ubus_connect(NULL); 
    ubus_add_fd();	
    ubus_add_uloop(ctx);
    
    static struct ubus_event_handler listener;   
    memset(&listener, 0, sizeof(listener));  
    listener.cb = ubus_probe_device_event;  
    ret = ubus_register_event_handler(ctx, &listener, UBUS_EVENT_ADD_DEVICE);
    ret = ubus_register_event_handler(ctx, &listener, UBUS_EVENT_REMOVE_DEVICE);
    
    uloop_run(); 
    ubus_free(ctx);

3、uloop源碼

1. uloop_init

/** *初始化事件循環 *主要工做是poll_fd = epoll_create(32);/* 建立一個epoll的文件描述符監控句柄。最多監控32個文件描述符 **/
int uloop_init(void) {
    if (poll_fd >= 0)
        return 0;
 
    poll_fd = epoll_create(32);/* 建立一個epoll的句柄。最多監控32個文件描述符 */
    if (poll_fd < 0)
        return -1;
 
    fcntl(poll_fd, F_SETFD, fcntl(poll_fd, F_GETFD) | FD_CLOEXEC); /* fd_cloexecs */
    return 0;
}

2. uloop_run

/** * 事件循環主處理入口 *1.當某一個進程第一次調用uloop_run時,註冊sigchld和sigint信號 *2.循環獲取當前時間,把超時的timeout處理掉,有一條timeout鏈表在維護 *3.循環檢測是否收到一個sigchld信號,若是收到,刪除對應的子進程,有一條process子進程鏈表在維護 *4.循環調用epoll_wait 監相應的觸發事件文件描述符fd **/
void uloop_run(void) {
    static int recursive_calls = 0; /* static value */
    struct timeval tv;
 
    /* * Handlers are only updated for the first call to uloop_run() (and restored * when this call is done). */
    if (!recursive_calls++) /* 第一次運行uloop_run時調用, 註冊信號處理函數 */
        uloop_setup_signals(true);
 
    uloop_cancelled = false;
    while(!uloop_cancelled)
    {
        uloop_gettime(&tv); /* 獲取當前時間 */
        uloop_process_timeouts(&tv); /* 把超時的timeout清理掉 */
        if (uloop_cancelled)
            break;
 
        if (do_sigchld) /* 收到一個sigchld的信號 */
            uloop_handle_processes(); /* 銷燬該進程的uloop_process */
        uloop_gettime(&tv);
        uloop_run_events(uloop_get_next_timeout(&tv));/* 處理相應的觸發事件fd */
    }
 
    if (!--recursive_calls)
        uloop_setup_signals(false);
}

3. uloop_done

/** * 銷燬事件循環 * 關閉epoll描述符 * 銷燬子進程鏈表 * 銷燬timeout鏈表 **/
void uloop_done(void) {
    if (poll_fd < 0)
        return;
 
    close(poll_fd);
    poll_fd = -1;
 
    uloop_clear_timeouts();
    uloop_clear_processes();
}

4、uloop三種使用

1. socket使用

uloop_fd_add(uloop_fd, ULOOP_READ);oop

#include "" 
  
struct uloop_fd ufd;    //建立uloop_fd全局變量 
  
static void fd_handler(struct uloop_fd *u, unsigned int ev) {  
    if(recvfrom(u->fd, ...)) == -1) {  
      
    } else {  
        //do your work 
    }  
}  
  
int main()  
{  
    // 
    int socket = socket(....);  
      
    ufd.fd = socket;  
      
    uloop_init();           //使用庫初始化 
      
    ufd.cb = fd_handler;  
  
    uloop_fd_add(&ufd, ULOOP_READ));  
  
    uloop_run();  
}

2. 定時器使用

uloop_timeout_set(uloop_timeout, freq);ui

#include "" 
  
struct uloop_timeout timeout;   //建立uloop_timeout全局變量 
  
int frequency = 5; //每隔5秒超時一次 
  
static void timeout_cb(struct uloop_timeout *t) {  
    //do your work 
      
    uloop_timeout_set(t, frequency * 1000);//設置下次的超時時間 
}  
  
int main() {     
    uloop_init();           //使用庫初始化 
      
    timeout.cb = timeout_cb;  
      
    uloop_timeout_set(&timeout, frequency * 1000);//設置下次的超時時間 
  
    uloop_run();  
}

3. 子進程使用

uloop_process_add(uloop_process);this

#include "" 
  
static struct uloop_process rsync;  //建立rsync全局變量 
  
static void rsync_complete(struct uloop_process *proc, int ret) {  
    //do something where child exit; 
    printf("rsync work is complete\n");  
}  
  
function fun() {  
    char *argv[]={"rsync", "-az", "rsync://XYZ@192.168.26.99/www","/root/www/","--password-file=/root/rsync.secrets", NULL};  
    rsync.cb = rsync_complete;  
    rsync.pid = fork();  
  
    if (!rsync.pid) {  
        /* This is child process*/  
        execvp(argv[0], argv);  
        fprintf(stderr, "fork failed\n");  
        exit(-1);  
    }  
  
    if (rsync.pid <=0) {  
        fprintf(stderr, "fork failed2\n");  
        return -1;  
    }  
  
    uloop_process_add(&rsync);  
  
}  
  
  
int main() {  
      
    .....  
      
    uloop_init();   //使用庫前進行初始化 
      
    fun();  
      
    uloop_run();  
      
}

5、數據傳輸

1. blobmsg

enter description here
enter description here

初始化:

json_uri = blobmsg_open_array(&b, "prog_list");  
for (idx = 0; idx < PROG_MAX; idx++)  
{  
    if ('\0' != uri_list[idx].name[0])  
    {  
        json_list = blobmsg_open_table(&b, NULL);  
        blobmsg_add_string(&b, "name", uri_list[idx].name);  
        blobmsg_add_u32(&b, "channel", uri_list[idx].chn_id);  
        blobmsg_close_table(&b, json_list);  
    }  
}  
blobmsg_close_array(&b, json_uri);

解析:
獲取索引: hdr = blob_data(attr); char *name = (char *)hdr->name;
獲取數據: blobmsg_get_u32(attr);
獲取長度: int len = blobmsg_data_len(tb[RSP_GET_STREAMINFO_ABILITY]);

struct blob_attr *tb[SCAN_POLICY_MAX];  
blobmsg_parse(scan_policy, SCAN_POLICY_MAX, tb, blob_data(msg), blob_len(msg));  
struct blob_attr *head = blobmsg_data(tb[RSP_GET_STREAMINFO_ABILITY]);
int len = blobmsg_data_len(tb[RSP_GET_STREAMINFO_ABILITY]);
struct blob_attr *attr;
struct blobmsg_hdr *hdr;

__blob_for_each_attr(attr, head, len) {
    hdr = blob_data(attr);
    struct blob_attr *head_temp;
    struct blob_attr *attr_temp;
    int len_temp;
    char *name = (char *)hdr->name;

    if (!strcmp(name, "fmt_number"))
        rsp->ability.fmt_number = blobmsg_get_u32(attr);
    else if (!strcmp(name, "frmival_num"))
        rsp->ability.frmival_num =	blobmsg_get_u32(attr);
}

6、Problem

如今使用中遇到了多線程的問題,因爲ubus許多變量都是全局變量,對多線程的支持並很差。好比同時在兩個線程中監聽廣播和發送消息,就會出現segment錯誤:
解決方法,最好能把兩個操做放到一個線程中,好比在監聽的回調函數中發送消息,很差的就是要根據發送消息的頻率去設置回調函數的timeout。

Reference

http://www.javashuo.com/article/p-rvyyyxrj-a.html
http://www.javashuo.com/article/p-zndbuhdf-m.html

相關文章
相關標籤/搜索