The usage of docker image wurstmeister/kafka

 The docker image wurstmeister/kafka is the most stared image for kafka in hub.docker.com, but the useage is rare, so  in this post, I would take some time to talk about the usage of this docker image.html

1. first, let's take some time to show the command to start the container instance in docker engine:

docker run -d  --name kafka -p 9092:9092 \ -e KAFKA_ADVERTISED_HOST_NAME=kafka \ -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \ -e KAFKA_ADVERTISED_PORT=9092 \ -e KAFKA_BROKER_ID=1 \ -e KAFKA_LISTENERS=PLAINTEXT://:9092 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://:9092 \
-e KAFKA_CREATE_TOPICS="stream-in:2:1,stream-out:2:1" \ --link zookeeper  wurstmeister/kafka:1.1.0

In the command above, we showed the flowing features about the usage of this image:java

a). specify the advertised host name,whichi would regulalr be the container name and routed to the actual host ip of  the container defined in the /etc/hosts file.node

b). the zookeeper list which kafka cluster used for cluster coordinate, there should be at least one zookeeper started, or else the kafka should be start failed.
linux

c). the port number via which can accesse the kafka broker runed in the container instacne.docker

d).KAFKA_LISTENERS,KAFKA_ADVERTISED_LISTENERS, these two enviroment variables should be defined, or esle the container would start failed.apache

1. advertised.listeners須要配置,若是不配置會使用listeners屬性,若是listeners也不配置, 經過默認的方式獲取:java.net.InetAddress.getCanonicalHostName(),該方法預計會返回hostname。 But:host.name 開始只綁定在了內部IP上,對外網卡沒法訪問.須要避免將Kafka broker機器的hostname註冊進zookeeper 2. kafka的advertised.host.name參數 外網訪問配置 kafka的server.properties文件 ```host.name```開始只綁定在了內部IP上,對外網卡沒法訪問。 把值設置爲空的話會kafka監聽端口在全部的網卡上綁定。可是在外網訪問時,客戶端又遇到了```java.nio.channels.ClosedChannelException```異常信息, server端用tcpdump分析的時候發現客戶端有傳遞kafka所在機器的機器名過來。在client裏斷點跟蹤一下發現是findLeader的時候返回的元信息是機器名而不是IP。 客戶端沒法解析這個機器名因此出現了前面的異常。 在server.properties 裏還有另外一個參數是解決這個問題的, advertised.host.name參數用來配置返回的host.name值,把這個參數配置爲外網IP地址便可。 這個參數默認沒有啓用,默認是返回的java.net.InetAddress.getCanonicalHostName的值,在個人mac上這個值並不等於hostname的值而是返回IP,但在linux上這個值就是hostname的值。 除了IP以外,還有PORT,外網對應的PORT也須要修改。如下是server.properties文件對應位置。 # Hostname the broker will advertise to producers and consumers. If not set, it uses the # value for "host.name" if configured. Otherwise, it will use the value returned from # java.net.InetAddress.getCanonicalHostName(). #advertised.host.name=<hostname routable by clients> # The port to publish to ZooKeeper for clients to use. If this is not set, # it will publish the same port that the broker binds to. #advertised.port=<port accessible by clients> 當Kafka broker啓動時,它會在ZK上註冊本身的IP和端口號,客戶端就經過這個IP和端口號來鏈接。 在AWS這種IaaS環境下,因爲java.net.InetAddress.getCanonicalHostName調用拿到的HostName是相似ip-172-31-10-199這樣的只有內網才能訪問到的主機名,因此默認註冊到ZK上的IP是內網才能訪問的內網IP。 此時就須要顯示指定 advertised.host.name, advertised.listeners參數,讓註冊到ZK上的IP是外網IP。 例如對於 59.64.11.22 IP對應的broker,須要在 server.properties 配置文件裏增長以下三個配置: advertised.host.name advertised.listeners advertised.port 新版配置 advertised.listeners=PLAINTEXT://59.64.11.22:9092
 估計讀者們也會跟我同樣犯迷糊,爲何須要三個參數來配置IP和端口號呢,用一個advertised.listeners不就搞定了嗎? 後來發現最新版本0.10.x broker配置棄用了advertised.host.name 和 advertised.port 這兩個個配置項,就配置advertised.listeners就能夠了。

 

 

2. reminder about how to install kafka cluster distributed.

for detail please refer to my blog post.bash

In this blog post, there are serveral key points about install kafka cluster:jvm

a) the broker id, which represents the unique id of the broker in the kafka cluster.tcp

b) zookeeper.connect which stores the meta data for cluster, the kafka cluster (even only has one single node kafka node) depends on zookeeper to functionate.ide

c) the host.name property which refer to  to actual ip address or hostname( this property has been deprected since  kafka 0.11), we can ignore it.

d) set the value of advertised.host.name,advertised.listeners,advertised.port which would publish the service to the client, so the client can access kafka via the address and port number through the value confied by these 2 parameters.

3. internal of the wurstmeister/kafka docker  images.

begin we goes deepinside the internal of the kafka image, let's take a look at the entry_point script of the image:

#!/bin/bash -e # Store original IFS config, so we can restore it at various stages ORIG_IFS=$IFS if [[ -z "$KAFKA_ZOOKEEPER_CONNECT" ]]; then
    echo "ERROR: missing mandatory config: KAFKA_ZOOKEEPER_CONNECT" exit 1
fi

if [[ -z "$KAFKA_PORT" ]]; then export KAFKA_PORT=9092
fi create-topics.sh & unset KAFKA_CREATE_TOPICS # DEPRECATED: but maintained for compatibility with older brokers pre 0.9.0 (https://issues.apache.org/jira/browse/KAFKA-1809)
if [[ -z "$KAFKA_ADVERTISED_PORT" && \ -z "$KAFKA_LISTENERS" && \ -z "$KAFKA_ADVERTISED_LISTENERS" && \ -S /var/run/docker.sock ]]; then KAFKA_ADVERTISED_PORT=$(docker port "$(hostname)" $KAFKA_PORT | sed -r 's/.*:(.*)/\1/g') export KAFKA_ADVERTISED_PORT fi

if [[ -z "$KAFKA_BROKER_ID" ]]; then
    if [[ -n "$BROKER_ID_COMMAND" ]]; then KAFKA_BROKER_ID=$(eval "$BROKER_ID_COMMAND") export KAFKA_BROKER_ID else # By default auto allocate broker ID export KAFKA_BROKER_ID=-1
    fi
fi

if [[ -z "$KAFKA_LOG_DIRS" ]]; then export KAFKA_LOG_DIRS="/kafka/kafka-logs-$HOSTNAME"
fi

if [[ -n "$KAFKA_HEAP_OPTS" ]]; then
    sed -r -i 's/(export KAFKA_HEAP_OPTS)="(.*)"/\1="'"$KAFKA_HEAP_OPTS"'"/g' "$KAFKA_HOME/bin/kafka-server-start.sh" unset KAFKA_HEAP_OPTS fi

if [[ -n "$HOSTNAME_COMMAND" ]]; then HOSTNAME_VALUE=$(eval "$HOSTNAME_COMMAND") # Replace any occurences of _{HOSTNAME_COMMAND} with the value IFS=$'\n'
    for VAR in $(env); do
        if [[ $VAR =~ ^KAFKA_ && "$VAR" =~ "_{HOSTNAME_COMMAND}" ]]; then eval "export ${VAR//_\{HOSTNAME_COMMAND\}/$HOSTNAME_VALUE}"
        fi
    done IFS=$ORIG_IFS fi

if [[ -n "$PORT_COMMAND" ]]; then PORT_VALUE=$(eval "$PORT_COMMAND") # Replace any occurences of _{PORT_COMMAND} with the value IFS=$'\n'
    for VAR in $(env); do
        if [[ $VAR =~ ^KAFKA_ && "$VAR" =~ "_{PORT_COMMAND}" ]]; then eval "export ${VAR//_\{PORT_COMMAND\}/$PORT_VALUE}"
        fi
    done IFS=$ORIG_IFS fi

if [[ -n "$RACK_COMMAND" && -z "$KAFKA_BROKER_RACK" ]]; then KAFKA_BROKER_RACK=$(eval "$RACK_COMMAND") export KAFKA_BROKER_RACK fi # Try and configure minimal settings or exit with error if there isn't enough information
if [[ -z "$KAFKA_ADVERTISED_HOST_NAME$KAFKA_LISTENERS" ]]; then
    if [[ -n "$KAFKA_ADVERTISED_LISTENERS" ]]; then
        echo "ERROR: Missing environment variable KAFKA_LISTENERS. Must be specified when using KAFKA_ADVERTISED_LISTENERS" exit 1
    elif [[ -z "$HOSTNAME_VALUE" ]]; then
        echo "ERROR: No listener or advertised hostname configuration provided in environment."
        echo " Please define KAFKA_LISTENERS / (deprecated) KAFKA_ADVERTISED_HOST_NAME" exit 1
    fi # Maintain existing behaviour # If HOSTNAME_COMMAND is provided, set that to the advertised.host.name value if listeners are not defined. export KAFKA_ADVERTISED_HOST_NAME="$HOSTNAME_VALUE"
fi #Issue newline to config file in case there is not one already echo "" >> "$KAFKA_HOME/config/server.properties" ( # Read in env as a new-line separated array. This handles the case of env variables have spaces and/or carriage returns. See #313 IFS=$'\n'
    for VAR in $(env) do
      if [[ $VAR =~ ^KAFKA_ && ! $VAR =~ ^KAFKA_HOME ]]; then kafka_name=$(echo "$VAR" | sed -r 's/KAFKA_(.*)=.*/\1/g' | tr '[:upper:]' '[:lower:]' | tr _ .) env_var=$(echo "$VAR" | sed -r 's/(.*)=.*/\1/g') if grep -E -q '(^|^#)'"$kafka_name=" "$KAFKA_HOME/config/server.properties"; then
            sed -r -i 's@(^|^#)('"$kafka_name"')=(.*)@\2='"${!env_var}"'@g' "$KAFKA_HOME/config/server.properties" #note that no config values may contain an '@' char
        else
            echo "$kafka_name=${!env_var}" >> "$KAFKA_HOME/config/server.properties"
        fi
      fi

      if [[ $VAR =~ ^LOG4J_ ]]; then log4j_name=$(echo "$VAR" | sed -r 's/(LOG4J_.*)=.*/\1/g' | tr '[:upper:]' '[:lower:]' | tr _ .) log4j_env=$(echo "$VAR" | sed -r 's/(.*)=.*/\1/g') if grep -E -q '(^|^#)'"$log4j_name=" "$KAFKA_HOME/config/log4j.properties"; then
            sed -r -i 's@(^|^#)('"$log4j_name"')=(.*)@\2='"${!log4j_env}"'@g' "$KAFKA_HOME/config/log4j.properties" #note that no config values may contain an '@' char
        else
            echo "$log4j_name=${!log4j_env}" >> "$KAFKA_HOME/config/log4j.properties"
        fi
      fi
    done ) if [[ -n "$CUSTOM_INIT_SCRIPT" ]] ; then eval "$CUSTOM_INIT_SCRIPT"
fi exec "$KAFKA_HOME/bin/kafka-server-start.sh" "$KAFKA_HOME/config/server.properties"

 

3.1 KAFKA_ZOOKEEPER_CONNECT is the required env for starting docker images.

if [[ -z "$KAFKA_ZOOKEEPER_CONNECT" ]]; then
    echo "ERROR: missing mandatory config: KAFKA_ZOOKEEPER_CONNECT" exit 1
fi

3.2 KAFKA_PORT is optional parameter.

if [[ -z "$KAFKA_PORT" ]]; then export KAFKA_PORT=9092
fi

 if env KAFKA_PORT is not provided, then would use the default port number 9092 as the kafka port number.

3.3 KAFKA_BROKER_ID is optional parameter.

if [[ -z "$KAFKA_BROKER_ID" ]]; then
    if [[ -n "$BROKER_ID_COMMAND" ]]; then KAFKA_BROKER_ID=$(eval "$BROKER_ID_COMMAND") export KAFKA_BROKER_ID else # By default auto allocate broker ID export KAFKA_BROKER_ID=-1
    fi
fi

if not provided, then use -1 which mean auto generated broker id.

3.4 we can define jvm parameter using KAFKA_HEAP_OPTS which would take effect by modified the corespinding segment in kafka-server-start.sh.

if [[ -n "$KAFKA_HEAP_OPTS" ]]; then
    sed -r -i 's/(export KAFKA_HEAP_OPTS)="(.*)"/\1="'"$KAFKA_HEAP_OPTS"'"/g' "$KAFKA_HOME/bin/kafka-server-start.sh" unset KAFKA_HEAP_OPTS fi

 

3.5  The configuration of  KAFKA_ADVERTISED_HOST_NAME,KAFKA_LISTENERS,KAFKA_ADVERTISED_LISTENERS,HOSTNAME_VALUE

# Try and configure minimal settings or exit with error if there isn't enough information
if [[ -z "$KAFKA_ADVERTISED_HOST_NAME$KAFKA_LISTENERS" ]]; then
    if [[ -n "$KAFKA_ADVERTISED_LISTENERS" ]]; then
        echo "ERROR: Missing environment variable KAFKA_LISTENERS. Must be specified when using KAFKA_ADVERTISED_LISTENERS" exit 1
    elif [[ -z "$HOSTNAME_VALUE" ]]; then
        echo "ERROR: No listener or advertised hostname configuration provided in environment."
        echo " Please define KAFKA_LISTENERS / (deprecated) KAFKA_ADVERTISED_HOST_NAME" exit 1
    fi # Maintain existing behaviour # If HOSTNAME_COMMAND is provided, set that to the advertised.host.name value if listeners are not defined. export KAFKA_ADVERTISED_HOST_NAME="$HOSTNAME_VALUE"
fi

The script above indicate the following tips:

a) we should config KAFKA_ADVERTISED_HOST_NAME, this config is the reqired config parameter.

相關文章
相關標籤/搜索