Envoy 源碼分析--network

Envoy 源碼分析--network

申明:本文的 Envoy 源碼分析基於 Envoy1.10.0。api

Envoy 的服務是通用服務,所以它須要支持 TCPUDP,同時還需支持 IPV4IPV6 兩種網絡協議,因此網絡模塊有點複雜。本次分析的網絡模塊是底層的模塊,沒有一整個服務的啓動流程,有的地方可能還串不起來。如今先來看下UML類圖:服務器

類圖看上去略顯複雜,主要分爲4塊:addressocketlistenconnection網絡

  • address 是地址相關的,主要包括 IPV4IPV6PIPEDNScidr
  • socketsocket 相關的操做,主要包括 ListenSocketConnectionSocketTransportSocket 以及 option
  • listen 是網絡監聽操做,包括 TCP 監聽和 UDP 監聽。
  • connection 是鏈接相關操做。關於 L3/4 過濾的此次暫時不分析,後續再講。

address

InstanceBase 繼承自 Instance 是全部地址類型的基類。Ipv4InstanceIpv6InstancePipeInstance 三個地址類都是繼承 InstanceBaseDNS解析類使用 c-ares 庫,DnsResolverImpl 只是對 c-ares 的進一步封裝。 CidrRange 是對 cidr 操做相關。app

系統操做返回的值和錯誤信息封裝成一個公用的結構體。具體以下:less

template <typename T> struct SysCallResult {
  //系統返回值
  T rc_;
  //系統返回的錯誤信息
  int errno_;
};

Instance

Ipv4InstanceIpv6InstancePipeInstance 三個地址類都是繼承 InstanceBase。它們的實現基本都差很少,socket()bind()connect() 這三個基礎操做都屬於它們的成員。如今咱們主要來看下 Ipv4Instance 幾個主要的操做(其它兩個類相似就再也不分析)。curl

Ipv4Instance 的類裏有個私有結構體 IpHelper 。這結構體封裝着 IPV4 地址的具體內容,好比端口,版本等異步

struct IpHelper : public Ip {
  const std::string& addressAsString() const override { return friendly_address_; }
  bool isAnyAddress() const override { return ipv4_.address_.sin_addr.s_addr == INADDR_ANY; }
  bool isUnicastAddress() const override {
  return !isAnyAddress() && (ipv4_.address_.sin_addr.s_addr != INADDR_BROADCAST) &&
             // inlined IN_MULTICAST() to avoid byte swapping
             !((ipv4_.address_.sin_addr.s_addr & htonl(0xf0000000)) == htonl(0xe0000000));
  }
  const Ipv4* ipv4() const override { return &ipv4_; }
  const Ipv6* ipv6() const override { return nullptr; }
  uint32_t port() const override { return ntohs(ipv4_.address_.sin_port); }
  IpVersion version() const override { return IpVersion::v4; }

  Ipv4Helper ipv4_;
  std::string friendly_address_;
};

bind()socket()connect() 基本都是直接調的底層函數。socket

Api::SysCallIntResult Ipv6Instance::bind(int fd) const {
  const int rc = ::bind(fd, reinterpret_cast<const sockaddr*>(&ip_.ipv6_.address_),
                        sizeof(ip_.ipv6_.address_));
  return {rc, errno};
}

Api::SysCallIntResult Ipv6Instance::connect(int fd) const {
  const int rc = ::connect(fd, reinterpret_cast<const sockaddr*>(&ip_.ipv6_.address_),
                           sizeof(ip_.ipv6_.address_));
  return {rc, errno};
}

DNS

DNS 使用 c-ares 做爲底層庫。 c-ares 是個 c 實現的異步 DNS 解析庫,不少知名軟件(curl,Nodejs,gevent 等)都使用了該庫。ide

c-ares 在構造函數內初始化庫,初始化上下文,而後設置 DNS 服務器。

DnsResolverImpl::DnsResolverImpl(
    Event::Dispatcher& dispatcher,
    const std::vector<Network::Address::InstanceConstSharedPtr>& resolvers)
        : dispatcher_(dispatcher),
      timer_(dispatcher.createTimer([this] { onEventCallback(ARES_SOCKET_BAD, 0); })) {
  //初始化庫
  ares_library_init(ARES_LIB_INIT_ALL);
  ares_options options;
  //初始化上下文
  initializeChannel(&options, 0);
  ... ...
  const std::string resolvers_csv = StringUtil::join(resolver_addrs, ",");
  //設置 DNS 服務器
  int result = ares_set_servers_ports_csv(channel_, resolvers_csv.c_str());
}

使用時直接 resolve() 結果返回在 callback 裏。

ActiveDnsQuery* DnsResolverImpl::resolve(const std::string& dns_name,
                                         DnsLookupFamily dns_lookup_family, ResolveCb callback) {
  ... ...
  if (dns_lookup_family == DnsLookupFamily::V4Only) {
    pending_resolution->getHostByName(AF_INET);
  } else {
    pending_resolution->getHostByName(AF_INET6);
  }
  ... ...
}

void DnsResolverImpl::PendingResolution::getHostByName(int family) {
  ares_gethostbyname(channel_, dns_name_.c_str(), family,
                     [](void* arg, int status, int timeouts, hostent* hostent) {
                       static_cast<PendingResolution*>(arg)->onAresHostCallback(status, timeouts, hostent);
                     },
                     this);
}

void DnsResolverImpl::PendingResolution::onAresHostCallback(int status, int timeouts, hostent* hostent) {

  ... ...
  //解析內容加入address_list
  std::list<Address::InstanceConstSharedPtr> address_list;
  if (status == ARES_SUCCESS) {
    if (hostent->h_addrtype == AF_INET) {
      for (int i = 0; hostent->h_addr_list[i] != nullptr; ++i) {
        ASSERT(hostent->h_length == sizeof(in_addr));
        sockaddr_in address;
        memset(&address, 0, sizeof(address));
        address.sin_family = AF_INET;
        address.sin_port = 0;
        address.sin_addr = *reinterpret_cast<in_addr*>(hostent->h_addr_list[i]);
        address_list.emplace_back(new Address::Ipv4Instance(&address));
      }
      ... ...
  }

  if (completed_) {
    if (!cancelled_) {
      try {
        //調用回調
        callback_(std::move(address_list));
      } catch (const EnvoyException& e) {
      ... ...
}

cidr

cidr 的定義是形如 192.168.0.1/24 的 IP 段。想知道具體的定義和 IP 段 可看 cidr

CidrRangecidr 拆分紅兩字段地址和長度。下面是判斷地址是否屬於這個 IP 段。

bool CidrRange::isInRange(const Instance& address) const {
  ... ...
  //長度爲0,全匹配(length_初始值爲-1)
  if (length_ == 0) {
    return true;
  }

  switch (address.ip()->version()) {
  case IpVersion::v4:
    if (ntohl(address.ip()->ipv4()->address()) >> (32 - length_) ==
        ntohl(address_->ip()->ipv4()->address()) >> (32 - length_)) {
      return true;
    }
    break;
  case IpVersion::v6:
    if ((Utility::Ip6ntohl(address_->ip()->ipv6()->address()) >> (128 - length_)) ==
        (Utility::Ip6ntohl(address.ip()->ipv6()->address()) >> (128 - length_))) {
      return true;
    }
    break;
  }
  return false;
}

socket

咱們都知道,建立 TCP 服務時,監聽的 fd 和鏈接的 fd 是不同的,所以 socket 分爲 ListenSocketConnectionSocketsocket 裏有不少的配置(好比讀超時,寫超時等)都是調用setsockopt,全部須要一個 Option 來進行統一的封裝。

Option

Option 是對 setsockopt 這個函數操做的封裝。封裝後再用智能指針的方式進行操做。

typedef std::shared_ptr<const Option> OptionConstSharedPtr;
typedef std::vector<OptionConstSharedPtr> Options;
typedef std::shared_ptr<Options> OptionsSharedPtr;

Option 在所有設置完後,在 applyOptions後,最終仍是調用 setsockopt

static bool applyOptions(const OptionsSharedPtr& options, Socket& socket,
                       envoy::api::v2::core::SocketOption::SocketState state) {
  if (options == nullptr) {
    return true;
  }
  for (const auto& option : *options) {
    //對全部的option 進行設置
    if (!option->setOption(socket, state)) {
      return false;
    }
  }
  return true;
}

bool SocketOptionImpl::setOption(Socket& socket,
                                 envoy::api::v2::core::SocketOption::SocketState state) const {
  if (in_state_ == state) {
    //調用成員函數 setSocketOption
    const Api::SysCallIntResult result = SocketOptionImpl::setSocketOption(socket, optname_, value_);
  ... ...
  return true;
}

Api::SysCallIntResult SocketOptionImpl::setSocketOption(Socket& socket, Network::SocketOptionName optname, const absl::string_view value) {
  ... ...
  //最終調用系統函數setsockopt
  return os_syscalls.setsockopt(socket.ioHandle().fd(), optname.value().first,
                                optname.value().second, value.data(), value.size());
}

Socket

Socket 提供基本的 socket 操做。主要是 'Option' 操做(上面已分析過)和地址操做。代碼比較簡單。

//設置和獲取本地地址
const Address::InstanceConstSharedPtr& localAddress() const override { return local_address_; }
void setLocalAddress(const Address::InstanceConstSharedPtr& local_address) override {
  local_address_ = local_address;
}

ListenSocket

ListenSocket 是對監聽 fd 的封裝,繼承自 Socket。主要操做天然就是 bind()。bind 調用自地址類的 bind() 函數(看上面的 address)。

void ListenSocketImpl::doBind() {
  // 地址和handle 繼承自socket。調用地址類的 bind。
  const Api::SysCallIntResult result = local_address_->bind(io_handle_->fd());
  if (result.rc_ == -1) {
    close();
    throw SocketBindException(
        fmt::format("cannot bind '{}': {}", local_address_->asString(), strerror(result.errno_)),
        result.errno_);
  }
  if (local_address_->type() == Address::Type::Ip && local_address_->ip()->port() == 0) {
    // If the port we bind is zero, then the OS will pick a free port for us (assuming there are
    // any), and we need to find out the port number that the OS picked.
    local_address_ = Address::addressFromFd(io_handle_->fd());
  }

ConnectionSocket

ConnectionSocket 是對鏈接 fd 的封裝,除了 Socket 的基本操做外,還增長對遠程地址和協議的設置。

//設置和獲取遠程地址
const Address::InstanceConstSharedPtr& remoteAddress() const override { return remote_address_; }
void setRemoteAddress(const Address::InstanceConstSharedPtr& remote_address) override {
  remote_address_ = remote_address;
}

//協議相關
void setDetectedTransportProtocol(absl::string_view protocol) override {
 transport_protocol_ = std::string(protocol);
}
absl::string_view detectedTransportProtocol() const override { return transport_protocol_; }

TransportSocket

TransportSocket 是一個實際讀/寫的傳輸套接字。它能夠對數據進行一些轉換(好比TLS,TCP代理等)。 TransportSocket 提供了多個接口。

failureReason() 返回最後的一個錯誤,沒錯誤返回空值。

canFlushClose() socket 是否能刷新和關閉。

closeSocket() 關閉 socket。

doRead() 讀取數據。

doWrite() 寫數據。

onConnected() transport 鏈接時調用此函數。

Ssl::ConnectionInfo* ssl() Ssl鏈接數據。

listen

listen 是對監聽操做相關的類,分爲 TcpListenUdpListen。 Listen 抽象類只提供兩個接口 disableenabledisable 關閉接受新鏈接,enable開啓接受新鏈接。

ListenerImpl 實現那兩接口的同時,因爲它是 TCP 的監聽必然就有 listen 和 accept 操做。在構造函數時,調用 setupServerSocket 創造 listen,啓用回調

void ListenerImpl::
setupServerSocket(Event::DispatcherImpl& dispatcher, Socket& socket) {
  //建立監聽,完成後回調 listenCallback
  listener_.reset(
      evconnlistener_new(&dispatcher.base(), listenCallback, this, 0, -1, socket.ioHandle().fd()));
  ... ...
  //失敗回調errorCallback
  evconnlistener_set_error_cb(listener_.get(), errorCallback);
}

監聽完成後,調用 listenCallback。listenCallback 用回調函數調用 onAccept 接收鏈接。

void ListenerImpl::listenCallback(evconnlistener*, evutil_socket_t fd, sockaddr* remote_addr, int remote_addr_len, void* arg) {
  ListenerImpl* listener = static_cast<ListenerImpl*>(arg);

  IoHandlePtr io_handle = std::make_unique<IoSocketHandleImpl>(fd);

  // 獲取本地地址
  const Address::InstanceConstSharedPtr& local_address =
      listener->local_address_ ? listener->local_address_
                               : listener->getLocalAddress(io_handle->fd());
  // 獲取遠程地址
  const Address::InstanceConstSharedPtr& remote_address =
      (remote_addr->sa_family == AF_UNIX)
          ? Address::peerAddressFromFd(io_handle->fd())
          : Address::addressFromSockAddr(*reinterpret_cast<const sockaddr_storage*>(remote_addr),
                                         remote_addr_len,
                                         local_address->ip()->version() == Address::IpVersion::v6);
  //調用 onAccept,
  listener->cb_.onAccept(
      std::make_unique<AcceptedSocketImpl>(std::move(io_handle), local_address, remote_address),
      listener->hand_off_restored_destination_connections_);
}

connection

connection 是鏈接相關的操做,客戶端和服務端的鏈接都屬於這個類。 Connection 是針對原始鏈接的一個抽象,繼承自 DeferredDeletableFilterManager。關於 DeferredDeletable 延遲析構請看 Envoy 源碼分析--eventFilterManager 之後討論。

ConnectionImpl

ConnectionImplConnectionBufferSourceTransportSocketCallbacks 三個抽象類的實現類。Connection 是鏈接操做相關的類,BufferSource 是得到 StreamBuffer 的抽象類(包括讀和寫),TransportSocketCallbacks 是傳輸套接字實例與鏈接進行通訊的回調。

每一個 ConnectionImpl 實例都有一個惟一的全局ID。在構造時賦值。

std::atomic<uint64_t> ConnectionImpl::next_global_id_;

ConnectionImpl::ConnectionImpl(Event::Dispatcher& dispatcher, ConnectionSocketPtr&& socket, TransportSocketPtr&& transport_socket, bool connected) : id_(next_global_id_++) {
}

ConnectionImpl 事件由 dispatcher_ 建立。在構造函數時建立事件。
Event 使用邊緣觸發,減小內核通知,提升效率(水平觸發和邊緣觸發區別你們本身查閱相關文檔)。同時寫入讀寫事件。當有讀寫事件時,會觸發回調 onFileEvent

ConnectionImpl::ConnectionImpl(Event::Dispatcher& dispatcher, ConnectionSocketPtr&& socket,TransportSocketPtr&& transport_socket, bool connected) {
  ... ...
  file_event_ = dispatcher_.createFileEvent(
      ioHandle().fd(), [this](uint32_t events) -> void { onFileEvent(events); },
      Event::FileTriggerType::Edge, Event::FileReadyType::Read | Event::FileReadyType::Write);
  ... ...
}

onFileEvent 在收到事件後,對不一樣的事件進行不一樣的處理。

void ConnectionImpl::onFileEvent(uint32_t events) {
  ... ...
  // 寫事件
  if (events & Event::FileReadyType::Write) {
    onWriteReady();
  }

  // 讀事件
  if (ioHandle().isOpen() && (events & Event::FileReadyType::Read)) {
    onReadReady();
  }
}

對於讀事件,在鏈接調用 readDisable 後,若是是 enable 會觸發讀事件。

void ConnectionImpl::readDisable(bool disable) {
    ... ...
    read_enabled_ = true;
    file_event_->setEnabled(Event::FileReadyType::Read | Event::FileReadyType::Write);
    if (read_buffer_.length() > 0) {
      file_event_->activate(Event::FileReadyType::Read);
    }
}

讀事件調用 onReadReadyonReadReady 先從 buffer中讀取數據,同時更新統計數據。對返回的結果進行分析,已關閉直接關閉。正常讀到數據,判斷是否有數據,有數據會調用 onRead, onRead 內會調用 ReadFilter 進行下一步處理(L3/4過濾下次分析)。

void ConnectionImpl::onReadReady() {
  ... ...
  IoResult result = transport_socket_->doRead(read_buffer_);
  uint64_t new_buffer_size = read_buffer_.length();
  updateReadBufferStats(result.bytes_processed_, new_buffer_size);

  if ((!enable_half_close_ && result.end_stream_read_)) {
    result.end_stream_read_ = false;
    result.action_ = PostIoAction::Close;
  }

  read_end_stream_ |= result.end_stream_read_;
  //有讀到數據
  if (result.bytes_processed_ != 0 || result.end_stream_read_) 
    onRead(new_buffer_size);
  }

  // 關閉鏈接
  if (result.action_ == PostIoAction::Close || bothSidesHalfClosed()) {
    ENVOY_CONN_LOG(debug, "remote close", *this);
    closeSocket(ConnectionEvent::RemoteClose);
  }
}

對於寫事件,在鏈接寫入數據時,會將數據先進行過濾,而後寫入寫緩衝。以後調用寫事件觸發 onFileEvent

void ConnectionImpl::write(Buffer::Instance& data, bool end_stream) {
  ... ...
  // WriteFilter過濾
  current_write_buffer_ = &data;
  current_write_end_stream_ = end_stream;
  FilterStatus status = filter_manager_.onWrite();
  current_write_buffer_ = nullptr;

  if (FilterStatus::StopIteration == status) {
    return;
  }

  write_end_stream_ = end_stream;
  if (data.length() > 0 || end_stream) {
    // 寫入緩衝
    write_buffer_->move(data);
    if (!connecting_) {
      //觸發寫事件
      file_event_->activate(Event::FileReadyType::Write);
    }
  }
}

在寫入事件後會調用 onWriteReadyonWriteReady 先判斷是否已鏈接,未鏈接會調用 connect 鏈接事件。鏈接成功後發送數據並統計信息,鏈接失敗關閉 socket。

void ConnectionImpl::onWriteReady() {
  ... ...
  if (connecting_) {
    ... ...
    if (error == 0) {
      connecting_ = false;
      //socket 未鏈接,調用connect。
      transport_socket_->onConnected();
     ... ...

  // 發送數據
  IoResult result = transport_socket_->doWrite(*write_buffer_, write_end_stream_);
  uint64_t new_buffer_size = write_buffer_->length();
  //更新統計信息
  updateWriteBufferStats(result.bytes_processed_, new_buffer_size);
  ... ...
}

ClientConnectionImpl

ClientConnectionImpl 是客戶端的鏈接,其繼承自 ConnectionImplClientConnectionClientConnectionImpl 只是在 Connection 的基礎上只增長了一個 connect 的接口。

connect 函數內最主要作的就是調用 connect() 鏈接。

void ClientConnectionImpl::connect() {
  // 鏈接服務器
  const Api::SysCallIntResult result = socket_->remoteAddress()->connect(ioHandle().fd());
  if (result.rc_ == 0) {
    // write will become ready.
    ASSERT(connecting_);
  } else {
  ... ...
}
相關文章
相關標籤/搜索