Ceph Threadpool Worker保活機制代碼分析

Ceph內部定義了Threadpool,Threadpool內部有不少工做線程,ceph經過一種保活機制來監控全部worker 線程的健康情況ide

Threadpool worker的工做流程以下:ui

void ThreadPool::worker(WorkThread *wt)線程

{隊列

    _lock.Lock();ci

    ldout(cct,10) << "worker start" << dendl;rem

    std::stringstream ss;get

    char name[16] = {0};string

    ceph_pthread_getname(pthread_self(), name, sizeof(name));工作流

    ss << name << " thread " << name;it

    heartbeat_handle_d *hb = cct->get_heartbeat_map()->add_worker(ss.str(), pthread_self());

    // heartbeatmap中添加worker

    while (!_stop)

    {

 

……

        if (!_pause && !work_queues.empty())

        {

            WorkQueue_* wq;

            int tries = work_queues.size();

            bool did = false;

            while (tries--)

            {

                last_work_queue++;

                last_work_queue %= work_queues.size();

                wq = work_queues[last_work_queue];

 

                void *item = wq->_void_dequeue();

                 // 有要處理的隊列項

                if (item)

                {

                    processing++;

                    ldout(cct,12) << "worker wq " << wq->name << " start processing " << item

                                  << " (" << processing << " active)" << dendl;

                    TPHandle tp_handle(cct, hb, wq->timeout_interval, wq->suicide_interval);

                    tp_handle.reset_tp_timeout();    // 重設worker線程的超時時間

                    _lock.Unlock();

                    wq->_void_process(item, tp_handle);  // 處理item

                    _lock.Lock();

                    wq->_void_process_finish(item); 

                    processing--;

                    ldout(cct,15) << "worker wq " << wq->name << " done processing " << item

                                  << " (" << processing << " active)" << dendl;

                    if (_pause || _draining)

                        _wait_cond.Signal();

                    did = true;

                    break;

                }

            }

            if (did)

                continue;

        }

 

        ldout(cct,20) << "worker waiting" << dendl;

        // 沒有消息的時候也會設置timeout,同時沒有消息的狀況下線程會在2秒自動喚醒

        cct->get_heartbeat_map()->reset_timeout(

            hb,

            cct->_conf->threadpool_default_timeout,

            0);

        _cond.WaitInterval(cct, _lock,

                           utime_t(

                               cct->_conf->threadpool_empty_queue_max_wait, 0));

    }

    ldout(cct,1) << "worker finish" << dendl;

 

    cct->get_heartbeat_map()->remove_worker(hb);

 

    _lock.Unlock();

}

 

而tp_osd_tp線程會定時檢查是否有超時的worker

bool HeartbeatMap::_check(const heartbeat_handle_d *h, const char *who, time_t now)

{

  bool healthy = true;

  time_t was;

 

  was = h->timeout.read();

  // 超時返回unhealthy

  if (was && was < now) {

    ldout(m_cct, 1) << who << " '" << h->name << "'"

    << " had timed out after " << h->grace << dendl;

    healthy = false;

  }

// 退出超時返回線程退出並assert

  was = h->suicide_timeout.read();

  if (was && was < now) {

    ldout(m_cct, 1) << who << " '" << h->name << "'"

    << " had suicide timed out after " << h->suicide_grace << dendl;

    pthread_kill(h->thread_id, SIGABRT);

    sleep(1);

    assert(0 == "hit suicide timeout");

  }

  return healthy;

}

 

void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb )

{

 

    uint32_t shard_index = thread_index % num_shards;

 

    ShardData* sdata = shard_list[shard_index];

    assert(NULL != sdata);

    sdata->sdata_op_ordering_lock.Lock();

    if (sdata->pqueue->empty())

    {

        sdata->sdata_op_ordering_lock.Unlock();

        osd->cct->get_heartbeat_map()->reset_timeout(hb,       // reset內部會調用check

                osd->cct->_conf->threadpool_default_timeout, 0);

        sdata->sdata_lock.Lock();

        sdata->sdata_cond.WaitInterval(osd->cct, sdata->sdata_lock,

                                       utime_t(osd->cct->_conf->threadpool_empty_queue_max_wait, 0));

        sdata->sdata_lock.Unlock();

        sdata->sdata_op_ordering_lock.Lock();

        if(sdata->pqueue->empty())

        {

            sdata->sdata_op_ordering_lock.Unlock();

            return;

        }

    }

相關文章
相關標籤/搜索