一、實現多線程方法:ios
其實就是多個線程同時調用io_service::runwindows
for (int i = 0; i != m_nThreads; ++i)
{
boost::shared_ptr<boost::thread> pTh(new boost::thread(
boost::bind(&boost::asio::io_service::run,&m_ioService)));
m_listThread.push_back(pTh);
}安全
二、多線程調度狀況:服務器
asio規定:只能在調用io_service::run的線程中才能調用事件完成處理器。多線程
注:事件完成處理器就是你async_accept、async_write等註冊的句柄,相似於回調的東西。併發
單線程:socket
若是隻有一個線程調用io_service::run,根據asio的規定,事件完成處理器也只能在這個線程中執行。也就是說,你全部代碼都在同一個線程中運行,所以變量的訪問是安全的。async
多線程:tcp
若是有多個線程同時調用io_service::run以實現多線程併發處理。對於asio來講,這些線程都是平等的,沒有主次之分。若是你投遞的一個請求好比async_write完成時,asio將隨機的激活調用io_service::run的線程。並在這個線程中調用事件完成處理器(async_write當時註冊的句柄)。若是你的代碼耗時較長,這個時候你投遞的另外一個async_write請求完成時,asio將不等待你的代碼處理完成,它將在另外的一個調用io_service::run線程中,調用async_write當時註冊的句柄。也就是說,你註冊的事件完成處理器有可能同時在多個線程中調用。測試
固然你可使用 boost::asio::io_service::strand讓完成事件處理器的調用,在同一時間只有一個, 好比下面的的代碼:
socket_.async_read_some(boost::asio::buffer(buffer_),
strand_.wrap(
boost::bind(&connection::handle_read, shared_from_this(),
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred)));
...
boost::asio::io_service::strand strand_;
此時async_read_som完成後掉用handle_read時,必須等待其它handle_read調用完成時才能被執行(async_read_som引發的handle_read調用)。
多線程調用時,還有一個重要的問題,那就是無序化。好比說,你短期內投遞多個async_write,那麼完成處理器的調用並非按照你投遞async_write的順序調用的。asio第一次調用完成事件處理器,有多是第二次async_write返回的結果,也有多是第3次的。使用strand也是這樣的。strand只是保證同一時間只運行一個完成處理器,但它並不保證順序。
代碼測試:
服務器:
將下面的代碼編譯之後,使用cmd命令提示符下傳人蔘數<IP> <port> <threads>調用
好比:test.exe 0.0.0.0 3005 10
客服端 使用windows自帶的telnet
cmd命令提示符:
telnet 127.0.0.1 3005
原理:客戶端鏈接成功後,同一時間調用100次boost::asio::async_write給客戶端發送數據,而且在完成事件處理器中打印調用序號,和線程ID。
核心代碼:
void start()
{
for (int i = 0; i != 100; ++i)
{
boost::shared_ptr<string> pStr(new string);
*pStr = boost::lexical_cast<string>(boost::this_thread::get_id());
*pStr += "\r\n";
boost::asio::async_write(m_nSocket,boost::asio::buffer(*pStr),
boost::bind(&CMyTcpConnection::HandleWrite,shared_from_this(),
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred,
pStr,i)
);
}
}
//去掉 boost::mutex::scoped_lock lk(m_ioMutex); 效果更明顯。
void HandleWrite(const boost::system::error_code& error
,std::size_t bytes_transferred
,boost::shared_ptr<string> pStr,int nIndex)
{
if (!error)
{
boost::mutex::scoped_lock lk(m_ioMutex);
cout << "發送序號=" << nIndex << ",線程id=" << boost::this_thread::get_id() << endl;
}
else
{
cout << "鏈接斷開" << endl;
}
}
完整代碼:
#include <boost/bind.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/asio.hpp>
#include <boost/lexical_cast.hpp>
#include <boost/thread.hpp>
#include <boost/thread/mutex.hpp>
#include <string>
#include <iostream>
using std::cout;
using std::endl;
using std::string;
using boost::asio::ip::tcp;
class CMyTcpConnection
: public boost::enable_shared_from_this<CMyTcpConnection>
{
public:
CMyTcpConnection(boost::asio::io_service &ser)
:m_nSocket(ser)
{
}
typedef boost::shared_ptr<CMyTcpConnection> CPMyTcpCon;
static CPMyTcpCon CreateNew(boost::asio::io_service& io_service)
{
return CPMyTcpCon(new CMyTcpConnection(io_service));
}
public:
void start()
{
for (int i = 0; i != 100; ++i)
{
boost::shared_ptr<string> pStr(new string);
*pStr = boost::lexical_cast<string>(boost::this_thread::get_id());
*pStr += "\r\n";
boost::asio::async_write(m_nSocket,boost::asio::buffer(*pStr),
boost::bind(&CMyTcpConnection::HandleWrite,shared_from_this(),
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred,
pStr,i)
);
}
}
tcp::socket& socket()
{
return m_nSocket;
}
private:
void HandleWrite(const boost::system::error_code& error
,std::size_t bytes_transferred
,boost::shared_ptr<string> pStr,int nIndex)
{
if (!error)
{
boost::mutex::scoped_lock lk(m_ioMutex);
cout << "發送序號=" << nIndex << ",線程id=" << boost::this_thread::get_id() << endl;
}
else
{
cout << "鏈接斷開" << endl;
}
}
private:
tcp::socket m_nSocket;
boost::mutex m_ioMutex;
};
class CMyService
: private boost::noncopyable
{
public:
CMyService(string const &strIP,string const &strPort,int nThreads)
:m_tcpAcceptor(m_ioService)
,m_nThreads(nThreads)
{
tcp::resolver resolver(m_ioService);
tcp::resolver::query query(strIP,strPort);
tcp::resolver::iterator endpoint_iterator = resolver.resolve(query);
boost::asio::ip::tcp::endpoint endpoint = *resolver.resolve(query);
m_tcpAcceptor.open(endpoint.protocol());
m_tcpAcceptor.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true));
m_tcpAcceptor.bind(endpoint);
m_tcpAcceptor.listen();
StartAccept();
}
~CMyService(){Stop();}
public:
void Stop()
{
m_ioService.stop();
for (std::vector<boost::shared_ptr<boost::thread>>::const_iterator it = m_listThread.cbegin();
it != m_listThread.cend(); ++ it)
{
(*it)->join();
}
}
void Start()
{
for (int i = 0; i != m_nThreads; ++i)
{
boost::shared_ptr<boost::thread> pTh(new boost::thread(
boost::bind(&boost::asio::io_service::run,&m_ioService)));
m_listThread.push_back(pTh);
}
}
private:
void HandleAccept(const boost::system::error_code& error
,boost::shared_ptr<CMyTcpConnection> newConnect)
{
if (!error)
{
newConnect->start();
}
StartAccept();
}
void StartAccept()
{
CMyTcpConnection::CPMyTcpCon newConnect = CMyTcpConnection::CreateNew(m_tcpAcceptor.get_io_service());
m_tcpAcceptor.async_accept(newConnect->socket(),
boost::bind(&CMyService::HandleAccept, this,
boost::asio::placeholders::error,newConnect));
}
private:
boost::asio::io_service m_ioService;
boost::asio::ip::tcp::acceptor m_tcpAcceptor;
std::vector<boost::shared_ptr<boost::thread>> m_listThread;
std::size_t m_nThreads;
};
int main(int argc, char* argv[])
{
try
{
if (argc != 4)
{
std::cerr << "<IP> <port> <threads>\n";
return 1;
}
int nThreads = boost::lexical_cast<int>(argv[3]);
CMyService mySer(argv[1],argv[2],nThreads);
mySer.Start();
getchar();
mySer.Stop();
}
catch (std::exception& e)
{
std::cerr << "Exception: " << e.what() << "\n";
}
return 0;
}
測試發現和上面的理論是一致的,發送序號是亂的,線程ID也不是同一個。
asio多線程中線程的合理個數:
做爲服務器,在不考慮省電的狀況下,應該儘量的使用cpu。也就是說,爲了讓cpu都忙起來,你的線程個數應該大於等於你電腦的cpu核心數(一個核心運行一個線程)。具體的值沒有最優方案,大多數人使用cpu核心數*2 + 2的這種方案,但它不必定適合你的狀況。
asio在windows xp等系統中的實現:
asio在windows下使用完成端口,若是你投遞的請求沒有完成,那麼這些線程都在等待GetQueuedCompletionStatus的返回,也就是等待內核對象,此時線程是不佔用cpu時間的。