對於一個分佈式存儲系統,須要一個穩定的底層網絡通訊模塊,用於各個節點的之間的互聯互通。緩存
對於一個網絡通訊系統,要求:服務器
性能評價的兩個指標: 帶寬和延遲網絡
在網絡中斷時,實現重連。數據不丟包session
在msg的子目錄下, 分別對應三種不一樣的實現方式:Simple, Async, XIO 數據結構
Simple是相對比較簡單,目前能夠在生產環境中使用的模式。 它最大的特色是,每個連接,都建立兩個線程,一個專門用於接收,一個專門用於發送。less
這樣的模式實現起來比較簡單,可是對於大規模的集羣部署,大量的連接會產生大量的線程,佔用太多資源,影響網絡的性能。異步
Async模式使用了基於事件的IO 多路複用模式。這是目前比較通用的方式,沒有用第三方庫,實現起來比較複雜。目前還處於試驗階段。socket
XIO模式:使用了開源的的網絡通訊模塊accelio 來實現。這種須要依賴第三方庫。實現起來能夠簡單一些。可是須要對accelio的使用方式熟悉,accelio出問題要有bug fix的能力。目前也處於試驗階段。async
特別注意的是,前兩種方式只支持 tcp/ip 協議,XIO 能夠支持 Infiniband網絡。tcp
在src/msg 的目錄裏,定義了網絡模塊的接口。
在源代碼src/msg 裏實現了ceph 的網絡通訊模塊。在msg目錄下,定義了網絡通訊的抽象接口。
msg/Message.cc
msg/Message.h 定義了message
msg/Connection.h 關於connection 的接口定義
msg/Dispatcher.h 關於Dispatcher
msg/Messenger.cc 定義了Messenger
msg/Messenger.h
msg/msg_types.cc 定義了消息的類型
msg/msg_types.h
Message 是全部消息的基類,任何要發送的消息,都要繼承該類。對於消息,其發送格式以下:
1 header + user_data + footer
header是消息頭,相似一個消息的信封(envelope),保存消息的一些描述信息。user_data 是用於要發送的實際數據, footer是一個消息尾,保存了消息的效驗信息和結束標誌。
user_data
1 payload + middle + data
用戶數據在Message裏 有三部分組成: payload, 通常保存ceph操做的元數據;middle 預留,目前沒有使用到;data 通常爲讀寫的數據。
其對應的數據結構以下:
ceph_msg_header
1 struct ceph_msg_header { 2 __le64 seq; /* message seq# for this session */ 3 當前session內 消息的惟一 序號 4 __le64 tid; /* transaction id */ 5 消息的全局惟一的 id 6 __le16 type; /* message type */ 7 消息類型 8 __le16 priority; /* priority. higher value == higher priority */ 9 優先級 10 __le16 version; /* version of message encoding */ 11 版本 12 __le32 front_len; /* bytes in main payload */ 13 payload 的長度 14 __le32 middle_len; /* bytes in middle payload */ 15 middle 的長度 16 __le32 data_len; /* bytes of data payload */ 17 data 的 長度 18 __le16 data_off; /* sender: include full offset; 19 對象的數據偏移量 20 receiver: mask against ~PAGE_MASK */ 21 struct ceph_entity_name src; 22 //消息源 23 /* oldest code we think can decode this. unknown if zero. */ 24 __le16 compat_version; 25 __le16 reserved; 26 __le32 crc; /* header crc32c */ 27 } __attribute__ ((packed));
ceph_msg_footer
1 struct ceph_msg_footer { 2 __le32 front_crc, middle_crc, data_crc; 3 //分別對應crc 效驗碼 4 __le64 sig; // 消息的64位 signature 5 __u8 flags; //結束標誌 6 } __attribute__ ((packed));
Message
1 class Message{ 2 ceph_msg_header header; // 消息頭 3 ceph_msg_footer footer; // 消息尾 4 bufferlist payload; // "front" unaligned blob 5 bufferlist middle; // "middle" unaligned blob 6 bufferlist data; // data payload (page-alignment will be preserved where possible) 7 /* recv_stamp is set when the Messenger starts reading the 8 * Message off the wire */ 9 utime_t recv_stamp; //開始接收數據的時間戳 10 /* dispatch_stamp is set when the Messenger starts calling dispatch() on 11 * its endpoints */ 12 utime_t dispatch_stamp; // dispatch 的時間戳 13 /* throttle_stamp is the point at which we got throttle */ 14 utime_t throttle_stamp; 15 /* time at which message was fully read */ 16 utime_t recv_complete_stamp; //接收完成的時間戳 17 ConnectionRef connection; //連接 18 uint32_t magic; 19 bi::list_member_hook<> dispatch_q; 20 //boost::intrusive list 的 member 21 }
類Connection 就對應的一個連接,它是socket的port 對port 連接的封裝。其最重要的接口,就是能夠發送消息
1 struct Connection : public RefCountedObject { 2 mutable Mutex lock; //鎖包括 Connection的全部字段 3 Messenger *msgr; 4 RefCountedObject *priv; //私有數據 5 int peer_type; //連接的類型 6 entity_addr_t peer_addr; //對方的地址 7 utime_t last_keepalive, last_keepalive_ack; //最後一次發送keeplive的時間 和最後一次接受keepalive的時間 8 private: 9 uint64_t features; //一些feature的標誌位 10 public: 11 bool failed; // true if we are a lossy connection that has failed. 12 int rx_buffers_version; //接收緩存區的版本 13 map<ceph_tid_t,pair<bufferlist,int> > rx_buffers; //接收緩衝區 14 ceph_tid --> (buffer, rx_buffers_version) 15 }
其最重要的功能,就是發送消息的接口
virtual int send_message(Message *m) = 0;
類Dispatcher 是接收消息的接口。 其接收消息的接口爲:
1 virtual bool ms_dispatch(Message *m) = 0; 2 virtual void ms_fast_dispatch(Message *m);
不管是Server端,仍是Client 端, 都須要實現一個Dispatcher 函數,對於Server 來接收請求,對應client 端,來接收ack應答。
Messenger 是整個網絡模塊功能類的抽象。其定義了網絡模塊的基本功能接口。網絡模塊對外提供的基本的功能,就是能在節點之間發送消息。
向一個節點發送消息
virtual int send_message(Message *m, const entity_inst_t& dest) = 0;
註冊一個,用來接收消息
void add_dispatcher_head(Dispatcher *d)
經過下面的最基本的服務器和客戶端的程序的展現,瞭解如何調用網絡通訊來完成收發請求的功能。
其源代碼在 test/simple_server.cc裏,這裏只展現有關網絡部分的核心流程。
1.調用 Messenger的函數create 建立一個Messenger的實例,g_conf->ms_type爲實現的類型,目前有三種方式,simple,async,xio.
1 messenger = Messenger::create(g_ceph_context, g_conf->ms_type, 2 entity_name_t::MON(-1), 3 "simple_server", 4 0 /* nonce */);
2.設置 messenger 的屬性
1 messenger->set_magic(MSG_MAGIC_TRACE_CTR); 2 messenger->set_default_policy( 3 Messenger::Policy::stateless_server(CEPH_FEATURES_ALL, 0));
3.對於 server,須要bind 服務端地址
1 r = messenger->bind(bind_addr); 2 if (r < 0) 3 goto out; 4 common_init_finish(g_ceph_context);
4.建立一個Dispatcher,並添加到Messenger
1 dispatcher = new SimpleDispatcher(messenger); 2 messenger->add_dispatcher_head(dispatcher);
5.啓動messenger
1 messenger->start(); 2 messenger->wait(); // can't be called until ready()
SimpleDispatcher 函數裏實現了ms_dispatch,用於處理接收到的各類請求消息。
1.調用 Messenger的函數create 建立一個Messenger的實例
1 messenger = Messenger::create(g_ceph_context, g_conf->ms_type, 2 entity_name_t::MON(-1), 3 "client", 4 getpid()); 5 6 messenger->set_magic(MSG_MAGIC_TRACE_CTR); 7 messenger->set_default_policy(Messenger::Policy::lossy_client(0, 0));
3.建立Dispatcher 類並添加,用於接收消息
1 dispatcher = new SimpleDispatcher(messenger); 2 messenger->add_dispatcher_head(dispatcher); 3 dispatcher->set_active(); // this side is the pinger
4.啓動消息
1 r = messenger->start(); 2 if (r < 0) 3 goto out;
5.下面開始發送請求,先獲取目標server 的連接
1 conn = messenger->get_connection(dest_server);
6.經過Connection來發送請求消息。須要注意的是,這裏的消息發送都是異步發送,請求的ack應對消息回來後在Dispatcher的 ms_dispatch或者ms_fast_dispatch裏處理。
1 Message *m; 2 for (msg_ix = 0; msg_ix < n_msgs; ++msg_ix) { 3 /* add a data payload if asked */ 4 if (! n_dsize) { 5 m = new MPing(); 6 } else { 7 m = new_simple_ping_with_data("simple_client", n_dsize); 8 } 9 conn->send_message(m); 10 }
Simple 是ceph裏比較早,目前也比較穩定,在生產環境中使用的網絡通訊模塊。
1 Accepter accepter; 2 用於接受客戶端的連接請求 3 DispatchQueue dispatch_queue; 4 接收到的請求的消息分發隊列 5 bool did_bind; 6 是否綁定 7 /// counter for the global seq our connection protocol uses 8 __u32 global_seq; 9 /// lock to protect the global_seq 10 ceph_spinlock_t global_seq_lock; 11 /** 12 * hash map of addresses to Pipes 13 * 14 * NOTE: a Pipe* with state CLOSED may still be in the map but is considered 15 * invalid and can be replaced by anyone holding the msgr lock 16 */ 17 ceph::unordered_map<entity_addr_t, Pipe*> rank_pipe; 18 /** 19 * list of pipes are in teh process of accepting 20 * 21 * These are not yet in the rank_pipe map. 22 */ 23 set<Pipe*> accepting_pipes; 24 /// a set of all the Pipes we have which are somehow active 25 set<Pipe*> pipes; 26 /// a list of Pipes we want to tear down 27 list<Pipe*> pipe_reap_queue; 28 /// internal cluster protocol version, if any, for talking to entities of the same type. 29 int cluster_protocol;
類Accepter 用來在server端監聽,接受連接。其繼承了Thread類,自己是一個線程,能夠不斷的監聽server 的端口。
DispatchQueue 類用於把接收到的請求保存在內部, 經過其內部的線程,調用SimpleMessenger 類註冊的 Dispatch 類的處理函數來處理相應的消息。
class DispatchQueue { ...... mutable Mutex lock; Cond cond; class QueueItem { int type; ConnectionRef con; MessageRef m; ...... }; PrioritizedQueue<QueueItem, uint64_t> mqueue; //基於優先級的 優先隊列 set<pair<double, Message*> > marrival; 集合 (recv_time, message) map<Message *, set<pair<double, Message*> >::iterator> marrival_map; 消息 到 所在集合位置的 映射 };
其內部的mqueue 爲優先級隊列,用來保存消息, marriaval 保存了接收到的消息。marrival_map 保存消息在 集合中的位置。
函數DispatchQueue::enqueue 用來把接收到的消息添加到 隊列中,函數DispatchQueue::entry 爲線程的處理函數,用於調用用戶註冊的Dispatcher類相應的處理函數來處理消息 。
類Pipe 是PipeConnection的具體實現類。其實現了兩個端口之間相似管道的通訊接口。
對於每個pipe,內部有一個Reader線程 和 一個Writer 線程,分別用來處理有關這個Pipe的消息接收和發送。線程DelayedDelivery用於故障注入測試使用。
類Pipe的數據結構介紹以下:
1 SimpleMessenger *msgr; // msgr的指針 2 uint64_t conn_id; //分配給Pipe 本身惟一的id 3 char *recv_buf; //接收緩存區 4 int recv_max_prefetch; //接收緩衝區一次預期的最大值 5 int recv_ofs; //接收的偏移量 6 int recv_len; //接收的長度 7 int sd; //pipe 對應的 socked fd 8 struct iovec msgvec[IOV_MAX]; //發送消息的 iovec 結構 9 int port; //連接短褲 10 int peer_type; //連接對方的類型 11 entity_addr_t peer_addr; //對方地址 12 Messenger::Policy policy; //策略 13 Mutex pipe_lock; 14 int state; //當前連接的狀態 15 atomic_t state_closed; // non-zero iff state = STATE_CLOSED 16 PipeConnectionRef connection_state; //PipeConnection 的引用 17 utime_t backoff; // backoff time 18 bool reader_running, reader_needs_join; 19 bool reader_dispatching; /// reader thread is dispatching without pipe_lock 20 bool notify_on_dispatch_done; /// something wants a signal when dispatch done 21 bool writer_running; 22 map<int, list<Message*> > out_q; // priority queue for outbound msgs 23 準備發送的消息 優先隊列 24 DispatchQueue *in_q; //接收消息的DispatchQueue 25 list<Message*> sent; //要發送的消息 26 Cond cond; 27 bool send_keepalive; 28 bool send_keepalive_ack; 29 utime_t keepalive_ack_stamp; 30 bool halt_delivery; //if a pipe's queue is destroyed, stop adding to it 31 __u32 connect_seq, peer_global_seq; 32 uint64_t out_seq; 發送消息的序列號 33 uint64_t in_seq, in_seq_acked; 接收到消息序號和 應對的序號
1.當發送一個消息時,首先要經過Messenger類,獲取對應的 Connection
conn = messenger->get_connection(dest_server);
具體到 SimpleMessenger的實現:
首先比較,若是dest.addr 是my_inst.addr,就直接返回 local_connection
調用函數_lookup_pipe 在已經存在的Pipe中查找,若是找到,就直接返回pipe->connection_state,不然調用函數connect_rank 建立一個Pipe,並加入到msgr的register_pipe 裏
2.當得到一個Connection以後,就能夠調用 Connection 的 發送函數,發送消息
conn->send_message(m);
其最終調用了SimpleMessenger::submit_message 函數
若是Pipe 不爲空,而且狀態不是Pipe::STATE_CLOSED 狀態,調用函數pipe->_send 把發送的消息添加到out_q 發送隊列裏,觸發發送線程
若是Pipe 爲空,就調用connect_rank 建立Pipe,並把消息添加到out_q 中
3.發送線程writer把消息發送出去
經過步驟2,要發送的消息Messae已經保存在相應Pipe的out_q隊列裏。並觸發了發送線程。每一個Pipe的Writer 線程負責發送out_q 的消息,其線程入口函數爲Pipe::writer, 實現功能:
調用函數_get_next_outgoing 從out_q 中獲取消息
調用函數 write_message(header, footer, blist) 把消息的header,footer,數據blist 發送出去
1.接收消息,每一個Pipe對應的線程 Reader 用於接收消息
其入口函數爲 Pipe::reader, 其功能以下:
1)判斷當前的state,若是爲STATE_ACCEPTING, 就調用函數Pipe::accept 來接受鏈接,若是不是STATE_CLOSED,而且不是 STATE_CONNECTING 狀態,就接收消息
2)先調用函數tcp_read 來接收一個tag
3)根據tag ,來接收不一樣類型的消息
1 CEPH_MSGR_TAG_KEEPALIVE 消 2 CEPH_MSGR_TAG_KEEPALIVE2, 在CEPH_MSGR_TAG_KEEPALIVE的基礎上,添加了時間 3 CEPH_MSGR_TAG_KEEPALIVE2_ACK 4 CEPH_MSGR_TAG_ACK 5 CEPH_MSGR_TAG_MSG 這裏纔是接收的消息 6 CEPH_MSGR_TAG_CLOSE
2.調用函數read_message 來接收消息, 當本函數返回後,接完成了接收消息。
3.調用函數in_q->fast_preprocess(m) 預處理消息
4.調用函數in_q->can_fast_dispatch(m),若是能夠fast dispatch, 就in_q->fast_dispatch(m)處理。 特別注意的是,fast_dispatch 並不把消息加入到 mqueue裏,而是直接調用msgr->ms_fast_dispatch 函數,並最終調用註冊的fast_dispatcher 函數處理。
5.不然調用函數in_q->enqueue(m, m->get_priority(), conn_id) , 本函數把接收到的消息加入到DispatchQueue的mqueue 隊列裏, 由DispatchQueue的線程調用ms_dispatch處理。
在這裏,須要注意的是 ms_fast_dispath 和 ms_dispatch 兩種處理的區別。ms_dispatch 是由DispatchQueue的線程處理的,它是一個單線程;ms_fast_dispatch的調用是由Pipe的接收線程直接處理的,所以性能比前者要好。
網絡模塊,最重要的是如何處理網絡錯誤。不管是在接收消息仍是發送消息的過程當中,都會出現各類異常錯誤,包括返回異常錯誤碼,接收數據的magic驗證不一致,接收的數據的效驗驗證不一致等等。 錯誤的緣由主要是網絡自己的錯誤(物理鏈路等),或者字節跳變引發的。
處理的方法比較簡單:
函數 Pipe::fault 用來處理錯誤