MySQL內核技術之「pthead局部變量」

MySQL使用了稱之爲psi/pfs的一系列文件和結構來進行performance監控。Psi全稱爲performance schema interface,pfs全稱爲performance storage。sql

該機制使用pthead來進行操做,其首先定義了pthread的線程存儲變量(pfs.cc):session

thread_local_key_t THR_PFS;
thread_local_key_t THR_PFS_VG;   // global_variables
thread_local_key_t THR_PFS_SV;   // session_variables
thread_local_key_t THR_PFS_VBT;  // variables_by_thread
thread_local_key_t THR_PFS_SG;   // global_status
thread_local_key_t THR_PFS_SS;   // session_status
thread_local_key_t THR_PFS_SBT;  // status_by_thread
thread_local_key_t THR_PFS_SBU;  // status_by_user
thread_local_key_t THR_PFS_SBH;  // status_by_host
thread_local_key_t THR_PFS_SBA;  // status_by_account

bool THR_PFS_initialized= false;

這裏的thread_local_key_t其實是pthread_key_t,即pthread線程存儲變量。pthread_key_t的使用就像一個全局變量,哪一個線程均可以用,可是實際上對應了線程內部的變量值,能夠參見該例:http://www.jianshu.com/p/d52c...。pthread規定,線程存儲變量thread_local_key_t必需要先初始化。MySQL在pfs_server.cc中對這些變量統一初始化:函數

void pre_initialize_performance_schema()
{
  pfs_initialized= false;

  init_all_builtin_memory_class();

  PFS_table_stat::g_reset_template.reset();
  global_idle_stat.reset();
  global_table_io_stat.reset();
  global_table_lock_stat.reset();

  if (my_create_thread_local_key(&THR_PFS, destroy_pfs_thread))
    return;
  if (my_create_thread_local_key(&THR_PFS_VG, NULL))  // global_variables
    return;
  if (my_create_thread_local_key(&THR_PFS_SV, NULL))  // session_variables
    return;
  if (my_create_thread_local_key(&THR_PFS_VBT, NULL)) // variables_by_thread
    return;
  if (my_create_thread_local_key(&THR_PFS_SG, NULL))  // global_status
    return;
  if (my_create_thread_local_key(&THR_PFS_SS, NULL))  // session_status
    return;
  if (my_create_thread_local_key(&THR_PFS_SBT, NULL)) // status_by_thread
    return;
  if (my_create_thread_local_key(&THR_PFS_SBU, NULL)) // status_by_user
    return;
  if (my_create_thread_local_key(&THR_PFS_SBH, NULL)) // status_by_host
    return;
  if (my_create_thread_local_key(&THR_PFS_SBA, NULL)) // status_by_account
    return;

  THR_PFS_initialized= true;
}

注意,這個初始化只作一次,之後建立線程時直接使用便可。上的第一個變量THR_PFS就是咱們要使用的。ui

如何使用

使用上的方式初始化,首先要set相應的value:線程

/**
  @brief Execute the JOIN generated by parallel

  @param join [in] JOIN structure
*/
void execute_join(parallel_execution_thread_arg* parallel_arg) {
    /*
     * Get the parameter:
     * 1. JOIN
     * 2. pfs
     */
    /// TODO: do we need to handle error?
    std::cout << "****************I am in worker thread*****************" << std::endl;

    /// Get join
    JOIN* join= parallel_arg->join;

    /// Get and Set pfs
    PSI_thread* pfs= parallel_arg->pfs;
    pfs_set_thread_v1(pfs);

    /// Delete
    delete parallel_arg;
    
    /// Set the new thread context
    my_thread_set_THR_THD(join->thd);

    /// Execute
    join->exec();
}

上面的函數是我在MySQL中新加入的代碼,其中使用pfs_set_thread_v1進行set操做,即把當前THR_PFS對應的值設置爲pfs。code

get操做。因爲咱們加入了boost線程庫,因此當啓動一個線程時須要把JOIN結構和pfs結構傳入。思路是首先經過THR_PFS得到pfs線程句柄,做爲參數傳入到新的線程中。再新線程執行函數中,把pfs線程句柄set進去。具體在sql_select.cc中,咱們加入了以下代碼:orm

/**
  Parallel execution.

  @details When a JOIN is parallel, its JOINs will execute parallelly.
  Put all JOINs into thread pool to execute.
*/
void JOIN::parallel_exec_joins() {
  for (uint i= 0; i < m_parallel_joins.size(); i ++) {
    /// Delete it in join->exec
    parallel_execution_thread_arg* parallel_arg= new parallel_execution_thread_arg();

    /// Set join
    JOIN* join= m_parallel_joins[i];
    parallel_arg->join= join;

    /// Set pfs
    PSI_thread* pfs= pfs_get_thread_v1();
    parallel_arg->pfs= pfs;

    /// Thread pool
    generic_thread_pool.SubmitTask(execute_join, (parallel_execution_thread_arg* &&)parallel_arg);
  }
}

能夠看到,咱們經過MySQL的pfs_get_thread_v1得到pfs線程句柄傳入到新的線程。server

上面的例子是針對pfs的線程。對於MySQL普通線程的例子在上面的execute_join也能找到。注意裏面有一行code:get

/// Set the new thread context
my_thread_set_THR_THD(join->thd);

這裏就是把當前的thd設置到pthread中。因此咱們看到,在MySQL中的不少地方都用到了這個東西,用法也已經明確了。it

相關文章
相關標籤/搜索