ubus

openwrt提供了一個系統總線ubus,相似linux桌面操做系統的d-bus,目標是提供系統級的進程間通訊(IPC)功能。node

爲了提供各類後臺進程和應用程序之間的通訊機制,ubus被開發出來,由3部分組成:精靈進程,接口庫和實用工具。linux

工程的核心是ubusd精靈進程,它提供了一個總線層,在系統啓動時運行,負責進程間的消息路由和傳遞。其餘進程註冊到ubusd進程進行消息的發送和接收。這個接口是用linux文件socket和TLV手法消息實現的。每個進程在指定命名空間下注冊本身的路徑。每個路徑均可以提供帶有各類參數的多個函數處理過程。函數處理過程程序能夠在完成處理後返回消息。git

接口庫名稱爲libubus.so,其餘進程能夠經過該動態連接庫來簡化對ubus總線的訪問。編程

實用工具ubus是提供命令行的接口調用工具。能夠基於該工具來進行腳本編程,也能夠使用ubus來診斷問題。json

ubus是爲進程間發送消息而設計的,不適合傳輸大量數據(進程間傳輸大量數據應採用共享內存)。ubuntu

網址:http://git.openwrt.org/project/ubus.gitsegmentfault

$ ls ubus-2015-05-25
cli.c           libubus.h           libubus-req.c  ubus_common.h  ubusd_id.c   ubusd_proto.c
CMakeLists.txt  libubus-internal.h  libubus-sub.c  ubusd.c        ubusd_id.h   ubusmsg.h
examples        libubus-io.c        lua            ubusd_event.c  ubusd_obj.c
libubus.c       libubus-obj.c       systemd        ubusd.h        ubusd_obj.h
ubusd: ubusd.c ubusd_id.c ubusd_obj.c ubusd_proto.c ubusd_event.c
libubus: libubus.c libubus-io.c libubus-obj.c libubus-sub.c libubus-req.c
cli/ubus: cli.c

 1. ubusd

/etc/init.d/ubus中提供ubusd進程的啓動,在系統進程啓動完成後當即啓動。他是在網絡進程netifd以前啓動的,該進程提供一個文件套接字接口和其餘應用程序通訊。數組

ubusd採用SOCK_STREAM的Unix域套接字實現與服務進程間通信。服務器

提供的功能:網絡

1)提供註冊對象和方法供其餘實體調用。

2)調用其餘應用程序提供的註冊對象的控制接口。

3)在特定對象上註冊監聽事件。

4)向特定對象發送事件消息。

ubus將消息處理抽象爲對象object和方法method的概念。一個對象中包含多個方法。對象和方法都有本身的名字,發送請求方在消息中指定要調用的對象和方法名字便可。

ubus的另一個概念就是訂閱(subscriber)。客戶端須要向服務器註冊收到特定消息時的處理方法。這樣當服務器在狀態發生改變時會經過ubus總線來通知客戶端。

ubus可用於兩個進程之間通訊,進程之間以TLV格式傳遞消息。ubus可以以json格式和用戶進行數據交換。常見場景(應用詳見ubus實現進程間通訊舉例):

1)客戶端/服務器模式。

2)訂閱通知模式(struct ubus_subscriber)。

3)事件模式(ubus_event_handler)。

ubusd是一種總線消息服務器,任何消息均經過ubusd進程傳遞,所以多個進程在相互通訊時,均經過ubus收發消息。

編譯安裝

1)去除lua支持

#ADD_SUBDIRECTORY(lua)

2)編譯

mkdir build; cd build; cmake ..; make ;make install
[ 30%] Built target ubus
[ 60%] Built target ubusd
[ 70%] Built target cli
[ 85%] Built target server
[100%] Built target client
Install the project...
-- Install configuration: ""
-- Installing: /usr/local/lib/libubus.so
-- Installing: /usr/local/bin/ubus
-- Set runtime path of "/usr/local/bin/ubus" to ""
-- Installing: /usr/local/sbin/ubusd
-- Up-to-date: /usr/local/include/ubusmsg.h
-- Up-to-date: /usr/local/include/ubus_common.h
-- Up-to-date: /usr/local/include/libubus.h
-- Installing: /usr/lib/systemd/system/ubus.socket
-- Installing: /usr/lib/systemd/system/ubus.service

2. libubus

1) 數據結構

struct ubus_event_handler {
    struct ubus_object obj;
    ubus_event_handler_t cb;
};

struct ubus_context {
    struct list_head requests;
    struct avl_tree objects;    /** client端object鏈表頭 */
    struct list_head pending;

    struct uloop_fd sock;       /** client端sock對象 */

    uint32_t local_id;          /** ubusd分配的client id */
    uint16_t request_seq;
    int stack_depth;

/** 斷開鏈接回調函數 */
    void (*connection_lost)(struct ubus_context *ctx);

    struct {
        struct ubus_msghdr hdr;
        char data[UBUS_MAX_MSGLEN];
    } msgbuf;                   /** 通訊報文 */
};

struct ubus_object_data {
    uint32_t id;
    uint32_t type_id;
    const char *path;
    struct blob_attr *signature;
};

struct ubus_request_data {
    uint32_t object;
    uint32_t peer;
    uint16_t seq;

    /* internal use */
    bool deferred;
    int fd;
};

struct ubus_request {
    struct list_head list;

    struct list_head pending;
    int status_code;
    bool status_msg;
    bool blocked;
    bool cancelled;
    bool notify;

    uint32_t peer;
    uint16_t seq;

    ubus_data_handler_t raw_data_cb;
    ubus_data_handler_t data_cb;
    ubus_fd_handler_t fd_cb;
    ubus_complete_handler_t complete_cb;

    struct ubus_context *ctx;
    void *priv;
};

struct ubus_notify_request {
    struct ubus_request req;

    ubus_notify_complete_handler_t status_cb;
    ubus_notify_complete_handler_t complete_cb;

    uint32_t pending;
    uint32_t id[UBUS_MAX_NOTIFY_PEERS + 1];
};

struct ubus_auto_conn {
    struct ubus_context ctx;
    struct uloop_timeout timer;
    const char *path;
    ubus_connect_handler_t cb;
};

2)接口函數

* 初始化client端context結構,並鏈接ubusd
struct ubus_context *ubus_connect(const char *path)
 * 與ubus_connect()函數基本功能相同,但此函數在鏈接斷開後會自動進行重連
void ubus_auto_connect(struct ubus_auto_conn *conn)
 * 註冊新事件
int ubus_register_event_handler(struct ubus_context *ctx, struct ubus_event_handler *ev, const char *pattern)
 * 發出事件消息
int ubus_send_event(struct ubus_context *ctx, const char *id, struct blob_attr *data)

 * 向ubusd查詢指定UBUS_ATTR_OBJPATH對應對象信息內容
 * 內容經過輸入回調函數ubus_lookup_handler_t由調用者自行處理
int ubus_lookup(struct ubus_context *ctx, const char *path, ubus_lookup_handler_t cb, void *priv)

 * 向ubusd查詢指定UBUS_ATTR_OBJPATH對應的ID號
int ubus_lookup_id(struct ubus_context *ctx, const char *path, uint32_t *id)

* 向uloop註冊fd
static inline void ubus_add_uloop(struct ubus_context *ctx)
{
    uloop_fd_add(&ctx->sock, ULOOP_BLOCKING | ULOOP_READ);
}

3. libubus-obj

數據結構

struct ubus_object {
    struct avl_node avl;  /** 關係到struct ubus_context的objects */

    const char *name;     /** UBUS_ATTR_OBJPATH */
    uint32_t id;          /** 由ubusd server分配的obj id */

    const char *path;
    struct ubus_object_type *type;

/** 第1次被訂閱或最後1次補退訂時被調用 */
    ubus_state_handler_t subscribe_cb;
    bool has_subscribers;    /** 此對象是否被訂閱 */

    const struct ubus_method *methods;  /** 方法數組 */
    int n_methods;                      /** 方法數組個數 */
};

struct ubus_object_type {
    const char *name;
    uint32_t id;            /** 由ubusd server分配的obj type id */

    const struct ubus_method *methods;  /** 方法數組 */
    int n_methods;                      /** 方法數組個數 */
};

struct ubus_method {
    const char *name;         /** 方法名稱 */
    ubus_handler_t handler;   /** 方法處理回調函數 */

    unsigned long mask;                   /** 參數過濾掩碼 */
    const struct blobmsg_policy *policy;  /** 參數過濾列表 */
    int n_policy;                         /** 參數過濾列表個數 */
};
#define UBUS_OBJECT_TYPE(_name, _methods)       \
    {                       \
        .name = _name,              \
        .id = 0,                \
        .n_methods = ARRAY_SIZE(_methods),  \
        .methods = _methods         \
    }

#define __UBUS_METHOD_NOARG(_name, _handler)        \
    .name = _name,                  \
    .handler = _handler

#define __UBUS_METHOD(_name, _handler, _policy)     \
    __UBUS_METHOD_NOARG(_name, _handler),       \
    .policy = _policy,              \
    .n_policy = ARRAY_SIZE(_policy)

#define UBUS_METHOD(_name, _handler, _policy)       \
    { __UBUS_METHOD(_name, _handler, _policy) }

#define UBUS_METHOD_MASK(_name, _handler, _policy, _mask) \
    {                       \
        __UBUS_METHOD(_name, _handler, _policy),\
        .mask = _mask               \
    }

#define UBUS_METHOD_NOARG(_name, _handler)      \
    { __UBUS_METHOD_NOARG(_name, _handler) }

接口函數

* client端向ubusd server請求增長一個新object
int ubus_add_object(struct ubus_context *ctx, struct ubus_object *obj)

 * client端向ubusd server請求刪除一個object
int ubus_remove_object(struct ubus_context *ctx, struct ubus_object *obj)

 * 處理收到與object相關報文
void __hidden ubus_process_obj_msg(struct ubus_context *ctx, struct ubus_msghdr *hdr)

 * 處理UBUS_MSG_INVOKE報文
static void
ubus_process_invoke(struct ubus_context *ctx, struct ubus_msghdr *hdr,
            struct ubus_object *obj, struct blob_attr **attrbuf)

 * 處理UBUS_MSG_UNSUBSCRIBE報文
static void
ubus_process_unsubscribe(struct ubus_context *ctx, struct ubus_msghdr *hdr,
             struct ubus_object *obj, struct blob_attr **attrbuf)

 * 處理UBUS_MSG_NOTIFY報文
static void
ubus_process_notify(struct ubus_context *ctx, struct ubus_msghdr *hdr,
            struct ubus_object *obj, struct blob_attr **attrbuf)

4. 示例

1) 下面看下logd中ubus操做,ubos的logd註冊了一個log對象,兩個方法read和write。

static const struct ubus_method log_methods[] = { 
    { .name = "read", .handler = read_log, .policy = &read_policy, .n_policy = 1 },
    { .name = "write", .handler = write_log, .policy = &write_policy, .n_policy = 1 },
};

static struct ubus_object_type log_object_type =
    UBUS_OBJECT_TYPE("log", log_methods);

static struct ubus_object log_object = { 
    .name = "log",
    .type = &log_object_type,
    .methods = log_methods,
    .n_methods = ARRAY_SIZE(log_methods),
};
static const struct blobmsg_policy read_policy =
    { .name = "lines", .type = BLOBMSG_TYPE_INT32 };

static const struct blobmsg_policy write_policy =
    { .name = "event", .type = BLOBMSG_TYPE_STRING };

看下兩個方法的處理函數(根據policy解析blobmsg):

static int
read_log(struct ubus_context *ctx, struct ubus_object *obj,
        struct ubus_request_data *req, const char *method,
        struct blob_attr *msg)
{
    struct client *cl;
    struct blob_attr *tb;
    struct log_head *l;
    int count = 0;
    int fds[2];
    int ret;

    if (msg) {   // 解析blobmsg
        blobmsg_parse(&read_policy, 1, &tb, blob_data(msg), blob_len(msg));
        if (tb)
            count = blobmsg_get_u32(tb);
    }
// 具體業務處理
    if (pipe(fds) == -1) {
        fprintf(stderr, "logd: failed to create pipe: %s\n", strerror(errno));
        return -1;
    }
    ubus_request_set_fd(ctx, req, fds[0]);
    cl = calloc(1, sizeof(*cl));
    cl->s.stream.notify_state = client_notify_state;
    cl->fd = fds[1];
    ustream_fd_init(&cl->s, cl->fd);
    list_add(&cl->list, &clients);
    l = log_list(count, NULL);
    while ((!tb || count) && l) {
        blob_buf_init(&b, 0);
        blobmsg_add_string(&b, "msg", l->data);
        blobmsg_add_u32(&b, "id", l->id);
        blobmsg_add_u32(&b, "priority", l->priority);
        blobmsg_add_u32(&b, "source", l->source);
        blobmsg_add_u64(&b, "time", l->ts.tv_sec * 1000LL);
        l = log_list(count, l);
        ret = ustream_write(&cl->s.stream, (void *) b.head, blob_len(b.head) + sizeof(struct blob_attr), false);
        blob_buf_free(&b);
        if (ret < 0)
            break;
    }
    return 0;
}
static int
write_log(struct ubus_context *ctx, struct ubus_object *obj,
        struct ubus_request_data *req, const char *method,
        struct blob_attr *msg)
{
    struct blob_attr *tb;
    char *event;

    if (msg) {  //解析blobmsg
        blobmsg_parse(&write_policy, 1, &tb, blob_data(msg), blob_len(msg));
        if (tb) {
            event = blobmsg_get_string(tb);
            log_add(event, strlen(event) + 1, SOURCE_SYSLOG);
        }
    }

    return 0;
}

上層調度

static struct ubus_auto_conn conn;

uloop_init(); conn.cb
= ubus_connect_handler; ubus_auto_connect(&conn);
uloop_run();
static void ubus_connect_handler(struct ubus_context *ctx) { int ret; ret = ubus_add_object(ctx, &log_object); if (ret) { fprintf(stderr, "Failed to add object: %s\n", ubus_strerror(ret)); exit(1); } fprintf(stderr, "log: connected to ubus\n"); }

logd是守護進程,永不退出,因此不必remove object。

2) 客戶端/服務器模式

/*
ubus_invoke()的聲明以下:
int ubus_invoke(struct ubus_context *ctx, uint32_t obj, const char *method,
                struct blob_attr *msg, ubus_data_handler_t cb, void *priv, int timeout);
                
其中cb參數是消息回調函數,其函數類型定義爲:
typedef void (*ubus_data_handler_t)(struct ubus_request *req,
                    int type, struct blob_attr *msg);
參數type是請求消息的類型,如UBUS_MSG_INVOKE,UBUS_MSG_DATA等。
參數msg存放從服務端獲得的回覆消息,struct blob_attr類型,一樣用blobmsg_parse()來解析。
參數req保存了請求方的消息屬性,其中req->priv即ubus_invoke()中的priv參數,
用這個參數能夠零活的傳遞一些額外的數據。
*/
// ser.c
#include <stdio.h>
#include <unistd.h>
#include <libubox/uloop.h>
#include <libubox/ustream.h>
#include <libubox/utils.h>
#include <libubox/blobmsg_json.h>
#include <libubus.h>

static int stu_no = 0;
static char *ubus_socket = NULL;
struct ubus_context *ctx = NULL;
static struct blob_buf b;

enum {
    STU_NO,
    __STU_MAX
};

static const struct blobmsg_policy stu_policy[__STU_MAX] ={
 [STU_NO] = {.name = "no", .type = BLOBMSG_TYPE_INT32 }
};

static int stu_add(struct ubus_context *ctx, struct ubus_object *obj,
                struct ubus_request_data *req, const char *method,
                struct blob_attr *msg)
{
    struct blob_attr *tb[__STU_MAX];

    blobmsg_parse(stu_policy, ARRAY_SIZE(stu_policy), tb, blob_data(msg), blob_len(msg));

    if(tb[STU_NO])
        stu_no += blobmsg_get_u32(tb[STU_NO]);

    blob_buf_init(&b, 0);
    blobmsg_add_u32(&b, "no", stu_no);
    ubus_send_reply(ctx, req, b.head);

    return 0;
}

static int stu_sub(struct ubus_context *ctx, struct ubus_object *obj,
                struct ubus_request_data *req, const char *method,
                struct blob_attr *msg)
{
    struct blob_attr *tb[__STU_MAX];

    blobmsg_parse(stu_policy, ARRAY_SIZE(stu_policy), tb, blob_data(msg), blob_len(msg));

    if(tb[STU_NO])
        stu_no -= blobmsg_get_u32(tb[STU_NO]);

    blob_buf_init(&b, 0);
    blobmsg_add_u32(&b, "no", stu_no);
    ubus_send_reply(ctx, req, b.head);

    return 0;
}
static const struct ubus_method stu_methods[] = {
/*
    { .name = "add", .handler = stu_add, .policy = stu_policy, .n_policy = 1 },
    { .name = "sub", .handler = stu_sub, .policy = stu_policy, .n_policy = 1 }
*/
    UBUS_METHOD("add", stu_add, stu_policy),
    UBUS_METHOD("sub", stu_sub, stu_policy),
};

static struct ubus_object_type stu_object_type =
    UBUS_OBJECT_TYPE("stu", stu_methods);

static struct ubus_object stu_object = {
    .name = "stu",
    .type = &stu_object_type,
    .methods = stu_methods,
    .n_methods = ARRAY_SIZE(stu_methods)
};

static void ubus_add_fd(void)
{
    ubus_add_uloop(ctx);

#ifdef FD_CLOEXEC
    fcntl(ctx->sock.fd, F_SETFD,
        fcntl(ctx->sock.fd, F_GETFD) | FD_CLOEXEC);
#endif
}

static void ubus_reconn_timer(struct uloop_timeout *timeout)
{
    static struct uloop_timeout retry =
    { .cb = ubus_reconn_timer,};
    int t = 2;

    if (ubus_reconnect(ctx, ubus_socket) != 0) {
        uloop_timeout_set(&retry, t * 1000);
        return;
    }

    ubus_add_fd();
}

static void ubus_connection_lost(struct ubus_context *ctx)
{
    ubus_reconn_timer(NULL);
}
int main(int argc, char **argv)
{
    char ch;
    int ret = 0;

    while((ch = getopt(argc, argv, "s:"))!= -1){
        switch(ch){
            case 's':
                ubus_socket = optarg;
                break;
            default:  /* ? */
                break;
        }
    }

    uloop_init();

    ctx = ubus_connect(ubus_socket);
    if(!ctx){
        printf("ubus connect error.\n");
        return -1;
    }
    ctx->connection_lost = ubus_connection_lost;

    ret = ubus_add_object(ctx, &stu_object);
    if(ret){
        printf("Failed to add object to ubus:%s\n", ubus_strerror(ret));
        return 0;
    }

    //ubus_add_uloop(ctx);
    ubus_add_fd();
    uloop_run();

    if(ctx)
        ubus_free(ctx);

    uloop_done();
    return 0;
}
#include <stdio.h>
#include <unistd.h>
#include <libubox/uloop.h>
#include <libubox/ustream.h>
#include <libubox/utils.h>
#include <libubox/blobmsg_json.h>
#include <libubus.h>

static struct blob_buf b;

static void receive_call_result_data(struct ubus_request *req, int type, struct blob_attr *msg)
{
    char *str;
    if(!msg)
        return;
    
    str = blobmsg_format_json_indent(msg, true, 0); 
    printf("%s\n", str);

    free(str);
}

static int client_main(struct ubus_context *ctx, int argc, char**argv)
{
    uint32_t id; 
    int ret;

    printf("argc =%d, argv[0] =%s\n", argc, argv[0]);
    if(argc != 3)
        return -2; 

    blob_buf_init(&b, 0); 
    if(!blobmsg_add_json_from_string(&b, argv[2])){
        printf("Failed to parse message data\n");
        return -1; 
    }   

    ret = ubus_lookup_id(ctx, argv[0], &id);
    if(ret)
        return ret;

    return ubus_invoke(ctx, id, argv[1], b.head, receive_call_result_data, NULL, 30*1000);
}

int main(int argc, char **argv)
{
    struct ubus_context *ctx = NULL;
    char ch;
    char *ubus_socket = NULL;
    int ret = 0;

    while((ch = getopt(argc, argv, "s:"))!= -1){
        switch(ch){
            case 's':
                ubus_socket = optarg;
                break;
            default:  /* ? */
                break;
        }
    }

    argc -= optind;
    argv += optind;

    ctx = ubus_connect(ubus_socket);
    if(!ctx){
        printf("ubus connect error.\n");
        return -1;
    }

    ret = client_main(ctx, argc, argv);
    if(ret < 0)
        return ret;

    if(ctx)
        ubus_free(ctx);

    return 0;
}

 編譯:

gcc cli.c -Wall -lubox -lubus -lblobmsg_json -o cli
gcc ser.c -Wall -lubox -lubus -lblobmsg_json -o ser

執行:

ubuntu:~/test/ubus/test$ sudo ./cli stu add '{"no":20}'
argc =3, argv[0] =stu
{
    "no": 20
}
ubuntu:~/test/ubus/test$ sudo ./cli stu add '{"no":20}'
argc =3, argv[0] =stu
{
    "no": 40
}
ubuntu:~/test/ubus/test$ sudo ./cli stu add '{"no":20}'
argc =3, argv[0] =stu
{
    "no": 60
}

 

參考文檔:

openwrt中使用ubus實現進程通訊

ubus實現進程間通訊舉例

openwrt ubus簡介以及libubus開發說明

ubus [1] - ubusd

ubus [2] - libubus

ubus [3] - cli

在 Linux 上實現基於 Socket 的多進程實時通訊 IBM

相關文章
相關標籤/搜索