參數的設定:參考資料html
不錯的資料:http://blog.csdn.net/honglei915/article/details/37697655java
http://developer.51cto.com/art/201501/464491.htmjson
注意:在配置文件server.properties中指定了partition的數量num.partitions。這指的是多單個topic的partition數量之和。如有多個broker,可能partition分佈在不一樣的節點上,則多個broker的全部partitioin數量加起來爲num.partitions api
0.7中producer的配置有幾項是相排斥的,設置了其一,就不能設置其二
好比:
broker.list 與 zk.connect 不能同時設置
broker.list 與 partitioner.class 不能同時設置
若是這麼幹,編譯時無所謂,運行時會拋異常session
1,指定brokerapp
props.put("broker.list", "0:10.10.10.10:9092");//直接鏈接kafka
設置這項後,就不能設置partitioner.class了,但是我在運行的時候發現,此時全部的數據都發往10.10.10.10的4個分區,並無只發給一個分區。我換了syncproducer裏的send(topic,partitionid,list)都沒用。
2,指定partition
props.put("partitioner.class","com.kafka.myparitioner.CidPartitioner");
props.put("zk.connect", "10.10.10.10:2181");//鏈接zksocket
上面的 com.kafka.myparitioner.CidPartitioner 爲本身實現的類,注意要本身實現完整的包名
CidPartitioner繼承了Partitioner類,其中實現的partition方法指定了經過key計算partition的方法ide
package com.kafka.myparitioner; import kafka.producer.Partitioner; import kafka.utils.VerifiableProperties; //設定依據key將當前這條消息發送到哪一個partition的規則 public class CidPartitioner implements Partitioner { public CidPartitioner(VerifiableProperties props) { //注意 : 構造函數的函數體沒有東西,可是不能沒有構造函數 } @Override public int partition(Object key, int numPartitions) { try { long partitionNum = Long.parseLong((String) key); return (int) Math.abs(partitionNum % numPartitions); } catch (Exception e) { return Math.abs(key.hashCode() % numPartitions); } } }
想要依據key來進行partition的分配,須要在發送消息的時候指定key。 函數
import java.io.IOException; import java.io.InputStream; import java.net.ServerSocket; import java.net.Socket; import java.util.Properties; import java.util.regex.Pattern; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; //與KafkaReceiverLTELogSocket的區別在於,指定了消息的partition分配規則 public class KafkaReceiveLTELogSocketPartition extends Thread{ //按照必定的時間間隔發送LTE信令數據 String regEx ="[^0-9.\\+\\-\\s+\\,E]"; Pattern p = Pattern.compile(regEx); //第一個類型表明key的類型,第二個表明消息的類型 private final kafka.javaapi.producer.Producer<String, String> producer; private final String topic; private final Properties props = new Properties(); private final int port = 12345; public KafkaReceiveLTELogSocketPartition(String topic) { props.put("serializer.class", "kafka.serializer.StringEncoder"); props.put("metadata.broker.list", "192.168.1.164:9093"); // 配置kafka端口 props.put("partitioner.class","com.kafka.myparitioner.CidPartitioner"); //props.put("zk.connect", "192.168.1.164:2181");//鏈接zk,新的版本好像不須要 producer = new kafka.javaapi.producer.Producer<String, String>(new ProducerConfig(props)); this.topic = topic; } public void receiveAndWrite2(String outputFileName , int port) throws IOException{ ServerSocket serverSocket = new ServerSocket(port); Socket socket = serverSocket.accept(); StringBuilder sb = new StringBuilder(); try{ while(true){ InputStream istream = socket.getInputStream(); int count = 0; while (count == 0) { count = istream.available(); } byte[] b = new byte[count]; istream.read(b); for(int i = 0 ; i < count ; i ++){ if(b[i]=='\n'){ //當遇到流中的換行符時,說明已經獲取一條完整的信息,發送 String str = sb.toString(); //獲取key_cid_str String key_cid_str = str.substring(str.indexOf(":")+1, str.indexOf(",")); System.out.println("接收長度:"+str.length()); System.out.println(str); //第一個參數表明key的類型,第二個參數表明message的類型 producer.send(new KeyedMessage<String, String>(topic,key_cid_str,str)); sb = new StringBuilder(); }else{ sb.append(Character.toChars(b[i])); } } } }finally{ // 關閉socket,不要再while中關閉,不然發送方每次都要重建鏈接 socket.close(); serverSocket.close(); } } @Override public void run() { String filename = "JSON1_Yanming_DriveTesting_09-04.16-17.16-27_TIME.json"; String outputFileName = ""+filename; try { receiveAndWrite2(outputFileName,port); } catch (IOException e) { e.printStackTrace(); } } public static void main(String[] args) { String topic = "kafka_flume_topic"; new KafkaReceiveLTELogSocketPartition(topic).start(); } }
利用KafkaConsumer輸出(這裏使用高級別Consumer) fetch
import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import kafka.message.MessageAndMetadata; public class KafkaConsumer extends Thread { private final ConsumerConnector consumer; private final String topic; public KafkaConsumer(String topic) { consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig()); this.topic = topic; } private static ConsumerConfig createConsumerConfig() { Properties props = new Properties(); props.put("zookeeper.connect", "192.168.1.164:2181"); // zookeeper的地址 props.put("group.id", "group2"); // 組ID //zk鏈接超時 props.put("zookeeper.session.timeout.ms", "40000"); props.put("zookeeper.sync.time.ms", "200"); props.put("auto.commit.interval.ms", "1000"); return new ConsumerConfig(props); } @Override public void run() { Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); //設定每一個topic開幾個線程 topicCountMap.put(topic, new Integer(1)); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0); ConsumerIterator<byte[], byte[]> it = stream.iterator(); while (it.hasNext()) { MessageAndMetadata<byte[], byte[]> message = it.next(); String topic = message.topic(); int partition = message.partition(); long offset = message.offset(); String key = new String(message.key()); String msg = new String(message.message()); // 在這裏處理消息,這裏僅簡單的輸出 // 若是消息消費失敗,能夠將已上信息打印到日誌中,活着發送到報警短信和郵件中,以便後續處理 System.out.println( " thread : " + Thread.currentThread().getName() + ", topic : " + topic + ", partition : " + partition + ", offset : " + offset + " , key : " + key + " , mess : " + msg); } } }
附加:Kafka低級別consumer
package com.cuicui.kafkademon; import java.nio.ByteBuffer; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import kafka.api.FetchRequest; import kafka.api.FetchRequestBuilder; import kafka.api.PartitionOffsetRequestInfo; import kafka.cluster.Broker; import kafka.common.TopicAndPartition; import kafka.javaapi.FetchResponse; import kafka.javaapi.OffsetRequest; import kafka.javaapi.OffsetResponse; import kafka.javaapi.PartitionMetadata; import kafka.javaapi.TopicMetadata; import kafka.javaapi.TopicMetadataRequest; import kafka.javaapi.TopicMetadataResponse; import kafka.javaapi.consumer.SimpleConsumer; import kafka.javaapi.message.ByteBufferMessageSet; import kafka.message.Message; import kafka.message.MessageAndOffset; /** * offset本身維護 目標topic、partition均由本身分配 * * @author <a href="mailto:leicui001@126.com">崔磊</a> * @date 2015年11月4日 上午11:44:15 * */ public class MySimpleConsumer { public static void main(String[] args) { new MySimpleConsumer().consume(); } /** * 消費消息 */ public void consume() { int partition = 0; // 找到leader Broker leaderBroker = findLeader(KafkaProperties.BROKER_CONNECT, KafkaProperties.TOPIC, partition); // 從leader消費 SimpleConsumer simpleConsumer = new SimpleConsumer(leaderBroker.host(), leaderBroker.port(), 20000, 10000, "mySimpleConsumer"); long startOffet = 1; int fetchSize = 1000; while (true) { long offset = startOffet; // 添加fetch指定目標tipic,分區,起始offset及fetchSize(字節),能夠添加多個fetch FetchRequest req = new FetchRequestBuilder().addFetch(KafkaProperties.TOPIC, 0, startOffet, fetchSize).build(); // 拉取消息 FetchResponse fetchResponse = simpleConsumer.fetch(req); ByteBufferMessageSet messageSet = fetchResponse.messageSet(KafkaProperties.TOPIC, partition); for (MessageAndOffset messageAndOffset : messageSet) { Message mess = messageAndOffset.message(); ByteBuffer payload = mess.payload(); byte[] bytes = new byte[payload.limit()]; payload.get(bytes); String msg = new String(bytes); offset = messageAndOffset.offset(); System.out.println("partition : " + 3 + ", offset : " + offset + " mess : " + msg); } // 繼續消費下一批 startOffet = offset + 1; } } /** * 找到制定分區的leader broker * * @param brokerHosts broker地址,格式爲:「host1:port1,host2:port2,host3:port3」 * @param topic topic * @param partition 分區 * @return */ public Broker findLeader(String brokerHosts, String topic, int partition) { Broker leader = findPartitionMetadata(brokerHosts, topic, partition).leader(); System.out.println(String.format("Leader tor topic %s, partition %d is %s:%d", topic, partition, leader.host(), leader.port())); return leader; } /** * 找到指定分區的元數據 * * @param brokerHosts broker地址,格式爲:「host1:port1,host2:port2,host3:port3」 * @param topic topic * @param partition 分區 * @return 元數據 */ private PartitionMetadata findPartitionMetadata(String brokerHosts, String topic, int partition) { PartitionMetadata returnMetaData = null; for (String brokerHost : brokerHosts.split(",")) { SimpleConsumer consumer = null; String[] splits = brokerHost.split(":"); consumer = new SimpleConsumer(splits[0], Integer.valueOf(splits[1]), 100000, 64 * 1024, "leaderLookup"); List<String> topics = Collections.singletonList(topic); TopicMetadataRequest request = new TopicMetadataRequest(topics); TopicMetadataResponse response = consumer.send(request); List<TopicMetadata> topicMetadatas = response.topicsMetadata(); for (TopicMetadata topicMetadata : topicMetadatas) { for (PartitionMetadata PartitionMetadata : topicMetadata.partitionsMetadata()) { if (PartitionMetadata.partitionId() == partition) { returnMetaData = PartitionMetadata; } } } if (consumer != null) consumer.close(); } return returnMetaData; } /** * 根據時間戳找到某個客戶端消費的offset * * @param consumer SimpleConsumer * @param topic topic * @param partition 分區 * @param clientID 客戶端的ID * @param whichTime 時間戳 * @return offset */ public long getLastOffset(SimpleConsumer consumer, String topic, int partition, String clientID, long whichTime) { TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition); Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>(); requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1)); OffsetRequest request = new OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientID); OffsetResponse response = consumer.getOffsetsBefore(request); long[] offsets = response.offsets(topic, partition); return offsets[0]; } }