Ceph內部定義了Threadpool,Threadpool內部有不少工做線程,ceph經過一種保活機制來監控全部worker 線程的健康情況ide
Threadpool worker的工做流程以下:ui
void ThreadPool::worker(WorkThread *wt)線程
{隊列
_lock.Lock();ci
ldout(cct,10) << "worker start" << dendl;rem
std::stringstream ss;get
char name[16] = {0};string
ceph_pthread_getname(pthread_self(), name, sizeof(name));工作流
ss << name << " thread " << name;it
heartbeat_handle_d *hb = cct->get_heartbeat_map()->add_worker(ss.str(), pthread_self());
// heartbeatmap中添加worker
while (!_stop)
{
……
if (!_pause && !work_queues.empty())
{
WorkQueue_* wq;
int tries = work_queues.size();
bool did = false;
while (tries--)
{
last_work_queue++;
last_work_queue %= work_queues.size();
wq = work_queues[last_work_queue];
void *item = wq->_void_dequeue();
// 有要處理的隊列項
if (item)
{
processing++;
ldout(cct,12) << "worker wq " << wq->name << " start processing " << item
<< " (" << processing << " active)" << dendl;
TPHandle tp_handle(cct, hb, wq->timeout_interval, wq->suicide_interval);
tp_handle.reset_tp_timeout(); // 重設worker線程的超時時間
_lock.Unlock();
wq->_void_process(item, tp_handle); // 處理item
_lock.Lock();
wq->_void_process_finish(item);
processing--;
ldout(cct,15) << "worker wq " << wq->name << " done processing " << item
<< " (" << processing << " active)" << dendl;
if (_pause || _draining)
_wait_cond.Signal();
did = true;
break;
}
}
if (did)
continue;
}
ldout(cct,20) << "worker waiting" << dendl;
// 沒有消息的時候也會設置timeout,同時沒有消息的狀況下線程會在2秒自動喚醒
cct->get_heartbeat_map()->reset_timeout(
hb,
cct->_conf->threadpool_default_timeout,
0);
_cond.WaitInterval(cct, _lock,
utime_t(
cct->_conf->threadpool_empty_queue_max_wait, 0));
}
ldout(cct,1) << "worker finish" << dendl;
cct->get_heartbeat_map()->remove_worker(hb);
_lock.Unlock();
}
而tp_osd_tp線程會定時檢查是否有超時的worker
bool HeartbeatMap::_check(const heartbeat_handle_d *h, const char *who, time_t now)
{
bool healthy = true;
time_t was;
was = h->timeout.read();
// 超時返回unhealthy
if (was && was < now) {
ldout(m_cct, 1) << who << " '" << h->name << "'"
<< " had timed out after " << h->grace << dendl;
healthy = false;
}
// 退出超時返回線程退出並assert
was = h->suicide_timeout.read();
if (was && was < now) {
ldout(m_cct, 1) << who << " '" << h->name << "'"
<< " had suicide timed out after " << h->suicide_grace << dendl;
pthread_kill(h->thread_id, SIGABRT);
sleep(1);
assert(0 == "hit suicide timeout");
}
return healthy;
}
void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb )
{
uint32_t shard_index = thread_index % num_shards;
ShardData* sdata = shard_list[shard_index];
assert(NULL != sdata);
sdata->sdata_op_ordering_lock.Lock();
if (sdata->pqueue->empty())
{
sdata->sdata_op_ordering_lock.Unlock();
osd->cct->get_heartbeat_map()->reset_timeout(hb, // reset內部會調用check
osd->cct->_conf->threadpool_default_timeout, 0);
sdata->sdata_lock.Lock();
sdata->sdata_cond.WaitInterval(osd->cct, sdata->sdata_lock,
utime_t(osd->cct->_conf->threadpool_empty_queue_max_wait, 0));
sdata->sdata_lock.Unlock();
sdata->sdata_op_ordering_lock.Lock();
if(sdata->pqueue->empty())
{
sdata->sdata_op_ordering_lock.Unlock();
return;
}
}