OSD服務端消息的接收起始於OSD::init()中的messenger::add_dispatcher_head(Dispatcher *d)函數函數
|- 358 void add_dispatcher_head(Dispatcher *d) { || 359 bool first = dispatchers.empty(); || 360 dispatchers.push_front(d); || 361 if (d->ms_can_fast_dispatch_any()) || 362 fast_dispatchers.push_front(d); || 363 if (first) || 364 ready(); //若是dispatcher list空,啓動SimpleMessenger::ready,不爲空證實SimpleMessenger已經啓動了 || 365 }
在SimpleMessenger::ready()中,啓動DispatchQueue等待mqueue,若是綁定了端口就啓動 accepter接收線程oop
76 void SimpleMessenger::ready() - 77 { | 78 ldout(cct,10) << "ready " << get_myaddr() << dendl; | 79 dispatch_queue.start(); //啓動DispatchQueue,等待mqueue | 80 | 81 lock.Lock(); | 82 if (did_bind) | 83 accepter.start(); | 84 lock.Unlock(); | 85 }
Accepter是Thread的繼承類,Accepter::start()最終調用Accepter::entry(),在entry中 accept並把接收到的sd加入到Pipe類中ui
void *Accepter::entry() { ... struct pollfd pfd; pfd.fd = listen_sd; pfd.events = POLLIN | POLLERR | POLLNVAL | POLLHUP; while (!done) { int r = poll(&pfd, 1, -1); if (pfd.revents & (POLLERR | POLLNVAL | POLLHUP)) break; // accept entity_addr_t addr; socklen_t slen = sizeof(addr.ss_addr()); int sd = ::accept(listen_sd, (sockaddr*)&addr.ss_addr(), &slen); if (sd >= 0) { errors = 0; ldout(msgr->cct,10) << "accepted incoming on sd " << sd << dendl; msgr->add_accept_pipe(sd); //註冊一個pipe,啓動讀線程,從該sd中讀取數據 } else { ldout(msgr->cct,0) << "accepter no incoming connection? sd = " << sd << " errno " << errno << " " << cpp_strerror(errno) << dendl; if (++errors > 4) break; } } ... return 0;
在SimpleMessenger::add_accept_pipe(int sd)中,申請一個Pipe類並把sd加入到Pipe中,開始Pipe::start_reader()this
340 Pipe *SimpleMessenger::add_accept_pipe(int sd) - 341 { | 342 lock.Lock(); | 343 Pipe *p = new Pipe(this, Pipe::STATE_ACCEPTING, NULL); | 344 p->sd = sd; | 345 p->pipe_lock.Lock(); | 346 p->start_reader(); | 347 p->pipe_lock.Unlock(); | 348 pipes.insert(p); | 349 accepting_pipes.insert(p); | 350 lock.Unlock(); | 351 return p; | 352 }
Pipe類內部有一個Reader和Writer線程類,Pipe::start_reader()啓動Pipe::Reader::entry(),最終啓動Pipe::reader函數spa
134 void Pipe::start_reader() - 135 { | 136 assert(pipe_lock.is_locked()); | 137 assert(!reader_running); |- 138 if (reader_needs_join) { || 139 reader_thread.join(); || 140 reader_needs_join = false; || 141 } | 142 reader_running = true; | 143 reader_thread.create("ms_pipe_read", msgr->cct->_conf->ms_rwthread_stack_bytes); | 144 }
|- 48 class Reader : public Thread { || 49 Pipe *pipe; || 50 public: || 51 explicit Reader(Pipe *p) : pipe(p) {} || 52 void *entry() { pipe->reader(); return 0; } || 53 } reader_thread;
在Pipe::reader函數中根據tag接收不一樣類型的消息,若是是CEPH_MSGR_TAG_MSG類型消息調用read_message接收消息,並把消息加入到mqueue中線程
void Pipe::reader() { pipe_lock.Lock(); if (state == STATE_ACCEPTING) { accept(); //第一次進入此函數處理 assert(pipe_lock.is_locked()); } // loop. while (state != STATE_CLOSED && state != STATE_CONNECTING) { assert(pipe_lock.is_locked()); ...... ...... else if (tag == CEPH_MSGR_TAG_MSG) { ldout(msgr->cct,20) << "reader got MSG" << dendl; Message *m = 0; int r = read_message(&m, auth_handler.get()); pipe_lock.Lock(); if (!m) { if (r < 0) fault(true); continue; } ...... ...... ...... // note last received message. in_seq = m->get_seq(); cond.Signal(); // wake up writer, to ack this ldout(msgr->cct,10) << "reader got message " << m->get_seq() << " " << m << " " << *m << dendl; in_q->fast_preprocess(m); //mds 、mon不會進入此函數,預處理 if (delay_thread) { utime_t release; if (rand() % 10000 < msgr->cct->_conf->ms_inject_delay_probability * 10000.0) { release = m->get_recv_stamp(); release += msgr->cct->_conf->ms_inject_delay_max * (double)(rand() % 10000) / 10000.0; lsubdout(msgr->cct, ms, 1) << "queue_received will delay until " << release << " on " << m << " " << *m << dendl; } delay_thread->queue(release, m); } else { if (in_q->can_fast_dispatch(m)) { reader_dispatching = true; pipe_lock.Unlock(); in_q->fast_dispatch(m); pipe_lock.Lock(); reader_dispatching = false; if (state == STATE_CLOSED || notify_on_dispatch_done) { // there might be somebody waiting notify_on_dispatch_done = false; cond.Signal(); } } else { //mds進入此else in_q->enqueue(m, m->get_priority(), conn_id); //把接收到的messenger加入到mqueue中 } } } ...... ...... } // reap? reader_running = false; reader_needs_join = true; unlock_maybe_reap(); ldout(msgr->cct,10) << "reader done" << dendl; }
在Pipe::DispatchQueue::enqueue函數中加入到mqueue中code
void DispatchQueue::enqueue(Message *m, int priority, uint64_t id) { Mutex::Locker l(lock); ldout(cct,20) << "queue " << m << " prio " << priority << dendl; add_arrival(m); if (priority >= CEPH_MSG_PRIO_LOW) { mqueue.enqueue_strict( id, priority, QueueItem(m)); } else { mqueue.enqueue( id, priority, m->get_cost(), QueueItem(m)); } cond.Signal(); //喚醒dispatch_queue.start() 啓動的dispatchThread,進入entry進行處理 }