基於 Asio 的 C++ 網絡編程

環境: Boost v1.66, VS 2013 & 2015html

說明:
這篇教程造成於 Boost v1.62 時代,最近(2018/01)針對 v1.66 作了一次大的更新。
此外,在代碼風格上,C++11 用得更多了。git


概述

近期學習 Boost Asio,依葫蘆畫瓢,寫了很多例子,對這個「輕量級」的網絡庫算是有了必定理解。可是秉着理論與實踐結合的態度,決定寫一篇教程,把腦子裏只知其一;不知其二的東西,試圖說清楚。github

Asio,即「異步 IO」(Asynchronous Input/Output),本是一個 獨立的 C++ 網絡程序庫,彷佛並不爲人所知,後來由於被 Boost 相中,才聲名鵲起。segmentfault

從設計上來看,Asio 類似且重度依賴於 Boost,與 thread、bind、smart pointers 等結合時,體驗順滑。從使用上來看,依然是重組合而輕繼承,一向的 C++ 標準庫風格。緩存

什麼是「異步 IO」?服務器

簡單來講,就是你發起一個 IO 操做,卻不用等它結束,你能夠繼續作其餘事情,當它結束時,你會獲得通知。網絡

固然這種表述是不精確的,操做系統並無直接提供這樣的機制。以 Unix 爲例,有五種 IO 模型可用:異步

  • 阻塞 I/O
  • 非阻塞 I/O
  • I/O 多路複用(multiplexing)(selectpoll
  • 信號驅動 I/O(SIGIO
  • 異步 I/O(POSIX aio_ 系列函數)

這五種模型的定義和比較,詳見「Unix Network Programming, Volume 1: The Sockets Networking API」一書 6.2 節,或者可參考 這篇筆記socket

Asio 封裝的正是「I/O 多路複用」。具體一點,epoll 之於 Linux,kqueue 之於 Mac 和 BSD。epollkqueueselectpoll 更高效。固然在 Windows 上封裝的則是 IOCP(完成端口)。async

Asio 的「I/O 操做」,主要仍是指「網絡 IO」,好比 socket 讀寫。因爲網絡傳輸的特性,「網絡 IO」相對比較費時,設計良好的服務器,不可能同步等待一個 IO 操做的結束,這太浪費 CPU 了。

對於普通的「文件 IO」,操做系統並無提供「異步」讀寫機制,libuv 的作法是用線程模擬異步,爲網絡和文件提供了一致的接口。Asio 並無這樣作,它專一於網絡。提供機制而不是策略,這很符合 C++ 哲學。

下面以示例,由淺到深,由簡單到複雜,逐一介紹 Asio 的用法。
簡單起見,頭文件一概省略。

I/O Context

每一個 Asio 程序都至少有一個 io_context 對象,它表明了操做系統的 I/O 服務(io_context 在 Boost 1.66 以前一直叫 io_service),把你的程序和這些服務連接起來。

下面這個程序空有 io_context 對象,卻沒有任何異步操做,因此它其實什麼也沒作,也沒有任何輸出。

int main() {
  boost::asio::io_context ioc;
  ioc.run();
  return 0;
}

io_context.run 是一個阻塞(blocking)調用,姑且把它想象成一個 loop(事件循環),直到全部異步操做完成後,loop 才結束,run 才返回。可是這個程序沒有任何異步操做,因此 loop 直接就結束了。

Timer

有了 io_context 還不足以完成 I/O 操做,用戶通常也不跟 io_context 直接交互。

根據 I/O 操做的不一樣,Asio 提供了不一樣的 I/O 對象,好比 timer(定時器),socket,等等。
Timer 是最簡單的一種 I/O 對象,能夠用來實現異步調用的超時機制,下面是最簡單的用法:

void Print(boost::system::error_code ec) {
  std::cout << "Hello, world!" << std::endl;
}

int main() {
  boost::asio::io_context ioc;
  boost::asio::deadline_timer timer(ioc, boost::posix_time::seconds(3));
  timer.async_wait(&Print);
  ioc.run();
  return 0;
}

先建立一個 deadline_timer,指定時間 3 秒,而後異步等待這個 timer,3 秒後,timer 超時結束,Print 被調用。

如下幾點須要注意:

  • 全部 I/O 對象都依賴 io_context,通常在構造時指定。
  • async_wait 初始化了一個異步操做,可是這個異步操做的執行,要等到 io_context.run 時纔開始。
  • Timer 除了異步等待(async_wait),還能夠同步等待(wait)。同步等待是阻塞的,直到 timer 超時結束。基本上全部 I/O 對象的操做都有同步和異步兩個版本,也許是出於設計上的完整性。
  • async_wait 的參數是一個函數對象,異步操做完成時它會被調用,因此也叫 completion handler,簡稱 handler,能夠理解成回調函數。
  • 全部 I/O 對象的 async_xyz 函數都有 handler 參數,對於 handler 的簽名,不一樣的異步操做有不一樣的要求,除了官方文檔裏的說明,也能夠直接查看 Boost 源碼。

async_wait 的 handler 簽名爲 void (boost::system::error_code),若是要傳遞額外的參數,就得用 bind。不妨修改一下 Print,讓它每隔一秒打印一次計數,從 0 遞增到 3

void Print(boost::system::error_code ec,
           boost::asio::deadline_timer* timer,
           int* count) {
  if (*count < 3) {
    std::cout << *count << std::endl;
    ++(*count);

    timer->expires_at(timer->expires_at() + boost::posix_time::seconds(1));
    
    timer->async_wait(std::bind(&Print, std::placeholders::_1, timer, count));
  }
}

與前版相比,Print 多了兩個參數,以便訪問當前計數及重啓 timer。

int main() {
  boost::asio::io_context ioc;
  boost::asio::deadline_timer timer(ioc, boost::posix_time::seconds(1));
  int count = 0;
  timer.async_wait(std::bind(&Print, std::placeholders::_1, &timer, &count));

  ioc.run();
  return 0;
}

調用 bind 時,使用了佔位符(placeholder)std::placeholders::_1。數字佔位符共有 9 個,_1 - _9。佔位符也有不少種寫法,這裏就不詳述了。

Echo Server

Socket 也是一種 I/O 對象,這一點前面已經說起。相比於 timer,socket 更爲經常使用,畢竟 Asio 是一個網絡程序庫。

下面以經典的 Echo 程序爲例,實現一個 TCP Server。所謂 Echo,就是 Server 把 Client 發來的內容原封不動發回給 Client。

先從同步方式開始,異步太複雜,慢慢來。

同步方式

Session 表明會話,負責管理一個 client 的鏈接。參數 socket 傳的是值,可是會用到 move 語義來避免拷貝。

void Session(tcp::socket socket) {
  try {
    while (true) {
      boost::array<char, BUF_SIZE> data;

      boost::system::error_code ec;
      std::size_t length = socket.read_some(boost::asio::buffer(data), ec);

      if (ec == boost::asio::error::eof) {
        std::cout << "鏈接被 client 妥善的關閉了" << std::endl;
        break;
      } else if (ec) {
        // 其餘錯誤
        throw boost::system::system_error(ec);
      }

      boost::asio::write(socket, boost::asio::buffer(data, length));
    }
  } catch (const std::exception& e) {
    std::cerr << "Exception: " <<  e.what() << std::endl;
  }
}

其中,tcpboost::asio::ip::tcpBUF_SIZE 定義爲 enum { BUF_SIZE = 1024 };。這些都是細節,後面的例子再也不贅述。

int main(int argc, char* argv[]) {
  if (argc != 2) {
    std::cerr << "Usage: " << argv[0] << " <port>" << std::endl;
    return 1;
  }

  unsigned short port = std::atoi(argv[1]);

  boost::asio::io_context ioc;

  // 建立 Acceptor 偵聽新的鏈接
  tcp::acceptor acceptor(ioc, tcp::endpoint(tcp::v4(), port));

  try {
    // 一次處理一個鏈接
    while (true) {
      Session(acceptor.accept());
    }
  } catch (const std::exception& e) {
    std::cerr << "Exception: " <<  e.what() << std::endl;
  }

  return 0;
}

啓動時,經過命令行參數指定端口號,好比:

$ echo_server_sync 8080

由於 Client 部分還未實現,先用 netcat 測試一下:

$ nc localhost 8080
hello
hello

如下幾點須要注意:

  • tcp::acceptor 也是一種 I/O 對象,用來接收 TCP 鏈接,鏈接端口由 tcp::endpoint 指定。
  • 數據 buffer 以 boost::array<char, BUF_SIZE> 表示,也能夠用 char data[BUF_SIZE],或 std::vector<char> data(BUF_SIZE)。事實上,用 std::vector 是最推薦的,由於它不但能夠動態調整大小,還支持 Buffer Debugging
  • 同步方式下,沒有調用 io_context.run,由於 acceptread_somewrite 都是阻塞的。這也意味着一次只能處理一個 Client 鏈接,可是能夠連續 echo,除非 Client 斷開鏈接。
  • 寫回數據時,沒有直接調用 socket.write_some,由於它不能保證一次寫完全部數據,可是 boost::asio::write 能夠。我以爲這是 Asio 接口設計不周,應該提供 socket.write
  • acceptor.accept 返回一個新的 socket 對象,利用 move 語義,直接就轉移給了 Session 的參數,期間並無拷貝開銷。

異步方式

異步方式下,困難在於對象的生命週期,能夠用 shared_ptr 解決。

爲了同時處理多個 Client 鏈接,須要保留每一個鏈接的 socket 對象,因而抽象出一個表示鏈接會話的類,叫 Session

class Session : public std::enable_shared_from_this<Session> {
public:
  Session(tcp::socket socket) : socket_(std::move(socket)) {
  }

  void Start() {
    DoRead();
  }

  void DoRead() {
    auto self(shared_from_this());
    socket_.async_read_some(
        boost::asio::buffer(buffer_),
        [this, self](boost::system::error_code ec, std::size_t length) {
          if (!ec) {
            DoWrite(length);
          }
        });
  }

  void DoWrite(std::size_t length) {
    auto self(shared_from_this());
    boost::asio::async_write(
        socket_,
        boost::asio::buffer(buffer_, length),
        [this, self](boost::system::error_code ec, std::size_t length) {
          if (!ec) {
            DoRead();
          }
        });
  }

private:
  tcp::socket socket_;
  std::array<char, BUF_SIZE> buffer_;
};

就代碼風格來講,有如下幾點須要注意:

  • 優先使用 STL,好比 std::enable_shared_from_thisstd::bindstd::array,等等。
  • 定義 handler 時,儘可能使用匿名函數(lambda 表達式)。
  • 以 C++ std::size_t 替 C size_t

剛開始,你可能會不習慣,我也是這樣,過了很久才慢慢擁抱 C++11 乃至 C++14。

Session 有兩個成員變量,socket_ 與 Client 通訊,buffer_ 是接收 Client 數據的緩存。只要 Session 對象在,socket 就在,鏈接就不斷。Socket 對象是構造時傳進來的,並且是經過 move 語義轉移進來的。

雖然還沒看到 Session 對象是如何建立的,但能夠確定的是,它必須用 std::shared_ptr 進行封裝,這樣才能保證異步模式下對象的生命週期。

此外,在 Session::DoReadSession::DoWrite 中,由於讀寫都是異步的,一樣爲了防止當前 Session 不被銷燬(由於超出做用域),因此要增長它的引用計數,即 auto self(shared_from_this()); 這一句的做用。

至於讀寫的邏輯,基本上就是把 read_some 換成 async_read_some,把 write 換成 async_write,而後以匿名函數做爲 completion handler。

接收 Client 鏈接的代碼,提取出來,抽象成一個類 Server

class Server {
public:
  Server(boost::asio::io_context& ioc, std::uint16_t port)
      : acceptor_(ioc, tcp::endpoint(tcp::v4(), port)) {
    DoAccept();
  }

private:
  void DoAccept() {
    acceptor_.async_accept(
        [this](boost::system::error_code ec, tcp::socket socket) {
          if (!ec) {
            std::make_shared<Session>(std::move(socket))->Start();
          }
          DoAccept();
        });
  }

private:
  tcp::acceptor acceptor_;
};

一樣,async_accept 替換了 acceptasync_accept 再也不阻塞,DoAccept 即刻就會返回。
爲了保證 Session 對象繼續存在,使用 std::shared_ptr 代替普通的棧對象,同時把新接收的 socket 對象轉移過去。

最後是 main()

int main(int argc, char* argv[]) {
  if (argc != 2) {
    std::cerr << "Usage: " << argv[0] << " <port>" << std::endl;
    return 1;
  }

  std::uint16_t port = std::atoi(argv[1]);

  boost::asio::io_context ioc;
  Server server(ioc, port);

  ioc.run();
  return 0;
}

Echo Client

雖然用 netcat 測試 Echo Server 很是方便,可是本身動手寫一個 Echo Client 仍然十分必要。
仍是先考慮同步方式。

同步方式

首先經過 hostport 解析出 endpoints(對,是複數!):

tcp::resolver resolver(ioc);
auto endpoints = resolver.resolve(tcp::v4(), host, port);

resolve 返回的 endpoints 類型爲 tcp::resolver::results_type,代之以 auto 能夠簡化代碼。類型推導應適當使用,至於連 int 都用 auto 就沒有必要了。
hostport 經過命令行參數指定,好比 localhost8080

接着建立 socket,創建鏈接:

tcp::socket socket(ioc);
boost::asio::connect(socket, endpoints);

這裏沒有直接調用 socket.connect,由於 endpoints 可能會有多個,boost::asio::connect 會挨個嘗試,逐一調用 socket.connect 直到鏈接成功。

其實這樣說不太嚴謹,根據個人測試,resolve 在沒有指定 protocol 時,確實會返回多個 endpoints,一個是 IPv6,一個是 IPv4。可是咱們已經指定了 protocol 爲 tcp::v4()

resolver.resolve(tcp::v4(), host, port)

因此,應該只有一個 endpoint。

接下來,從標準輸入(std::cin)讀一行數據,而後經過 boost::asio::write 發送給 Server:

char request[BUF_SIZE];
    std::size_t request_length = 0;
    do {
      std::cout << "Enter message: ";
      std::cin.getline(request, BUF_SIZE);
      request_length = std::strlen(request);
    } while (request_length == 0);

    boost::asio::write(socket, boost::asio::buffer(request, request_length));

do...while 是爲了防止用戶直接 Enter 致使輸入爲空。boost::asio::write 是阻塞調用,發送完才返回。

從 Server 同步接收數據有兩種方式:

  • 使用 boost::asio::read(對應於 boost::asio::write);
  • 使用 socket.read_some

二者的差異是,boost::asio::read 讀到指定長度時,就會返回,你須要知道你想讀多少;而 socket.read_some 一旦讀到一些數據就會返回,因此必須放在循環裏,而後手動判斷是否已經讀到想要的長度,不然沒法退出循環。

下面分別是兩種實現的代碼。

使用 boost::asio::read

char reply[BUF_SIZE];
    std::size_t reply_length = boost::asio::read(
        socket,
        boost::asio::buffer(reply, request_length));

    std::cout.write(reply, reply_length);

使用 socket.read_some

std::size_t total_reply_length = 0;
    while (true) {
      std::array<char, BUF_SIZE> reply;
      std::size_t reply_length = socket.read_some(boost::asio::buffer(reply));

      std::cout.write(reply.data(), reply_length);

      total_reply_length += reply_length;
      if (total_reply_length >= request_length) {
        break;
      }
    }

不難看出,socket.read_some 用起來更爲複雜。
Echo 程序的特殊之處就是,你能夠假定 Server 會原封不動的把請求發回來,因此你知道 Client 要讀多少。
可是不少時候,咱們不知道要讀多少數據。
因此,socket.read_some 反倒更爲實用。

此外,在這個例子中,咱們沒有爲各函數指定輸出參數 boost::system::error_code,而是使用了異常,把整個代碼塊放在 try...catch 中。

try {
  // ...
} catch (const std::exception& e) {
  std::cerr << e.what() << std::endl;
}

Asio 的 API 基本都經過重載(overload),提供了 error_codeexception 兩種錯誤處理方式。使用異常更易於錯誤處理,也能夠簡化代碼,可是 try...catch 該包含多少代碼,並非那麼明顯,新手很容易誤用,什麼都往 try...catch 裏放。

通常來講,異步方式下,使用 error_code 更方便一些。因此 complete handler 的參數都有 error_code

異步方式

就 Client 來講,異步也許並不是必要,除非想同時鏈接多個 Server。

異步讀寫前面已經涉及,咱們就先看 async_resolveasync_connect

首先,抽取出一個類 Client

class Client {
public:
  Client(boost::asio::io_context& ioc,
         const std::string& host, const std::string& port)
      : socket_(ioc), resolver_(ioc) {
  }

private:
  tcp::socket socket_;
  tcp::resolver resolver_;

  char cin_buf_[BUF_SIZE];
  std::array<char, BUF_SIZE> buf_;
};

resolver_ 是爲了 async_resolve,做爲成員變量,生命週期便獲得了保證,不會由於函數結束而失效。

下面來看 async_resolve 實現(代碼在構造函數中):

Client(...) {
  resolver_.async_resolve(tcp::v4(), host, port,
                          std::bind(&Client::OnResolve, this,
                                    std::placeholders::_1,
                                    std::placeholders::_2));
}

async_resolve 的 handler:

void OnResolve(boost::system::error_code ec,
               tcp::resolver::results_type endpoints) {
  if (ec) {
    std::cerr << "Resolve: " << ec.message() << std::endl;
  } else {
    boost::asio::async_connect(socket_, endpoints,
                               std::bind(&Client::OnConnect, this,
                                         std::placeholders::_1,
                                         std::placeholders::_2));
  }
}

async_connect 的 handler:

void OnConnect(boost::system::error_code ec, tcp::endpoint endpoint) {
  if (ec) {
    std::cout << "Connect failed: " << ec.message() << std::endl;
    socket_.close();
  } else {
    DoWrite();
  }
}

鏈接成功後,調用 DoWrite,從標準輸入讀取一行數據,而後異步發送給 Server。
下面是異步讀寫相關的函數,一併給出:

void DoWrite() {
  std::size_t len = 0;
  do {
    std::cout << "Enter message: ";
    std::cin.getline(cin_buf_, BUF_SIZE);
    len = strlen(cin_buf_);
  } while (len == 0);

  boost::asio::async_write(socket_,
                           boost::asio::buffer(cin_buf_, len),
                           std::bind(&Client::OnWrite, this,
                                     std::placeholders::_1));
}

void OnWrite(boost::system::error_code ec) {
  if (!ec) {
    std::cout << "Reply is: ";

    socket_.async_read_some(boost::asio::buffer(buf_),
                            std::bind(&Client::OnRead, this,
                                      std::placeholders::_1,
                                      std::placeholders::_2));
  }
}

void OnRead(boost::system::error_code ec, std::size_t length) {
  if (!ec) {
    std::cout.write(buf_.data(), length);
    std::cout << std::endl;
    // 若是想繼續下一輪,能夠在這裏調用 DoWrite()。
  }
}

異步讀寫在異步 Server 那一節已經介紹過,這裏就再也不贅述了。

最後是 main()

int main(int argc, char* argv[]) {
  if (argc != 3) {
    std::cerr << "Usage: " << argv[0] << " <host> <port>" << std::endl;
    return 1;
  }

  const char* host = argv[1];
  const char* port = argv[2];

  boost::asio::io_context ioc;
  Client client(ioc, host, port);

  ioc.run();
  return 0;
}

至此,異步方式的 Echo Client 就算實現了。

爲了不文章太長,Asio 的介紹暫時先告一段落。如有補遺,會另行記錄。

完整及更加豐富的示例代碼,請移步 GitHub

相關文章
相關標籤/搜索