VPP是多線程模型,共享地址空間,最快的通訊機制就是直接訪問彼此之間的數據。VPP本身實現了一套簡單的線程安全機制,用於保護臨界區。node
VPP多線程之間同步採用的是相似於帶信號和超時機制的自旋鎖,主要有check、sync、release操做。
整體上相似於pthread_cond_timedwait中的互斥體改爲自旋鎖所提供的功能,超過BARRIER_SYNC_TIMEOUT時間的話說明可能發生死鎖故直接abort。
其中:api
typedef struct { ...... volatile u32 *wait_at_barrier;/* 通知work線程開始等待sync標誌,main線程開啓sync,設置爲1,結束設置爲0 */ volatile u32 *workers_at_barrier;/* 統計已經進入sync的worker線程的個數,由worker線程加1 */ i64 recursion_level;/* 當前遞歸深度 */ u64 barrier_sync_count;/* 當前多少個線程已經同步了,當該值等於work線程數時,開始執行臨界區操做 */ u8 barrier_elog_enabled; const char *barrier_caller;/* 開啓本次sync的函數名字 */ const char *barrier_context; } vlib_worker_thread_t;
typedef struct vlib_main_t { ...... /* debugging */ volatile int parked_at_barrier; /* * Barrier epoch - Set to current time, each time barrier_sync or * barrier_release is called with zero recursion. * 用於計算sync持續時間 */ f64 barrier_epoch; /* Earliest barrier can be closed again */ /* 當前時間小於barrier_no_close_before,不容許啓動sync */ f64 barrier_no_close_before; ...... } vlib_main_t;
main線程調用該函數通知worker線程開始sync,等待全部worker線程進入sync狀態後,執行臨界操做。數組
#define vlib_worker_thread_barrier_sync(X) {vlib_worker_thread_barrier_sync_int(X, __FUNCTION__);} void vlib_worker_thread_barrier_sync_int (vlib_main_t * vm, const char *func_name) { f64 deadline; f64 now; f64 t_entry; f64 t_open; f64 t_closed; u32 count; if (vec_len (vlib_mains) < 2) return; /* 只有主線程可以調用該函數 */ ASSERT (vlib_get_thread_index () == 0); /* vlib_worker_threads[0]爲主線程,記錄調用該函數的名字 */ vlib_worker_threads[0].barrier_caller = func_name; count = vec_len (vlib_mains) - 1;/* 工做線程個數 */ /* Record entry relative to last close */ now = vlib_time_now (vm); t_entry = now - vm->barrier_epoch; /* Tolerate recursive calls,遞歸深度,非首次調用直接返回 */ if (++vlib_worker_threads[0].recursion_level > 1) { barrier_trace_sync_rec (t_entry); return; } /* 發起sync次數統計 */ vlib_worker_threads[0].barrier_sync_count++; /* Enforce minimum barrier open time to minimize packet loss */ /* 再次發起sync,必須在禁止其外,每次sync完成後,在指定時間內不能發起第二次sync */ ASSERT (vm->barrier_no_close_before <= (now + BARRIER_MINIMUM_OPEN_LIMIT)); while (1) { now = vlib_time_now (vm); /* Barrier hold-down timer expired? */ if (now >= vm->barrier_no_close_before) break; if ((vm->barrier_no_close_before - now) > (2.0 * BARRIER_MINIMUM_OPEN_LIMIT)) { clib_warning ("clock change: would have waited for %.4f seconds", (vm->barrier_no_close_before - now)); break; } } /* Record time of closure */ /* 兩次啓動sync的間隔時間,即open時間 */ t_open = now - vm->barrier_epoch; vm->barrier_epoch = now; /* 最大時間,debug版本下600秒,其它狀況下1秒 */ deadline = now + BARRIER_SYNC_TIMEOUT; /* 設置wait_at_barrier值爲1,通知worker */ *vlib_worker_threads->wait_at_barrier = 1; /* 等待全部的工做者線程就緒 */ while (*vlib_worker_threads->workers_at_barrier != count) { /* 超時直接打印os panic */ if ((now = vlib_time_now (vm)) > deadline) { fformat (stderr, "%s: worker thread deadlock\n", __FUNCTION__); os_panic (); } } /* 從開始啓動sync過程到全部work線程接受sync的時間 */ t_closed = now - vm->barrier_epoch; barrier_trace_sync (t_entry, t_open, t_closed); }
main線程處理完臨界區操做後,調用該函數通知worker線程sync過程結束。安全
/* sync過程結束函數*/ void vlib_worker_thread_barrier_release (vlib_main_t * vm) { f64 deadline; f64 now; f64 minimum_open; f64 t_entry; f64 t_closed_total; f64 t_update_main = 0.0; int refork_needed = 0; if (vec_len (vlib_mains) < 2) return; ASSERT (vlib_get_thread_index () == 0); now = vlib_time_now (vm); /* 一對sync與release調用時間段 */ t_entry = now - vm->barrier_epoch; /* 減小遞歸深度,若是大於0表示sync還沒結束 */ if (--vlib_worker_threads[0].recursion_level > 0) { barrier_trace_release_rec (t_entry); return; } ...... deadline = now + BARRIER_SYNC_TIMEOUT; /* * Note when we let go of the barrier. * Workers can use this to derive a reasonably accurate * time offset. See vlib_time_now(...) */ vm->time_last_barrier_release = vlib_time_now (vm); CLIB_MEMORY_STORE_BARRIER (); /* 清除等待標誌 */ *vlib_worker_threads->wait_at_barrier = 0; /* 等待全部的works線程退出 */ while (*vlib_worker_threads->workers_at_barrier > 0) { /* 時間太長,打印panic */ if ((now = vlib_time_now (vm)) > deadline) { fformat (stderr, "%s: worker thread deadlock\n", __FUNCTION__); os_panic (); } } ...... /* 整個sync持續時間 */ t_closed_total = now - vm->barrier_epoch; /* 計算下一次sync最少須要休息多久才能啓動,與本次sync耗時正相關 */ minimum_open = t_closed_total * BARRIER_MINIMUM_OPEN_FACTOR; if (minimum_open > BARRIER_MINIMUM_OPEN_LIMIT) { minimum_open = BARRIER_MINIMUM_OPEN_LIMIT; } /* 設置下次sync的最先時間 */ vm->barrier_no_close_before = now + minimum_open; /* Record barrier epoch (used to enforce minimum open time) */ /* 更新epoch時間 */ vm->barrier_epoch = now; barrier_trace_release (t_entry, t_closed_total, t_update_main); }
vlib_worker_thread_barrier_sync和vlib_worker_thread_barrier_release函數只能由main線程成對使用,能夠支持嵌套調用。用於實現main線程訪問worker線程的數據,效率較差。數據結構
vpp_main線程啓動sync後,worker線程須要調用該函數等待。多線程
static inline void vlib_worker_thread_barrier_check (void) { /* 若是main線程已經啓動了sync過程,則本線程須要進入sync狀態 */ if (PREDICT_FALSE (*vlib_worker_threads->wait_at_barrier)) { vlib_main_t *vm = vlib_get_main (); u32 thread_index = vm->thread_index; f64 t = vlib_time_now (vm); ...... /* 等待線程數加1 */ clib_atomic_fetch_add (vlib_worker_threads->workers_at_barrier, 1); if (CLIB_DEBUG > 0) { vm = vlib_get_main (); vm->parked_at_barrier = 1; } /* 自旋等待sync結束 */ while (*vlib_worker_threads->wait_at_barrier); /* * Recompute the offset from thread-0 time. * Note that vlib_time_now adds vm->time_offset, so * clear it first. Save the resulting idea of "now", to * see how well we're doing. See show_clock_command_fn(...) */ { f64 now; vm->time_offset = 0.0; now = vlib_time_now (vm); vm->time_offset = vlib_global_main.time_last_barrier_release - now; vm->time_last_barrier_release = vlib_time_now (vm); } if (CLIB_DEBUG > 0) vm->parked_at_barrier = 0; /* sync已經結束,將等待線程數減掉1 */ clib_atomic_fetch_add (vlib_worker_threads->workers_at_barrier, -1); ...... } }
咱們以命令「 set interface rx-placement」的主要函數:vnet_hw_interface_assign_rx_thread爲例進行展現:app
/* main線程收到命令後,最終會調用該函數 */ void vnet_hw_interface_assign_rx_thread (vnet_main_t * vnm, u32 hw_if_index, u16 queue_id, uword thread_index) { vnet_device_main_t *vdm = &vnet_device_main; vlib_main_t *vm, *vm0; vnet_device_input_runtime_t *rt; vnet_device_and_queue_t *dq; vnet_hw_interface_t *hw = vnet_get_hw_interface (vnm, hw_if_index); ASSERT (hw->input_node_index > 0); if (vdm->first_worker_thread_index == 0) thread_index = 0; if (thread_index != 0 && (thread_index < vdm->first_worker_thread_index || thread_index > vdm->last_worker_thread_index)) { thread_index = vdm->next_worker_thread_index++; if (vdm->next_worker_thread_index > vdm->last_worker_thread_index) vdm->next_worker_thread_index = vdm->first_worker_thread_index; } vm = vlib_mains[thread_index]; vm0 = vlib_get_main ();/* 本線程,通常是主線程 */ /* 通知worker線程,開始sync */ vlib_worker_thread_barrier_sync (vm0); rt = vlib_node_get_runtime_data (vm, hw->input_node_index); vec_add2 (rt->devices_and_queues, dq, 1); dq->hw_if_index = hw_if_index; dq->dev_instance = hw->dev_instance; dq->queue_id = queue_id; dq->mode = VNET_HW_INTERFACE_RX_MODE_POLLING; rt->enabled_node_state = VLIB_NODE_STATE_POLLING; vnet_device_queue_update (vnm, rt); vec_validate (hw->input_node_thread_index_by_queue, queue_id); vec_validate (hw->rx_mode_by_queue, queue_id); hw->input_node_thread_index_by_queue[queue_id] = thread_index; hw->rx_mode_by_queue[queue_id] = VNET_HW_INTERFACE_RX_MODE_POLLING; /* 通知worker線程,sync結束 */ vlib_worker_thread_barrier_release (vm0); vlib_node_set_state (vm, hw->input_node_index, rt->enabled_node_state); }
/* 參數is_main決定是主線程仍是worker線程 */ static_always_inline void vlib_main_or_worker_loop (vlib_main_t * vm, int is_main) { ...... while (1) { vlib_node_runtime_t *n; /* 存在須要處理的rpc請求,處理 */ if (PREDICT_FALSE (_vec_len (vm->pending_rpc_requests) > 0)) { if (!is_main)/* 只有work線程纔會發送rpc請求 */ vl_api_send_pending_rpc_requests (vm); } if (!is_main)/* worker線程 */ { /* 與main線程進行互斥,若是main線程進入了臨界區的話,自旋等待 */ vlib_worker_thread_barrier_check (); ...... } ...... vlib_increment_main_loop_counter (vm); /* Record time stamp in case there are no enabled nodes and above calls do not update time stamp. */ cpu_time_now = clib_cpu_time_now (); } }
VPP的rpc機制經過API機制實現的,在api機制中註冊了兩個api:less
#define foreach_rpc_api_msg \ _(RPC_CALL,rpc_call) \ _(RPC_CALL_REPLY,rpc_call_reply)
typedef struct vlib_main_t { ...... /* RPC requests, main thread only */ uword *pending_rpc_requests; /* 線程準備發送給vpp_main線程處理的rpc */ uword *processing_rpc_requests; /* vpp_main線程正在處理的rpc數組 */ clib_spinlock_t pending_rpc_lock; /* 保護上面兩個數組的自旋鎖 */ } vlib_main_t;
rpc的api傳遞的請求消息ide
#ifndef _vl_api_defined_rpc_call #define _vl_api_defined_rpc_call typedef VL_API_PACKED(struct _vl_api_rpc_call { u16 _vl_msg_id;/* 消息id */ u32 client_index;/* 不須要該索引,由於這個api是內部的 */ u32 context; u64 function;/* rpc函數 */ u8 multicast; u8 need_barrier_sync;/* 是否須要進行互斥保護 */ u8 send_reply;/* 是否發送應答,通常不發送應答 */ u32 data_len; u8 data[0]; }) vl_api_rpc_call_t; #endif
static void vl_api_rpc_call_t_handler (vl_api_rpc_call_t * mp) { vl_api_rpc_call_reply_t *rmp; int (*fp) (void *); i32 rv = 0; vlib_main_t *vm = vlib_get_main (); if (mp->function == 0)/* 用戶的rpc函數爲空,輸出waring */ { rv = -1; clib_warning ("rpc NULL function pointer"); } else { if (mp->need_barrier_sync)/* 若是須要互斥,則進行保護 */ vlib_worker_thread_barrier_sync (vm); fp = uword_to_pointer (mp->function, int (*)(void *));/* 轉換成函數地址 */ rv = fp (mp->data);/* 執行函數 */ if (mp->need_barrier_sync) vlib_worker_thread_barrier_release (vm); } if (mp->send_reply)/* 若是須要發送應答,則發送應答給客戶端,通常不須要發送應答 */ { svm_queue_t *q = vl_api_client_index_to_input_queue (mp->client_index); if (q) { rmp = vl_msg_api_alloc_as_if_client (sizeof (*rmp)); rmp->_vl_msg_id = ntohs (VL_API_RPC_CALL_REPLY); rmp->context = mp->context; rmp->retval = rv; vl_msg_api_send_shmem (q, (u8 *) & rmp); } } if (mp->multicast) { clib_warning ("multicast not yet implemented..."); } } /* 應答處理函數,沒有實現 */ static void vl_api_rpc_call_reply_t_handler (vl_api_rpc_call_reply_t * mp) { clib_warning ("unimplemented"); }
/* 通知main_thread線程執行咱們的函數,通知者能夠是worker線程也能夠是main線程。 ** force_rpc:表示強制使用rpc模式,即不直接調用咱們指定的函數,讓對應的協程去執行 ** worker線程調用該函數時,必須設置爲1。main線程能夠設置也能夠不設置 */ always_inline void vl_api_rpc_call_main_thread_inline (void *fp, u8 * data, u32 data_length, u8 force_rpc) { vl_api_rpc_call_t *mp; vlib_main_t *vm_global = &vlib_global_main; vlib_main_t *vm = vlib_get_main (); /* Main thread and not a forced RPC: call the function directly */ /* main線程沒有設置force_rpc標誌,那就直接執行,不放入協程 */ if ((force_rpc == 0) && (vlib_get_thread_index () == 0)) { void (*call_fp) (void *); vlib_worker_thread_barrier_sync (vm); call_fp = fp; call_fp (data); vlib_worker_thread_barrier_release (vm); return; } /* Otherwise, actually do an RPC */ /* 進行一次rpc,分配rpc通訊消息結構,使用的是共享內存 */ mp = vl_msg_api_alloc_as_if_client (sizeof (*mp) + data_length); clib_memset (mp, 0, sizeof (*mp)); clib_memcpy_fast (mp->data, data, data_length); /* 第一個成員必須是消息id,api機制須要這個。內嵌的消息,非插件api,不須要模塊基礎消息id。 */ mp->_vl_msg_id = ntohs (VL_API_RPC_CALL); mp->function = pointer_to_uword (fp); mp->need_barrier_sync = 1; /* Add to the pending vector. Thread 0 requires locking. */ /* main線程的pending_rpc_requests向量是臨界區,須要進行保護 ** 其它線程pending_rpc_requests本身讀佔,不須要保護 */ if (vm == vm_global) clib_spinlock_lock_if_init (&vm_global->pending_rpc_lock); vec_add1 (vm->pending_rpc_requests, (uword) mp); if (vm == vm_global) clib_spinlock_unlock_if_init (&vm_global->pending_rpc_lock); } /* * Check if called from worker threads. * If so, make rpc call of fp through shmem. * Otherwise, call fp directly */ void vl_api_rpc_call_main_thread (void *fp, u8 * data, u32 data_length) { vl_api_rpc_call_main_thread_inline (fp, data, data_length, /*force_rpc */ 0); } /* * Always make rpc call of fp through shmem, useful for calling from threads * not setup as worker threads, such as DPDK callback thread * 強制main線程經過共享內存進行rpc調用,不直接調用 */ void vl_api_force_rpc_call_main_thread (void *fp, u8 * data, u32 data_length) { vl_api_rpc_call_main_thread_inline (fp, data, data_length, /*force_rpc */ 1); } main線程中的協程還可使用函數vlib_rpc_call_main_thread發起RPC void *rpc_call_main_thread_cb_fn; void vlib_rpc_call_main_thread (void *callback, u8 * args, u32 arg_size) { /* 全局函數指針,在初始化的時候其值被設置爲vl_api_rpc_call_main_thread函數的地址 */ if (rpc_call_main_thread_cb_fn) { void (*fp) (void *, u8 *, u32) = rpc_call_main_thread_cb_fn; (*fp) (callback, args, arg_size); } else clib_warning ("BUG: rpc_call_main_thread_cb_fn NULL!"); }
/* worker線程將收集的rpc請求從本身的pending_rpc_requests中轉移到main線程的pending_rpc_requests */ void vl_api_send_pending_rpc_requests (vlib_main_t * vm) { vlib_main_t *vm_global = &vlib_global_main; ASSERT (vm != vm_global); clib_spinlock_lock_if_init (&vm_global->pending_rpc_lock); vec_append (vm_global->pending_rpc_requests, vm->pending_rpc_requests); vec_reset_length (vm->pending_rpc_requests); clib_spinlock_unlock_if_init (&vm_global->pending_rpc_lock); }
只有worker線程才須要將RPC請求轉移到main線程。函數
/* 參數is_main決定是主線程仍是worker線程 */ static_always_inline void vlib_main_or_worker_loop (vlib_main_t * vm, int is_main) { while (1) { vlib_node_runtime_t *n; /* woerk線程將本線程收集的rpc請求轉交給main線程 */ if (PREDICT_FALSE (_vec_len (vm->pending_rpc_requests) > 0)) { if (!is_main)/* 只有work線程纔會將本身發起的rpc請求轉移到main線程 */ vl_api_send_pending_rpc_requests (vm); } ...... vlib_increment_main_loop_counter (vm); /* Record time stamp in case there are no enabled nodes and above calls do not update time stamp. */ cpu_time_now = clib_cpu_time_now (); } }
RPC處理是在協程"api-rx-from-ring",這個協程也是處理api的協程。
/* *INDENT-OFF* */ VLIB_REGISTER_NODE (vl_api_clnt_node) = { .function = vl_api_clnt_process, .type = VLIB_NODE_TYPE_PROCESS, .name = "api-rx-from-ring", .state = VLIB_NODE_STATE_DISABLED, };
static uword vl_api_clnt_process (vlib_main_t * vm, vlib_node_runtime_t * node, vlib_frame_t * f) { ...... /* $$$ pay attention to frame size, control CPU usage */ while (1) { /* * There's a reason for checking the queue before * sleeping. If the vlib application crashes, it's entirely * possible for a client to enqueue a connect request * during the process restart interval. * * Unless some force of physics causes the new incarnation * of the application to process the request, the client will * sit and wait for Godot... */ vector_rate = vlib_last_vector_length_per_node (vm); start_time = vlib_time_now (vm); while (1) { if (vl_mem_api_handle_rpc (vm, node)/* 執行協程請求 */ || vl_mem_api_handle_msg_main (vm, node))/* 執行api請求 */ { vm->api_queue_nonempty = 0; VL_MEM_API_LOG_Q_LEN ("q-underflow: len %d", 0); sleep_time = 20.0; break; } ...... } ...... } return 0; } int vl_mem_api_handle_rpc (vlib_main_t * vm, vlib_node_runtime_t * node) { api_main_t *am = &api_main; int i; uword *tmp, mp; /* * Swap pending and processing vectors, then process the RPCs * Avoid deadlock conditions by construction. * 將等待處理的人rpc請求轉移到局部變量tmp。避免臨界時間太長。 */ clib_spinlock_lock_if_init (&vm->pending_rpc_lock); tmp = vm->processing_rpc_requests; vec_reset_length (tmp); vm->processing_rpc_requests = vm->pending_rpc_requests; vm->pending_rpc_requests = tmp; clib_spinlock_unlock_if_init (&vm->pending_rpc_lock); /* * RPCs are used to reflect function calls to thread 0 * when the underlying code is not thread-safe. * * Grabbing the thread barrier across a set of RPCs * greatly increases efficiency, and avoids * running afoul of the barrier sync holddown timer. * The barrier sync code supports recursive locking. * * We really need to rewrite RPC-based code... */ if (PREDICT_TRUE (vec_len (vm->processing_rpc_requests))) { vl_msg_api_barrier_sync (); for (i = 0; i < vec_len (vm->processing_rpc_requests); i++)/* 循環處理每個rpc */ { mp = vm->processing_rpc_requests[i]; vl_msg_api_handler_with_vm_node (am, (void *) mp, vm, node); } vl_msg_api_barrier_release (); } return 0; } /* This is only to be called from a vlib/vnet app */ void vl_msg_api_handler_with_vm_node (api_main_t * am, void *the_msg, vlib_main_t * vm, vlib_node_runtime_t * node) { u16 id = ntohs (*((u16 *) the_msg));/* 獲取消息id,傳遞的消息第一個成員就是消息id */ u8 *(*handler) (void *, void *, void *); u8 *(*print_fp) (void *, void *); ...... /* 根據消息id獲取對應的執行函數,即VL_API_RPC_CALL對應的函數vl_api_rpc_call_t_handler */ if (id < vec_len (am->msg_handlers) && am->msg_handlers[id]) { handler = (void *) am->msg_handlers[id]; if (PREDICT_FALSE (am->rx_trace && am->rx_trace->enabled)) vl_msg_api_trace (am, am->rx_trace, the_msg); if (PREDICT_FALSE (am->msg_print_flag)) { fformat (stdout, "[%d]: %s\n", id, am->msg_names[id]); print_fp = (void *) am->msg_print_handlers[id]; if (print_fp == 0) { fformat (stdout, " [no registered print fn for msg %d]\n", id); } else { (*print_fp) (the_msg, vm); } } if (!am->is_mp_safe[id]) { vl_msg_api_barrier_trace_context (am->msg_names[id]); vl_msg_api_barrier_sync (); } /* 執行函數vl_api_rpc_call_t_handler */ (*handler) (the_msg, vm, node); if (!am->is_mp_safe[id]) vl_msg_api_barrier_release (); } else { clib_warning ("no handler for msg id %d", id); } /* * Special-case, so we can e.g. bounce messages off the vnet * main thread without copying them... */ if (!(am->message_bounce[id])) vl_msg_api_free (the_msg); ...... }