安裝kafka並設置經過外網ip訪問

一、kafka 安裝

安裝JDK

tar xvf jdk1.8.0_231.tar.gz  -C /usr/local && cd   /usr/local
ln -sv jdk1.8.0_231 jdk 

vim /etc/profile.d/java.sh
    JAVA_HOME=/usr/local/jdk
    PATH=$JAVA_HOME/bin:$PATH

zookeeper安裝(或使用kafka自帶的)

vim /usr/local/kafka/zookeeper/conf/zoo.cfg   
   tickTime=2000
   initLimit=10
   syncLimit=5
   dataDir=/data/zookeeper
   clientPort=2181
   maxClientCnxns=0

   # 集羣版的zookeeper添加以下配置
   # server.1=ip1:2888:3888
   # server.2=ip2:2888:3888
   # server.3=ip3:28888:3888

下載kafka和安裝kakfa

wget https://archive.apache.org/dist/kafka/0.10.2.1/kafka_2.11-0.10.2.1.tgz

tar xvf kafka_2.11-0.10.2.1.tgz -C /usr/local && cd   /usr/local
ln -sv kafka_2.11-0.10.2.1.tgz kafka

修改kafka啓動內存

vim /usr/local/kafka/bin/kafka-server-start.sh
  export KAFKA_HEAP_OPTS="-Xmx2G -Xms2G"

kafka啓動和中止

/usr/local/kafka/bin/zookeeper-server-start.sh -deamon /usr/local/kafka/conf/zookeeper.properties

/usr/local/kafka/bin/kafka-server-start.sh -deamon /usr/local/kafka/conf/server.properties

/usr/local/kafka/bin/kafka-server-stop.sh /usr/local/kafka/conf/server.properties
/usr/local/kafka/bin/zookeeper-server-stop.sh /usr/local/kafka/conf/zookeeper.properties

單獨安裝kafka的啓停方式

/usr/local/zookeeper/bin/zkServer.sh stop|stop

二、 kafka 設置外網訪問(若有須要提供外網訪問)

前提條件

須要一個解析到內網ip地址的域名,內網環境也能夠設置/etc/hostsjava

參數設置

host.name=kafka.test.com(對應的域名解析須要解到內網ip)
高版本已棄用。低版本0.10.2.1能夠用, 僅當listeners屬性未配置時被使用,已用listeners屬性代替。表示broker的hostnameapache

advertised.listeners=PLAINTEXT://kafka.test.com:9092(高版本用,替代host.name,設置了advertised.listeners不用設置host.name)
註冊到zookeeper上並提供給客戶端的監聽器,若是沒有配置則使用listeners。bootstrap

advertised.host.name(不須要設置,僅做參考)
已棄用。僅當advertised.listeners或者listeners屬性未配置時被使用。官網建議使用advertised.listenersvim

listeners(不須要設置,僅做參考)
須要監聽的URL和協議,如:PLAINTEXT://myhost:9092,SSL://:9091 CLIENT://0.0.0.0:9092,REPLICATION://localhost:9093。若是未指定該配置,則使用java.net.InetAddress.getCanonicalHostName()函數的的返回值ruby

修改上broker的/etc/hosts文件

[內網ip] kafka.test.com服務器

修改外網訪問服務器上的/etc/hosts文件

[外網ip] kafka.test.comide

三、kafka消費調試

生產者

/usr/local/kafka/bin/kafka-console-producer.sh --broker-list IP:9092 --topic TOPIC

消費者

/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server IP:9092 --topic TOPIC--from-beginning --max-messages 1
/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 外網IP:9092 --topic TOPIC --from-beginning --max-messages 1

四、logstash調試

output {
  stdout { codec => rubydebug { metadata => true }  }
}

五、logstash沒法消費kakfa日誌的問題排查

a、topics_pattern 通配問題".* ","."必定不能少函數

topics_pattern=>"prefix-.*"

b、filter中匹配規則,注意要能匹配到kafka中topic,不一樣的filebeat和不一樣的logstash版本對應的topic元數據可能不太同樣,這點須要注意.net

if [type] =~ "prefix-*" {
          grok { match =>["[type]","^prefix-(?<index_name>)"] }
       }

      if [kafka][topic] =~ "prefix-*" {
        grok { match => [ "[kafka][topic]", "^prefix-(?<index_name>.*$)" ]}
      }

       if [@metadata][topic] =~ "prefix-*" {
          grok { match =>["[@metadata][topic]","^prefix-(?<index_name>)"] }
       }

      if [@metadata][kafka][topic] =~ "prefix-*" {
        grok { match => [ "[@metadata][kafka][topic]", "^prefix-(?<index_name>.*$)" ]}
      }

外網kakfa消費參考:https://www.maiyewang.com/archives/17993debug

相關文章
相關標籤/搜索