[轉]ceph網絡通訊 數據結構分析1

對於一個分佈式存儲系統,須要一個穩定的底層網絡通訊模塊,用於各個節點的之間的互聯互通。緩存

對於一個網絡通訊系統,要求:服務器

  • 高性能

          性能評價的兩個指標: 帶寬和延遲網絡

  • 穩定可靠

          在網絡中斷時,實現重連。數據不丟包session


在msg的子目錄下, 分別對應三種不一樣的實現方式:Simple, Async, XIO 數據結構

Simple是相對比較簡單,目前能夠在生產環境中使用的模式。 它最大的特色是,每個連接,都建立兩個線程,一個專門用於接收,一個專門用於發送。less

這樣的模式實現起來比較簡單,可是對於大規模的集羣部署,大量的連接會產生大量的線程,佔用太多資源,影響網絡的性能。異步

Async模式使用了基於事件的IO 多路複用模式。這是目前比較通用的方式,沒有用第三方庫,實現起來比較複雜。目前還處於試驗階段。socket

XIO模式:使用了開源的的網絡通訊模塊accelio 來實現。這種須要依賴第三方庫。實現起來能夠簡單一些。可是須要對accelio的使用方式熟悉,accelio出問題要有bug fix的能力。目前也處於試驗階段。async

特別注意的是,前兩種方式只支持 tcp/ip 協議,XIO 能夠支持 Infiniband網絡。tcp

網絡通訊的接口

在src/msg 的目錄裏,定義了網絡模塊的接口。
在源代碼src/msg 裏實現了ceph 的網絡通訊模塊。在msg目錄下,定義了網絡通訊的抽象接口。

msg/Message.cc
msg/Message.h    定義了message

msg/Connection.h  關於connection 的接口定義
msg/Dispatcher.h   關於Dispatcher

msg/Messenger.cc  定義了Messenger
msg/Messenger.h

msg/msg_types.cc   定義了消息的類型
msg/msg_types.h

 

Message

Message 是全部消息的基類,任何要發送的消息,都要繼承該類。對於消息,其發送格式以下:

1  header   + user_data  +  footer

header是消息頭,相似一個消息的信封(envelope),保存消息的一些描述信息。user_data 是用於要發送的實際數據, footer是一個消息尾,保存了消息的效驗信息和結束標誌。

user_data

1    payload  + middle  + data

用戶數據在Message裏 有三部分組成: payload, 通常保存ceph操做的元數據;middle 預留,目前沒有使用到;data 通常爲讀寫的數據。

其對應的數據結構以下:

ceph_msg_header

 1 struct ceph_msg_header {
 2     __le64 seq;       /* message seq# for this session */ 
 3                    當前session內 消息的惟一 序號
 4     __le64 tid;        /* transaction id */
 5                    消息的全局惟一的 id
 6     __le16 type;      /* message type */
 7                    消息類型
 8     __le16 priority;    /* priority.  higher value == higher priority */
 9                    優先級
10     __le16 version;    /* version of message encoding */
11                    版本
12     __le32 front_len;   /* bytes in main payload */
13                      payload 的長度
14     __le32 middle_len;  /* bytes in middle payload */
15                      middle 的長度
16     __le32 data_len;    /* bytes of data payload */
17                      data 的 長度
18     __le16 data_off;    /* sender: include full offset;
19                      對象的數據偏移量
20                      receiver: mask against ~PAGE_MASK */
21     struct ceph_entity_name src;
22                      //消息源
23     /* oldest code we think can decode this.  unknown if zero. */
24     __le16 compat_version;
25     __le16 reserved;
26     __le32 crc;       /* header crc32c */
27 } __attribute__ ((packed));

ceph_msg_footer

1 struct ceph_msg_footer {
2     __le32 front_crc, middle_crc, data_crc;
3                      //分別對應crc 效驗碼
4     __le64  sig;      // 消息的64位  signature
5     __u8 flags;       //結束標誌
6 } __attribute__ ((packed));

Message

 1 class  Message{
 2   ceph_msg_header  header;      // 消息頭
 3   ceph_msg_footer  footer;       // 消息尾
 4   bufferlist         payload;       // "front" unaligned blob
 5   bufferlist         middle;        // "middle" unaligned blob
 6   bufferlist       data;     // data payload (page-alignment will be preserved where possible)
 7   /* recv_stamp is set when the Messenger starts reading the
 8    * Message off the wire */
 9   utime_t recv_stamp;   //開始接收數據的時間戳
10   /* dispatch_stamp is set when the Messenger starts calling dispatch() on
11    * its endpoints */
12   utime_t dispatch_stamp;  // dispatch 的時間戳
13   /* throttle_stamp is the point at which we got throttle */
14   utime_t throttle_stamp;
15   /* time at which message was fully read */
16   utime_t recv_complete_stamp;  //接收完成的時間戳
17   ConnectionRef connection;  //連接
18   uint32_t magic;
19   bi::list_member_hook<> dispatch_q;  
20    //boost::intrusive list 的 member
21 }

Connection

類Connection 就對應的一個連接,它是socket的port 對port 連接的封裝。其最重要的接口,就是能夠發送消息

 1 struct Connection : public RefCountedObject {
 2   mutable Mutex lock;    //鎖包括 Connection的全部字段
 3   Messenger *msgr;     
 4   RefCountedObject *priv;  //私有數據
 5   int peer_type;             //連接的類型
 6   entity_addr_t peer_addr;    //對方的地址
 7   utime_t last_keepalive, last_keepalive_ack;  //最後一次發送keeplive的時間 和最後一次接受keepalive的時間
 8 private:
 9   uint64_t features;           //一些feature的標誌位
10 public:
11   bool failed;       // true if we are a lossy connection that has failed.
12   int rx_buffers_version;  //接收緩存區的版本
13   map<ceph_tid_t,pair<bufferlist,int> > rx_buffers;  //接收緩衝區
14          ceph_tid --> (buffer, rx_buffers_version)
15 }

其最重要的功能,就是發送消息的接口

  virtual int send_message(Message *m) = 0;

Dispatcher

類Dispatcher 是接收消息的接口。 其接收消息的接口爲:

1 virtual bool ms_dispatch(Message *m) = 0;
2 virtual void ms_fast_dispatch(Message *m);

不管是Server端,仍是Client 端, 都須要實現一個Dispatcher 函數,對於Server 來接收請求,對應client 端,來接收ack應答。

Messenger

Messenger 是整個網絡模塊功能類的抽象。其定義了網絡模塊的基本功能接口。網絡模塊對外提供的基本的功能,就是能在節點之間發送消息。

向一個節點發送消息

virtual int send_message(Message *m, const entity_inst_t& dest) = 0;

註冊一個,用來接收消息

void add_dispatcher_head(Dispatcher *d)

網絡模塊的使用

經過下面的最基本的服務器和客戶端的程序的展現,瞭解如何調用網絡通訊來完成收發請求的功能。

Server 程序

其源代碼在 test/simple_server.cc裏,這裏只展現有關網絡部分的核心流程。

1.調用 Messenger的函數create 建立一個Messenger的實例,g_conf->ms_type爲實現的類型,目前有三種方式,simple,async,xio.

1  messenger = Messenger::create(g_ceph_context, g_conf->ms_type,
2                   entity_name_t::MON(-1),
3                   "simple_server",
4                   0 /* nonce */);

2.設置 messenger 的屬性

1 messenger->set_magic(MSG_MAGIC_TRACE_CTR);
2 messenger->set_default_policy(
3   Messenger::Policy::stateless_server(CEPH_FEATURES_ALL, 0));

3.對於 server,須要bind 服務端地址

1 r = messenger->bind(bind_addr);
2 if (r < 0)
3     goto out;
4 common_init_finish(g_ceph_context);

4.建立一個Dispatcher,並添加到Messenger

1 dispatcher = new SimpleDispatcher(messenger);
2 messenger->add_dispatcher_head(dispatcher); 

5.啓動messenger

1 messenger->start();
2 messenger->wait(); // can't be called until ready()

SimpleDispatcher 函數裏實現了ms_dispatch,用於處理接收到的各類請求消息。

Client 程序分析

1.調用 Messenger的函數create 建立一個Messenger的實例

1 messenger = Messenger::create(g_ceph_context, g_conf->ms_type,
2                       entity_name_t::MON(-1),
3                       "client",
4                       getpid());
5 
6 messenger->set_magic(MSG_MAGIC_TRACE_CTR);
7 messenger->set_default_policy(Messenger::Policy::lossy_client(0, 0));

3.建立Dispatcher 類並添加,用於接收消息

1 dispatcher = new SimpleDispatcher(messenger);
2 messenger->add_dispatcher_head(dispatcher);
3 dispatcher->set_active(); // this side is the pinger

4.啓動消息

1 r = messenger->start();
2 if (r < 0)
3     goto out;

5.下面開始發送請求,先獲取目標server 的連接

1 conn = messenger->get_connection(dest_server);

6.經過Connection來發送請求消息。須要注意的是,這裏的消息發送都是異步發送,請求的ack應對消息回來後在Dispatcher的 ms_dispatch或者ms_fast_dispatch裏處理。

 1 Message *m;
 2 for (msg_ix = 0; msg_ix < n_msgs; ++msg_ix) {
 3   /* add a data payload if asked */
 4   if (! n_dsize) {
 5     m = new MPing();
 6   } else {
 7     m = new_simple_ping_with_data("simple_client", n_dsize);
 8   }
 9   conn->send_message(m);
10 }

Simple

Simple 是ceph裏比較早,目前也比較穩定,在生產環境中使用的網絡通訊模塊。

SimpleMessager

 1 Accepter accepter;    
 2  用於接受客戶端的連接請求
 3 DispatchQueue dispatch_queue; 
 4  接收到的請求的消息分發隊列
 5  bool did_bind;   
 6    是否綁定
 7   /// counter for the global seq our connection protocol uses
 8   __u32 global_seq;
 9   /// lock to protect the global_seq
10   ceph_spinlock_t global_seq_lock;
11   /**
12    * hash map of addresses to Pipes
13    *
14    * NOTE: a Pipe* with state CLOSED may still be in the map but is considered
15    * invalid and can be replaced by anyone holding the msgr lock
16    */
17   ceph::unordered_map<entity_addr_t, Pipe*> rank_pipe;
18   /**
19    * list of pipes are in teh process of accepting
20    *
21    * These are not yet in the rank_pipe map.
22    */
23   set<Pipe*> accepting_pipes;
24   /// a set of all the Pipes we have which are somehow active
25   set<Pipe*>      pipes;
26   /// a list of Pipes we want to tear down
27   list<Pipe*>     pipe_reap_queue;
28   /// internal cluster protocol version, if any, for talking to entities of the same type.
29   int cluster_protocol;

Accepter

類Accepter 用來在server端監聽,接受連接。其繼承了Thread類,自己是一個線程,能夠不斷的監聽server 的端口。

 

DispatchQueue

DispatchQueue 類用於把接收到的請求保存在內部, 經過其內部的線程,調用SimpleMessenger 類註冊的 Dispatch 類的處理函數來處理相應的消息。

class DispatchQueue {
  ......
  mutable Mutex lock;
  Cond cond;
  class QueueItem {
    int type;
    ConnectionRef con;
MessageRef m;
......
  };
  PrioritizedQueue<QueueItem, uint64_t> mqueue;    //基於優先級的 優先隊列
  set<pair<double, Message*> > marrival;  
     集合 (recv_time, message) 
  map<Message *, set<pair<double, Message*> >::iterator> marrival_map;
    消息 到  所在集合位置的 映射
};

其內部的mqueue 爲優先級隊列,用來保存消息, marriaval 保存了接收到的消息。marrival_map 保存消息在 集合中的位置。

函數DispatchQueue::enqueue 用來把接收到的消息添加到 隊列中,函數DispatchQueue::entry 爲線程的處理函數,用於調用用戶註冊的Dispatcher類相應的處理函數來處理消息 。

Pipe

類Pipe 是PipeConnection的具體實現類。其實現了兩個端口之間相似管道的通訊接口。

對於每個pipe,內部有一個Reader線程 和 一個Writer 線程,分別用來處理有關這個Pipe的消息接收和發送。線程DelayedDelivery用於故障注入測試使用。

類Pipe的數據結構介紹以下:

 1 SimpleMessenger *msgr;   //   msgr的指針
 2 uint64_t conn_id;     //分配給Pipe 本身惟一的id
 3 char *recv_buf;         //接收緩存區
 4 int recv_max_prefetch;   //接收緩衝區一次預期的最大值
 5 int recv_ofs;            //接收的偏移量
 6 int recv_len;            //接收的長度
 7 int sd;               //pipe 對應的 socked fd
 8 struct iovec msgvec[IOV_MAX];   //發送消息的 iovec 結構
 9 int port;       //連接短褲
10 int peer_type;  //連接對方的類型
11 entity_addr_t peer_addr;  //對方地址
12 Messenger::Policy policy;   //策略
13 Mutex pipe_lock;
14 int state;            //當前連接的狀態
15 atomic_t state_closed;    // non-zero iff state = STATE_CLOSED
16 PipeConnectionRef connection_state;   //PipeConnection 的引用
17 utime_t backoff;         // backoff time
18 bool reader_running, reader_needs_join;
19 bool reader_dispatching;    /// reader thread is dispatching without pipe_lock
20 bool notify_on_dispatch_done;   /// something wants a signal when dispatch done
21 bool writer_running;
22 map<int, list<Message*> > out_q;  // priority queue for outbound msgs
23    準備發送的消息 優先隊列
24 DispatchQueue *in_q;   //接收消息的DispatchQueue
25 list<Message*> sent;   //要發送的消息
26 Cond cond;
27 bool send_keepalive;
28 bool send_keepalive_ack;
29 utime_t keepalive_ack_stamp;
30 bool halt_delivery;     //if a pipe's queue is destroyed, stop adding to it
31 __u32 connect_seq, peer_global_seq;
32 uint64_t out_seq;      發送消息的序列號 
33 uint64_t in_seq, in_seq_acked;  接收到消息序號和 應對的序號

 

消息的發送

1.當發送一個消息時,首先要經過Messenger類,獲取對應的 Connection
  conn = messenger->get_connection(dest_server);

具體到 SimpleMessenger的實現:


首先比較,若是dest.addr 是my_inst.addr,就直接返回 local_connection
調用函數_lookup_pipe 在已經存在的Pipe中查找,若是找到,就直接返回pipe->connection_state,不然調用函數connect_rank 建立一個Pipe,並加入到msgr的register_pipe 裏


2.當得到一個Connection以後,就能夠調用 Connection 的 發送函數,發送消息
      conn->send_message(m);

其最終調用了SimpleMessenger::submit_message 函數


若是Pipe 不爲空,而且狀態不是Pipe::STATE_CLOSED 狀態,調用函數pipe->_send 把發送的消息添加到out_q 發送隊列裏,觸發發送線程
若是Pipe 爲空,就調用connect_rank 建立Pipe,並把消息添加到out_q 中


3.發送線程writer把消息發送出去
經過步驟2,要發送的消息Messae已經保存在相應Pipe的out_q隊列裏。並觸發了發送線程。每一個Pipe的Writer 線程負責發送out_q 的消息,其線程入口函數爲Pipe::writer, 實現功能:


調用函數_get_next_outgoing 從out_q 中獲取消息
調用函數 write_message(header, footer, blist) 把消息的header,footer,數據blist 發送出去

消息的接收

1.接收消息,每一個Pipe對應的線程 Reader 用於接收消息

  其入口函數爲  Pipe::reader, 其功能以下:

       1)判斷當前的state,若是爲STATE_ACCEPTING, 就調用函數Pipe::accept 來接受鏈接,若是不是STATE_CLOSED,而且不是     STATE_CONNECTING 狀態,就接收消息
       2)先調用函數tcp_read 來接收一個tag
       3)根據tag ,來接收不一樣類型的消息

1 CEPH_MSGR_TAG_KEEPALIVE 消
2 CEPH_MSGR_TAG_KEEPALIVE2, 在CEPH_MSGR_TAG_KEEPALIVE的基礎上,添加了時間
3 CEPH_MSGR_TAG_KEEPALIVE2_ACK
4 CEPH_MSGR_TAG_ACK
5 CEPH_MSGR_TAG_MSG   這裏纔是接收的消息
6 CEPH_MSGR_TAG_CLOSE

2.調用函數read_message 來接收消息, 當本函數返回後,接完成了接收消息。

3.調用函數in_q->fast_preprocess(m) 預處理消息
4.調用函數in_q->can_fast_dispatch(m),若是能夠fast dispatch,  就in_q->fast_dispatch(m)處理。 特別注意的是,fast_dispatch 並不把消息加入到 mqueue裏,而是直接調用msgr->ms_fast_dispatch 函數,並最終調用註冊的fast_dispatcher 函數處理。
5.不然調用函數in_q->enqueue(m, m->get_priority(), conn_id) , 本函數把接收到的消息加入到DispatchQueue的mqueue 隊列裏, 由DispatchQueue的線程調用ms_dispatch處理。


在這裏,須要注意的是 ms_fast_dispath 和 ms_dispatch 兩種處理的區別。ms_dispatch 是由DispatchQueue的線程處理的,它是一個單線程;ms_fast_dispatch的調用是由Pipe的接收線程直接處理的,所以性能比前者要好。

錯誤處理

網絡模塊,最重要的是如何處理網絡錯誤。不管是在接收消息仍是發送消息的過程當中,都會出現各類異常錯誤,包括返回異常錯誤碼,接收數據的magic驗證不一致,接收的數據的效驗驗證不一致等等。 錯誤的緣由主要是網絡自己的錯誤(物理鏈路等),或者字節跳變引發的。

處理的方法比較簡單:

  • 關閉鏈接
  • 從新創建鏈接
  • 從新發送沒有接受到ack應對的消息

函數 Pipe::fault 用來處理錯誤

  1. 調用shutdown_socket 關閉Pipe 的socket
  2. 調用函數requeue_sent  把沒有收到ack的消息從新加入發送隊列,當發送隊列有請求時,發送線程會不斷的嘗試從新鏈接
相關文章
相關標籤/搜索