一、placement new進行內存重用react
boost/asio/detail/reactive_socket_service_base.hpp中,async_receive須要建立一個reactive_socket_recv_op,該對象不是直接從系統new出來的,而是先查找空閒列表,找到一塊可以放得下該op對象的內存(boost_asio_handler_alloc_helper::allocate),而後對該內存進行placement new構造對象。windows
// Allocate and construct an operation to wrap the handler. typedef reactive_socket_recv_op<MutableBufferSequence, Handler> op; typename op::ptr p = { boost::asio::detail::addressof(handler), boost_asio_handler_alloc_helpers::allocate( sizeof(op), handler), 0 }; p.p = new (p.v) op(impl.socket_, impl.state_, buffers, flags, handler);
二、hash_map的實現多線程
在boost/detail/hash_map.hpp中,利用std::list實現了hash_map。做者並無爲每一個bucket建一個容器來存放拉鍊的值,而是隻用了一個std::list用於存放全部的值,每一個bucket存放的是該bucket中元素在list中的起始與終止iterator。hash_map在select_reactor.ipp中被用到,用於存儲socket到對應op的映射。app
三、async_send/async_receive中的傳入的bufferssocket
template <typename ConstBufferSequence, typename Handler> void async_send(base_implementation_type& impl, const ConstBufferSequence& buffers, socket_base::message_flags flags, Handler& handler)
async_send/async_receive是模板方法,發送的內容是一個buffers的序列,最終底層調用async
ssize_t sendmsg(int sockfd, const struct msghdr *msg, int flags);
或者tcp
int WSAAPI WSASend ( SOCKET s, LPWSABUF lpBuffers, DWORD dwBufferCount, LPDWORD lpNumberOfBytesSent, int iFlags, LPWSAOVERLAPPED lpOverlapped, LPWSAOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine );
這樣多個buffer不用先合成一個大的buffer,需是直接交給底層發送,相似於writev/readv。函數
這裏的BufferSequence能夠是簡單的const_buffers_1/mutable_buffers_1,底層只包含一個const_buffer或mutable_buffer。用boost.asio.buffer對char*,len包裝生成const_buffers_1或mutable_buffers_1。BufferSequence也能夠是std::vector<const_buffer> 或是boost::array<mutable_buffer, 3>等。全部這些BufferSequence須要支持begin()與end()。this
因爲 typedef reactive_socket_send_op<ConstBufferSequence, Handler> op也是模板定義,不一樣的ConstBufferSequence會具現化不一樣的reactive_socket_sendop,在reactive_socket_sendop中會存放ConstBufferSequence,當socket可寫時,回調reactive_socket_send_op_base的perform。線程
static bool do_perform(reactor_op* base) { reactive_socket_send_op_base* o( static_cast<reactive_socket_send_op_base*>(base)); buffer_sequence_adapter<boost::asio::const_buffer, ConstBufferSequence> bufs(o->buffers_); return socket_ops::non_blocking_send(o->socket_, bufs.buffers(), bufs.count(), o->flags_, o->ec_, o->bytes_transferred_); }
由模板類buffer_sequence_adapter使用偏特化機制,將BufferSequence中每一個Buffer的指針、長度信息寫入LPWSABUF或是struct iovec*中,再由non_blocking_send調用WSASend或sendmsg發送數據。
四、 win_iocp_io_service -------- task_io_service
win_iocp_socket_service -------- reactive_socket_service
其中reactive_socket_service使用reactor來模擬proactor效果。
(reactor主要有epoll_reactor/select_reactor/dev_poll_reactor/kqueue_reactor)
五、asio中的超時
async_send/async_receive中都不能帶超時,只能用另一個deadline_timer來實現,這樣形成超時的代碼與發送接收的回調代碼只能分開來寫,很不方便。 實際上,在reactor上加上超時仍是比較容易的,但多是windows的iocp卻沒有什麼好的辦法,咱們不能在iocp上面自由地取消一個操做,而只能取消一個socket上的全部操做或是關閉套節字,因此只能取交集了。
windows下的超時使用CreateWaitableTimer/SetWaitableTimer並用獨立線程來實現超時機制。(是否能夠用RegisterWaitForSingleObject與UnregisterWaitEx函數來實現或者用timeSetEvent/timeKillEvent來實現?)
六、epoll_reactor中的per_descriptor_data
每一個套節字會在epoll_reactor中註冊,由allocate_descriptor_state分配一個per_descriptor_data,存放在object_pool<descriptor_state> registered_descriptors_中; 而object_pool中包含兩個list,live_list_一個用於存放當前已分配的descriptor_state,free_list_用於存放釋放的descriptor_state,實現循環利用。
int epoll_reactor::register_descriptor(socket_type descriptor, epoll_reactor::per_descriptor_data& descriptor_data) { descriptor_data = allocate_descriptor_state(); { mutex::scoped_lock descriptor_lock(descriptor_data->mutex_); descriptor_data->reactor_ = this; descriptor_data->descriptor_ = descriptor; descriptor_data->shutdown_ = false; } epoll_event ev = { 0, { 0 } }; ev.events = EPOLLIN | EPOLLERR | EPOLLHUP | EPOLLPRI | EPOLLET; descriptor_data->registered_events_ = ev.events; ev.data.ptr = descriptor_data; int result = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, descriptor, &ev); if (result != 0) return errno; return 0; }
epoll_reactor::descriptor_state* epoll_reactor::allocate_descriptor_state() { mutex::scoped_lock descriptors_lock(registered_descriptors_mutex_); return registered_descriptors_.alloc(); }
epoll_event ev = { 0, { 0 } }; ev.events = descriptor_data->registered_events_ | EPOLLOUT; ev.data.ptr = descriptor_data; if (epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, descriptor, &ev) == 0) { descriptor_data->registered_events_ |= ev.events; }
當epoll_wait返回時,直接將ev.data.ptr做爲descriptor_data,而後進行處理。 這在單線程下沒什麼問題。但若是在多線程環境下,epoll_wait期間,該descriptor_data可能先被被釋放(如調用tcp.socket.close)存入free_list_,而後再被另外一個tcp.socket分配,這樣會形成另外一個socket產生錯誤的回調。這應該是個bug吧。。。
七、boost.asio.write與async_write_some
boost.asio.write循環調用async_write_some實現數據發送. asio/impl/write.hpp中的write_op的代碼用於回調,operator()中第一次是start=1,之後的start都是0。這裏的代碼有點奇怪,switch裏的default在for循環中。
void operator()(const boost::system::error_code& ec, std::size_t bytes_transferred, int start = 0) { switch (start_ = start) { case 1: buffers_.prepare(this->check_for_completion(ec, total_transferred_)); for (;;) { stream_.async_write_some(buffers_, BOOST_ASIO_MOVE_CAST(write_op)(*this)); return; default: total_transferred_ += bytes_transferred; buffers_.consume(bytes_transferred); buffers_.prepare(this->check_for_completion(ec, total_transferred_)); if ((!ec && bytes_transferred == 0) || buffers_.begin() == buffers_.end()) break; }
若是是本身寫的話,估計是這樣(boost的代碼中能夠節省一行async_write_some代碼,大牛的思想果真不同凡響):
void operator()(const boost::system::error_code& ec, std::size_t bytes_transferred, int start = 0) { switch (start_ = start) { case 1: buffers_.prepare(this->check_for_completion(ec, total_transferred_)); stream_.async_write_some(buffers_,BOOST_ASIO_MOVE_CAST(write_op)(*this)); return; default: total_transferred_ += bytes_transferred; buffers_.consume(bytes_transferred); buffers_.prepare(this->check_for_completion(ec, total_transferred_)); total_transferred_ += bytes_transferred; if ((!ec && bytes_transferred == 0)|| buffers_.begin() == buffers_.end()) break; stream_.async_write_some(buffers_,BOOST_ASIO_MOVE_CAST(write_op)(*this)); return; } }
[to be continue]