ubus提供了一種多進程通訊的機制。存在一個守護進程ubusd,因此進程都註冊到ubusd,ubusd進行消息的接收、分發管理。html
server端:ubus_send_reply(ctx, req, b.head);
linux
uloop_init();
ctx = ubus_connect(NULL);
ubus_add_uloop(ctx);
static struct ubus_method scan_methods[] = {
UBUS_METHOD("scan", ubus_start_scan, scan_policy),
};
static struct ubus_object scan_obj = {
.name = "scan_prog", /* 對象的名字 */
.type = &scan_obj_type,
.methods = scan_methods,
.n_methods = ARRAY_SIZE(scan_methods),
};
ubus_add_object(ctx, scan_obj);
ubus_send_reply(ctx, req, b.head);
uloop_run();
ubus_free(ctx);
client端:ubus_invoke(ctx, id, "scan", b.head, scanreq_prog_cb, NULL, timeout * 1000);
json
uloop_init();
ctx = ubus_connect(path);
struct blob_buf b;
blob_buf_init(&b, 0);
ubus_lookup_id(ctx, "scan_prog", &id);
ubus_invoke(ctx, id, "scan", b.head, scanreq_prog_cb, NULL, timeout * 1000);
ubus_free(ctx);
server端:ubus_notify(ctx, &test_object, "say Hi!", NULL, -1);
多線程
static struct ubus_object test_object = {
.name = "test", /* object的名字 */
.type = &test_obj_type,
.subscribe_cb = test_client_subscribe_cb,
};
static void test_client_subscribe_cb(struct ubus_context *ctx, struct ubus_object *obj) {
fprintf(stderr, "Subscribers active: %d\n", obj->has_subscribers);
}
uloop_init();
ctx = ubus_connect(NULL);
ubus_add_uloop(ctx);
ret = ubus_add_object(ctx, &test_object);
while (1) {
sleep(2);
/* step2: 廣播notification消息。 */
ubus_notify(ctx, &test_object, "say Hi!", NULL, -1);
}
uloop_run();
ubus_free(ctx);
uloop_done();
client端:ret = ubus_register_subscriber(ctx, &test_event);
socket
static int test_notify(struct ubus_context *ctx, struct ubus_object *obj, struct ubus_request_data *req, const char *method, struct blob_attr *msg) {
printf("notify handler...\n");
}
static void test_handle_remove(struct ubus_context *ctx, struct ubus_subscriber *obj, uint32_t id) {
printf("remove handler...\n");
}
uloop_init();
ctx = ubus_connect(NULL);
ubus_add_uloop(ctx);
struct ubus_subscriber test_event;
/* 通知到來時的處理函數。 */
test_event.cb = test_notify;
test_event.remove_cb = test_handle_remove; //server主動發起刪除該client的訂閱的cb函數(如server退出的時候)
/* 註冊test_event */
ret = ubus_register_subscriber(ctx, &test_event);
uint32_t obj_id;
ret = ubus_lookup_id(ctx, "test", &obj_id);
ret = ubus_subscribe(ctx, &test_event, obj_id);
uloop_run();
ubus_free(ctx);
uloop_done();
server端:發送事件廣播消息, ubus_send_event(ctx, "add_device", b.head);
ide
uloop_init();
ctx = ubus_connect(NULL);
blob_buf_init(&b, 0);
/* 須要傳遞的參數 */
blobmsg_add_u32(&b, "major", 3);
blobmsg_add_u32(&b, "minor", 56);
blobmsg_add_string(&b, "name", "mmc01");
/* 廣播名爲"add_device"的事件 */
return ubus_send_event(ctx, "add_device", b.head);
ubus_free(ctx);
client端:ret = ubus_register_event_handler(ctx, &listener, "add_device");
函數
#define UBUS_EVENT_ADD_DEVICE "add_device"
#define UBUS_EVENT_REMOVE_DEVICE "rm_device"
static void ubus_probe_device_event(struct ubus_context *ctx, struct ubus_event_handler *ev, const char *type, struct blob_attr *msg) {
char *str;
if (!msg)
return;
str = blobmsg_format_json(msg, true);
printf("{ \"%s\": %s }\n", type, str);
free(str);
}
uloop_init();
ctx = ubus_connect(NULL);
ubus_add_fd();
ubus_add_uloop(ctx);
static struct ubus_event_handler listener;
memset(&listener, 0, sizeof(listener));
listener.cb = ubus_probe_device_event;
ret = ubus_register_event_handler(ctx, &listener, UBUS_EVENT_ADD_DEVICE);
ret = ubus_register_event_handler(ctx, &listener, UBUS_EVENT_REMOVE_DEVICE);
uloop_run();
ubus_free(ctx);
/** *初始化事件循環 *主要工做是poll_fd = epoll_create(32);/* 建立一個epoll的文件描述符監控句柄。最多監控32個文件描述符 **/
int uloop_init(void) {
if (poll_fd >= 0)
return 0;
poll_fd = epoll_create(32);/* 建立一個epoll的句柄。最多監控32個文件描述符 */
if (poll_fd < 0)
return -1;
fcntl(poll_fd, F_SETFD, fcntl(poll_fd, F_GETFD) | FD_CLOEXEC); /* fd_cloexecs */
return 0;
}
/** * 事件循環主處理入口 *1.當某一個進程第一次調用uloop_run時,註冊sigchld和sigint信號 *2.循環獲取當前時間,把超時的timeout處理掉,有一條timeout鏈表在維護 *3.循環檢測是否收到一個sigchld信號,若是收到,刪除對應的子進程,有一條process子進程鏈表在維護 *4.循環調用epoll_wait 監相應的觸發事件文件描述符fd **/
void uloop_run(void) {
static int recursive_calls = 0; /* static value */
struct timeval tv;
/* * Handlers are only updated for the first call to uloop_run() (and restored * when this call is done). */
if (!recursive_calls++) /* 第一次運行uloop_run時調用, 註冊信號處理函數 */
uloop_setup_signals(true);
uloop_cancelled = false;
while(!uloop_cancelled)
{
uloop_gettime(&tv); /* 獲取當前時間 */
uloop_process_timeouts(&tv); /* 把超時的timeout清理掉 */
if (uloop_cancelled)
break;
if (do_sigchld) /* 收到一個sigchld的信號 */
uloop_handle_processes(); /* 銷燬該進程的uloop_process */
uloop_gettime(&tv);
uloop_run_events(uloop_get_next_timeout(&tv));/* 處理相應的觸發事件fd */
}
if (!--recursive_calls)
uloop_setup_signals(false);
}
/** * 銷燬事件循環 * 關閉epoll描述符 * 銷燬子進程鏈表 * 銷燬timeout鏈表 **/
void uloop_done(void) {
if (poll_fd < 0)
return;
close(poll_fd);
poll_fd = -1;
uloop_clear_timeouts();
uloop_clear_processes();
}
uloop_fd_add(uloop_fd, ULOOP_READ);oop
#include ""
struct uloop_fd ufd; //建立uloop_fd全局變量
static void fd_handler(struct uloop_fd *u, unsigned int ev) {
if(recvfrom(u->fd, ...)) == -1) {
} else {
//do your work
}
}
int main()
{
//
int socket = socket(....);
ufd.fd = socket;
uloop_init(); //使用庫初始化
ufd.cb = fd_handler;
uloop_fd_add(&ufd, ULOOP_READ));
uloop_run();
}
uloop_timeout_set(uloop_timeout, freq);ui
#include ""
struct uloop_timeout timeout; //建立uloop_timeout全局變量
int frequency = 5; //每隔5秒超時一次
static void timeout_cb(struct uloop_timeout *t) {
//do your work
uloop_timeout_set(t, frequency * 1000);//設置下次的超時時間
}
int main() {
uloop_init(); //使用庫初始化
timeout.cb = timeout_cb;
uloop_timeout_set(&timeout, frequency * 1000);//設置下次的超時時間
uloop_run();
}
uloop_process_add(uloop_process);this
#include ""
static struct uloop_process rsync; //建立rsync全局變量
static void rsync_complete(struct uloop_process *proc, int ret) {
//do something where child exit;
printf("rsync work is complete\n");
}
function fun() {
char *argv[]={"rsync", "-az", "rsync://XYZ@192.168.26.99/www","/root/www/","--password-file=/root/rsync.secrets", NULL};
rsync.cb = rsync_complete;
rsync.pid = fork();
if (!rsync.pid) {
/* This is child process*/
execvp(argv[0], argv);
fprintf(stderr, "fork failed\n");
exit(-1);
}
if (rsync.pid <=0) {
fprintf(stderr, "fork failed2\n");
return -1;
}
uloop_process_add(&rsync);
}
int main() {
.....
uloop_init(); //使用庫前進行初始化
fun();
uloop_run();
}
初始化:
json_uri = blobmsg_open_array(&b, "prog_list");
for (idx = 0; idx < PROG_MAX; idx++)
{
if ('\0' != uri_list[idx].name[0])
{
json_list = blobmsg_open_table(&b, NULL);
blobmsg_add_string(&b, "name", uri_list[idx].name);
blobmsg_add_u32(&b, "channel", uri_list[idx].chn_id);
blobmsg_close_table(&b, json_list);
}
}
blobmsg_close_array(&b, json_uri);
解析:
獲取索引: hdr = blob_data(attr); char *name = (char *)hdr->name;
獲取數據: blobmsg_get_u32(attr);
獲取長度: int len = blobmsg_data_len(tb[RSP_GET_STREAMINFO_ABILITY]);
struct blob_attr *tb[SCAN_POLICY_MAX];
blobmsg_parse(scan_policy, SCAN_POLICY_MAX, tb, blob_data(msg), blob_len(msg));
struct blob_attr *head = blobmsg_data(tb[RSP_GET_STREAMINFO_ABILITY]);
int len = blobmsg_data_len(tb[RSP_GET_STREAMINFO_ABILITY]);
struct blob_attr *attr;
struct blobmsg_hdr *hdr;
__blob_for_each_attr(attr, head, len) {
hdr = blob_data(attr);
struct blob_attr *head_temp;
struct blob_attr *attr_temp;
int len_temp;
char *name = (char *)hdr->name;
if (!strcmp(name, "fmt_number"))
rsp->ability.fmt_number = blobmsg_get_u32(attr);
else if (!strcmp(name, "frmival_num"))
rsp->ability.frmival_num = blobmsg_get_u32(attr);
}
如今使用中遇到了多線程的問題,因爲ubus許多變量都是全局變量,對多線程的支持並很差。好比同時在兩個線程中監聽廣播和發送消息,就會出現segment錯誤:
解決方法,最好能把兩個操做放到一個線程中,好比在監聽的回調函數中發送消息,很差的就是要根據發送消息的頻率去設置回調函數的timeout。
http://www.javashuo.com/article/p-rvyyyxrj-a.html
http://www.javashuo.com/article/p-zndbuhdf-m.html