BOOST ASIO 學習專貼

本文已於20170903更新完畢,全部boost asio 代碼均爲本人手抄。編譯器爲vs2013,而且全部代碼已經上傳,本文下方可下載源碼ios

 

 

 

爲了學習boost asio庫,我是從boost的官方boost asio的教程學起的。緩存

每個示例我都抄寫了一遍以加深記憶,每個例子我都用本身的話歸納一遍,雖然歸納的不是很好,代碼以爲難懂的地方我都加註釋。服務器

1.同步使用Timersession


本便使用了boost::asio::deadline_timer,這個timer有兩種狀態:過時和不過時。wait函數調用一個過時的timer直接返回。
多線程

int _tmain(int argc, _TCHAR* argv[])
{
    boost::asio::io_service io;
    boost::asio::deadline_timer t(io,boost::posix_time::seconds(5));
    t.wait();
    std::cout<<"wait finished!"<<std::endl;
    return 0;
}

2.異步使用Timer併發

下在演示了使用deadline_timer的asyn_wati函數實現異步等待。但要注意的一點是異步等待必需要調用io.run才能夠。並且必須在io.run函數執行以前調用asyn_wait,不然io.run會當即返回,由於他沒有能夠作的事。這說明io.run必須至少有一個等待的,不然它會直接返回。asio函數保證回調函數執行和io.run所在的線程同樣!異步

//異步Timer
void print(const boost::system::error_code & )
{
    std::cout<<"Wait Finished"<<std::endl;
}
int _tmain(int argc, _TCHAR* argv[])
{
    boost::asio::io_service io;
    boost::asio::deadline_timer t(io,boost::posix_time::seconds(5));
    t.async_wait(&print);
    io.run();

    return 0;
}

3.爲回調函數綁定參數socket

這個例子一個是說明異步Timer的可持續性問題,也就是在回調中設置Time的超時時間。另外一個說明回調函數參數的綁定 。可是實際發現我官的代碼沒有發生那個重複回調的效果。緣由是我只是調用了expire_at而沒有調用再次等待函數async_wait。這讓我更加明白expires_at這個函數至關於下次觸發的時間。而async_wait提交一個等待申請。async

async_wait提交一次,回調函數執行一次,而expire_at設定下次回調函數調用的時間。tcp

#include <boost/bind.hpp>
void Print(const boost::system::error_code & ,
           boost::asio::deadline_timer * t,int * count)
{
    if(*count < 5)
    {
        std::cout<<*count<<std::endl;
        ++(*count);
        t->expires_at(t->expires_at() + boost::posix_time::seconds(1));
        t->async_wait(boost::bind(Print,boost::asio::placeholders::error,t,count));
    }
}
int _tmain(int argc, _TCHAR* argv[])
{
    boost::asio::io_service io;
    int count = 0;
    boost::asio::deadline_timer t(io,boost::posix_time::seconds(1));
    t.async_wait(boost::bind(Print,boost::asio::placeholders::error,&t,&count));
    io.run();
    return 0;
}

4.類成員作爲timer的回調函數

這個例子主要演示了,如何綁定一個類成員函數做爲一個回調

class Print
{
public:
    Print(boost::asio::io_service & io)
        :timer_(io,boost::posix_time::seconds(1)),count_(0)
    {
        timer_.async_wait(boost::bind(&Print::print,this));
    }
    ~Print()
    {
        std::cout<<"finnal count is "<<count_<<std::endl;
    }
    void print()
    {
        if(count_ < 5)
        {
            std::cout<<count_<<std::endl;
            ++count_;
            timer_.expires_at(timer_.expires_at() + boost::posix_time::seconds(1));
            timer_.async_wait(boost::bind(&Print::print,this));
        }
    }
protected:
    boost::asio::deadline_timer timer_;
    int count_;
};

int _tmain(int argc, _TCHAR* argv[])
{
    boost::asio::io_service io;
    Print p(io);
    io.run();
    return 0;
}

4.在多線程程序中的同步回調

先前的例子經過io_service.run和同步回調在同一個線程內,正如你所知的那樣,asio保證回調函數只能被在io_service.run()所在的線程調用 。所以,只有在一個線程內調用io_service::run保證回調函數不會併發執行。這樣在服務器程序中有兩個侷限性:

1.當回調函數執行時間比較長時響應太慢
2.沒有起到多處理器的優點

若是你注意到這個侷限性,一個可供選擇的方案是建立一個線程池去調用io_service.run()函數,這樣實現的回調的併發,咱們須要去同步一個共享變量。

下面的例子使用到了An boost::asio::strand ,他保證這些回調函數經過strans派遣,它能夠容許一個回調函數在另外一個回調函數執行以前完成。簡單點說,這裏的strand就是讓回調函數不會併發的執行。可是這裏的strand到底的意圖在哪裏?不是要演示多線程執行回調嗎?這裏又作了strand使回調又依次執行好想沒有達到多線程效果

#include <boost/thread/thread.hpp>

class printer
{
public:
    printer(boost::asio::io_service & io)
        :strand_(io),
        timer1_(io,boost::posix_time::seconds(1)),
        timer2_(io,boost::posix_time::seconds(1)),
        count_(0)
    {
        timer1_.async_wait(strand_.wrap(boost::bind(&printer::print1,this)));
        timer2_.async_wait(strand_.wrap(boost::bind(&printer::print2,this)));
    }
    void print1()
    {
        if(count_ < 10)
        {
            std::cout<<"Timer 1:"<<count_<<std::endl;
            ++count_;

            timer1_.expires_at(timer1_.expires_at() + boost::posix_time::seconds(1));
            timer1_.async_wait(strand_.wrap(boost::bind(&printer::print1,this)));
        }
    }

    void print2()
    {
        if(count_ < 10)
        {
            std::cout<<"Timer 2:"<<count_<<std::endl;
            ++count_;

            timer2_.expires_at(timer2_.expires_at() + boost::posix_time::seconds(1));
            timer2_.async_wait(strand_.wrap(boost::bind(&printer::print2,this)));
        }
    }
private:
    boost::asio::io_service::strand strand_;
    boost::asio::deadline_timer timer1_;
    boost::asio::deadline_timer timer2_;
    int count_;
};
int _tmain(int argc, _TCHAR* argv[])
{
    boost::asio::io_service io;
    printer p(io);
    boost::thread t(boost::bind(&boost::asio::io_service::run,&io));
    io.run();
    t.join();
    return 0;
}

下面有時間研究一下 boost::asio::strand的用法

5.簡單的一個TCP服務端

下面程序演示一個boost作的最簡單的一個服務端程序,客戶端鏈接以後服務器給客戶端發送一個當前時間的字符串

 下面值得一提的是tcp::acceptor,他被封裝爲socket的服務端接收器,構造他時須要一個io_service和一個tcp::endpoint。

std::string make_daytime_string()
{
    using namespace std;
    time_t now = time(0);
    return ctime(&now);
}

int _tmain(int argc, _TCHAR* argv[])
{
    try
    {
        boost::asio::io_service io_service;
        tcp::acceptor acceptor(io_service,tcp::endpoint(tcp::v4(),13));
        for(;;)
        {
            tcp::socket socket(io_service);
            acceptor.accept(socket);//接受一個客戶端socket
            std::string message = make_daytime_string();
            boost::system::error_code ignored_error;
            boost::asio::write(socket,boost::asio::buffer(message),ignored_error);
        }
    }
    catch(std::exception & e)
    {
        std::cerr<<e.what()<<std::endl;
    }

    return 0;
}

 

6.簡單的一個TCP客戶端

int _tmain(int argc, _TCHAR* argv[])
{
    try
    {
        if(argc != 2)
        {
            std::cerr<<"Usage:client <host>"<<std::endl;
            return 1;
        }

        boost::asio::io_service io_service;
        tcp::resolver resolver(io_service);
        tcp::resolver::query query(argv[1],"daytime");
        tcp::resolver::iterator endpoint_iterator = resolver.resolve(query);
        tcp::socket socket(io_service);
        boost::asio::connect(socket,endpoint_iterator);
        for(;;)
        {
            boost::array<char,128> buf;
            boost::system::error_code error;
            size_t len = socket.read_some(boost::asio::buffer((buf)),error);
            if(error == boost::asio::error::eof)
                break;
            else if(error)
                throw boost::system::system_error(error);
            std::cout.write(buf.data(),len);

        }
    }
    catch(std::exception & e)
    {
        std::cerr<<e.what()<<std::endl;
    }
    return 0;
}

 

7.TCP異步服務端

std::string make_daytime_string()
{
    using namespace std;
    time_t now = time(0);
    return ctime(&now);
}
class tcp_connection : public boost::enable_shared_from_this<tcp_connection>
{
public:
    typedef boost::shared_ptr<tcp_connection> tcp_connection_ptr;
    static tcp_connection_ptr Create(boost::asio::io_service & io)
    {
        return tcp_connection_ptr(new tcp_connection(io));
    }
    tcp::socket & Socket()
    {
        return socket_;
    }
    void Start()
    {
        message_ = make_daytime_string();
        boost::asio::async_write(socket_,boost::asio::buffer(message_),
            boost::bind(&tcp_connection::handle_write,shared_from_this(),
            boost::asio::placeholders::error,
            boost::asio::placeholders::bytes_transferred));

    }
private:
    tcp_connection(boost::asio::io_service & io) : socket_(io)    {}
    void handle_write(const boost::system::error_code & ,size_t) 
    {

    }
    tcp::socket socket_;
    std::string message_;
};

class TcpServer
{
public:
    TcpServer(boost::asio::io_service & io) : acceptor_(io,tcp::endpoint(tcp::v4(),13)) 
    {
        start_accept();
    }
protected:
    void start_accept()
    {
        tcp_connection::tcp_connection_ptr new_connection = tcp_connection::Create(acceptor_.get_io_service());
        acceptor_.async_accept(new_connection->Socket(),
            boost::bind(&TcpServer::handle_accept,this,new_connection,boost::asio::placeholders::error));
    }
    tcp::acceptor acceptor_;
    void handle_accept(tcp_connection::tcp_connection_ptr new_connection,
        const boost::system::error_code & error)
    {
        if(!error)
            new_connection->Start();
        start_accept();
    }
};

int _tmain(int argc, _TCHAR* argv[])
{
    try
    {
        boost::asio::io_service io_service;
        TcpServer server(io_service);
        io_service.run();
    }
    catch (std::exception & e)
    {
        std::cerr << e.what() << std::endl;
    }
    return 0;
}

以上代碼調用異步函數asyn_accept和asyn_write分別進行異步接受socket和異步socket發送。

以上代碼是官方tutorial的代碼,有幾點特別的地方值得學習:

  • 構造函數私有化
    通常本身寫代碼構造函數不可能給私有化,而類tcp_connection使用一個靜態類成員函數Create生產一個對象,而使得類的構造函數能夠私有。
  • 使用enable_shared_from_this
    boost類enable_shared_from_this的好處是避免在類成員函數中傳遞this而傳遞一個shared_ptr智能指針,這樣不用擔憂釋放的問題。而在這裏,若是傳指針則有可能所持有的指針指向的對象已經被釋放,若是用shared_ptr則能夠保證不被釋放,引用官方的一句話:We will use shared_ptr and enable_shared_from_this because we want to keep the tcp_connection object alive as long as there is an operation that refers to it.
  • 不指定沒有用的參數,有可能注意到handle_write()沒有error和byte_transfered參數,由於body中沒有用到這兩個參數,若是參數不使用可能以移除參數

8.Custom Allocation

 

 

// Async_Allocation.cpp : 定義控制檯應用程序的入口點。
//

#include "stdafx.h"


#include <array>
#include <cstdlib>
#include <iostream>
#include <memory>
#include <type_traits>
#include <utility>
#include <boost/asio.hpp>
#include <boost/bind.hpp>

using boost::asio::ip::tcp;



//這個類實現了一個分配函數,他的功能是不讓頻繁的分配和釋放。
class handler_allocator
{
public:
    handler_allocator() : in_use_(false){}
    handler_allocator(const handler_allocator &) = delete;
    handler_allocator& operator=(const handler_allocator &) = delete;

    //分配函數
    //若是當前沒有被使用,那標記爲已使用 並返回指針
    //若是當前正在使用,則分配新的內存 new
    void * allocate(std::size_t size)
    {
        if (!in_use_ && size < sizeof(storage_))
        {
            in_use_ = true;
            return &storage_;
        }
        else
            return ::operator new(size);
    }

    //釋放函數
    //若是當前要釋放的指針就是自己,則標記爲未使用
    //若是當前要釋放的指針不是自己,那進行默認釋放 delete
    void dealocate(void * pointer)
    {
        if (pointer == &storage_)
            in_use_ = false;
        else
            operator delete(pointer);
    }
private:
    std::aligned_storage<1024>  storage_;
    bool in_use_;
};


template<typename Handler>
class custom_alloc_handler
{
public:
    custom_alloc_handler(handler_allocator & a, Handler h) : allocator_(a), handler_(h){}

    //這個函數重置()運算符,使用可變參模板,調用handler_()
    template<typename ...Args>
    void operator()(Args&&... args)
    {
        handler_(std::forward<Args>(args)...);
    }

    //??
    friend void * asio_handler_allocate(std::size_t size, custom_alloc_handler<Handler>*this_handler)
    {
        return this_handler->allocator_.allocate(size);
    }
    //??
    friend void asio_handler_deallocate(void * pointer, std::size_t, custom_alloc_handler<Handler> * this_handler)
    {
        this_handler->allocator_.dealocate(pointer);
    }
    
private:
    handler_allocator & allocator_;
    Handler handler_;

};


//他返回一個Handle
template<typename Handler>
inline custom_alloc_handler<Handler> make_custom_alloc_handler(handler_allocator & a, Handler h)
{
    return custom_alloc_handler<Handler>(a, h);
}


class session : public std::enable_shared_from_this<session>
{
public:
    session(tcp::socket socket) : socket_(std::move(socket)){}
    void start()
    {
        do_read();
    }
private:
    void do_read()
    {
        auto self(shared_from_this());
        socket_.async_read_some(boost::asio::buffer(data_),
            make_custom_alloc_handler(allocator_, 
            [this, self](boost::system::error_code ec, std::size_t length)
        {
            if (!ec)
                do_write(length);
        }
        ));
    }
    void do_write(std::size_t length)
    {
        auto self(shared_from_this());
        boost::asio::async_write(socket_,boost::asio::buffer(data_,length),make_custom_alloc_handler(allocator_,
            [this, self](boost::system::error_code ec, std::size_t)
        {
            if (!ec)
                do_read();
        }));
    }
    tcp::socket socket_;
    std::array<char, 1024> data_;//存儲從客戶端接受來的數據
    handler_allocator allocator_;//自定義內存分配
};

class server
{
public:
    server(boost::asio::io_service & io,short port)
        : acceptor_(io, tcp::endpoint(tcp::v4(), port)),
        socket_(io)
    {
        do_accept();
    }
private:
    void do_accept()
    {
        acceptor_.async_accept(socket_, [this](boost::system::error_code  ec)
        {
            if (!ec)
                std::make_shared<session>(std::move(socket_))->start();
            else
                std::cerr << ec.value() << ec.message() << std::endl;
            do_accept();
        });
    }
    tcp::acceptor acceptor_;
    tcp::socket socket_;
};

int _tmain(int argc, _TCHAR* argv[])
{
    try
    {
        boost::asio::io_service io;
        server s(io, 13);
        io.run();
    }
    catch (std::exception & e)
    {
        std::cerr << "Exception " << e.what() << std::endl;
    }
    return 0;
}

 

以上代碼在調用socket.async_read_some的時候,第二個參數原來是一個Handler,原型以下:

void handler( const boost::system::error_code& error, // Result of operation. std::size_t bytes_transferred // Number of bytes read. );
函數make_custom_alloc_handler產生一個custom_alloc_handler<Handler>對象,custom_alloc_handler<Handler>重載括號運算符實現對回調的調用,這種方法對於我來講感受很厲害。總之這片代碼我看得不是很懂。

首先:回調用函數應該是一個執行體,也就是std::function,而這裏來一個custom_alloc_handler<Handler>對象,對象也能夠看成執行體?
其次:這個函數沒有用到asio_handler_allocate和asio_handler_deallocate,我也不知道如何使用。這個放到之後再研究 

通過學習和查詢信息得出的結果:

  1. 異步操做能夠增長一個臨時的分配對象asio_handler_allocate。由於異步操做有一個handler函數對象,這個臨時對象能夠堪稱是與handler函數對象相關聯的。本例中asio_handler_allocate爲handler類對象的一個友元成員函數。這樣在分配內存時,默認就調用此函數進行分配內存。任何與handler相關聯的臨時對象會在handler執行完以後被析構,而asio_handler_allocate這裏除了size參數能夠額外增長參數,例如本例中的this_handler參數同樣,因此這裏容許同一塊內存能夠被後來的異步操做重複利用,asio_handler_allocate原型以下:
    void * asio_handler_allocate( std::size_t size, ... );
  2.  Handler容許有多種形式存在
    1. 函數形式
      void read_handler( const boost::system::error_code& ec, std::size_t bytes_transferred) { ... }
      這種形式最爲普通,就是一個回調用函數而已
    2. 類對象(重載括號運算符)
      struct read_handler { ... void operator()( const boost::system::error_code& ec, std::size_t bytes_transferred) { ... } ... };
      本例中應該就是這種
    3. 類成員函數
      void my_class::read_handler( const boost::system::error_code& ec, std::size_t bytes_transferred) { ... } ... socket.async_read(..., boost::bind(&my_class::read_handler, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
  3. 經過以上知識點,能夠清楚知道本例代碼是如何執行的了。

 

 

9.Buffers

// BB_CountedReferenceBuffer.cpp : 定義控制檯應用程序的入口點。
//

#include "stdafx.h"

#include <string>

#include <boost/asio.hpp>
#include <memory>

#include <utility>

#include <iostream>

using boost::asio::ip::tcp;

class shared_const_buffer
{
public:
    //構造函數用std::string 分別初始化buffer_放data_對象
    shared_const_buffer(const std::string & data)
        : data_(new std::vector<char>(data.begin(), data.end())),
        buffer_(boost::asio::buffer(*data_)) {}

    typedef boost::asio::const_buffer value_type;
    typedef const boost::asio::const_buffer* const_iterator;

    const boost::asio::const_buffer* begin() const { return &buffer_; }
    const boost::asio::const_buffer* end() const { return &buffer_ + 1; }

private:
    std::shared_ptr<std::vector<char>> data_;//vector char
    boost::asio::const_buffer buffer_;
};

class session
    : public std::enable_shared_from_this<session>
{
public:
    session(tcp::socket socket)
        :socket_(std::move(socket))
    {}

    void start()
    {
        do_write();
    }
private:
    void do_write()
    {
        std::time_t now = std::time(0);
        shared_const_buffer buffer(std::ctime(&now));

        auto self(shared_from_this());
        boost::asio::async_write(socket_, buffer, 
            [this, self](boost::system::error_code,std::size_t)
        {
        });
    }

    tcp::socket socket_;
};

class server
{
public:
    server(boost::asio::io_service & io,short port)
        : acceptor_(io, tcp::endpoint(tcp::v4(),port)),
        socket_(io)
    {
        do_accept();
    }

private:
    void do_accept()
    {
        acceptor_.async_accept(socket_,
            [this](boost::system::error_code ec)
        {
            if (!ec)
                std::make_shared<session>(std::move(socket_))->start();
            else
                std::cerr << ec.message() << std::endl;
            do_accept();
        });
    }

    tcp::acceptor acceptor_;
    tcp::socket socket_;

};

int _tmain(int argc, _TCHAR* argv[])
{
    try
    {
        boost::asio::io_service io_service;
        server s(io_service, 13);
        io_service.run();
    }
    catch (std::exception & e)
    {
        std::cerr << "Exception:  " << e.what() << std::endl;
    }
    return 0;
}

代碼解析:

本例主要演示了,異步操做中能夠自定義的buffer。

以上代碼自定義一個類shared_const_buffer,在調用async_write用這個類對象。async_write有多個重載,這裏主要說示例中用到的重載形式,即:

template<
    typename AsyncWriteStream,
    typename ConstBufferSequence,
    typename WriteHandler>
void-or-deduced async_write(
    AsyncWriteStream & s,
    const ConstBufferSequence & buffers,
    WriteHandler handler);

第二具模板參數ConstBufferSequence爲一個模板參數,自定義ConstBufferSequence模板類有一些要求以下 :

在下面要求列表中,X表示爲一個包含類型T對象的類。a表示表示一個類型爲X的值,u表示一個標識符
本例中T爲boost::asio::const_buffer buffer_,X爲本例中的shared_const_buffer。

  1. X::value_type 返回類型爲T,用於表示X實際表示的value_type爲T,本例中爲boost::asio::const_buffer
  2. X::const_iterator 指向T的迭代器類型,表示iterator類型實際爲哪一種類型,本例中爲 const boost::asio::const_buffer * 
  3. X(a)  構造函數
  4. X u(a) 暫時不知如何解釋
  5. (&a)->~X() 暫時不知如何解釋
  6. a.begin() 返回起始迭代器
  7. a.end() 返回終止迭代器

 

10.Chat_message數據包類

 

class chat_message
{
public:
    enum { header_length = 4 };
    enum { max_body_length = 512 };

    chat_message() : body_length_(0) {}

    const char * data() const
    {
        return data_;
    }
    char * data()
    {
        return data_;
    }
    std::size_t length() const
    {
        return header_length + body_length_;
    }

    const char * body() const
    {
        return data_ + header_length;
    }
    char * body()
    {
        return data_ + header_length;
    }

    std::size_t body_length() const
    {
        return body_length_;
    }
    void body_length(std::size_t new_length)
    {
        body_length_ = new_length;
        if (body_length_ > max_body_length)
            body_length_ = max_body_length;
    }

    bool decode_header()
    {
        char header[header_length + 1] = "";
        strncat_s(header, data_, header_length);
        body_length_ = std::atoi(header);
        if (body_length_ > max_body_length)
        {
            body_length_ = 0;
            return false;
        }
        return true;
    }
    void encode_header()
    {
        char header[header_length + 1] = "";
        sprintf_s(header, "%4d", static_cast<int>(body_length_));
        std::memcpy(data_, header, header_length);
    }
private:
    char data_[header_length + max_body_length];
    std::size_t body_length_;
};

 

這個類比較簡單,他把一個數據包定義爲頭和體。頭部是一個整形,表明body的大小。 

11.Chat_Server詳解

先上代碼

 

// CB_ChatServer.cpp : 定義控制檯應用程序的入口點。
//

#include "stdafx.h"

#include "..\CA_ChatMessage\chat_message.h"

#include <deque>
#include <set>
#include <memory>
#include <boost/asio.hpp>
#include <iostream>

using boost::asio::ip::tcp;

//deque相似於vector,但他能夠快速的在頭部和尾部插入元素
typedef std::deque<chat_message> chat_message_queue;


//聊天參與者
class chat_participant
{
public:
    //析構函數
    virtual ~chat_participant() {}

    //交付
    virtual void deliver(const chat_message & msg) = 0;
};

typedef std::shared_ptr<chat_participant> chat_participant_ptr;


/*
chat_room
對全部client進行管理 
*/
class chat_room
{
public:
    //有客戶端加入
    void join(chat_participant_ptr participant)
    {
        participants_.insert(participant);

        //當客戶端加入以後,先把最近消息給廣播一下
        for (auto msg : recent_msg_)
            participant->deliver(msg);
    }

    //有客戶端離開
    void leave(chat_participant_ptr participant)
    {
        participants_.erase(participant);
    }

    //廣播一條消息
    void deliver(const chat_message & msg)
    {
        recent_msg_.push_back(msg);//增長到最近消息列表
        while (recent_msg_.size() > max_recent_msgs)
            recent_msg_.pop_front();
        
        //給全部客戶端廣播這條消息
        for (auto participant : participants_)
            participant->deliver(msg);
    }
private:
    std::set<chat_participant_ptr> participants_;
    enum{max_recent_msgs = 100};
    chat_message_queue recent_msg_;
};

class chat_session
    : public chat_participant,
    public std::enable_shared_from_this<chat_session>
{
public:
    chat_session(tcp::socket socket,chat_room & room)
        :socket_(std::move(socket)),
        room_(room){}
    void start()
    {
        room_.join(shared_from_this());
        do_read_header();
    }

    //廣播一個消息,這裏最主要作的實際上是 write_msgs_.push_back(msg);
    //而do_write,只顯爲了驅動,大多數的write_msgs是在驅動後的on write裏面執行的。
    virtual void deliver(const chat_message & msg)
    {
        //爲何要這樣寫,由於到後面room 的recent_msg有可能有幾十個,例如是50個。則
        //Post write會 Post這麼屢次,而這裏直接Post進一個對隊列write_msgs,而後post一次
        //而其它的post只在OnPost裏面再次去調用post write
        bool write_in_process = !write_msgs_.empty();//先前write_msgs是否不爲空
        write_msgs_.push_back(msg);//添加要廣播的消息到write_msgs對列裏面
        if (!write_in_process)//若是先前write_msgs爲空的話,說明寫的消息正在投遞。
            do_write();
    }
private:
    //讀消息
    void do_read_header()
    {
        auto self(shared_from_this());
        boost::asio::async_read(socket_,
            boost::asio::buffer(read_msg_.data(),chat_message::header_length),
            [this, self](boost::system::error_code ec, std::size_t)
        {
            if (!ec && read_msg_.decode_header())
                do_read_body();
            else
                room_.leave(shared_from_this());
        });
    }
    void do_read_body()
    {
        auto self(shared_from_this());
        boost::asio::async_read(socket_,
            boost::asio::buffer(read_msg_.body(),read_msg_.body_length()),
            [this, self](boost::system::error_code ec, std::size_t)
        {
            if (!ec)
            {
                room_.deliver(read_msg_);//聊天室將消息保存到recent消息以後再將此消息廣播出去
                do_read_header();
            }
            else
                room_.leave(shared_from_this());
        });
    }

    //這裏是給客戶端發次全部的write_msgs_。直到write_msgs_爲空中止post write
    void do_write()
    {
        auto self(shared_from_this());
        boost::asio::async_write(socket_,
            boost::asio::buffer(write_msgs_.front().data(),write_msgs_.front().length()),
            [this, self](boost::system::error_code ec, std::size_t)
        {
            if (!ec)
            {
                write_msgs_.pop_front();
                if (!write_msgs_.empty())
                {
                    do_write();
                }
            }
            else
                room_.leave(shared_from_this());
        });
    }
    tcp::socket socket_;//通訊socket
    chat_room & room_;//房間引用
    chat_message read_msg_;//通訊用到的read_msg
    chat_message_queue write_msgs_;//這個是最近消息隊列
};


class chat_server
{
public:
    chat_server(boost::asio::io_service & io, const tcp::endpoint & endpoint)
        : acceptor_(io, endpoint),
        socket_(io)
    {
        do_accept();
    }
private:
    void do_accept()
    {
        acceptor_.async_accept(socket_,[this](boost::system::error_code ec)
        {
            if (!ec)
                std::make_shared<chat_session>(std::move(socket_), room_)->start();
            do_accept();
        });
    }
    tcp::acceptor acceptor_;
    tcp::socket socket_;
    chat_room room_;
};
int _tmain(int argc, _TCHAR* argv[])
{
    try
    {
        boost::asio::io_service io;
        chat_server  s(io, tcp::endpoint(boost::asio::ip::address::from_string("127.0.0.1"), 13));
        io.run();
    }
    catch (std::exception & e)
    {
        std::cerr << "Exception: " << e.what() << std::endl;
    }

    return 0;
}

 

 

 

服務端作異步監聽,當有客戶端到來,把這個客戶端(session) 放到聊天室對象裏,當這個客戶端斷開時,從聊天室客戶端列表裏刪除。
這個聊天室實現了一個廣播功能,當客戶端發送消息至服務器時,服務器給全部客戶端廣播這條消息,而且聊天室記錄最近客戶端發送到服務器的消息,當客戶端鏈接到服務器時,服務器主動把最近消息記錄發送給這個客戶端。
這裏要注意到一點的是,他的發送是相似消息驅動的形式,就是用一個對象保存要發送的消息,當發送成功回調OnSend裏發現有未發完的消息時,再駢PostSend。而不是主動發送。我暫時不知道這種作法的意圖。可是能夠注意到的一點是這種發送是依次的,也就是PostSend順序是這樣的 PostSend OnSend PostSend OnSend,而咱們常常的作法則是PostSend PostSend OnSend OnSend。這個好處不言而喻。提供了一種緩存機制。

12.Chat_Client詳解

先上代碼

 

// CC_ChatClient.cpp : 定義控制檯應用程序的入口點。
//

#include "stdafx.h"

#include <cstdlib>
#include <deque>
#include <iostream>
#include <thread>
#include <boost/asio.hpp>
#include "..\CA_ChatMessage\chat_message.h"
using boost::asio::ip::tcp;

typedef std::deque<chat_message> chat_message_queue;

class chat_client
{
public:
    chat_client(boost::asio::io_service & io,
        tcp::resolver::iterator endpoint_iterator) :
        io_service_(io),
        socket_(io)
    {
        do_connect(endpoint_iterator);
    }

    //這裏的作法與平時作法不太同樣,
    //平時咱們通常是write一條就去do_write一次,
    //而這裏是write一次把內容加到write_msgs裏面
    //當目前沒有正在post正在執行時去do_write一下,不然把do_write 的操做放到on_write裏面進行
    void write(const chat_message& msg)
    {
        io_service_.post(
            [this, msg]()
        {
            bool write_in_progress = !write_msgs_.empty();
            write_msgs_.push_back(msg);
            if (!write_in_progress)
            {
                do_write();
            }
        });
    }
private:
    void do_connect(tcp::resolver::iterator endpoint_iterator)
    {
        boost::asio::async_connect(socket_, endpoint_iterator,
            [this](boost::system::error_code ec, tcp::resolver::iterator)
        {
            if (!ec)
                do_read_header();
            else
                std::cerr << "connect failed:" << ec.message() << std::endl;
        });
    }

    void do_read_header()
    {
        boost::asio::async_read(socket_,
            boost::asio::buffer(read_msg_.data(),chat_message::header_length),
            [this](boost::system::error_code ec, std::size_t)
        {
            if (!ec && read_msg_.decode_header())
                do_read_body();
            else
                socket_.close();
        });
    }

    void do_read_body()
    {
        boost::asio::async_read(socket_,
            boost::asio::buffer(read_msg_.body(), read_msg_.body_length()),
            [this](boost::system::error_code ec, std::size_t)
        {
            if (!ec)
            {
                std::cout.write(read_msg_.body(), read_msg_.body_length());
                std::cout << "\n";
                do_read_header();
            }
            else
                socket_.close();
        });
    }

    void do_write()
    {
        boost::asio::async_write(socket_,
            boost::asio::buffer(write_msgs_.front().data(),
            write_msgs_.front().length()),
            [this](boost::system::error_code ec, std::size_t)
        {
            if (!ec)
            {
                write_msgs_.pop_front();
                if (!write_msgs_.empty())
                    do_write();
            }
            else
                socket_.close();
        });
    }

    boost::asio::io_service & io_service_;
    tcp::socket socket_;
    chat_message read_msg_;
    chat_message_queue write_msgs_;
};


int _tmain(int argc, _TCHAR* argv[])
{
    try
    {
        boost::asio::io_service io;
        tcp::resolver resolver(io);
        auto end_point_iter = resolver.resolve({ "127.0.0.1", "13" });
        chat_client c(io, end_point_iter);

        std::thread t([&io](){io.run(); });
        char line[chat_message::max_body_length + 1] = { 0 };
        while (std::cin.getline(line, chat_message::max_body_length + 1))
        {
            chat_message msg;
            msg.body_length(std::strlen(line));
            std::memcpy(msg.body(), line, msg.body_length());
            msg.encode_header();
            c.write(msg);
        }
    }
    catch (std::exception & e)
    {
        std::cerr << "Exception: " << e.what() << "\n";
    }
    return 0;
}

 

 

 

這個客戶端沒有什麼特色,最大的特別就是我上節在服務端說到的,消息回調Post機制。

13.echo

echo都是很是簡單的socket示例,暫時不作熬述

14.Futures

// EA_Futures.cpp : 定義控制檯應用程序的入口點。
//

#include "stdafx.h"


#include <array>
#include <future>
#include <iostream>
#include <thread>
#include <boost/asio/io_service.hpp>
#include <boost/asio/ip/udp.hpp>
#include <boost/asio/use_future.hpp>

using boost::asio::ip::udp;

void get_daytime(boost::asio::io_service & io, const char * host_name)
{
    try
    {
        udp::resolver resv(io);
        std::future<udp::resolver::iterator> iter = resv.async_resolve({ udp::v4(), host_name, "daytime" }, boost::asio::use_future);
        udp::socket sock(io, udp::v4());

        std::array<char, 1> send_buf = { { 0 } };
        std::future < std::size_t> send_length = sock.async_send_to(boost::asio::buffer(send_buf), *iter.get(),
            boost::asio::use_future);

        send_length.get();//阻塞,直到發送完成

        std::array<char, 128> recv_buf;
        udp::endpoint sender_endpoint;
        std::future<std::size_t> recv_length = sock.async_receive_from(boost::asio::buffer(recv_buf),
            sender_endpoint,
            boost::asio::use_future);

        //當接收完成去作其它事

        std::cout.write(recv_buf.data(), recv_length.get());

    }
    catch (std::exception &e)
    {
        std::cerr << e.what() << std::endl;
    }
}

int _tmain(int argc, _TCHAR* argv[])
{
    try
    {
        boost::asio::io_service io;
        boost::asio::io_service::work work(io);
        std::thread t([&io](){io.run(); });

        get_daytime(io, "127.0.0.1");
        io.stop();
        t.join();
    }
    catch (std::exception & e)
    {
        std::cerr << e.what() << std::endl;
    }
    return 0;
}

 

知識點:

  1.  io_service::work 這是一個很小的輔助類,只支持構造函數和析構函數。構造一個 work時,outstanding_work_+1,使得io.run在完成異步消息以後判斷outstanding_work_時不爲0,於是會使io.run()不至於返回。通俗的講它就是讓io.run一直運行不退出,只到work析構。
  2. std::future 他是獲取異步執行函數的返回值的,至關於你建立了一個線程線程在計算某個結果,你要獲得這個結果時,你得同步一下,還要看一下,結果算完了沒有。future就是作這件事的。關於這個std::future我會另外開一篇文章寫一下。這裏有一篇文件詳細介紹一下這個std::future幹了什麼http://blog.csdn.net/wangshubo1989/article/details/49872199
  3. io.stop 這個函數是告訴io_service要中止 。

 

 

18.HttpServer

本例用boost asio  寫了一個簡易http服務器,與前面的相比新的知識點很少。

下面提供源碼下載:

 

源碼下載

相關文章
相關標籤/搜索