VPP處理報文時是沿着一個有向圖進行處理的,每個功能單元稱之爲節點(node)。node
typedef struct { /* Public nodes. */ /* 節點指針數組,使用下標做爲索引 */ vlib_node_t **nodes; /* Node index hashed by node name. */ /* 根據節點名字進行hash,能夠根據節點名字進行hash表查找 * 只有main線程纔會委會該hash表 */ uword *node_by_name; u32 flags; /* 該標誌表示Runtime信息已經被初始化過了 */ #define VLIB_NODE_MAIN_RUNTIME_STARTED (1 << 0) /* Nodes segregated by type for cache locality. Does not apply to nodes of type VLIB_NODE_TYPE_INTERNAL. */ vlib_node_runtime_t *nodes_by_type[VLIB_N_NODE_TYPE]; /* Node runtime indices for input nodes with pending interrupts. */ u32 *pending_interrupt_node_runtime_indices; clib_spinlock_t pending_interrupt_lock; /* Input nodes are switched from/to interrupt to/from polling mode when average vector length goes above/below polling/interrupt thresholds. * 輸入節點在中斷模式和輪詢模式之間進行切換,當向量的平均長度高於輪詢長度閾值時 * 將會從中斷模式切換到輪詢模式(這種狀況說明報文很是多),當長度低於中斷閾值時,從 * 輪詢模式切換到中斷模式(壓力變小了) */ u32 polling_threshold_vector_length; u32 interrupt_threshold_vector_length; /* Vector of next frames. */ /* 幀數組,由內部節點組成,其中n1是節點的下一跳個節點的個數,元素是節點運行索引 * node_runtime_index與幀數據索引構成的幀。 */ /* |----node 1的n1個元素|----node 2的n2個元素|......| ----node n的n個元素| */ /* 只針對內部節點 */ vlib_next_frame_t *next_frames; /* Vector of internal node's frames waiting to be called. * 等待被調用的內部節點,一般是上一個節點的報文處理後指向的下一個節點 */ vlib_pending_frame_t *pending_frames; /* Timing wheel for scheduling time-based node dispatch. */ void *timing_wheel; vlib_signal_timed_event_data_t *signal_timed_event_data_pool; /* Opaque data vector added via timing_wheel_advance. */ u32 *data_from_advancing_timing_wheel; /* CPU time of next process to be ready on timing wheel. */ f64 time_next_process_ready; /* Vector of process nodes. One for each node of type VLIB_NODE_TYPE_PROCESS. */ vlib_process_t **processes; /* Current running process or ~0 if no process running. */ u32 current_process_index; /* Pool of pending process frames. */ vlib_pending_frame_t *suspended_process_frames; /* Vector of event data vectors pending recycle. */ void **recycled_event_data_vectors; /* Current counts of nodes in each state. */ u32 input_node_counts_by_state[VLIB_N_NODE_STATE]; /* Hash of (scalar_size,vector_size) to frame_sizes index. */ uword *frame_size_hash; /* Per-size frame allocation information. */ /* 不一樣大小的幀的分配信息,是一個數組,與上面的hash表是兩種索引方式 */ vlib_frame_size_t *frame_sizes; /* Time of last node runtime stats clear. */ f64 time_last_runtime_stats_clear; /* Node registrations added by constructors */ vlib_node_registration_t *node_registrations; } vlib_node_main_t;
typedef enum { /* An internal node on the call graph (could be output). */ VLIB_NODE_TYPE_INTERNAL, /* Nodes which input data into the processing graph. Input nodes are called for each iteration of main loop. 輸入節點,報文流轉入口 */ VLIB_NODE_TYPE_INPUT, /* Nodes to be called before all input nodes. Used, for example, to clean out driver TX rings before processing input. 輸入節點以前處理的節點,用於處理一些在處理輸入報文以前的任務。 好比清除發送緩衝區(好像沒有註冊該功能的節點)。目前只註冊了兩個該 類型的節點:epoll和session */ VLIB_NODE_TYPE_PRE_INPUT, /* "Process" nodes which can be suspended and later resumed. */ /* vpp的協程節點,用於處理能夠掛起的任務,好比命令行,api等業務 */ VLIB_NODE_TYPE_PROCESS, VLIB_N_NODE_TYPE, } vlib_node_type_t;
typedef struct _vlib_node_fn_registration { vlib_node_function_t *function; /* 功能函數 */ int priority; /* 優先級,同一節點能夠註冊多個處理函數,選擇優先級最高的,值越大優先級越高 */ struct _vlib_node_fn_registration *next_registration;/* 造成鏈表 */ char *name;/* 名字,必需要和其所屬的節點一致,不然註冊會失敗 */ } vlib_node_fn_registration_t;
typedef struct _vlib_node_registration { /* Vector processing function for this node. 節點的功能函數,從下面註冊的功能函數鏈表中選擇一個優先級最高的最爲該成員的值 */ vlib_node_function_t *function; /* Node function candidate registration with priority 節點功能函數鏈表 */ vlib_node_fn_registration_t *node_fn_registrations; /* Node name. 節點名字 */ char *name; /* Name of sibling (if applicable). */ /* 兄弟節點名字 */ char *sibling_of; /* Node index filled in by registration. 節點索引 */ u32 index; /* Type of this node. 節點類型 */ vlib_node_type_t type; /* Error strings indexed by error code for this node. 節點錯誤碼映射表 */ char **error_strings; /* Buffer format/unformat for this node. */ format_function_t *format_buffer; unformat_function_t *unformat_buffer; /* Trace format/unformat for this node. */ format_function_t *format_trace; unformat_function_t *unformat_trace; /* Function to validate incoming frames. */ u8 *(*validate_frame) (struct vlib_main_t * vm, struct vlib_node_runtime_t *, struct vlib_frame_t * f); /* Per-node runtime data. 節點運行時數據,私有數據存儲位置 */ void *runtime_data; /* Process stack size. */ u16 process_log2_n_stack_bytes; /* Number of bytes of per-node run time data. */ u8 runtime_data_bytes; /* State for input nodes. */ u8 state; /* Node flags. */ u16 flags; /* protocol at b->data[b->current_data] upon entry to the dispatch fn */ u8 protocol_hint; /* Size of scalar and vector arguments in bytes. */ u16 scalar_size, vector_size; /* Number of error codes used by this node. */ u16 n_errors; /* Number of next node names that follow. 該節點指向的下一個節點個數 */ u16 n_next_nodes; /* Constructor link-list, don't ask... 全部節點經過該成員造成鏈表 */ struct _vlib_node_registration *next_registration; /* Names of next nodes which this node feeds into. 下一個節點數組,存儲的是名字、 */ char *next_nodes[]; } vlib_node_registration_t;
#ifndef CLIB_MARCH_VARIANT #define VLIB_REGISTER_NODE(x,...) \ __VA_ARGS__ vlib_node_registration_t x; \ //聲明一個須要註冊的節點 static void __vlib_add_node_registration_##x (void) \ //聲明一個靜態的添加一個節點的函數,有constructor屬性,在main函數以前執行 __attribute__((__constructor__)) ; \ static void __vlib_add_node_registration_##x (void) \ { \ //定義添加節點函數,即將節點x連接到vm->node_main.node_registrations鏈表中 vlib_main_t * vm = vlib_get_main(); \ x.next_registration = vm->node_main.node_registrations; \ vm->node_main.node_registrations = &x; \ } \ static void __vlib_rm_node_registration_##x (void) \ //從鏈表中移除節點 __attribute__((__destructor__)) ; \ static void __vlib_rm_node_registration_##x (void) \ { \ vlib_main_t * vm = vlib_get_main(); \ VLIB_REMOVE_FROM_LINKED_LIST (vm->node_main.node_registrations, \ &x, next_registration); \ } \ __VA_ARGS__ vlib_node_registration_t x // 定義一個須要註冊的節點,這裏沒有分號,是由於使用這個宏的時候有分號,而且初始化該變量。 #else #define VLIB_REGISTER_NODE(x,...) \ static __clib_unused vlib_node_registration_t __clib_unused_##x #endif
咱們以DPDK類型的輸入節點來進行分析。api
/* *INDENT-OFF* */ VLIB_REGISTER_NODE (dpdk_input_node) = { .type = VLIB_NODE_TYPE_INPUT, .name = "dpdk-input", .sibling_of = "device-input", /* Will be enabled if/when hardware is detected. */ .state = VLIB_NODE_STATE_DISABLED, .format_buffer = format_ethernet_header_with_length, .format_trace = format_dpdk_rx_trace, .n_errors = DPDK_N_ERROR, .error_strings = dpdk_error_strings, };
#define VLIB_NODE_FN(node) \ uword CLIB_MARCH_SFX (node##_fn)(); \ static vlib_node_fn_registration_t \ CLIB_MARCH_SFX(node##_fn_registration) = \ { .function = &CLIB_MARCH_SFX (node##_fn), }; \ \ static void __clib_constructor \ CLIB_MARCH_SFX (node##_multiarch_register) (void) \ { \ extern vlib_node_registration_t node; \ //這裏引用了一個node節點,其名字爲宏的輸入參數,說明在定義節點和其處理函數的時候要求它們有同樣的名字。 vlib_node_fn_registration_t *r; \ r = & CLIB_MARCH_SFX (node##_fn_registration); \ r->priority = CLIB_MARCH_FN_PRIORITY(); \//處理函數優先級,根據優先級選擇最高優先級的處理函數 r->name = CLIB_MARCH_VARIANT_STR; \ r->next_registration = node.node_fn_registrations; \//將函數添加到其對應的節點鏈表中,從這裏能夠看出一個節點能夠有多個處理函數,在函數register_node中會選擇一個優先級最高的函數做爲節點的最終處理函數。 node.node_fn_registrations = r; \ } \ uword CLIB_CPU_OPTIMIZED CLIB_MARCH_SFX (node##_fn)
咱們以DPDK輸入節點爲例。數組
VLIB_NODE_FN (dpdk_input_node) (vlib_main_t * vm, vlib_node_runtime_t * node, vlib_frame_t * f) { dpdk_main_t *dm = &dpdk_main; dpdk_device_t *xd; uword n_rx_packets = 0; /* 獲取輸入節點的運行信息,其中的devices_and_queues包含了該線程在該輸入節點須要處理的隊列信息,動態增長該類設備時,會在修改其中的信息 */ vnet_device_input_runtime_t *rt = (void *) node->runtime_data; vnet_device_and_queue_t *dq;/* */ u32 thread_index = node->thread_index; /* * Poll all devices on this cpu for input/interrupts. */ /* *INDENT-OFF* 遍歷該線程接管的每個設備的每個隊列 */ foreach_device_and_queue (dq, rt->devices_and_queues) { xd = vec_elt_at_index(dm->devices, dq->dev_instance); if (PREDICT_FALSE (xd->flags & DPDK_DEVICE_FLAG_BOND_SLAVE)) continue; /* Do not poll slave to a bonded interface */ n_rx_packets += dpdk_device_input (vm, dm, xd, node, thread_index, dq->queue_id); } /* *INDENT-ON* */ return n_rx_packets; }
/* 運行時幀索引,這些幀根據節點類型進行分類的 */ typedef struct vlib_node_runtime_t { CLIB_CACHE_LINE_ALIGN_MARK (cacheline0); /**< cacheline mark */ /* 運行函數 */ vlib_node_function_t *function; /**< Node function to call. */ vlib_error_t *errors; /**< Vector of errors for this node. */ #if __SIZEOF_POINTER__ == 4 u8 pad[8]; #endif u32 clocks_since_last_overflow; /**< Number of clock cycles. */ u32 max_clock; /**< Maximum clock cycle for an invocation. */ u32 max_clock_n; /**< Number of vectors in the recorded max_clock. */ u32 calls_since_last_overflow; /**< Number of calls. */ u32 vectors_since_last_overflow; /**< Number of vector elements processed by this node. */ u32 perf_counter0_ticks_since_last_overflow; /**< Perf counter 0 ticks */ u32 perf_counter1_ticks_since_last_overflow; /**< Perf counter 1 ticks */ u32 perf_counter_vectors_since_last_overflow; /**< Perf counter vectors */ /* 起始的下一幀索引 */ u32 next_frame_index; /**< Start of next frames for this node. */ /* 節點索引 */ u32 node_index; /**< Node index. */ u32 input_main_loops_per_call; /**< For input nodes: decremented on each main loop interation until it reaches zero and function is called. Allows some input nodes to be called more than others. */ u32 main_loop_count_last_dispatch; /**< Saved main loop counter of last ** dispatch of this node. ** 上一次進入該節點時,主循環調用次數 */ u32 main_loop_vector_stats[2];/* 分組報文統計數組,兩個元素交替統計 */ u16 flags; /**< Copy of main node flags. */ u16 state; /**< Input node state. */ /* 運行時下一個節點的個數 */ u16 n_next_nodes;/* 多少個下一個節點 */ /* 該節點上一次使用的下一個幀的索引編號,緩存的用於加速 */ u16 cached_next_index; /**< Next frame index that vector arguments were last enqueued to last time this node ran. Set to zero before first run of this node. */ /* 節點所屬線程 */ u16 thread_index; /**< thread this node runs on */ u8 runtime_data[0]; /**< Function dependent node-runtime data. This data is thread local, and it is not cloned from main thread. It needs to be initialized for each thread before it is used unless runtime_data template exists in vlib_node_t. */ } vlib_node_runtime_t; /* 運行時節點描述結構體 */
typedef struct { /* Frame index. */ /* 幀數據索引*/ u32 frame_index; /* Node runtime for this next. */ /* 運行節點索引 */ u32 node_runtime_index; /* Next frame flags. */ u32 flags; /* Reflects node frame-used flag for this next. */ #define VLIB_FRAME_NO_FREE_AFTER_DISPATCH \ VLIB_NODE_FLAG_FRAME_NO_FREE_AFTER_DISPATCH /* Don't append this frame */ #define VLIB_FRAME_NO_APPEND (1 << 14) /* This next frame owns enqueue to node corresponding to node_runtime_index. */ #define VLIB_FRAME_OWNER (1 << 15) /* Set when frame has been allocated for this next. */ #define VLIB_FRAME_IS_ALLOCATED VLIB_NODE_FLAG_IS_OUTPUT /* Set when frame has been added to pending vector. */ #define VLIB_FRAME_PENDING VLIB_NODE_FLAG_IS_DROP /* Set when frame is to be freed after dispatch. */ #define VLIB_FRAME_FREE_AFTER_DISPATCH VLIB_NODE_FLAG_IS_PUNT /* Set when frame has traced packets. */ #define VLIB_FRAME_TRACE VLIB_NODE_FLAG_TRACE /* Number of vectors enqueue to this next since last overflow. */ u32 vectors_since_last_overflow; } vlib_next_frame_t;
/* A frame pending dispatch by main loop. */ typedef struct { /* Node and runtime for this frame. */ /* 能夠經過該索引在幀數組中找到對應的vlib_node_runtime_t結構 */ u32 node_runtime_index; /* Frame index (in the heap). */ u32 frame_index; /* Start of next frames for this node. */ u32 next_frame_index; /* Special value for next_frame_index when there is no next frame. */ #define VLIB_PENDING_FRAME_NO_NEXT_FRAME ((u32) ~0) } vlib_pending_frame_t;
/* Max number of vector elements to process at once per node. */ #define VLIB_FRAME_SIZE 256 #define VLIB_FRAME_ALIGN CLIB_CACHE_LINE_BYTES /* Calling frame (think stack frame) for a node. * 一個節點的調用棧幀 */ typedef struct vlib_frame_t { /* Frame flags. */ u16 frame_flags; /* User flags. Used for sending hints to the next node. */ u16 flags; /* 數組arguments中的標量字節數Number of scalar bytes in arguments. */ u8 scalar_size; /* Number of bytes per vector argument. */ u8 vector_size; /* Number of vector elements currently in frame. */ /* 在該幀中的向量元素的個數 */ u16 n_vectors; /* Scalar and vector arguments to next node. */ u8 arguments[0]; } vlib_frame_t;
vpp節點有兩種註冊方式,第一種是採用上面的宏進行定義。這些宏帶有__constructor__屬性,都是在main函數自動執行的,造成相應的鏈表。還能夠動態定義,而後進行加工。下面咱們分析一下節點的加工過程。緩存
vlib_main函數調用vlib_node_main_init函數進行node初始化。session
/* Main function. */ int vlib_main (vlib_main_t * volatile vm, unformat_input_t * input) { clib_error_t *volatile error; vlib_node_main_t *nm = &vm->node_main; ...... /* Register static nodes so that init functions may use them. */ /* 註冊全部靜態節點 */ vlib_register_all_static_nodes (vm); ...... /* Initialize node graph. */ /* 初始化節點圖 */ if ((error = vlib_node_main_init (vm))) { /* Arrange for graph hook up error to not be fatal when debugging. */ if (CLIB_DEBUG > 0) clib_error_report (error); else goto done; } ...... vlib_main_loop (vm); ...... }
void vlib_register_all_static_nodes (vlib_main_t * vm) { vlib_node_registration_t *r; static char *null_node_error_strings[] = { "blackholed packets", }; /* 定義一個null節點,做爲第一個節點,其編號爲0 */ static vlib_node_registration_t null_node_reg = { .function = null_node_fn, .vector_size = sizeof (u32), .name = "null-node", .n_errors = 1, .error_strings = null_node_error_strings, }; /* make sure that node index 0 is not used by real node */ register_node (vm, &null_node_reg); /* 遍歷全部的靜態節點,進行註冊 */ r = vm->node_main.node_registrations; while (r) { register_node (vm, r); r = r->next_registration; } }
該函數分配一個vlib_node_t結構,用vlib_node_registration_t信息對其進行初始化,讓後將其添加到vm->node_main->nodes指針數組中,其在數組中的下標爲其節點索引n->index。數據結構
static void register_node (vlib_main_t * vm, vlib_node_registration_t * r) { vlib_node_main_t *nm = &vm->node_main; vlib_node_t *n; u32 page_size = clib_mem_get_page_size (); int i; if (CLIB_DEBUG > 0) { /* Default (0) type should match INTERNAL. */ vlib_node_t zero = { 0 }; ASSERT (VLIB_NODE_TYPE_INTERNAL == zero.type); } /* 從節點的多個函數中選擇一個最高的優先級的函數做爲節點的最終處理函數 */ if (r->node_fn_registrations) { vlib_node_fn_registration_t *fnr = r->node_fn_registrations; int priority = -1; /* to avoid confusion, please remove ".function " statiement from CLIB_NODE_REGISTRATION() if using function function candidates */ ASSERT (r->function == 0); while (fnr) { if (fnr->priority > priority) { priority = fnr->priority; r->function = fnr->function; } fnr = fnr->next_registration; } } ASSERT (r->function != 0); /* 分配節點內存 */ n = clib_mem_alloc_no_fail (sizeof (n[0])); clib_memset (n, 0, sizeof (n[0])); /* 設置索引 */ n->index = vec_len (nm->nodes); n->node_fn_registrations = r->node_fn_registrations; n->protocol_hint = r->protocol_hint; /* 將節點地址添加到數組中 */ vec_add1 (nm->nodes, n); /* Name is always a vector so it can be formatted with %v. */ if (clib_mem_is_heap_object (vec_header (r->name, 0))) n->name = vec_dup ((u8 *) r->name); else n->name = format (0, "%s", r->name); /* 構建節點名字與節點索引hash表 */ if (!nm->node_by_name) nm->node_by_name = hash_create_vec ( /* size */ 32, sizeof (n->name[0]), sizeof (uword)); /* Node names must be unique. */ { vlib_node_t *o = vlib_get_node_by_name (vm, n->name); if (o) clib_error ("more than one node named `%v'", n->name); } hash_set (nm->node_by_name, n->name, n->index); r->index = n->index; /* save index in registration */ n->function = r->function; /* Node index of next sibling will be filled in by vlib_node_main_init. */ n->sibling_of = r->sibling_of; if (r->sibling_of && r->n_next_nodes > 0) clib_error ("sibling node should not have any next nodes `%v'", n->name); if (r->type == VLIB_NODE_TYPE_INTERNAL) ASSERT (r->vector_size > 0); #define _(f) n->f = r->f _(type); _(flags); _(state); _(scalar_size); _(vector_size); _(format_buffer); _(unformat_buffer); _(format_trace); _(validate_frame); /* Register error counters. */ vlib_register_errors (vm, n->index, r->n_errors, r->error_strings); node_elog_init (vm, n->index); _(runtime_data_bytes); if (r->runtime_data_bytes > 0) { vec_resize (n->runtime_data, r->runtime_data_bytes); if (r->runtime_data) clib_memcpy (n->runtime_data, r->runtime_data, r->runtime_data_bytes); } /* 初始化節點的下一跳數組 */ vec_resize (n->next_node_names, r->n_next_nodes); for (i = 0; i < r->n_next_nodes; i++) n->next_node_names[i] = r->next_nodes[i]; vec_validate_init_empty (n->next_nodes, r->n_next_nodes - 1, ~0); vec_validate (n->n_vectors_by_next_node, r->n_next_nodes - 1); n->owner_node_index = n->owner_next_index = ~0; /* Initialize node runtime. */ /* 初始化節點運行數據,主要是對節點按類型進行分類 */ { vlib_node_runtime_t *rt; u32 i; if (n->type == VLIB_NODE_TYPE_PROCESS) { vlib_process_t *p; uword log2_n_stack_bytes; log2_n_stack_bytes = clib_max (r->process_log2_n_stack_bytes, 15); #ifdef CLIB_UNIX /* * Bump the stack size if running over a kernel with a large page size, * and the stack isn't any too big to begin with. Otherwise, we'll * trip over the stack guard page for sure. */ if ((page_size > (4 << 10)) && log2_n_stack_bytes < 19) { if ((1 << log2_n_stack_bytes) <= page_size) log2_n_stack_bytes = min_log2 (page_size) + 1; else log2_n_stack_bytes++; } #endif p = clib_mem_alloc_aligned_at_offset (sizeof (p[0]) + (1 << log2_n_stack_bytes), STACK_ALIGN, STRUCT_OFFSET_OF (vlib_process_t, stack), 0 /* no, don't call os_out_of_memory */ ); if (p == 0) clib_panic ("failed to allocate process stack (%d bytes)", 1 << log2_n_stack_bytes); clib_memset (p, 0, sizeof (p[0])); p->log2_n_stack_bytes = log2_n_stack_bytes; /* Process node's runtime index is really index into process pointer vector. */ n->runtime_index = vec_len (nm->processes); vec_add1 (nm->processes, p); /* Paint first stack word with magic number so we can at least detect process stack overruns. */ p->stack[0] = VLIB_PROCESS_STACK_MAGIC; /* Node runtime is stored inside of process. */ rt = &p->node_runtime; #ifdef CLIB_UNIX /* * Disallow writes to the bottom page of the stack, to * catch stack overflows. */ if (mprotect (p->stack, page_size, PROT_READ) < 0) clib_unix_warning ("process stack"); #endif } else { /* 根據類型進行分類 */ vec_add2_aligned (nm->nodes_by_type[n->type], rt, 1, /* align */ CLIB_CACHE_LINE_BYTES); n->runtime_index = rt - nm->nodes_by_type[n->type]; } /* 統計輸入節點狀態個數 */ if (n->type == VLIB_NODE_TYPE_INPUT) nm->input_node_counts_by_state[n->state] += 1; rt->function = n->function; rt->flags = n->flags; rt->state = n->state; rt->node_index = n->index; rt->n_next_nodes = r->n_next_nodes; rt->next_frame_index = vec_len (nm->next_frames); /* 爲該節點在nm->next_frames中申請一塊rt->n_next_nodes元素的內存 * 該內存用於存儲該節點運行的下一幀 */ vec_resize (nm->next_frames, rt->n_next_nodes); for (i = 0; i < rt->n_next_nodes; i++) vlib_next_frame_init (nm->next_frames + rt->next_frame_index + i); vec_resize (rt->errors, r->n_errors); for (i = 0; i < vec_len (rt->errors); i++) rt->errors[i] = vlib_error_set (n->index, i); STATIC_ASSERT_SIZEOF (vlib_node_runtime_t, 128); ASSERT (vec_len (n->runtime_data) <= VLIB_NODE_RUNTIME_DATA_SIZE); if (vec_len (n->runtime_data) > 0) clib_memcpy (rt->runtime_data, n->runtime_data, vec_len (n->runtime_data)); vec_free (n->runtime_data); } }
clib_error_t * vlib_node_main_init (vlib_main_t * vm) { vlib_node_main_t *nm = &vm->node_main; clib_error_t *error = 0; vlib_node_t *n; uword ni; /* 建立frame內存分配器 */ nm->frame_sizes = vec_new (vlib_frame_size_t, 1); #ifdef VLIB_SUPPORTS_ARBITRARY_SCALAR_SIZES nm->frame_size_hash = hash_create (0, sizeof (uword)); #endif /* 設置已經初始化標誌 */ nm->flags |= VLIB_NODE_MAIN_RUNTIME_STARTED; /* Generate sibling relationships */ /* 處理全部節點的兄弟關係,好比不一樣類型的輸入節點大可能是兄弟節點,他們會指向相同的 * 下一跳節點。好比dpdk-input節點與af-packet-input幾點就是互爲兄弟節點。兄弟的兄弟 * 也是我兄弟 */ { vlib_node_t *n, *sib; uword si; /* 遍歷每個節點 */ for (ni = 0; ni < vec_len (nm->nodes); ni++) { n = vec_elt (nm->nodes, ni); if (!n->sibling_of) continue; /* 獲取兄弟名字 */ sib = vlib_get_node_by_name (vm, (u8 *) n->sibling_of); if (!sib) { error = clib_error_create ("sibling `%s' not found for node `%v'", n->sibling_of, n->name); goto done; } /* *INDENT-OFF* */ /* 遍歷兄弟節點的每個兄弟掩碼,它的兄弟都是個人兄弟 */ clib_bitmap_foreach (si, sib->sibling_bitmap, ( { /* 獲取兄弟的兄弟節點 */ vlib_node_t * m = vec_elt (nm->nodes, si); /* Connect all of sibling's siblings to us. */ /* 加本節點加入到兄弟的兄的的兄弟掩碼圖中 */ m->sibling_bitmap = clib_bitmap_ori (m->sibling_bitmap, n->index); /* Connect us to all of sibling's siblings. */ /* 將兄弟的兄弟加入到本身的掩碼圖中 */ n->sibling_bitmap = clib_bitmap_ori (n->sibling_bitmap, si); })); /* *INDENT-ON* */ /* Connect sibling to us. */ sib->sibling_bitmap = clib_bitmap_ori (sib->sibling_bitmap, n->index); /* Connect us to sibling. */ /* 將兄弟設置到本身的掩碼圖中 */ n->sibling_bitmap = clib_bitmap_ori (n->sibling_bitmap, sib->index); } } /* Resolve next names into next indices. */ /* 根據下一跳名字數組構建下一跳掩碼數組 */ for (ni = 0; ni < vec_len (nm->nodes); ni++) { uword i; n = vec_elt (nm->nodes, ni); for (i = 0; i < vec_len (n->next_node_names); i++) { char *a = n->next_node_names[i]; if (!a) continue; /* 構建下一跳索引數組 */ if (~0 == vlib_node_add_named_next_with_slot (vm, n->index, a, i)) { error = clib_error_create ("node `%v' refers to unknown node `%s'", n->name, a); goto done; } } vec_free (n->next_node_names); } /* Set previous node pointers. */ /* 將下一跳節點指向本身,即構建前驅關係 */ for (ni = 0; ni < vec_len (nm->nodes); ni++) { vlib_node_t *n_next; uword i; n = vec_elt (nm->nodes, ni); for (i = 0; i < vec_len (n->next_nodes); i++) { if (n->next_nodes[i] >= vec_len (nm->nodes)) continue; n_next = vec_elt (nm->nodes, n->next_nodes[i]); n_next->prev_node_bitmap = clib_bitmap_ori (n_next->prev_node_bitmap, n->index); } } /* 初始化每個內部節點,構建起下一跳節點的運行信息 */ { vlib_next_frame_t *nf; vlib_node_runtime_t *r; vlib_node_t *next; uword i; vec_foreach (r, nm->nodes_by_type[VLIB_NODE_TYPE_INTERNAL]) { if (r->n_next_nodes == 0) continue; n = vlib_get_node (vm, r->node_index); /* 根據運行索引獲取其在next_frames的起始地址 */ nf = vec_elt_at_index (nm->next_frames, r->next_frame_index); /* 遍歷每個下一跳 */ for (i = 0; i < vec_len (n->next_nodes); i++) { next = vlib_get_node (vm, n->next_nodes[i]); /* Validate node runtime indices are correctly initialized. */ ASSERT (nf[i].node_runtime_index == next->runtime_index); nf[i].flags = 0; if (next->flags & VLIB_NODE_FLAG_FRAME_NO_FREE_AFTER_DISPATCH) nf[i].flags |= VLIB_FRAME_NO_FREE_AFTER_DISPATCH; } } } done: return error; }
/* Add named next node to given node in given slot. */ /* 添加一個命名的下一跳到節點node指定的slot中,若是slot沒有指定, * 則分配。 */ uword vlib_node_add_named_next_with_slot (vlib_main_t * vm, uword node, char *name, uword slot) { vlib_node_main_t *nm; vlib_node_t *n, *n_next; nm = &vm->node_main; n = vlib_get_node (vm, node); n_next = vlib_get_node_by_name (vm, (u8 *) name); if (!n_next) { if (nm->flags & VLIB_NODE_MAIN_RUNTIME_STARTED) return ~0; if (slot == ~0) slot = clib_max (vec_len (n->next_node_names), vec_len (n->next_nodes)); vec_validate (n->next_node_names, slot); n->next_node_names[slot] = name; return slot; } return vlib_node_add_next_with_slot (vm, node, n_next->index, slot); }
/* Add next node to given node in given slot. */ uword vlib_node_add_next_with_slot (vlib_main_t * vm, uword node_index, uword next_node_index, uword slot) { vlib_node_main_t *nm = &vm->node_main; vlib_node_t *node, *next; uword *p; node = vec_elt (nm->nodes, node_index); next = vec_elt (nm->nodes, next_node_index); /* Runtime has to be initialized. */ ASSERT (nm->flags & VLIB_NODE_MAIN_RUNTIME_STARTED); /* 根據下一跳節點索引快速判斷該節點是否在本節點的下一跳數組中 */ if ((p = hash_get (node->next_slot_by_node, next_node_index))) { /* Next already exists: slot must match. */ /* 已經存在,返回該slot */ if (slot != ~0) ASSERT (slot == p[0]); return p[0]; } /* 不存在的話,將下一個可用位置分給該next_node_index節點 */ if (slot == ~0) slot = vec_len (node->next_nodes); vec_validate_init_empty (node->next_nodes, slot, ~0); vec_validate (node->n_vectors_by_next_node, slot); /* 添加一個下一跳索引 */ node->next_nodes[slot] = next_node_index; hash_set (node->next_slot_by_node, next_node_index, slot); /* 構建運行信息 */ vlib_node_runtime_update (vm, node_index, slot); /* 創建反向關係,設置next_node_index節點的位數組prev_node_bitmap中node_index爲1 */ next->prev_node_bitmap = clib_bitmap_ori (next->prev_node_bitmap, node_index); /* Siblings all get same node structure. */ /* 處理本節點的兄弟節點,兄弟節點都指向該next_node_index節點 * 存在深度的遞歸調用該函數。最差狀況下,一個兄弟節點遞歸一次。 */ { uword sib_node_index, sib_slot; vlib_node_t *sib_node; /* *INDENT-OFF* */ clib_bitmap_foreach (sib_node_index, node->sibling_bitmap, ( { sib_node = vec_elt (nm->nodes, sib_node_index); if (sib_node != node) { sib_slot = vlib_node_add_next_with_slot (vm, sib_node_index, next_node_index, slot); ASSERT (sib_slot == slot); } })); /* *INDENT-ON* */ } return slot; }
/* 增長了節點,須要更新運行時數據,next_index不是節點索引,而是槽位號slot */ static void vlib_node_runtime_update (vlib_main_t * vm, u32 node_index, u32 next_index) { vlib_node_main_t *nm = &vm->node_main; vlib_node_runtime_t *r, *s; vlib_node_t *node, *next_node; vlib_next_frame_t *nf; vlib_pending_frame_t *pf; i32 i, j, n_insert; ASSERT (vlib_get_thread_index () == 0); /* 開啓sync過程 */ vlib_worker_thread_barrier_sync (vm); node = vec_elt (nm->nodes, node_index); r = vlib_node_get_runtime (vm, node_index); /* 新增多少個下一跳節點 */ n_insert = vec_len (node->next_nodes) - r->n_next_nodes; if (n_insert > 0) { i = r->next_frame_index + r->n_next_nodes; /* 在數組中間插入n_insert個節點 */ vec_insert (nm->next_frames, n_insert, i); /* Initialize newly inserted next frames. */ for (j = 0; j < n_insert; j++) vlib_next_frame_init (nm->next_frames + i + j); /* Relocate other next frames at higher indices. */ for (j = 0; j < vec_len (nm->nodes); j++) { s = vlib_node_get_runtime (vm, j); if (j != node_index && s->next_frame_index >= i) s->next_frame_index += n_insert; } /* Pending frames may need to be relocated also. */ /* 修改正在運行的幀的索引 */ vec_foreach (pf, nm->pending_frames) { if (pf->next_frame_index != VLIB_PENDING_FRAME_NO_NEXT_FRAME && pf->next_frame_index >= i) pf->next_frame_index += n_insert; } /* *INDENT-OFF* */ pool_foreach (pf, nm->suspended_process_frames, ( { if (pf->next_frame_index != ~0 && pf->next_frame_index >= i) pf->next_frame_index += n_insert; })); /* *INDENT-ON* */ r->n_next_nodes = vec_len (node->next_nodes); } /* Set frame's node runtime index. */ /* 設置節點的運行時索引,next_index是槽位號,不是索引 */ next_node = vlib_get_node (vm, node->next_nodes[next_index]); nf = nm->next_frames + r->next_frame_index + next_index; nf->node_runtime_index = next_node->runtime_index; vlib_worker_thread_node_runtime_update (); vlib_worker_thread_barrier_release (vm); }
除了使用node註冊宏進行節點的註冊外,還可使用以下函數按需註冊,註冊一個新的VLIB_NODE_TYPE_INTERNAL節點後須要調用vlib_worker_thread_node_runtime_update或者vlib_node_add_next_with_slot開啓一輪新的節點編排工做,全部的線程都要進行。註冊VLIB_NODE_TYPE_PROCESS節點後,須要調用vlib_start_process函數啓動協程。app
/* Register new packet processing node. */ /* 動態註冊一個新的節點 */ u32 vlib_register_node (vlib_main_t * vm, vlib_node_registration_t * r) { register_node (vm, r); return r->index; }
該函數會更新node graph以及runtime 信息,還會通知其它線程進行sync同步來完成消息的變動。less
/* Add next node to given node in given slot. */ uword vlib_node_add_next_with_slot (vlib_main_t * vm, uword node_index, uword next_node_index, uword slot) { vlib_node_main_t *nm = &vm->node_main; vlib_node_t *node, *next; uword *p; node = vec_elt (nm->nodes, node_index); next = vec_elt (nm->nodes, next_node_index); /* Runtime has to be initialized. */ ASSERT (nm->flags & VLIB_NODE_MAIN_RUNTIME_STARTED); /* 根據下一跳節點索引快速判斷該節點是否在本節點的下一跳數組中 */ if ((p = hash_get (node->next_slot_by_node, next_node_index))) { /* Next already exists: slot must match. */ /* 已經存在,返回該slot */ if (slot != ~0) ASSERT (slot == p[0]); return p[0]; } /* 不存在的話,將下一個可用位置分給該next_node_index節點 */ if (slot == ~0) slot = vec_len (node->next_nodes); vec_validate_init_empty (node->next_nodes, slot, ~0); vec_validate (node->n_vectors_by_next_node, slot); /* 添加一個下一跳索引 */ node->next_nodes[slot] = next_node_index; hash_set (node->next_slot_by_node, next_node_index, slot); /* 通知其它線程開始進行運行狀態重建 */ vlib_node_runtime_update (vm, node_index, slot); /* 創建反向關係,設置next_node_index節點的位數組prev_node_bitmap中node_index爲1 */ next->prev_node_bitmap = clib_bitmap_ori (next->prev_node_bitmap, node_index); /* Siblings all get same node structure. */ /* 處理本節點的兄弟節點,兄弟節點都指向該next_node_index節點 * 存在深度的遞歸調用該函數。最差狀況下,一個兄弟節點遞歸一次。 */ { uword sib_node_index, sib_slot; vlib_node_t *sib_node; /* *INDENT-OFF* */ clib_bitmap_foreach (sib_node_index, node->sibling_bitmap, ( { sib_node = vec_elt (nm->nodes, sib_node_index); if (sib_node != node) { sib_slot = vlib_node_add_next_with_slot (vm, sib_node_index, next_node_index, slot); ASSERT (sib_slot == slot); } })); /* *INDENT-ON* */ } return slot; }
/* 增長了節點,須要更新運行時數據,next_index不是節點索引,而是槽位號slot */ static void vlib_node_runtime_update (vlib_main_t * vm, u32 node_index, u32 next_index) { vlib_node_main_t *nm = &vm->node_main; vlib_node_runtime_t *r, *s; vlib_node_t *node, *next_node; vlib_next_frame_t *nf; vlib_pending_frame_t *pf; i32 i, j, n_insert; ASSERT (vlib_get_thread_index () == 0); /* 開啓sync過程 */ vlib_worker_thread_barrier_sync (vm); node = vec_elt (nm->nodes, node_index); r = vlib_node_get_runtime (vm, node_index); /* 新增多少個下一跳節點 */ n_insert = vec_len (node->next_nodes) - r->n_next_nodes; if (n_insert > 0) { i = r->next_frame_index + r->n_next_nodes; /* 在數組中間插入n_insert個節點 */ vec_insert (nm->next_frames, n_insert, i); /* Initialize newly inserted next frames. */ for (j = 0; j < n_insert; j++) vlib_next_frame_init (nm->next_frames + i + j); /* Relocate other next frames at higher indices. */ for (j = 0; j < vec_len (nm->nodes); j++) { s = vlib_node_get_runtime (vm, j); if (j != node_index && s->next_frame_index >= i) s->next_frame_index += n_insert; } /* Pending frames may need to be relocated also. */ /* 修改正在運行的幀的索引 */ vec_foreach (pf, nm->pending_frames) { if (pf->next_frame_index != VLIB_PENDING_FRAME_NO_NEXT_FRAME && pf->next_frame_index >= i) pf->next_frame_index += n_insert; } /* *INDENT-OFF* */ pool_foreach (pf, nm->suspended_process_frames, ( { if (pf->next_frame_index != ~0 && pf->next_frame_index >= i) pf->next_frame_index += n_insert; })); /* *INDENT-ON* */ r->n_next_nodes = vec_len (node->next_nodes); } /* Set frame's node runtime index. */ /* 設置節點的運行時索引,next_index是槽位號,不是索引 */ next_node = vlib_get_node (vm, node->next_nodes[next_index]); nf = nm->next_frames + r->next_frame_index + next_index; nf->node_runtime_index = next_node->runtime_index; vlib_worker_thread_node_runtime_update (); vlib_worker_thread_barrier_release (vm); }
/* 當有新的節點添加時,須要通知worker線程進行重建運行環境 */ void vlib_worker_thread_node_runtime_update (void) { /* * Make a note that we need to do a node runtime update * prior to releasing the barrier. */ vlib_global_main.need_vlib_worker_thread_node_runtime_update = 1; }
node信息發生變化後,main線程會通知其它線程進入sync狀態,need_vlib_worker_thread_node_runtime_update標誌被設置後,會進行runtime信息重建。ide
/* sync過程結束函數*/ void vlib_worker_thread_barrier_release (vlib_main_t * vm) { f64 deadline; f64 now; f64 minimum_open; f64 t_entry; f64 t_closed_total; f64 t_update_main = 0.0; int refork_needed = 0; if (vec_len (vlib_mains) < 2) return; ASSERT (vlib_get_thread_index () == 0); now = vlib_time_now (vm); /* 一對sync與release調用時間段 */ t_entry = now - vm->barrier_epoch; /* 減小遞歸深度,若是大於0表示sync還沒結束 */ if (--vlib_worker_threads[0].recursion_level > 0) { barrier_trace_release_rec (t_entry); return; } /* Update (all) node runtimes before releasing the barrier, if needed */ /* 設置了運行數據統計收集標誌,將worker線程的運行信息同步到main線程中,同時通知worker線程進行重建 */ if (vm->need_vlib_worker_thread_node_runtime_update) { /* * Lock stat segment here, so we's safe when * rebuilding the stat segment node clones from the * stat thread... */ vlib_stat_segment_lock (); /* Do stats elements on main thread */ /* 在mian線程中進行統計信息同步 */ worker_thread_node_runtime_update_internal (); vm->need_vlib_worker_thread_node_runtime_update = 0; /* Do per thread rebuilds in parallel */ refork_needed = 1; /* 設置vlib_worker_threads->node_reforks_required通知 worker線程進行runtime信息重建 */ clib_atomic_fetch_add (vlib_worker_threads->node_reforks_required, (vec_len (vlib_mains) - 1)); now = vlib_time_now (vm); t_update_main = now - vm->barrier_epoch; } ...... /* Wait for reforks before continuing */ /* 等待worker線程重建 */ if (refork_needed) { now = vlib_time_now (vm); deadline = now + BARRIER_SYNC_TIMEOUT; while (*vlib_worker_threads->node_reforks_required > 0) { if ((now = vlib_time_now (vm)) > deadline) { fformat (stderr, "%s: worker thread refork deadlock\n", __FUNCTION__); os_panic (); } } vlib_stat_segment_unlock (); } ...... }
worker線程調用函數在sync期間進行runtime信息重建。函數
static inline void vlib_worker_thread_barrier_check (void) { /* 若是main線程已經啓動了sync過程,則本線程須要進入sync狀態 */ if (PREDICT_FALSE (*vlib_worker_threads->wait_at_barrier)) { ...... if (PREDICT_FALSE (*vlib_worker_threads->node_reforks_required)) { ...... /* 進行本線程runtime信息重建 */ vlib_worker_thread_node_refork (); clib_atomic_fetch_add (vlib_worker_threads->node_reforks_required,-1); while (*vlib_worker_threads->node_reforks_required); } ...... } }
/* 重建全部的worker線程運行信息 */ void vlib_worker_thread_node_refork (void) { vlib_main_t *vm, *vm_clone; vlib_node_main_t *nm, *nm_clone; vlib_node_t **old_nodes_clone; vlib_node_runtime_t *rt, *old_rt; vlib_node_t *new_n_clone; int j; vm = vlib_mains[0]; nm = &vm->node_main; vm_clone = vlib_get_main (); nm_clone = &vm_clone->node_main; /* Re-clone error heap */ u64 *old_counters = vm_clone->error_main.counters; u64 *old_counters_all_clear = vm_clone->error_main.counters_last_clear; clib_memcpy_fast (&vm_clone->error_main, &vm->error_main, sizeof (vm->error_main)); j = vec_len (vm->error_main.counters) - 1; vec_validate_aligned (old_counters, j, CLIB_CACHE_LINE_BYTES); vec_validate_aligned (old_counters_all_clear, j, CLIB_CACHE_LINE_BYTES); vm_clone->error_main.counters = old_counters; vm_clone->error_main.counters_last_clear = old_counters_all_clear; nm_clone = &vm_clone->node_main; /* 刪除全部等待運行的幀,重建,通常來講,該向量爲空,由於只有處理完全部的幀以後纔會進入臨界區 */ vec_free (nm_clone->next_frames); nm_clone->next_frames = vec_dup_aligned (nm->next_frames, CLIB_CACHE_LINE_BYTES); for (j = 0; j < vec_len (nm_clone->next_frames); j++) { vlib_next_frame_t *nf = &nm_clone->next_frames[j]; u32 save_node_runtime_index; u32 save_flags; save_node_runtime_index = nf->node_runtime_index; save_flags = nf->flags & VLIB_FRAME_NO_FREE_AFTER_DISPATCH; vlib_next_frame_init (nf); nf->node_runtime_index = save_node_runtime_index; nf->flags = save_flags; } old_nodes_clone = nm_clone->nodes; nm_clone->nodes = 0; /* re-fork nodes */ /* Allocate all nodes in single block for speed */ new_n_clone = clib_mem_alloc_no_fail (vec_len (nm->nodes) * sizeof (*new_n_clone)); for (j = 0; j < vec_len (nm->nodes); j++) { vlib_node_t *old_n_clone; vlib_node_t *new_n; new_n = nm->nodes[j]; old_n_clone = old_nodes_clone[j]; clib_memcpy_fast (new_n_clone, new_n, sizeof (*new_n)); /* none of the copied nodes have enqueue rights given out */ new_n_clone->owner_node_index = VLIB_INVALID_NODE_INDEX; if (j >= vec_len (old_nodes_clone)) { /* new node, set to zero */ clib_memset (&new_n_clone->stats_total, 0, sizeof (new_n_clone->stats_total)); clib_memset (&new_n_clone->stats_last_clear, 0, sizeof (new_n_clone->stats_last_clear)); } else { /* Copy stats if the old data is valid */ clib_memcpy_fast (&new_n_clone->stats_total, &old_n_clone->stats_total, sizeof (new_n_clone->stats_total)); clib_memcpy_fast (&new_n_clone->stats_last_clear, &old_n_clone->stats_last_clear, sizeof (new_n_clone->stats_last_clear)); /* keep previous node state */ new_n_clone->state = old_n_clone->state; } vec_add1 (nm_clone->nodes, new_n_clone); new_n_clone++; } /* Free the old node clones */ clib_mem_free (old_nodes_clone[0]); vec_free (old_nodes_clone); /* re-clone internal nodes */ old_rt = nm_clone->nodes_by_type[VLIB_NODE_TYPE_INTERNAL]; nm_clone->nodes_by_type[VLIB_NODE_TYPE_INTERNAL] = vec_dup_aligned (nm->nodes_by_type[VLIB_NODE_TYPE_INTERNAL], CLIB_CACHE_LINE_BYTES); vec_foreach (rt, nm_clone->nodes_by_type[VLIB_NODE_TYPE_INTERNAL]) { vlib_node_t *n = vlib_get_node (vm, rt->node_index); rt->thread_index = vm_clone->thread_index; /* copy runtime_data, will be overwritten later for existing rt */ if (n->runtime_data && n->runtime_data_bytes > 0) clib_memcpy_fast (rt->runtime_data, n->runtime_data, clib_min (VLIB_NODE_RUNTIME_DATA_SIZE, n->runtime_data_bytes)); } for (j = 0; j < vec_len (old_rt); j++) { rt = vlib_node_get_runtime (vm_clone, old_rt[j].node_index); rt->state = old_rt[j].state; clib_memcpy_fast (rt->runtime_data, old_rt[j].runtime_data, VLIB_NODE_RUNTIME_DATA_SIZE); } vec_free (old_rt); /* re-clone input nodes */ old_rt = nm_clone->nodes_by_type[VLIB_NODE_TYPE_INPUT]; nm_clone->nodes_by_type[VLIB_NODE_TYPE_INPUT] = vec_dup_aligned (nm->nodes_by_type[VLIB_NODE_TYPE_INPUT], CLIB_CACHE_LINE_BYTES); vec_foreach (rt, nm_clone->nodes_by_type[VLIB_NODE_TYPE_INPUT]) { vlib_node_t *n = vlib_get_node (vm, rt->node_index); rt->thread_index = vm_clone->thread_index; /* copy runtime_data, will be overwritten later for existing rt */ if (n->runtime_data && n->runtime_data_bytes > 0) clib_memcpy_fast (rt->runtime_data, n->runtime_data, clib_min (VLIB_NODE_RUNTIME_DATA_SIZE, n->runtime_data_bytes)); } for (j = 0; j < vec_len (old_rt); j++) { rt = vlib_node_get_runtime (vm_clone, old_rt[j].node_index); rt->state = old_rt[j].state; clib_memcpy_fast (rt->runtime_data, old_rt[j].runtime_data, VLIB_NODE_RUNTIME_DATA_SIZE); } vec_free (old_rt); /* re-clone pre-input nodes */ old_rt = nm_clone->nodes_by_type[VLIB_NODE_TYPE_PRE_INPUT]; nm_clone->nodes_by_type[VLIB_NODE_TYPE_PRE_INPUT] = vec_dup_aligned (nm->nodes_by_type[VLIB_NODE_TYPE_PRE_INPUT], CLIB_CACHE_LINE_BYTES); vec_foreach (rt, nm_clone->nodes_by_type[VLIB_NODE_TYPE_PRE_INPUT]) { vlib_node_t *n = vlib_get_node (vm, rt->node_index); rt->thread_index = vm_clone->thread_index; /* copy runtime_data, will be overwritten later for existing rt */ if (n->runtime_data && n->runtime_data_bytes > 0) clib_memcpy_fast (rt->runtime_data, n->runtime_data, clib_min (VLIB_NODE_RUNTIME_DATA_SIZE, n->runtime_data_bytes)); } for (j = 0; j < vec_len (old_rt); j++) { rt = vlib_node_get_runtime (vm_clone, old_rt[j].node_index); rt->state = old_rt[j].state; clib_memcpy_fast (rt->runtime_data, old_rt[j].runtime_data, VLIB_NODE_RUNTIME_DATA_SIZE); } vec_free (old_rt); nm_clone->processes = vec_dup_aligned (nm->processes, CLIB_CACHE_LINE_BYTES); }