kafka - advertised.listeners and listeners

listeners,node

Listener List - Comma-separated list of URIs we will listen on and their protocols.
Specify hostname as 0.0.0.0 to bind to all interfaces.
Leave hostname empty to bind to default interface.
Examples of legal listener lists: PLAINTEXT://myhost:9092,TRACE://:9091 PLAINTEXT://0.0.0.0:9092, TRACE://localhost:9093apache

 

advertised.listeners,安全

Listeners to publish to ZooKeeper for clients to use, if different than the listeners above.
In IaaS environments, this may need to be different from the interface to which the broker binds. If this is not set, the value for `listeners` will be used.socket

 

listenerside

是kafka真正bind的地址,this

/**
 * An NIO socket server. The threading model is
 *   1 Acceptor thread that handles new connections
 *   Acceptor has N Processor threads that each have their own selector and read requests from sockets
 *   M Handler threads that handle requests and produce responses back to the processor threads for writing.
 */
class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time) extends Logging with KafkaMetricsGroup {

  private val endpoints = config.listeners
    
  /**
   * Start the socket server
   */
  def startup() {
    
    
      endpoints.values.foreach { endpoint =>
        val protocol = endpoint.protocolType
        val processorEndIndex = processorBeginIndex + numProcessorThreads

        for (i <- processorBeginIndex until processorEndIndex)
          processors(i) = newProcessor(i, connectionQuotas, protocol)

        val acceptor = new Acceptor(endpoint, sendBufferSize, recvBufferSize, brokerId,
          processors.slice(processorBeginIndex, processorEndIndex), connectionQuotas)
        acceptors.put(endpoint, acceptor)
        Utils.newThread("kafka-socket-acceptor-%s-%d".format(protocol.toString, endpoint.port), acceptor, false).start()
        acceptor.awaitStartup()

        processorBeginIndex = processorEndIndex
      }
  }

在socketServer中,能夠看到,確實在SocketServer中accept的是listenersspa

爲每一個endpoint都創建acceptor和processercode

 

advertised.listenersorm

是暴露給外部的listeners,若是沒有設置,會用listenersserver

KafkaServer.startup

        /* tell everyone we are alive */
        val listeners = config.advertisedListeners.map {case(protocol, endpoint) =>
          if (endpoint.port == 0)
            (protocol, EndPoint(endpoint.host, socketServer.boundPort(protocol), endpoint.protocolType))
          else
            (protocol, endpoint)
        }
        kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, listeners, zkUtils, config.rack,
          config.interBrokerProtocolVersion)
        kafkaHealthcheck.startup()

這裏讀出advertisedListeners,傳給KafkaHealthcheck

/**
 * This class registers the broker in zookeeper to allow 
 * other brokers and consumers to detect failures. It uses an ephemeral znode with the path:
 *   /brokers/ids/[0...N] --> advertisedHost:advertisedPort
 *   
 * Right now our definition of health is fairly naive. If we register in zk we are healthy, otherwise
 * we are dead.
 */
class KafkaHealthcheck(brokerId: Int,
                       advertisedEndpoints: Map[SecurityProtocol, EndPoint],
                       zkUtils: ZkUtils,
                       rack: Option[String],
                       interBrokerProtocolVersion: ApiVersion) extends Logging {

像註釋大家看到的,

KafkaHealthcheck就是把broker信息註冊到zk裏面的ephemeral znode,而後當znode消失就知道broker掛了

因此這裏註冊到zk中的必定是advertisedListeners

/**
   * Register this broker as "alive" in zookeeper
   */
  def register() {
    val jmxPort = System.getProperty("com.sun.management.jmxremote.port", "-1").toInt
    val updatedEndpoints = advertisedEndpoints.mapValues(endpoint =>
      if (endpoint.host == null || endpoint.host.trim.isEmpty)
        EndPoint(InetAddress.getLocalHost.getCanonicalHostName, endpoint.port, endpoint.protocolType) //若是沒有host,默認讀取InetAddress.getLocalHost.getCanonicalHostName
      else
        endpoint
    )

    // the default host and port are here for compatibility with older client
    // only PLAINTEXT is supported as default
    // if the broker doesn't listen on PLAINTEXT protocol, an empty endpoint will be registered and older clients will be unable to connect
    val plaintextEndpoint = updatedEndpoints.getOrElse(SecurityProtocol.PLAINTEXT, new EndPoint(null,-1,null)) //生成plaintextEndpoint節點,兼容老版本
    zkUtils.registerBrokerInZk(brokerId, plaintextEndpoint.host, plaintextEndpoint.port, updatedEndpoints, jmxPort, rack, //新的版本只會讀updatedEndpoints
      interBrokerProtocolVersion)
  }

 

 

問題是若是kafka間同步到底用的是什麼listener,

ReplicaManager.makeFollowers

中會建立FetchThread,

        val partitionsToMakeFollowerWithLeaderAndOffset = partitionsToMakeFollower.map(partition =>
          new TopicAndPartition(partition) -> BrokerAndInitialOffset(
            metadataCache.getAliveBrokers.find(_.id == partition.leaderReplicaIdOpt.get).get.getBrokerEndPoint(config.interBrokerSecurityProtocol),
            partition.getReplica().get.logEndOffset.messageOffset)).toMap
        replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollowerWithLeaderAndOffset)

這個邏輯是,broker間作同步的時候,建立FetchThread時的狀況,

能夠看到,broker信息仍是從metadataCache取到的,

從metadataCache取出相應的broker,而後調用getBrokerEndPoint(config.interBrokerSecurityProtocol),取到相應的endpoint

security.inter.broker.protocol,Security protocol used to communicate between brokers. Valid values are: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL.

 

而用戶拿到的broker信息,

KafkaApis.handleTopicMetadataRequest

val brokers = metadataCache.getAliveBrokers
    
    val responseBody = new MetadataResponse(
      brokers.map(_.getNode(request.securityProtocol)).asJava,
      metadataCache.getControllerId.getOrElse(MetadataResponse.NO_CONTROLLER_ID),
      completeTopicMetadata.asJava,
      requestVersion
    )

這裏取決於什麼安全協議,request.securityProtocol

public enum SecurityProtocol {
    /** Un-authenticated, non-encrypted channel */
    PLAINTEXT(0, "PLAINTEXT", false),
    /** SSL channel */
    SSL(1, "SSL", false),
    /** SASL authenticated, non-encrypted channel */
    SASL_PLAINTEXT(2, "SASL_PLAINTEXT", false),
    /** SASL authenticated, SSL channel */
    SASL_SSL(3, "SASL_SSL", false),
    /** Currently identical to PLAINTEXT and used for testing only. We may implement extra instrumentation when testing channel code. */
    TRACE(Short.MAX_VALUE, "TRACE", true);

能夠看到不一樣的協議,能夠有不一樣的地址

 

Broker

/**
    * Create a broker object from id and JSON string.
    *
    * @param id
    * @param brokerInfoString
    *
    * Version 1 JSON schema for a broker is:
    * {
    *   "version":1,
    *   "host":"localhost",
    *   "port":9092
    *   "jmx_port":9999,
    *   "timestamp":"2233345666"
    * }
    *
    * Version 2 JSON schema for a broker is:
    * {
    *   "version":2,
    *   "host":"localhost",
    *   "port":9092
    *   "jmx_port":9999,
    *   "timestamp":"2233345666",
    *   "endpoints":["PLAINTEXT://host1:9092", "SSL://host1:9093"]
    * }
    *
    * Version 3 (current) JSON schema for a broker is:
    * {
    *   "version":3,
    *   "host":"localhost",
    *   "port":9092
    *   "jmx_port":9999,
    *   "timestamp":"2233345666",
    *   "endpoints":["PLAINTEXT://host1:9092", "SSL://host1:9093"],
    *   "rack":"dc1"
    * }
    */
  def createBroker(id: Int, brokerInfoString: String): Broker = {
    if (brokerInfoString == null)
      throw new BrokerNotAvailableException(s"Broker id $id does not exist")
    try {
      Json.parseFull(brokerInfoString) match {
        case Some(m) =>
          val brokerInfo = m.asInstanceOf[Map[String, Any]]
          val version = brokerInfo("version").asInstanceOf[Int]
          val endpoints =
            if (version < 1)
              throw new KafkaException(s"Unsupported version of broker registration: $brokerInfoString")
            else if (version == 1) {
              val host = brokerInfo("host").asInstanceOf[String]
              val port = brokerInfo("port").asInstanceOf[Int]
              Map(SecurityProtocol.PLAINTEXT -> new EndPoint(host, port, SecurityProtocol.PLAINTEXT))
            }
            else {
              val listeners = brokerInfo("endpoints").asInstanceOf[List[String]]
              listeners.map { listener =>
                val ep = EndPoint.createEndPoint(listener)
                (ep.protocolType, ep)
              }.toMap
            }
          val rack = brokerInfo.get("rack").filter(_ != null).map(_.asInstanceOf[String])
          new Broker(id, endpoints, rack)
        case None =>
          throw new BrokerNotAvailableException(s"Broker id $id does not exist")
      }
    } catch {
      case t: Throwable =>
        throw new KafkaException(s"Failed to parse the broker info from zookeeper: $brokerInfoString", t)
    }
  }
}

能夠看到,老版本的是用host,port

而新版本都是用endpoints,裏面能夠定義各類協議下的listeners

 

zkUtil

/**
   * This API takes in a broker id, queries zookeeper for the broker metadata and returns the metadata for that broker
   * or throws an exception if the broker dies before the query to zookeeper finishes
   *
   * @param brokerId The broker id
   * @return An optional Broker object encapsulating the broker metadata
   */
  def getBrokerInfo(brokerId: Int): Option[Broker] = {
    readDataMaybeNull(BrokerIdsPath + "/" + brokerId)._1 match {
      case Some(brokerInfo) => Some(Broker.createBroker(brokerId, brokerInfo))
      case None => None
    }
  }

zkUtil只是讀出zk中相應的內容並createBroker

 

結論,

listeners,用於server真正bind

advertisedListeners, 用於開發給用戶,若是沒有設定,直接使用listeners

 

當前kafka沒有區份內外部的流量,一旦設置advertisedListeners,全部流量都會使用這個配置,明顯不合理啊

https://cwiki.apache.org/confluence/display/KAFKA/KIP-103%3A+Separation+of+Internal+and+External+traffic

會解決這個問題

相關文章
相關標籤/搜索