storm集成kafka

kafkautil:java

 
 
import java.util.Properties;

import kafka.javaapi.producer.Producer;
import kafka.producer.ProducerConfig;

import org.springframework.beans.factory.annotation.Value;

public class KafkaUtil {
    
    @Value("#{sys['connect']}")
    private static String zkConnect ;
    @Value("#{sys['metadata.broker.list']}")
    private static String brokerList;
    @Value("#{sys['request.required.acks']}")
    private static String ack;
    
    private static Producer<String, String> producer = null;
    
    /*static{
        Properties p = PropertiesUtil.getProperties("kafka.properties");
        zkConnect = (String) p.get("zk.connect");
        brokerList = (String) p.get("metadata.broker.list");
        ack = (String) p.get("request.required.acks");
        topic = (String) p.get("topic.imeidata");
    }
    */
    public static Producer<String,String> getProducer(){
        if(producer == null){
            Properties p = PropertiesUtil.getProperties("kafka.properties");
            zkConnect = (String) p.get("zk.connect");
            brokerList = (String) p.get("metadata.broker.list");
            ack = (String) p.get("request.required.acks");
            
            Properties props = new Properties();
            props.put("zk.connect", zkConnect);
            props.put("metadata.broker.list", brokerList);
            props.put("serializer.class", "kafka.serializer.StringEncoder");
            props.put("request.required.acks", ack);
            props.put("producer.type", "async");//是否同步 sync:同步   async:異步  
            props.put("partitioner.class", "com.kafka.SendPartitioner");//發送到多個分區進行分佈式存儲的分區算法類
            
            props.put("request.timeout.ms", "50000");
            props.put("queue.buffering.max.ms", "10000");//默認值5000  異步模式下,每隔此時間間隔會將緩衝的消息提交一次
            props.put("batch.num.messages", "1000");//默認值200  異步模式下,一次批量提交消息的條數,
                                                    //但若是間隔時間超過 queue.buffering.max.ms 的值,無論有沒有達到批量提交的設值,都會進行一次提交
            ProducerConfig config = new ProducerConfig(props);
            producer = new Producer<String, String>(config);
        }
        return producer;
    }
   
}
View Code
kafka消息發送類的屬性:
1:zk.connect:zk服務端鏈接地址

2:metadata.broker.list:zk客戶端地址
3:serializer.class:kafka消息發送序列化格式

4:request.required.acks:是否確認消息消費機制 它有三個選項:1,0,-1
0,意味着producer永遠不會等待一個來自broker的ack,這就是0.7版本的行爲。這個選項提供了最低的延遲,可是持久化的保證是最弱的,當server掛掉的時候會丟失一些數據。經測試,每10K消息大約會丟幾百條消息。

1,意味着在leader replica已經接收到數據後,producer會獲得一個ack。這個選項提供了更好的持久性,由於在server確認請求成功處理後,client纔會返回。若是剛寫到leader上,還沒來得及複製leader就掛了,那麼消息纔可能會 丟失。

     -1,意味着在全部的ISR都接收到數據後,producer才獲得一個ack。這個選項提供了最好的持久性,只要還有一個replica存活,那麼數據就不會丟失。經測試,100W條消息沒有丟消息。算法

  

5:request.timeout.ms:請求超時
6:producer.type 是否同步 它有兩個選項 sync:同步   async:異步  同步模式下,每發送一次消息完畢纔會返回 在異步模式下,能夠選擇異步參數。
7:queue.buffering.max.ms:默認值5000  異步模式下,每隔此時間間隔會將緩衝的消息提交一次
8:batch.num.messages:默認值200  異步模式下,一次批量提交消息的條數,但若是間隔時間超過 queue.buffering.max.ms 的值,無論有沒有達到批量提交的設值,都會進行一次提交
9:partitioner.class:自定義分區算法
在一個kafka集羣中,每個節點稱爲一個broker,因此進入zk經過/ls命令查看根目錄有個brokers目錄(kafka默認安裝配置文件是放在zk根目錄,我更喜歡入在自定義目錄下),這裏保存了當前kafka集羣在正在運行的節點名:
  
只有將全部消息最大限度平均的發送到每一個broker上去,才能達到最好的集羣效果。那麼kafka是經過什麼來保證這一點的呢。
 
kafka消息類KeyedMessae中有一個方法,參數分別爲將要發送消息的隊列,和消息KEY,VALUE。經過對KEY的HASH值求broker的個數求模,將會獲得broker值,它就是將接收消息的節點。
能夠自定義分區實現類,並在屬性中指明:

import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;

public class SendPartitioner implements Partitioner{

    public SendPartitioner(VerifiableProperties verifiableProperties) {}

    @Override
    public int partition(Object key, int numPartitions) {
        try {
            return Math.abs(key.hashCode() % numPartitions);
        } catch (Exception e) {
            return Math.abs(key.hashCode() % numPartitions);
        }
    }

}
View Code
numPartitions 指的是kafka集羣節點數,不用顯式指定,它能夠經過zk實時獲得此值。


以上屬性大均可以經過kafka的安裝配置文件來指定。但一個kafka集羣可能並不止服務一個隊列或者一個項目。不一樣的項目具體業務需求不一樣,因此最好是在各個項目提定具體的參數。


 Storm:spring

 storm與kafka集成有第三方框架,叫作storm-kafka.jar。簡而言之,它其實只作了一件事情。就是已經寫好了storm的spout,咱們只須要編寫bolt和提交topology便可實現storm.api

 它幫咱們實現了kafka消費端相對難把握的一件事情,就是偏移量offset的問題。若是你不想每次啓動都重頭讀取kafka消息,儘可能避免消息重複消費,那必需要保證良好的偏移量機制。特別是在多個用戶組和隊列的狀況下。app

 代碼:框架

 

import com.util.PropertiesUtil;

import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.StringScheme;
import storm.kafka.ZkHosts;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.topology.TopologyBuilder;
    
public class Topology {
    
    private static String topic;
    private static String brokerZkStr;
    private static String brokerZkPath;
    private static String offset;
    private static String app;
    
    static{
        Properties p = PropertiesUtil.getProperties("storm.properties");
        brokerZkStr = (String) p.get("brokerZkStr");
        brokerZkPath = (String) p.get("brokerZkPath");
        topic = (String) p.get("kafka.topic");
        offset = (String) p.get("kafka.offset");
        app = (String) p.get("kafka.app");
         
    }
    
    
    public static void main(String[] args) throws InterruptedException {  
        ZkHosts zk = new ZkHosts(brokerZkStr,brokerZkPath);
        SpoutConfig spoutConf = new SpoutConfig(zk, topic, 
                offset,//偏移量 offset 的根目錄
                app);//對應一個應用
        
        List<String> zkServices = new ArrayList<>();
        
        for(String str : zk.brokerZkStr.split(",")){
            zkServices.add(str.split(":")[0]);
        }
        
        spoutConf.zkServers = zkServices;
        spoutConf.zkPort = 2181;
        spoutConf.forceFromStart = false;// true:從頭消費  false:從offset處消費
        spoutConf.socketTimeoutMs = 60 * 1000;
        spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
        
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("spout", new KafkaSpout(spoutConf),4);
        //builder.setSpout("spout", new TestSpout(),5);
        builder.setBolt("bolt1", new GetMsgBolt(),4).shuffleGrouping("spout");
        
        Config config = new Config();
        config.setDebug(false);
        config.setNumWorkers(4);
        if(args.length>0){
            try {
                StormSubmitter.submitTopology(args[0], config, builder.createTopology());
            } catch (Exception e) {
                e.printStackTrace();
            }
        }else{
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("MyTopology", config, builder.createTopology());
        }
    }  
}
View Code

 

屬性:
1:brokerZkPath kafka集羣在zk裏的根目錄,默認是brokers
2:kafka.offset kafka消息隊列偏移量記錄在zk中的位置
3:kafka.app 實際上就是
kafka.offset的子目錄,父級目錄規定了kafka集羣消息offset的總位置,子目錄是具體每一個隊列或者應用消息的偏移量,避免在多用戶組多隊列狀況下偏移量錯亂的狀況。

相關文章
相關標籤/搜索