用現代 C++ 寫一個高性能的服務器

本文由 伯樂在線 - 袁欣 翻譯,艾凌風 校稿。未經許可,禁止轉載!
英文出處:James Perry。歡迎加入翻譯組git

首先感謝你們對上一篇博文《用 C++ 開啓技術創業之旅》的反饋。在上篇文章中我提到我曾在一天內就憑藉 Facebook 的 Wangle 搭建起一個數據庫引擎的原型,在這裏我會解釋我是如何作到的。到本文最後,你將能夠用 Wangle 編寫出一個高性能的C++服務器。本文也將做爲教程整合進 Wangle 的 ReadMe.md 中。github

我將展現如何使用現代C++編寫一個Echo服務器,至關於分佈式系統開發中的「Hello World」。這個服務器會將接收的消息直接返回。咱們同時須要一個能夠向咱們的服務器發動消息的客戶端,在這裏能夠發現客戶端的源碼數據庫

Wangle是一個用來搭建事件驅動的現代異步C++服務的C/S應用框架。Wangle最基本的抽象概念就是Pipeline(管線)。可以理解這種抽象,將會很容易寫出各類複雜的現代C++服務,另外一個重要的概念是Service(服務),其能夠看做一種更高級的Pipeline,不過超出了本文咱們關注的範疇。bootstrap

PipeLine

pipeline 是 Wangle 中最重要也是最強大的抽象,可讓用戶在定製 request 和 response 的實現時擁有很大的自由。一個pipeline就是一系列request/response控制程序的嵌套。我試圖尋找一個真實世界中pipeline的類比,惟一我能想到的就是現實世界工廠中的生產線。一條生產線工做在一種順序模式下,全部的工人取得一個物體,而且只添加一種修改,再將其發送給上游的工人直到整個產品製造完成。這可能不是一個特別好的比喻,由於流水線上產品的流動是單向的,而一個pipeline能控制反方向的數據流動--就好像將成品分解成原材料。緩存

一個Wangle handler能夠同時掌控上游和下游的兩個方向的數據流動。當你把全部的handler鏈接在一塊兒,就能夠用一種靈活的方式將原始數據組裝爲想要的數據類型或者將已有的數據拆分。安全

在咱們的服務器的pipeline中大體將會有下面幾種handler:服務器

1.Handler 1 (下文的上游下游是指對一同個handler而言,根據其在pipeline中的位置不一樣,輸入輸出相反) 上游:將從socket中接收的二進制數據流寫入一個零拷貝(zero-copy,指省略了Applicaion context和Kernel context之間的上下文切換,避免了CPU對Buffer的冗餘拷貝,直接在Kernel級別進行數據傳輸的技術,詳情請參閱維基百科)的字節緩存中,發送給handler2數據結構

下游:接收一個零拷貝的字節緩存,將其內容寫入socket中app

2.Handler2 上游:接收handler1的緩存對象,解碼爲一個string對象傳遞給handler3 下游:接收handler3的string對象,將其轉碼爲一個零拷貝的字節緩存,發送給handler1框架

3.Handler3 上游:接收handler2中的string對象,再向下發送至pipeline等待寫回客戶端。string會發回handler2 下游:接收上游的string對象,傳遞給handler2

須要注意的一點是,每個handler應當只作一件事而且只有一件,若是你有一個handler裏作了多項任務,好比從二進制流離直接解碼出string,那麼你須要學會將它拆分。這對提高代碼的可維護性和擴展性很是重要。

另外,沒錯,handler不是線程安全的,因此不要輕易的在其中使用任何沒有通過mutex,atomic lock保護的數據,若是你確實須要一個線程安全的環境,Folly提供了一種免於加鎖的數據結構, Folly依賴於Wangle,你能夠很容易的在項目中引入並使用它。

若是你還不是很明白全部的步驟,不用着急,在看到下面的具體實現時你會更加清楚。

Echo Server

下面我會展現服務器的具體實現。我假定您已經安裝好Wangle。須要注意的是截至目前Wangle還不能在Mac OS上安裝,我建議您能夠安裝虛擬機,使用Ubuntu來安裝Wangle。

這就是echo handler:接收一個string,打印到stdout中,再發送回pipeline。要注意write語句中的定界符不能夠省略,由於pipeline會按照字節解碼。

C++

 

1

2

3

4

5

6

7

8

9

// the main logic of our echo server; receives a string and writes it straight

// back

class EchoHandler : public HandlerAdapter {

public:

  virtual void read(Context* ctx, std::string msg) override {

    std::cout << "handling " << msg << std::endl;

    write(ctx, msg + "rn");

  }

};

Echohandler實際上是咱們pipeline的最後一個handler,如今咱們須要建立一個PipelineFactory來控制全部的request和response。

C++

 

1

2

3

4

5

6

7

8

9

10

11

12

13

// where we define the chain of handlers for each messeage received

class EchoPipelineFactory : public PipelineFactory {

public:

  EchoPipeline::Ptr newPipeline(std::shared_ptr sock) {

    auto pipeline = EchoPipeline::create();

    pipeline->addBack(AsyncSocketHandler(sock));

    pipeline->addBack(LineBasedFrameDecoder(8192));

    pipeline->addBack(StringCodec());

    pipeline->addBack(EchoHandler());

    pipeline->finalize();

    return pipeline;

  }

};

pipeline中每個handler的插入順序都須要嚴格注意,由於它們是按照前後排序的,此處咱們有4個handler

1.AsyncSocketHandler: 上游:讀取scoket中的二進制流轉換成零拷貝字節緩存 下游:將字節緩存內容寫入底層socket 2. LineBasedFrameDecoder: 上游:接收字節緩存,按行分割數據 下游:將字節緩存發送給AsyncSocketHandler 3. StringCodec: 上游:接收字節緩存,解碼爲std:string傳遞給EchoHandler 下游:接收std:string, 編碼爲字節緩存,傳遞給LineBasedFrameDecoder 4. EchoHandler: 上游:接收std:string對象,將其寫入pipeline-將消息返回給Echohandler。 下游:接收一個std:string對象,轉發給StringCodec Handler。 如今咱們所須要作的就是將pipeline factory關聯到ServerBootstrap,綁定一個端口,這樣咱們已經完成了 基本上全部的工做。

C++

 

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

#include <gflags/gflags.h>

 

#include <wangle/bootstrap/ServerBootstrap.h>

#include <wangle/channel/AsyncSocketHandler.h>

#include <wangle/codec/LineBasedFrameDecoder.h>

#include <wangle/codec/StringCodec.h>

 

using namespace folly;

using namespace wangle;

 

DEFINE_int32(port, 8080, "echo server port");

 

typedef Pipeline<IOBufQueue&, std::string> EchoPipeline;

 

// the main logic of our echo server; receives a string and writes it straight

// back

class EchoHandler : public HandlerAdapter<std::string> {

public:

  virtual void read(Context* ctx, std::string msg) override {

    std::cout << "handling " << msg << std::endl;

    write(ctx, msg + "\r\n");

  }

};

 

// where we define the chain of handlers for each messeage received

class EchoPipelineFactory : public PipelineFactory<EchoPipeline> {

public:

  EchoPipeline::Ptr newPipeline(std::shared_ptr<AsyncTransportWrapper> sock) {

    auto pipeline = EchoPipeline::create();

    pipeline->addBack(AsyncSocketHandler(sock));

    pipeline->addBack(LineBasedFrameDecoder(8192));

    pipeline->addBack(StringCodec());

    pipeline->addBack(EchoHandler());

    pipeline->finalize();

    return pipeline;

  }

};

 

int main(int argc, char** argv) {

  google::ParseCommandLineFlags(&argc, &argv, true);

 

  ServerBootstrap<EchoPipeline> server;

  server.childPipeline(std::make_shared<EchoPipelineFactory>());

  server.bind(FLAGS_port);

  server.waitForStop();

 

  return 0;

}

至此咱們一共只寫了48行代碼就完成了一個高性能的異步C++服務器。

Echo Client

echo客戶端的實現與咱們的服務端很是相似:

C++

 

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

// the handler for receiving messages back from the server

class EchoHandler : public HandlerAdapter {

public:

  virtual void read(Context* ctx, std::string msg) override {

    std::cout << "received back: " << msg;

  }

  virtual void readException(Context* ctx, exception_wrapper e) override {

    std::cout << exceptionStr(e) << std::endl;

    close(ctx);

  }

  virtual void readEOF(Context* ctx) override {

    std::cout << "EOF received :(" << std::endl;

    close(ctx);

  }

};

注意咱們重載了readException和readEOF兩個方法,還有其餘一些方法能夠被重載。若是你須要控制某個特別的事件,只須要重載對應的虛函數便可。

這是客戶端的pipeline factory的實現,與咱們的服務端結構基本一致,只有EventBaseHandler這個handler在服務端代碼中未曾出現,它能夠確保咱們能夠從任意一個線程寫入數據。

C++

 

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

// the handler for receiving messages back from the server

class EchoHandler : public HandlerAdapter {

public:

  virtual void read(Context* ctx, std::string msg) override {

    std::cout << "received back: " << msg;

  }

  virtual void readException(Context* ctx, exception_wrapper e) override {

    std::cout << exceptionStr(e) << std::endl;

    close(ctx);

  }

  virtual void readEOF(Context* ctx) override {

    std::cout << "EOF received :(" << std::endl;

    close(ctx);

  }

};

客戶端全部的代碼以下圖所示

C++

 

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

#include <gflags/gflags.h>

#include

 

#include <wangle/bootstrap/ClientBootstrap.h>

#include <wangle/channel/AsyncSocketHandler.h>

#include <wangle/channel/EventBaseHandler.h>

#include <wangle/codec/LineBasedFrameDecoder.h>

#include <wangle/codec/StringCodec.h>

 

using namespace folly;

using namespace wangle;

 

DEFINE_int32(port, 8080, "echo server port");

DEFINE_string(host, "::1", "echo server address");

 

typedef Pipeline<folly::IOBufQueue&amp;, std::string> EchoPipeline;

 

// the handler for receiving messages back from the server

class EchoHandler : public HandlerAdapter {

public:

  virtual void read(Context* ctx, std::string msg) override {

    std::cout << "received back: " << msg;

  }

  virtual void readException(Context* ctx, exception_wrapper e) override {

    std::cout << exceptionStr(e) << std::endl;

    close(ctx);

  }

  virtual void readEOF(Context* ctx) override {

    std::cout << "EOF received :(" << std::endl;

    close(ctx);

  }

};

 

// chains the handlers together to define the response pipeline

class EchoPipelineFactory : public PipelineFactory {

public:

  EchoPipeline::Ptr newPipeline(std::shared_ptr sock) {

    auto pipeline = EchoPipeline::create();

    pipeline->addBack(AsyncSocketHandler(sock));

    pipeline->addBack(

        EventBaseHandler()); // ensure we can write from any thread

    pipeline->addBack(LineBasedFrameDecoder(8192, false));

    pipeline->addBack(StringCodec());

    pipeline->addBack(EchoHandler());

    pipeline->finalize();

    return pipeline;

  }

};

 

int main(int argc, char** argv) {

  google::ParseCommandLineFlags(&amp;argc, &amp;argv, true);

 

  ClientBootstrap client;

  client.group(std::make_shared(1));

  client.pipelineFactory(std::make_shared());

  auto pipeline = client.connect(SocketAddress(FLAGS_host, FLAGS_port)).get();

 

  try {

    while (true) {

      std::string line;

      std::getline(std::cin, line);

      if (line == "") {

        break;

      }

 

      pipeline->write(line + "rn").get();

      if (line == "bye") {

        pipeline->close();

        break;

      }

    }

  } catch (const std::exception&amp; e) {

    std::cout << exceptionStr(e) << std::endl;

  }

 

  return 0;

}

程序用一個While循環不斷監測用戶的輸入,而且依靠調用.get() 來同步等待一直到請求被響應。

總結

本文我展現瞭如何用Wangle來編寫一個簡易的高性能C++服務器。您應該已經掌握了Wangle的一些基本知識,而且有信心寫出本身的C++服務器。我建議您深刻了解Wangle中的Service概念,它會有助您開發出更增強大的服務器。

相關文章
相關標籤/搜索