kafkautil:java
import java.util.Properties; import kafka.javaapi.producer.Producer; import kafka.producer.ProducerConfig; import org.springframework.beans.factory.annotation.Value; public class KafkaUtil { @Value("#{sys['connect']}") private static String zkConnect ; @Value("#{sys['metadata.broker.list']}") private static String brokerList; @Value("#{sys['request.required.acks']}") private static String ack; private static Producer<String, String> producer = null; /*static{ Properties p = PropertiesUtil.getProperties("kafka.properties"); zkConnect = (String) p.get("zk.connect"); brokerList = (String) p.get("metadata.broker.list"); ack = (String) p.get("request.required.acks"); topic = (String) p.get("topic.imeidata"); } */ public static Producer<String,String> getProducer(){ if(producer == null){ Properties p = PropertiesUtil.getProperties("kafka.properties"); zkConnect = (String) p.get("zk.connect"); brokerList = (String) p.get("metadata.broker.list"); ack = (String) p.get("request.required.acks"); Properties props = new Properties(); props.put("zk.connect", zkConnect); props.put("metadata.broker.list", brokerList); props.put("serializer.class", "kafka.serializer.StringEncoder"); props.put("request.required.acks", ack); props.put("producer.type", "async");//是否同步 sync:同步 async:異步 props.put("partitioner.class", "com.kafka.SendPartitioner");//發送到多個分區進行分佈式存儲的分區算法類 props.put("request.timeout.ms", "50000"); props.put("queue.buffering.max.ms", "10000");//默認值5000 異步模式下,每隔此時間間隔會將緩衝的消息提交一次 props.put("batch.num.messages", "1000");//默認值200 異步模式下,一次批量提交消息的條數, //但若是間隔時間超過 queue.buffering.max.ms 的值,無論有沒有達到批量提交的設值,都會進行一次提交 ProducerConfig config = new ProducerConfig(props); producer = new Producer<String, String>(config); } return producer; } }
kafka消息發送類的屬性:
1:zk.connect:zk服務端鏈接地址
2:metadata.broker.list:zk客戶端地址
3:serializer.class:kafka消息發送序列化格式
4:request.required.acks:是否確認消息消費機制 它有三個選項:1,0,-1
0,意味着producer永遠不會等待一個來自broker的ack,這就是0.7版本的行爲。這個選項提供了最低的延遲,可是持久化的保證是最弱的,當server掛掉的時候會丟失一些數據。經測試,每10K消息大約會丟幾百條消息。
1,意味着在leader replica已經接收到數據後,producer會獲得一個ack。這個選項提供了更好的持久性,由於在server確認請求成功處理後,client纔會返回。若是剛寫到leader上,還沒來得及複製leader就掛了,那麼消息纔可能會 丟失。
-1,意味着在全部的ISR都接收到數據後,producer才獲得一個ack。這個選項提供了最好的持久性,只要還有一個replica存活,那麼數據就不會丟失。經測試,100W條消息沒有丟消息。算法
5:request.timeout.ms:請求超時
6:producer.type 是否同步 它有兩個選項 sync:同步 async:異步 同步模式下,每發送一次消息完畢纔會返回 在異步模式下,能夠選擇異步參數。
7:queue.buffering.max.ms:默認值5000 異步模式下,每隔此時間間隔會將緩衝的消息提交一次
8:batch.num.messages:默認值200 異步模式下,一次批量提交消息的條數,但若是間隔時間超過 queue.buffering.max.ms 的值,無論有沒有達到批量提交的設值,都會進行一次提交
9:partitioner.class:自定義分區算法
在一個kafka集羣中,每個節點稱爲一個broker,因此進入zk經過/ls命令查看根目錄有個brokers目錄(kafka默認安裝配置文件是放在zk根目錄,我更喜歡入在自定義目錄下),這裏保存了當前kafka集羣在正在運行的節點名:
只有將全部消息最大限度平均的發送到每一個broker上去,才能達到最好的集羣效果。那麼kafka是經過什麼來保證這一點的呢。
kafka消息類KeyedMessae中有一個方法,參數分別爲將要發送消息的隊列,和消息KEY,VALUE。經過對KEY的HASH值求broker的個數求模,將會獲得broker值,它就是將接收消息的節點。
能夠自定義分區實現類,並在屬性中指明:
import kafka.producer.Partitioner; import kafka.utils.VerifiableProperties; public class SendPartitioner implements Partitioner{ public SendPartitioner(VerifiableProperties verifiableProperties) {} @Override public int partition(Object key, int numPartitions) { try { return Math.abs(key.hashCode() % numPartitions); } catch (Exception e) { return Math.abs(key.hashCode() % numPartitions); } } }
numPartitions 指的是kafka集羣節點數,不用顯式指定,它能夠經過zk實時獲得此值。
以上屬性大均可以經過kafka的安裝配置文件來指定。但一個kafka集羣可能並不止服務一個隊列或者一個項目。不一樣的項目具體業務需求不一樣,因此最好是在各個項目提定具體的參數。
Storm:spring
storm與kafka集成有第三方框架,叫作storm-kafka.jar。簡而言之,它其實只作了一件事情。就是已經寫好了storm的spout,咱們只須要編寫bolt和提交topology便可實現storm.api
它幫咱們實現了kafka消費端相對難把握的一件事情,就是偏移量offset的問題。若是你不想每次啓動都重頭讀取kafka消息,儘可能避免消息重複消費,那必需要保證良好的偏移量機制。特別是在多個用戶組和隊列的狀況下。app
代碼:框架
import com.util.PropertiesUtil; import storm.kafka.KafkaSpout; import storm.kafka.SpoutConfig; import storm.kafka.StringScheme; import storm.kafka.ZkHosts; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; import backtype.storm.spout.SchemeAsMultiScheme; import backtype.storm.topology.TopologyBuilder; public class Topology { private static String topic; private static String brokerZkStr; private static String brokerZkPath; private static String offset; private static String app; static{ Properties p = PropertiesUtil.getProperties("storm.properties"); brokerZkStr = (String) p.get("brokerZkStr"); brokerZkPath = (String) p.get("brokerZkPath"); topic = (String) p.get("kafka.topic"); offset = (String) p.get("kafka.offset"); app = (String) p.get("kafka.app"); } public static void main(String[] args) throws InterruptedException { ZkHosts zk = new ZkHosts(brokerZkStr,brokerZkPath); SpoutConfig spoutConf = new SpoutConfig(zk, topic, offset,//偏移量 offset 的根目錄 app);//對應一個應用 List<String> zkServices = new ArrayList<>(); for(String str : zk.brokerZkStr.split(",")){ zkServices.add(str.split(":")[0]); } spoutConf.zkServers = zkServices; spoutConf.zkPort = 2181; spoutConf.forceFromStart = false;// true:從頭消費 false:從offset處消費 spoutConf.socketTimeoutMs = 60 * 1000; spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme()); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout", new KafkaSpout(spoutConf),4); //builder.setSpout("spout", new TestSpout(),5); builder.setBolt("bolt1", new GetMsgBolt(),4).shuffleGrouping("spout"); Config config = new Config(); config.setDebug(false); config.setNumWorkers(4); if(args.length>0){ try { StormSubmitter.submitTopology(args[0], config, builder.createTopology()); } catch (Exception e) { e.printStackTrace(); } }else{ LocalCluster cluster = new LocalCluster(); cluster.submitTopology("MyTopology", config, builder.createTopology()); } } }
屬性:
1:brokerZkPath kafka集羣在zk裏的根目錄,默認是brokers
2:kafka.offset kafka消息隊列偏移量記錄在zk中的位置
3:kafka.app 實際上就是kafka.offset的子目錄,父級目錄規定了kafka集羣消息offset的總位置,子目錄是具體每一個隊列或者應用消息的偏移量,避免在多用戶組多隊列狀況下偏移量錯亂的狀況。