openwrt提供了一個系統總線ubus,相似linux桌面操做系統的d-bus,目標是提供系統級的進程間通訊(IPC)功能。node
爲了提供各類後臺進程和應用程序之間的通訊機制,ubus被開發出來,由3部分組成:精靈進程,接口庫和實用工具。linux
工程的核心是ubusd精靈進程,它提供了一個總線層,在系統啓動時運行,負責進程間的消息路由和傳遞。其餘進程註冊到ubusd進程進行消息的發送和接收。這個接口是用linux文件socket和TLV手法消息實現的。每個進程在指定命名空間下注冊本身的路徑。每個路徑均可以提供帶有各類參數的多個函數處理過程。函數處理過程程序能夠在完成處理後返回消息。git
接口庫名稱爲libubus.so,其餘進程能夠經過該動態連接庫來簡化對ubus總線的訪問。編程
實用工具ubus是提供命令行的接口調用工具。能夠基於該工具來進行腳本編程,也能夠使用ubus來診斷問題。json
ubus是爲進程間發送消息而設計的,不適合傳輸大量數據(進程間傳輸大量數據應採用共享內存)。ubuntu
網址:http://git.openwrt.org/project/ubus.gitsegmentfault
$ ls ubus-2015-05-25 cli.c libubus.h libubus-req.c ubus_common.h ubusd_id.c ubusd_proto.c CMakeLists.txt libubus-internal.h libubus-sub.c ubusd.c ubusd_id.h ubusmsg.h examples libubus-io.c lua ubusd_event.c ubusd_obj.c libubus.c libubus-obj.c systemd ubusd.h ubusd_obj.h
ubusd: ubusd.c ubusd_id.c ubusd_obj.c ubusd_proto.c ubusd_event.c libubus: libubus.c libubus-io.c libubus-obj.c libubus-sub.c libubus-req.c cli/ubus: cli.c
/etc/init.d/ubus中提供ubusd進程的啓動,在系統進程啓動完成後當即啓動。他是在網絡進程netifd以前啓動的,該進程提供一個文件套接字接口和其餘應用程序通訊。數組
ubusd採用SOCK_STREAM的Unix域套接字實現與服務進程間通信。服務器
提供的功能:網絡
1)提供註冊對象和方法供其餘實體調用。
2)調用其餘應用程序提供的註冊對象的控制接口。
3)在特定對象上註冊監聽事件。
4)向特定對象發送事件消息。
ubus將消息處理抽象爲對象object和方法method的概念。一個對象中包含多個方法。對象和方法都有本身的名字,發送請求方在消息中指定要調用的對象和方法名字便可。
ubus的另一個概念就是訂閱(subscriber)。客戶端須要向服務器註冊收到特定消息時的處理方法。這樣當服務器在狀態發生改變時會經過ubus總線來通知客戶端。
ubus可用於兩個進程之間通訊,進程之間以TLV格式傳遞消息。ubus可以以json格式和用戶進行數據交換。常見場景(應用詳見ubus實現進程間通訊舉例):
1)客戶端/服務器模式。
2)訂閱通知模式(struct ubus_subscriber)。
3)事件模式(ubus_event_handler)。
ubusd是一種總線消息服務器,任何消息均經過ubusd進程傳遞,所以多個進程在相互通訊時,均經過ubus收發消息。
編譯安裝
1)去除lua支持
#ADD_SUBDIRECTORY(lua)
2)編譯
mkdir build; cd build; cmake ..; make ;make install
[ 30%] Built target ubus [ 60%] Built target ubusd [ 70%] Built target cli [ 85%] Built target server [100%] Built target client Install the project... -- Install configuration: "" -- Installing: /usr/local/lib/libubus.so -- Installing: /usr/local/bin/ubus -- Set runtime path of "/usr/local/bin/ubus" to "" -- Installing: /usr/local/sbin/ubusd -- Up-to-date: /usr/local/include/ubusmsg.h -- Up-to-date: /usr/local/include/ubus_common.h -- Up-to-date: /usr/local/include/libubus.h -- Installing: /usr/lib/systemd/system/ubus.socket -- Installing: /usr/lib/systemd/system/ubus.service
1) 數據結構
struct ubus_event_handler { struct ubus_object obj; ubus_event_handler_t cb; }; struct ubus_context { struct list_head requests; struct avl_tree objects; /** client端object鏈表頭 */ struct list_head pending; struct uloop_fd sock; /** client端sock對象 */ uint32_t local_id; /** ubusd分配的client id */ uint16_t request_seq; int stack_depth; /** 斷開鏈接回調函數 */ void (*connection_lost)(struct ubus_context *ctx); struct { struct ubus_msghdr hdr; char data[UBUS_MAX_MSGLEN]; } msgbuf; /** 通訊報文 */ }; struct ubus_object_data { uint32_t id; uint32_t type_id; const char *path; struct blob_attr *signature; }; struct ubus_request_data { uint32_t object; uint32_t peer; uint16_t seq; /* internal use */ bool deferred; int fd; }; struct ubus_request { struct list_head list; struct list_head pending; int status_code; bool status_msg; bool blocked; bool cancelled; bool notify; uint32_t peer; uint16_t seq; ubus_data_handler_t raw_data_cb; ubus_data_handler_t data_cb; ubus_fd_handler_t fd_cb; ubus_complete_handler_t complete_cb; struct ubus_context *ctx; void *priv; }; struct ubus_notify_request { struct ubus_request req; ubus_notify_complete_handler_t status_cb; ubus_notify_complete_handler_t complete_cb; uint32_t pending; uint32_t id[UBUS_MAX_NOTIFY_PEERS + 1]; }; struct ubus_auto_conn { struct ubus_context ctx; struct uloop_timeout timer; const char *path; ubus_connect_handler_t cb; };
2)接口函數
* 初始化client端context結構,並鏈接ubusd struct ubus_context *ubus_connect(const char *path) * 與ubus_connect()函數基本功能相同,但此函數在鏈接斷開後會自動進行重連 void ubus_auto_connect(struct ubus_auto_conn *conn) * 註冊新事件 int ubus_register_event_handler(struct ubus_context *ctx, struct ubus_event_handler *ev, const char *pattern) * 發出事件消息 int ubus_send_event(struct ubus_context *ctx, const char *id, struct blob_attr *data) * 向ubusd查詢指定UBUS_ATTR_OBJPATH對應對象信息內容 * 內容經過輸入回調函數ubus_lookup_handler_t由調用者自行處理 int ubus_lookup(struct ubus_context *ctx, const char *path, ubus_lookup_handler_t cb, void *priv) * 向ubusd查詢指定UBUS_ATTR_OBJPATH對應的ID號 int ubus_lookup_id(struct ubus_context *ctx, const char *path, uint32_t *id)
* 向uloop註冊fd
static inline void ubus_add_uloop(struct ubus_context *ctx)
{
uloop_fd_add(&ctx->sock, ULOOP_BLOCKING | ULOOP_READ);
}
數據結構
struct ubus_object { struct avl_node avl; /** 關係到struct ubus_context的objects */ const char *name; /** UBUS_ATTR_OBJPATH */ uint32_t id; /** 由ubusd server分配的obj id */ const char *path; struct ubus_object_type *type; /** 第1次被訂閱或最後1次補退訂時被調用 */ ubus_state_handler_t subscribe_cb; bool has_subscribers; /** 此對象是否被訂閱 */ const struct ubus_method *methods; /** 方法數組 */ int n_methods; /** 方法數組個數 */ }; struct ubus_object_type { const char *name; uint32_t id; /** 由ubusd server分配的obj type id */ const struct ubus_method *methods; /** 方法數組 */ int n_methods; /** 方法數組個數 */ }; struct ubus_method { const char *name; /** 方法名稱 */ ubus_handler_t handler; /** 方法處理回調函數 */ unsigned long mask; /** 參數過濾掩碼 */ const struct blobmsg_policy *policy; /** 參數過濾列表 */ int n_policy; /** 參數過濾列表個數 */ };
#define UBUS_OBJECT_TYPE(_name, _methods) \ { \ .name = _name, \ .id = 0, \ .n_methods = ARRAY_SIZE(_methods), \ .methods = _methods \ } #define __UBUS_METHOD_NOARG(_name, _handler) \ .name = _name, \ .handler = _handler #define __UBUS_METHOD(_name, _handler, _policy) \ __UBUS_METHOD_NOARG(_name, _handler), \ .policy = _policy, \ .n_policy = ARRAY_SIZE(_policy) #define UBUS_METHOD(_name, _handler, _policy) \ { __UBUS_METHOD(_name, _handler, _policy) } #define UBUS_METHOD_MASK(_name, _handler, _policy, _mask) \ { \ __UBUS_METHOD(_name, _handler, _policy),\ .mask = _mask \ } #define UBUS_METHOD_NOARG(_name, _handler) \ { __UBUS_METHOD_NOARG(_name, _handler) }
接口函數
* client端向ubusd server請求增長一個新object int ubus_add_object(struct ubus_context *ctx, struct ubus_object *obj) * client端向ubusd server請求刪除一個object int ubus_remove_object(struct ubus_context *ctx, struct ubus_object *obj) * 處理收到與object相關報文 void __hidden ubus_process_obj_msg(struct ubus_context *ctx, struct ubus_msghdr *hdr) * 處理UBUS_MSG_INVOKE報文 static void ubus_process_invoke(struct ubus_context *ctx, struct ubus_msghdr *hdr, struct ubus_object *obj, struct blob_attr **attrbuf) * 處理UBUS_MSG_UNSUBSCRIBE報文 static void ubus_process_unsubscribe(struct ubus_context *ctx, struct ubus_msghdr *hdr, struct ubus_object *obj, struct blob_attr **attrbuf) * 處理UBUS_MSG_NOTIFY報文 static void ubus_process_notify(struct ubus_context *ctx, struct ubus_msghdr *hdr, struct ubus_object *obj, struct blob_attr **attrbuf)
1) 下面看下logd中ubus操做,ubos的logd註冊了一個log對象,兩個方法read和write。
static const struct ubus_method log_methods[] = { { .name = "read", .handler = read_log, .policy = &read_policy, .n_policy = 1 }, { .name = "write", .handler = write_log, .policy = &write_policy, .n_policy = 1 }, }; static struct ubus_object_type log_object_type = UBUS_OBJECT_TYPE("log", log_methods); static struct ubus_object log_object = { .name = "log", .type = &log_object_type, .methods = log_methods, .n_methods = ARRAY_SIZE(log_methods), };
static const struct blobmsg_policy read_policy = { .name = "lines", .type = BLOBMSG_TYPE_INT32 }; static const struct blobmsg_policy write_policy = { .name = "event", .type = BLOBMSG_TYPE_STRING };
看下兩個方法的處理函數(根據policy解析blobmsg):
static int read_log(struct ubus_context *ctx, struct ubus_object *obj, struct ubus_request_data *req, const char *method, struct blob_attr *msg) { struct client *cl; struct blob_attr *tb; struct log_head *l; int count = 0; int fds[2]; int ret; if (msg) { // 解析blobmsg blobmsg_parse(&read_policy, 1, &tb, blob_data(msg), blob_len(msg)); if (tb) count = blobmsg_get_u32(tb); } // 具體業務處理 if (pipe(fds) == -1) { fprintf(stderr, "logd: failed to create pipe: %s\n", strerror(errno)); return -1; } ubus_request_set_fd(ctx, req, fds[0]); cl = calloc(1, sizeof(*cl)); cl->s.stream.notify_state = client_notify_state; cl->fd = fds[1]; ustream_fd_init(&cl->s, cl->fd); list_add(&cl->list, &clients); l = log_list(count, NULL); while ((!tb || count) && l) { blob_buf_init(&b, 0); blobmsg_add_string(&b, "msg", l->data); blobmsg_add_u32(&b, "id", l->id); blobmsg_add_u32(&b, "priority", l->priority); blobmsg_add_u32(&b, "source", l->source); blobmsg_add_u64(&b, "time", l->ts.tv_sec * 1000LL); l = log_list(count, l); ret = ustream_write(&cl->s.stream, (void *) b.head, blob_len(b.head) + sizeof(struct blob_attr), false); blob_buf_free(&b); if (ret < 0) break; } return 0; }
static int write_log(struct ubus_context *ctx, struct ubus_object *obj, struct ubus_request_data *req, const char *method, struct blob_attr *msg) { struct blob_attr *tb; char *event; if (msg) { //解析blobmsg blobmsg_parse(&write_policy, 1, &tb, blob_data(msg), blob_len(msg)); if (tb) { event = blobmsg_get_string(tb); log_add(event, strlen(event) + 1, SOURCE_SYSLOG); } } return 0; }
上層調度
static struct ubus_auto_conn conn;
uloop_init(); conn.cb = ubus_connect_handler; ubus_auto_connect(&conn);
uloop_run(); static void ubus_connect_handler(struct ubus_context *ctx) { int ret; ret = ubus_add_object(ctx, &log_object); if (ret) { fprintf(stderr, "Failed to add object: %s\n", ubus_strerror(ret)); exit(1); } fprintf(stderr, "log: connected to ubus\n"); }
logd是守護進程,永不退出,因此不必remove object。
2) 客戶端/服務器模式
/* ubus_invoke()的聲明以下: int ubus_invoke(struct ubus_context *ctx, uint32_t obj, const char *method, struct blob_attr *msg, ubus_data_handler_t cb, void *priv, int timeout); 其中cb參數是消息回調函數,其函數類型定義爲: typedef void (*ubus_data_handler_t)(struct ubus_request *req, int type, struct blob_attr *msg); 參數type是請求消息的類型,如UBUS_MSG_INVOKE,UBUS_MSG_DATA等。 參數msg存放從服務端獲得的回覆消息,struct blob_attr類型,一樣用blobmsg_parse()來解析。 參數req保存了請求方的消息屬性,其中req->priv即ubus_invoke()中的priv參數, 用這個參數能夠零活的傳遞一些額外的數據。 */
// ser.c #include <stdio.h> #include <unistd.h> #include <libubox/uloop.h> #include <libubox/ustream.h> #include <libubox/utils.h> #include <libubox/blobmsg_json.h> #include <libubus.h> static int stu_no = 0; static char *ubus_socket = NULL; struct ubus_context *ctx = NULL; static struct blob_buf b; enum { STU_NO, __STU_MAX }; static const struct blobmsg_policy stu_policy[__STU_MAX] ={ [STU_NO] = {.name = "no", .type = BLOBMSG_TYPE_INT32 } }; static int stu_add(struct ubus_context *ctx, struct ubus_object *obj, struct ubus_request_data *req, const char *method, struct blob_attr *msg) { struct blob_attr *tb[__STU_MAX]; blobmsg_parse(stu_policy, ARRAY_SIZE(stu_policy), tb, blob_data(msg), blob_len(msg)); if(tb[STU_NO]) stu_no += blobmsg_get_u32(tb[STU_NO]); blob_buf_init(&b, 0); blobmsg_add_u32(&b, "no", stu_no); ubus_send_reply(ctx, req, b.head); return 0; } static int stu_sub(struct ubus_context *ctx, struct ubus_object *obj, struct ubus_request_data *req, const char *method, struct blob_attr *msg) { struct blob_attr *tb[__STU_MAX]; blobmsg_parse(stu_policy, ARRAY_SIZE(stu_policy), tb, blob_data(msg), blob_len(msg)); if(tb[STU_NO]) stu_no -= blobmsg_get_u32(tb[STU_NO]); blob_buf_init(&b, 0); blobmsg_add_u32(&b, "no", stu_no); ubus_send_reply(ctx, req, b.head); return 0; } static const struct ubus_method stu_methods[] = { /* { .name = "add", .handler = stu_add, .policy = stu_policy, .n_policy = 1 }, { .name = "sub", .handler = stu_sub, .policy = stu_policy, .n_policy = 1 } */ UBUS_METHOD("add", stu_add, stu_policy), UBUS_METHOD("sub", stu_sub, stu_policy), }; static struct ubus_object_type stu_object_type = UBUS_OBJECT_TYPE("stu", stu_methods); static struct ubus_object stu_object = { .name = "stu", .type = &stu_object_type, .methods = stu_methods, .n_methods = ARRAY_SIZE(stu_methods) }; static void ubus_add_fd(void) { ubus_add_uloop(ctx); #ifdef FD_CLOEXEC fcntl(ctx->sock.fd, F_SETFD, fcntl(ctx->sock.fd, F_GETFD) | FD_CLOEXEC); #endif } static void ubus_reconn_timer(struct uloop_timeout *timeout) { static struct uloop_timeout retry = { .cb = ubus_reconn_timer,}; int t = 2; if (ubus_reconnect(ctx, ubus_socket) != 0) { uloop_timeout_set(&retry, t * 1000); return; } ubus_add_fd(); } static void ubus_connection_lost(struct ubus_context *ctx) { ubus_reconn_timer(NULL); } int main(int argc, char **argv) { char ch; int ret = 0; while((ch = getopt(argc, argv, "s:"))!= -1){ switch(ch){ case 's': ubus_socket = optarg; break; default: /* ? */ break; } } uloop_init(); ctx = ubus_connect(ubus_socket); if(!ctx){ printf("ubus connect error.\n"); return -1; } ctx->connection_lost = ubus_connection_lost; ret = ubus_add_object(ctx, &stu_object); if(ret){ printf("Failed to add object to ubus:%s\n", ubus_strerror(ret)); return 0; } //ubus_add_uloop(ctx); ubus_add_fd(); uloop_run(); if(ctx) ubus_free(ctx); uloop_done(); return 0; }
#include <stdio.h> #include <unistd.h> #include <libubox/uloop.h> #include <libubox/ustream.h> #include <libubox/utils.h> #include <libubox/blobmsg_json.h> #include <libubus.h> static struct blob_buf b; static void receive_call_result_data(struct ubus_request *req, int type, struct blob_attr *msg) { char *str; if(!msg) return; str = blobmsg_format_json_indent(msg, true, 0); printf("%s\n", str); free(str); } static int client_main(struct ubus_context *ctx, int argc, char**argv) { uint32_t id; int ret; printf("argc =%d, argv[0] =%s\n", argc, argv[0]); if(argc != 3) return -2; blob_buf_init(&b, 0); if(!blobmsg_add_json_from_string(&b, argv[2])){ printf("Failed to parse message data\n"); return -1; } ret = ubus_lookup_id(ctx, argv[0], &id); if(ret) return ret; return ubus_invoke(ctx, id, argv[1], b.head, receive_call_result_data, NULL, 30*1000); } int main(int argc, char **argv) { struct ubus_context *ctx = NULL; char ch; char *ubus_socket = NULL; int ret = 0; while((ch = getopt(argc, argv, "s:"))!= -1){ switch(ch){ case 's': ubus_socket = optarg; break; default: /* ? */ break; } } argc -= optind; argv += optind; ctx = ubus_connect(ubus_socket); if(!ctx){ printf("ubus connect error.\n"); return -1; } ret = client_main(ctx, argc, argv); if(ret < 0) return ret; if(ctx) ubus_free(ctx); return 0; }
編譯:
gcc cli.c -Wall -lubox -lubus -lblobmsg_json -o cli
gcc ser.c -Wall -lubox -lubus -lblobmsg_json -o ser
執行:
ubuntu:~/test/ubus/test$ sudo ./cli stu add '{"no":20}' argc =3, argv[0] =stu { "no": 20 } ubuntu:~/test/ubus/test$ sudo ./cli stu add '{"no":20}' argc =3, argv[0] =stu { "no": 40 } ubuntu:~/test/ubus/test$ sudo ./cli stu add '{"no":20}' argc =3, argv[0] =stu { "no": 60 }
參考文檔: