VPP_MAIN線程與VPP_WORK線程之間的交互機制

VPP是多線程模型,共享地址空間,最快的通訊機制就是直接訪問彼此之間的數據。VPP本身實現了一套簡單的線程安全機制,用於保護臨界區。node

VPP多線程之間同步採用的是相似於帶信號和超時機制的自旋鎖,主要有check、sync、release操做。
整體上相似於pthread_cond_timedwait中的互斥體改爲自旋鎖所提供的功能,超過BARRIER_SYNC_TIMEOUT時間的話說明可能發生死鎖故直接abort。
其中:api

  • [ ] vlib_worker_thread_barrier_check相似於pthread_cond_wait操做,等待vlib_worker_threads->wait_at_barrier條件。
  • [ ] vlib_worker_thread_barrier_sync相似於spin_lock操做,置位vlib_worker_threads->workers_at_barrier。只有主線程能夠調用該函數,通知其它線程準備同步。
  • [ ] vlib_worker_thread_barrier_release相似於spin_unlock操做,復位vlib_worker_threads->workers_at_barrier。只有主線程能夠調用該函數,通知其它線程同步結束。

vpp_main線程訪問vpp_worker線程的數據的保護機制

數據結構

vlib_worker_thread_t

typedef struct
{   
    ......
        
    volatile u32 *wait_at_barrier;/* 通知work線程開始等待sync標誌,main線程開啓sync,設置爲1,結束設置爲0 */
    volatile u32 *workers_at_barrier;/* 統計已經進入sync的worker線程的個數,由worker線程加1 */
    i64 recursion_level;/* 當前遞歸深度 */
    u64 barrier_sync_count;/* 當前多少個線程已經同步了,當該值等於work線程數時,開始執行臨界區操做 */
    u8 barrier_elog_enabled;
    const char *barrier_caller;/* 開啓本次sync的函數名字 */
    const char *barrier_context;
} vlib_worker_thread_t;

vlib_main_t

typedef struct vlib_main_t
{
    ......
    
    /* debugging */
    volatile int parked_at_barrier;

    /*
     * Barrier epoch - Set to current time, each time barrier_sync or
     * barrier_release is called with zero recursion.
     * 用於計算sync持續時間
     */
    f64 barrier_epoch;

    /* Earliest barrier can be closed again */
    /* 當前時間小於barrier_no_close_before,不容許啓動sync */
    f64 barrier_no_close_before;
    ......
} vlib_main_t;

相關函數分析

  • [ ] vlib_worker_thread_barrier_sync

main線程調用該函數通知worker線程開始sync,等待全部worker線程進入sync狀態後,執行臨界操做。數組

#define vlib_worker_thread_barrier_sync(X) {vlib_worker_thread_barrier_sync_int(X, __FUNCTION__);}

void
vlib_worker_thread_barrier_sync_int (vlib_main_t * vm, const char *func_name)
{
    f64 deadline;
    f64 now;
    f64 t_entry;
    f64 t_open;
    f64 t_closed;
    u32 count;

    if (vec_len (vlib_mains) < 2)
        return;
    /* 只有主線程可以調用該函數 */
    ASSERT (vlib_get_thread_index () == 0);
    /* vlib_worker_threads[0]爲主線程,記錄調用該函數的名字 */
    vlib_worker_threads[0].barrier_caller = func_name;
    count = vec_len (vlib_mains) - 1;/* 工做線程個數 */

    /* Record entry relative to last close */
    now = vlib_time_now (vm);
    t_entry = now - vm->barrier_epoch;

    /* Tolerate recursive calls,遞歸深度,非首次調用直接返回 */
    if (++vlib_worker_threads[0].recursion_level > 1)
    {
        barrier_trace_sync_rec (t_entry);
        return;
    }
    /* 發起sync次數統計 */
    vlib_worker_threads[0].barrier_sync_count++;

    /* Enforce minimum barrier open time to minimize packet loss */
    /* 再次發起sync,必須在禁止其外,每次sync完成後,在指定時間內不能發起第二次sync */
    ASSERT (vm->barrier_no_close_before <= (now + BARRIER_MINIMUM_OPEN_LIMIT));

    while (1)
    {
        now = vlib_time_now (vm);
        /* Barrier hold-down timer expired? */
        if (now >= vm->barrier_no_close_before)
            break;
        if ((vm->barrier_no_close_before - now)
                > (2.0 * BARRIER_MINIMUM_OPEN_LIMIT))
        {
            clib_warning ("clock change: would have waited for %.4f seconds",
                          (vm->barrier_no_close_before - now));
            break;
        }
    }
    
    /* Record time of closure */
    /* 兩次啓動sync的間隔時間,即open時間 */
    t_open = now - vm->barrier_epoch;
    vm->barrier_epoch = now;
    /* 最大時間,debug版本下600秒,其它狀況下1秒 */
    deadline = now + BARRIER_SYNC_TIMEOUT;
    /* 設置wait_at_barrier值爲1,通知worker */
    *vlib_worker_threads->wait_at_barrier = 1;
    /* 等待全部的工做者線程就緒 */
    while (*vlib_worker_threads->workers_at_barrier != count)
    {
        /* 超時直接打印os       panic */
        if ((now = vlib_time_now (vm)) > deadline)
        {
            fformat (stderr, "%s: worker thread deadlock\n", __FUNCTION__);
            os_panic ();
        }
    }
    /* 從開始啓動sync過程到全部work線程接受sync的時間 */
    t_closed = now - vm->barrier_epoch;

    barrier_trace_sync (t_entry, t_open, t_closed);

}
  • [ ] vlib_worker_thread_barrier_release

main線程處理完臨界區操做後,調用該函數通知worker線程sync過程結束。安全

/* sync過程結束函數*/
void
vlib_worker_thread_barrier_release (vlib_main_t * vm)
{
    f64 deadline;
    f64 now;
    f64 minimum_open;
    f64 t_entry;
    f64 t_closed_total;
    f64 t_update_main = 0.0;
    int refork_needed = 0;

    if (vec_len (vlib_mains) < 2)
        return;

    ASSERT (vlib_get_thread_index () == 0);


    now = vlib_time_now (vm);
    /* 一對sync與release調用時間段 */
    t_entry = now - vm->barrier_epoch;
    /* 減小遞歸深度,若是大於0表示sync還沒結束 */
    if (--vlib_worker_threads[0].recursion_level > 0)
    {
        barrier_trace_release_rec (t_entry);
        return;
    }

    ......

    deadline = now + BARRIER_SYNC_TIMEOUT;

    /*
     * Note when we let go of the barrier.
     * Workers can use this to derive a reasonably accurate
     * time offset. See vlib_time_now(...)
     */
    vm->time_last_barrier_release = vlib_time_now (vm);
    CLIB_MEMORY_STORE_BARRIER ();
    /* 清除等待標誌 */
    *vlib_worker_threads->wait_at_barrier = 0;
    /* 等待全部的works線程退出 */
    while (*vlib_worker_threads->workers_at_barrier > 0)
    {
        /* 時間太長,打印panic */
        if ((now = vlib_time_now (vm)) > deadline)
        {
            fformat (stderr, "%s: worker thread deadlock\n", __FUNCTION__);
            os_panic ();
        }
    }

    ......
        
    /* 整個sync持續時間 */
    t_closed_total = now - vm->barrier_epoch;
    /* 計算下一次sync最少須要休息多久才能啓動,與本次sync耗時正相關 */
    minimum_open = t_closed_total * BARRIER_MINIMUM_OPEN_FACTOR;

    if (minimum_open > BARRIER_MINIMUM_OPEN_LIMIT)
    {
        minimum_open = BARRIER_MINIMUM_OPEN_LIMIT;
    }
    /* 設置下次sync的最先時間 */
    vm->barrier_no_close_before = now + minimum_open;

    /* Record barrier epoch (used to enforce minimum open time) */
    /* 更新epoch時間 */
    vm->barrier_epoch = now;

    barrier_trace_release (t_entry, t_closed_total, t_update_main);

}

vlib_worker_thread_barrier_sync和vlib_worker_thread_barrier_release函數只能由main線程成對使用,能夠支持嵌套調用。用於實現main線程訪問worker線程的數據,效率較差。數據結構

  • [ ] vlib_worker_thread_barrier_check*

vpp_main線程啓動sync後,worker線程須要調用該函數等待。多線程

static inline void
vlib_worker_thread_barrier_check (void)
{
    /* 若是main線程已經啓動了sync過程,則本線程須要進入sync狀態 */
    if (PREDICT_FALSE (*vlib_worker_threads->wait_at_barrier))
    {
        vlib_main_t *vm = vlib_get_main ();
        u32 thread_index = vm->thread_index;
        f64 t = vlib_time_now (vm);
        ......
        /* 等待線程數加1 */
        clib_atomic_fetch_add (vlib_worker_threads->workers_at_barrier, 1);
        if (CLIB_DEBUG > 0)
        {
            vm = vlib_get_main ();
            vm->parked_at_barrier = 1;
        }
        /* 自旋等待sync結束 */
        while (*vlib_worker_threads->wait_at_barrier);

        /*
         * Recompute the offset from thread-0 time.
         * Note that vlib_time_now adds vm->time_offset, so
         * clear it first. Save the resulting idea of "now", to
         * see how well we're doing. See show_clock_command_fn(...)
         */
        {
            f64 now;
            vm->time_offset = 0.0;
            now = vlib_time_now (vm);
            vm->time_offset = vlib_global_main.time_last_barrier_release - now;
            vm->time_last_barrier_release = vlib_time_now (vm);
        }

        if (CLIB_DEBUG > 0)
            vm->parked_at_barrier = 0;
        /* sync已經結束,將等待線程數減掉1 */
        clib_atomic_fetch_add (vlib_worker_threads->workers_at_barrier, -1);
        ......
    }
}

線程互斥機制使用示例

咱們以命令「 set interface rx-placement」的主要函數:vnet_hw_interface_assign_rx_thread爲例進行展現:app

main線程

/* main線程收到命令後,最終會調用該函數 */
void
vnet_hw_interface_assign_rx_thread (vnet_main_t * vnm, u32 hw_if_index,
                                    u16 queue_id, uword thread_index)
{
    vnet_device_main_t *vdm = &vnet_device_main;
    vlib_main_t *vm, *vm0;
    vnet_device_input_runtime_t *rt;
    vnet_device_and_queue_t *dq;
    vnet_hw_interface_t *hw = vnet_get_hw_interface (vnm, hw_if_index);

    ASSERT (hw->input_node_index > 0);

    if (vdm->first_worker_thread_index == 0)
        thread_index = 0;

    if (thread_index != 0 &&
            (thread_index < vdm->first_worker_thread_index ||
             thread_index > vdm->last_worker_thread_index))
    {
        thread_index = vdm->next_worker_thread_index++;
        if (vdm->next_worker_thread_index > vdm->last_worker_thread_index)
            vdm->next_worker_thread_index = vdm->first_worker_thread_index;
    }

    vm = vlib_mains[thread_index];
    vm0 = vlib_get_main ();/* 本線程,通常是主線程 */
    /* 通知worker線程,開始sync */
    vlib_worker_thread_barrier_sync (vm0);

    rt = vlib_node_get_runtime_data (vm, hw->input_node_index);

    vec_add2 (rt->devices_and_queues, dq, 1);
    dq->hw_if_index = hw_if_index;
    dq->dev_instance = hw->dev_instance;
    dq->queue_id = queue_id;
    dq->mode = VNET_HW_INTERFACE_RX_MODE_POLLING;
    rt->enabled_node_state = VLIB_NODE_STATE_POLLING;

    vnet_device_queue_update (vnm, rt);
    vec_validate (hw->input_node_thread_index_by_queue, queue_id);
    vec_validate (hw->rx_mode_by_queue, queue_id);
    hw->input_node_thread_index_by_queue[queue_id] = thread_index;
    hw->rx_mode_by_queue[queue_id] = VNET_HW_INTERFACE_RX_MODE_POLLING;
    /* 通知worker線程,sync結束 */
    vlib_worker_thread_barrier_release (vm0);

    vlib_node_set_state (vm, hw->input_node_index, rt->enabled_node_state);
}

work線程

/* 參數is_main決定是主線程仍是worker線程 */
static_always_inline void
vlib_main_or_worker_loop (vlib_main_t * vm, int is_main)
{
    ......
    
    while (1)
    {
        vlib_node_runtime_t *n;

        /* 存在須要處理的rpc請求,處理 */
        if (PREDICT_FALSE (_vec_len (vm->pending_rpc_requests) > 0))
        {
            if (!is_main)/* 只有work線程纔會發送rpc請求 */
                vl_api_send_pending_rpc_requests (vm);
        }

        if (!is_main)/* worker線程 */
        {
            /* 與main線程進行互斥,若是main線程進入了臨界區的話,自旋等待 */
            vlib_worker_thread_barrier_check ();
            ......
        }
        ......
        
        vlib_increment_main_loop_counter (vm);

        /* Record time stamp in case there are no enabled nodes and above
            calls do not update time stamp. */
        cpu_time_now = clib_cpu_time_now ();
    }
}

vpp_worker線程通知vpp_main線程的處理數據機制-RPC

VPP的rpc機制經過API機制實現的,在api機制中註冊了兩個api:less

#define foreach_rpc_api_msg                     \
_(RPC_CALL,rpc_call)                            \
_(RPC_CALL_REPLY,rpc_call_reply)

數據結構

  • [ ] vlib_main_t
typedef struct vlib_main_t
{
    ......
    /* RPC requests, main thread only */
    uword *pending_rpc_requests;      /* 線程準備發送給vpp_main線程處理的rpc */
    uword *processing_rpc_requests;   /* vpp_main線程正在處理的rpc數組 */
    clib_spinlock_t pending_rpc_lock; /* 保護上面兩個數組的自旋鎖 */
} vlib_main_t;
  • [ ] vl_api_rpc_call_t

rpc的api傳遞的請求消息ide

#ifndef _vl_api_defined_rpc_call
#define _vl_api_defined_rpc_call
typedef VL_API_PACKED(struct _vl_api_rpc_call {
    u16 _vl_msg_id;/* 消息id */
    u32 client_index;/* 不須要該索引,由於這個api是內部的 */
    u32 context;
    u64 function;/* rpc函數 */
    u8 multicast;
    u8 need_barrier_sync;/* 是否須要進行互斥保護 */
    u8 send_reply;/* 是否發送應答,通常不發送應答 */
    u32 data_len;
    u8 data[0];
}) vl_api_rpc_call_t;
#endif

相關函數分析

RPC api執行函數

static void
vl_api_rpc_call_t_handler (vl_api_rpc_call_t * mp)
{
    vl_api_rpc_call_reply_t *rmp;
    int (*fp) (void *);
    i32 rv = 0;
    vlib_main_t *vm = vlib_get_main ();

    if (mp->function == 0)/* 用戶的rpc函數爲空,輸出waring */
    {
        rv = -1;
        clib_warning ("rpc NULL function pointer");
    }

    else
    {
        if (mp->need_barrier_sync)/* 若是須要互斥,則進行保護 */
            vlib_worker_thread_barrier_sync (vm);

        fp = uword_to_pointer (mp->function, int (*)(void *));/* 轉換成函數地址 */
        rv = fp (mp->data);/* 執行函數 */

        if (mp->need_barrier_sync)
            vlib_worker_thread_barrier_release (vm);
    }

    if (mp->send_reply)/* 若是須要發送應答,則發送應答給客戶端,通常不須要發送應答 */
    {
        svm_queue_t *q = vl_api_client_index_to_input_queue (mp->client_index);
        if (q)
        {
            rmp = vl_msg_api_alloc_as_if_client (sizeof (*rmp));
            rmp->_vl_msg_id = ntohs (VL_API_RPC_CALL_REPLY);
            rmp->context = mp->context;
            rmp->retval = rv;
            vl_msg_api_send_shmem (q, (u8 *) & rmp);
        }
    }
    if (mp->multicast)
    {
        clib_warning ("multicast not yet implemented...");
    }
}
/* 應答處理函數,沒有實現 */
static void
vl_api_rpc_call_reply_t_handler (vl_api_rpc_call_reply_t * mp)
{
    clib_warning ("unimplemented");
}

發起一次RPC

/* 通知main_thread線程執行咱們的函數,通知者能夠是worker線程也能夠是main線程。
** force_rpc:表示強制使用rpc模式,即不直接調用咱們指定的函數,讓對應的協程去執行
**            worker線程調用該函數時,必須設置爲1。main線程能夠設置也能夠不設置
*/
always_inline void
vl_api_rpc_call_main_thread_inline (void *fp, u8 * data, u32 data_length,
                                    u8 force_rpc)
{
    vl_api_rpc_call_t *mp;
    vlib_main_t *vm_global = &vlib_global_main;
    vlib_main_t *vm = vlib_get_main ();

    /* Main thread and not a forced RPC: call the function directly */
    /* main線程沒有設置force_rpc標誌,那就直接執行,不放入協程 */
    if ((force_rpc == 0) && (vlib_get_thread_index () == 0))
    {
        void (*call_fp) (void *);

        vlib_worker_thread_barrier_sync (vm);

        call_fp = fp;
        call_fp (data);

        vlib_worker_thread_barrier_release (vm);
        return;
    }

    /* Otherwise, actually do an RPC */
    /* 進行一次rpc,分配rpc通訊消息結構,使用的是共享內存 */
    mp = vl_msg_api_alloc_as_if_client (sizeof (*mp) + data_length);

    clib_memset (mp, 0, sizeof (*mp));
    clib_memcpy_fast (mp->data, data, data_length);
    /* 第一個成員必須是消息id,api機制須要這個。內嵌的消息,非插件api,不須要模塊基礎消息id。
     */
    mp->_vl_msg_id = ntohs (VL_API_RPC_CALL);
    mp->function = pointer_to_uword (fp);
    mp->need_barrier_sync = 1;

    /* Add to the pending vector. Thread 0 requires locking. */
    /* main線程的pending_rpc_requests向量是臨界區,須要進行保護
    ** 其它線程pending_rpc_requests本身讀佔,不須要保護
    */
    if (vm == vm_global)
        clib_spinlock_lock_if_init (&vm_global->pending_rpc_lock);
    vec_add1 (vm->pending_rpc_requests, (uword) mp);
    if (vm == vm_global)
        clib_spinlock_unlock_if_init (&vm_global->pending_rpc_lock);
}
/*
 * Check if called from worker threads.
 * If so, make rpc call of fp through shmem.
 * Otherwise, call fp directly
 */
void
vl_api_rpc_call_main_thread (void *fp, u8 * data, u32 data_length)
{
    vl_api_rpc_call_main_thread_inline (fp, data, data_length,    /*force_rpc */
                                        0);
}

/*
 * Always make rpc call of fp through shmem, useful for calling from threads
 * not setup as worker threads, such as DPDK callback thread
 * 強制main線程經過共享內存進行rpc調用,不直接調用
 */
void
vl_api_force_rpc_call_main_thread (void *fp, u8 * data, u32 data_length)
{
    vl_api_rpc_call_main_thread_inline (fp, data, data_length,    /*force_rpc */
                                        1);
}
main線程中的協程還可使用函數vlib_rpc_call_main_thread發起RPC
void *rpc_call_main_thread_cb_fn;

void
vlib_rpc_call_main_thread (void *callback, u8 * args, u32 arg_size)
{
    /* 全局函數指針,在初始化的時候其值被設置爲vl_api_rpc_call_main_thread函數的地址 */
    if (rpc_call_main_thread_cb_fn)
    {
        void (*fp) (void *, u8 *, u32) = rpc_call_main_thread_cb_fn;
        (*fp) (callback, args, arg_size);
    }
    else
        clib_warning ("BUG: rpc_call_main_thread_cb_fn NULL!");
}

worker線程將本線程發起的RPC轉交給main線程

/* worker線程將收集的rpc請求從本身的pending_rpc_requests中轉移到main線程的pending_rpc_requests */
void
vl_api_send_pending_rpc_requests (vlib_main_t * vm)
{
    vlib_main_t *vm_global = &vlib_global_main;

    ASSERT (vm != vm_global);

    clib_spinlock_lock_if_init (&vm_global->pending_rpc_lock);
    vec_append (vm_global->pending_rpc_requests, vm->pending_rpc_requests);
    vec_reset_length (vm->pending_rpc_requests);
    clib_spinlock_unlock_if_init (&vm_global->pending_rpc_lock);
}
  • [ ] vlib_main_or_worker_loop

只有worker線程才須要將RPC請求轉移到main線程。函數

/* 參數is_main決定是主線程仍是worker線程 */
static_always_inline void
vlib_main_or_worker_loop (vlib_main_t * vm, int is_main)
{
    while (1)
    {
        vlib_node_runtime_t *n;

        /* woerk線程將本線程收集的rpc請求轉交給main線程 */
        if (PREDICT_FALSE (_vec_len (vm->pending_rpc_requests) > 0))
        {
            if (!is_main)/* 只有work線程纔會將本身發起的rpc請求轉移到main線程 */
                vl_api_send_pending_rpc_requests (vm);
        }
        ......
        
        vlib_increment_main_loop_counter (vm);

        /* Record time stamp in case there are no enabled nodes and above
            calls do not update time stamp. */
        cpu_time_now = clib_cpu_time_now ();
    }
}

協程處理RPC

RPC處理是在協程"api-rx-from-ring",這個協程也是處理api的協程。

/* *INDENT-OFF* */
VLIB_REGISTER_NODE (vl_api_clnt_node) =
{
    .function = vl_api_clnt_process,
    .type = VLIB_NODE_TYPE_PROCESS,
    .name = "api-rx-from-ring",
    .state = VLIB_NODE_STATE_DISABLED,
};

協程主函數vl_api_clnt_process

static uword
vl_api_clnt_process (vlib_main_t * vm, vlib_node_runtime_t * node,
                     vlib_frame_t * f)
{
    ......
    /* $$$ pay attention to frame size, control CPU usage */
    while (1)
    {
        /*
         * There's a reason for checking the queue before
         * sleeping. If the vlib application crashes, it's entirely
         * possible for a client to enqueue a connect request
         * during the process restart interval.
         *
         * Unless some force of physics causes the new incarnation
         * of the application to process the request, the client will
         * sit and wait for Godot...
         */
        vector_rate = vlib_last_vector_length_per_node (vm);
        start_time = vlib_time_now (vm);
        while (1)
        {
            if (vl_mem_api_handle_rpc (vm, node)/* 執行協程請求 */
                    || vl_mem_api_handle_msg_main (vm, node))/* 執行api請求 */
            {
                vm->api_queue_nonempty = 0;
                VL_MEM_API_LOG_Q_LEN ("q-underflow: len %d", 0);
                sleep_time = 20.0;
                break;
            }
            ......
        }
        ......
    }

    return 0;
}

int
vl_mem_api_handle_rpc (vlib_main_t * vm, vlib_node_runtime_t * node)
{
    api_main_t *am = &api_main;
    int i;
    uword *tmp, mp;

    /*
     * Swap pending and processing vectors, then process the RPCs
     * Avoid deadlock conditions by construction.
     * 將等待處理的人rpc請求轉移到局部變量tmp。避免臨界時間太長。
     */
    clib_spinlock_lock_if_init (&vm->pending_rpc_lock);
    tmp = vm->processing_rpc_requests;
    vec_reset_length (tmp);
    vm->processing_rpc_requests = vm->pending_rpc_requests;
    vm->pending_rpc_requests = tmp;
    clib_spinlock_unlock_if_init (&vm->pending_rpc_lock);

    /*
     * RPCs are used to reflect function calls to thread 0
     * when the underlying code is not thread-safe.
     *
     * Grabbing the thread barrier across a set of RPCs
     * greatly increases efficiency, and avoids
     * running afoul of the barrier sync holddown timer.
     * The barrier sync code supports recursive locking.
     *
     * We really need to rewrite RPC-based code...
     */
    if (PREDICT_TRUE (vec_len (vm->processing_rpc_requests)))
    {
        vl_msg_api_barrier_sync ();
        for (i = 0; i < vec_len (vm->processing_rpc_requests); i++)/* 循環處理每個rpc */
        {
            mp = vm->processing_rpc_requests[i];
            vl_msg_api_handler_with_vm_node (am, (void *) mp, vm, node);
        }
        vl_msg_api_barrier_release ();
    }

    return 0;
}

/* This is only to be called from a vlib/vnet app */
void
vl_msg_api_handler_with_vm_node (api_main_t * am,
                                 void *the_msg, vlib_main_t * vm,
                                 vlib_node_runtime_t * node)
{
    u16 id = ntohs (*((u16 *) the_msg));/* 獲取消息id,傳遞的消息第一個成員就是消息id */
    u8 *(*handler) (void *, void *, void *);
    u8 *(*print_fp) (void *, void *);

    ......
    /* 根據消息id獲取對應的執行函數,即VL_API_RPC_CALL對應的函數vl_api_rpc_call_t_handler */
    if (id < vec_len (am->msg_handlers) && am->msg_handlers[id])
    {
        handler = (void *) am->msg_handlers[id];

        if (PREDICT_FALSE (am->rx_trace && am->rx_trace->enabled))
            vl_msg_api_trace (am, am->rx_trace, the_msg);

        if (PREDICT_FALSE (am->msg_print_flag))
        {
            fformat (stdout, "[%d]: %s\n", id, am->msg_names[id]);
            print_fp = (void *) am->msg_print_handlers[id];
            if (print_fp == 0)
            {
                fformat (stdout, "  [no registered print fn for msg %d]\n", id);
            }
            else
            {
                (*print_fp) (the_msg, vm);
            }
        }

        if (!am->is_mp_safe[id])
        {
            vl_msg_api_barrier_trace_context (am->msg_names[id]);
            vl_msg_api_barrier_sync ();
        }
        /* 執行函數vl_api_rpc_call_t_handler */
        (*handler) (the_msg, vm, node);
        if (!am->is_mp_safe[id])
            vl_msg_api_barrier_release ();
    }
    else
    {
        clib_warning ("no handler for msg id %d", id);
    }

    /*
     * Special-case, so we can e.g. bounce messages off the vnet
     * main thread without copying them...
     */
    if (!(am->message_bounce[id]))
        vl_msg_api_free (the_msg);

    ......
}
相關文章
相關標籤/搜索