boost.asio代碼學習

一、placement new進行內存重用react

boost/asio/detail/reactive_socket_service_base.hpp中,async_receive須要建立一個reactive_socket_recv_op,該對象不是直接從系統new出來的,而是先查找空閒列表,找到一塊可以放得下該op對象的內存(boost_asio_handler_alloc_helper::allocate),而後對該內存進行placement new構造對象。windows

// Allocate and construct an operation to wrap the handler.
    typedef reactive_socket_recv_op<MutableBufferSequence, Handler> op;
    typename op::ptr p = { boost::asio::detail::addressof(handler),
      boost_asio_handler_alloc_helpers::allocate(
        sizeof(op), handler), 0 };
    p.p = new (p.v) op(impl.socket_, impl.state_, buffers, flags, handler);

二、hash_map的實現多線程

在boost/detail/hash_map.hpp中,利用std::list實現了hash_map。做者並無爲每一個bucket建一個容器來存放拉鍊的值,而是隻用了一個std::list用於存放全部的值,每一個bucket存放的是該bucket中元素在list中的起始與終止iterator。hash_map在select_reactor.ipp中被用到,用於存儲socket到對應op的映射。app

三、async_send/async_receive中的傳入的bufferssocket

template <typename ConstBufferSequence, typename Handler>
  void async_send(base_implementation_type& impl,
      const ConstBufferSequence& buffers,
      socket_base::message_flags flags, Handler& handler)

async_send/async_receive是模板方法,發送的內容是一個buffers的序列,最終底層調用async

ssize_t sendmsg(int sockfd, const struct msghdr *msg, int flags);

或者tcp

int WSAAPI WSASend (

  SOCKET s,

  LPWSABUF lpBuffers,

  DWORD dwBufferCount,

  LPDWORD lpNumberOfBytesSent,

  int iFlags,

  LPWSAOVERLAPPED lpOverlapped,

  LPWSAOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine

  );

這樣多個buffer不用先合成一個大的buffer,需是直接交給底層發送,相似於writev/readv。函數

這裏的BufferSequence能夠是簡單的const_buffers_1/mutable_buffers_1,底層只包含一個const_buffer或mutable_buffer。用boost.asio.buffer對char*,len包裝生成const_buffers_1或mutable_buffers_1。BufferSequence也能夠是std::vector<const_buffer> 或是boost::array<mutable_buffer, 3>等。全部這些BufferSequence須要支持begin()與end()。this

因爲 typedef reactive_socket_send_op<ConstBufferSequence, Handler> op也是模板定義,不一樣的ConstBufferSequence會具現化不一樣的reactive_socket_sendop,在reactive_socket_sendop中會存放ConstBufferSequence,當socket可寫時,回調reactive_socket_send_op_base的perform。線程

static bool do_perform(reactor_op* base)
  {
    reactive_socket_send_op_base* o(
        static_cast<reactive_socket_send_op_base*>(base));

    buffer_sequence_adapter<boost::asio::const_buffer,
        ConstBufferSequence> bufs(o->buffers_);

    return socket_ops::non_blocking_send(o->socket_,
          bufs.buffers(), bufs.count(), o->flags_,
          o->ec_, o->bytes_transferred_);
  }

由模板類buffer_sequence_adapter使用偏特化機制,將BufferSequence中每一個Buffer的指針、長度信息寫入LPWSABUF或是struct iovec*中,再由non_blocking_send調用WSASend或sendmsg發送數據。

四、  win_iocp_io_service        --------  task_io_service

       win_iocp_socket_service --------  reactive_socket_service   

       其中reactive_socket_service使用reactor來模擬proactor效果。

       (reactor主要有epoll_reactor/select_reactor/dev_poll_reactor/kqueue_reactor)

五、asio中的超時

      async_send/async_receive中都不能帶超時,只能用另一個deadline_timer來實現,這樣形成超時的代碼與發送接收的回調代碼只能分開來寫,很不方便。 實際上,在reactor上加上超時仍是比較容易的,但多是windows的iocp卻沒有什麼好的辦法,咱們不能在iocp上面自由地取消一個操做,而只能取消一個socket上的全部操做或是關閉套節字,因此只能取交集了。

    windows下的超時使用CreateWaitableTimer/SetWaitableTimer並用獨立線程來實現超時機制。(是否能夠用RegisterWaitForSingleObject與UnregisterWaitEx函數來實現或者用timeSetEvent/timeKillEvent來實現?)

六、epoll_reactor中的per_descriptor_data

每一個套節字會在epoll_reactor中註冊,由allocate_descriptor_state分配一個per_descriptor_data,存放在object_pool<descriptor_state> registered_descriptors_中; 而object_pool中包含兩個list,live_list_一個用於存放當前已分配的descriptor_state,free_list_用於存放釋放的descriptor_state,實現循環利用。

int epoll_reactor::register_descriptor(socket_type descriptor,
    epoll_reactor::per_descriptor_data& descriptor_data)
{
  descriptor_data = allocate_descriptor_state();

  {
    mutex::scoped_lock descriptor_lock(descriptor_data->mutex_);

    descriptor_data->reactor_ = this;
    descriptor_data->descriptor_ = descriptor;
    descriptor_data->shutdown_ = false;
  }

  epoll_event ev = { 0, { 0 } };
  ev.events = EPOLLIN | EPOLLERR | EPOLLHUP | EPOLLPRI | EPOLLET;
  descriptor_data->registered_events_ = ev.events;
  ev.data.ptr = descriptor_data;
  int result = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, descriptor, &ev);
  if (result != 0)
    return errno;

  return 0;
}
epoll_reactor::descriptor_state* epoll_reactor::allocate_descriptor_state()
{
  mutex::scoped_lock descriptors_lock(registered_descriptors_mutex_);
  return registered_descriptors_.alloc();
}
epoll_event ev = { 0, { 0 } };
          ev.events = descriptor_data->registered_events_ | EPOLLOUT;
          ev.data.ptr = descriptor_data;
          if (epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, descriptor, &ev) == 0)
          {
            descriptor_data->registered_events_ |= ev.events;
          }

當epoll_wait返回時,直接將ev.data.ptr做爲descriptor_data,而後進行處理。 這在單線程下沒什麼問題。但若是在多線程環境下,epoll_wait期間,該descriptor_data可能先被被釋放(如調用tcp.socket.close)存入free_list_,而後再被另外一個tcp.socket分配,這樣會形成另外一個socket產生錯誤的回調。這應該是個bug吧。。。

七、boost.asio.write與async_write_some

boost.asio.write循環調用async_write_some實現數據發送. asio/impl/write.hpp中的write_op的代碼用於回調,operator()中第一次是start=1,之後的start都是0。這裏的代碼有點奇怪,switch裏的default在for循環中。

void operator()(const boost::system::error_code& ec,
        std::size_t bytes_transferred, int start = 0)
    {
      switch (start_ = start)
      {
        case 1:
        buffers_.prepare(this->check_for_completion(ec, total_transferred_));
        for (;;)
        {
          stream_.async_write_some(buffers_,
              BOOST_ASIO_MOVE_CAST(write_op)(*this));
          return; default:
          total_transferred_ += bytes_transferred;
          buffers_.consume(bytes_transferred);
          buffers_.prepare(this->check_for_completion(ec, total_transferred_));
          if ((!ec && bytes_transferred == 0)
              || buffers_.begin() == buffers_.end())
            break;
        }

若是是本身寫的話,估計是這樣(boost的代碼中能夠節省一行async_write_some代碼,大牛的思想果真不同凡響):

void operator()(const boost::system::error_code& ec,
        std::size_t bytes_transferred, int start = 0)
    {
      switch (start_ = start)
      {
	    case 1:
        	buffers_.prepare(this->check_for_completion(ec, total_transferred_));
        	stream_.async_write_some(buffers_,BOOST_ASIO_MOVE_CAST(write_op)(*this));
          	return; 
	    default:
        	total_transferred_ += bytes_transferred;
        	buffers_.consume(bytes_transferred);
        	buffers_.prepare(this->check_for_completion(ec, total_transferred_));
        	total_transferred_ += bytes_transferred;
          	if ((!ec && bytes_transferred == 0)|| buffers_.begin() == buffers_.end())
			    break;
        	stream_.async_write_some(buffers_,BOOST_ASIO_MOVE_CAST(write_op)(*this));
          	return; 
        }
    }

[to be continue]

相關文章
相關標籤/搜索