hdp-storm的kafka kerberos認證

一、使用Apache Storm-Kafka的插件包

一直使用apache storm-kafka的工具包去消費kafka,索性直接配置好對應的zookeeper集羣的host、port,kafka集羣的host、port。java

直接放到HDP-storm的環境上提交。果真直接報錯查看異常發現拿不到kafka leader所在機器host、port信息。查看apache storm-kafka源碼發現它是從zk的/brokers/ids/0下得到的.node

//本身維護和構建全部topic的partition對應的host與port信息,因kafka管理集羣都是經過將topic、分區、副本信息寫入zk
//中監聽更新或刪除的。因此在zk中能夠讀取到kafka全部的狀態信息
public List<GlobalPartitionInformation> getBrokerInfo() throws SocketTimeoutException {
      List<String> topics =  getTopics();//得到全部的topic從zk的/brokers/topics下得到
      List<GlobalPartitionInformation> partitions =  new ArrayList<GlobalPartitionInformation>();

      for (String topic : topics) {
          GlobalPartitionInformation globalPartitionInformation = new GlobalPartitionInformation(topic, this._isWildcardTopic);
          try {
         //得到當前topic中有幾個partition從zk的/brokers/topics/distributedTopic/partitions/下得到.
              int numPartitionsForTopic = getNumPartitions(topic);
              String brokerInfoPath = brokerPath();//一個broker路徑默認是/brokers/ids
              //找到每一個partition的leader對應的host和port爲後面建立consumer作準備
              for (int partition = 0; partition < numPartitionsForTopic; partition++) {
                //得到partition的leader在zk中保存的路徑,默認0、一、2....可在hdp中100一、100二、1003....
                  int leader = getLeaderFor(topic,partition);
                  String path = brokerInfoPath + "/" + leader;// /brokers/ids/1001
                  try {
                      //得到/brokers/ids/1001 znode的信息
                      byte[] brokerData = _curator.getData().forPath(path);
                      Broker hp = getBrokerHost(brokerData);//拿到host與port
                      //構建partition與broker對應關係簡單說就是partition所在的host機器和port
                      globalPartitionInformation.addPartition(partition, hp);
                  } catch (org.apache.zookeeper.KeeperException.NoNodeException e) {
                      LOG.error("Node {} does not exist ", path);
                  }
              }
          } catch (SocketTimeoutException e) {
              throw e;
          } catch (Exception e) {
              throw new RuntimeException(e);
          }
          LOG.info("Read partition info from zookeeper: " + globalPartitionInformation);
          partitions.add(globalPartitionInformation);
      }
        return partitions;
    }
/**
     * [zk: localhost:2181(CONNECTED) 56] get /brokers/ids/0
     * { "host":"localhost", "jmx_port":9999, "port":9092, "version":1 }
     *
     * @param contents
     * @return
     */
    private Broker getBrokerHost(byte[] contents) {
        try {
            Map<Object, Object> value = (Map<Object, Object>) JSONValue.parseWithException(new String(contents, "UTF-8"));
            String host = (String) value.get("host");
            Integer port = ((Long) value.get("port")).intValue();
            return new Broker(host, port);
        } catch (RuntimeException e) {
            throw e;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

但是上HDP-zookeeper中查看發現/brokers/ids/1001下面的host:null、port:-1.apache

找到獲取partition映射host與port,發現無非就是在對應zk的znode上獲取到他的JSONValue值拿到host與port,雖然這個/brokers/ids/1001的znode下面host:null,port:-1.不知道出於什麼緣由HDP的kafka要將host與port變成null與-1,可是我發現這個endpoints中同樣能夠拿到很顯然host : woker11-cs.zuhu2.com , port : 6667.安全

修改後使用maven打包.結果測試發現host與port確實拿到了,但是仍是沒法消費.bash

google發現apache storm-kafka安全認證的API,只有hdp二次開發storm-kafka纔有網絡

public SimpleConsumer register(Broker host, String topic, int partition) {
        if (!this._connections.containsKey(host)) {
            this._connections.put(host, new DynamicPartitionConnections.ConnectionInfo(
new SimpleConsumer(host.host, host.port, this._config.socketTimeoutMs,
 this._config.bufferSizeBytes, this._config.clientId, this._config.securityProtocol)));
        }

        DynamicPartitionConnections.ConnectionInfo info = (DynamicPartitionConnections.ConnectionInfo)this._connections.get(host);
        info.partitions.add(this.getHashKey(topic, partition));
        return info.consumer;
    }

調用register獲取SompleConsumer,hdp版本的比apache storm-kafka的多一個securityProtocol變量.這個值默認是"PLAINTEXT".併發

hdp的kafka_2.10中的SimpleConsumer默認它是開啓認證的因此你在建立時只給host、port、soTimeout、bufferSize、clientId,同樣會默認給securityProtocol賦值爲「PLAINTEXT」socket

apache的kafka_2.10中的SimpleConsumermaven

最根本的仍是storm建立new SimpleConsumer使用的kafka_2.10 API也二次開發了.它提供了安全認證的API.以下apache kafka_2.10沒有提供認證API工具

最後發現本身修改apache storm-kafka是不行了.kafka_2.10和kafka-clients都是通過二次開發了的.

因爲網絡緣由這些包都下載不了.我去

http://repo.hortonworks.com/content/repositories/releases/

官網去下載

並手動的install到本地maven庫中.

mvn install:install-file -Dfile=F:\sougoDownload\storm-kafka-1.0.1.2.5.3.0-37.jar -DgroupId=org.apache.storm -DartifactId=storm-kafka -Dversion=1.0.1.2.5.3.0-37 -Dpackaging=jar

最後打包觀察能夠消費併發送數據到kafka.

注意下載的hdp版本的包必須和環境中的版本一直.hdp各個版本存在差別並不像java向後兼容.最開始我下的最新版本storm-kafka 1.1.0.3.0.1.3-1,KafkaBolt繼承的BaseTickTupleAwareRichBolt可是環境的storm-core是1.0.1.2.5.3.0-37中根本沒有BaseTickTupleAwareRichBolt這個類.

storm-kafka 1.0.1.2.5.3.0-37版本的KafkaBolt繼承的是BaseRichBolt

相關文章
相關標籤/搜索