Kafka筆記--指定消息的partition規則

參數的設定:參考資料html

不錯的資料:http://blog.csdn.net/honglei915/article/details/37697655java

http://developer.51cto.com/art/201501/464491.htmjson

注意:在配置文件server.properties中指定了partition的數量num.partitions。這指的是多單個topic的partition數量之和。如有多個broker,可能partition分佈在不一樣的節點上,則多個broker的全部partitioin數量加起來爲num.partitions api

0.7中producer的配置有幾項是相排斥的,設置了其一,就不能設置其二
好比:
  broker.list 與 zk.connect 不能同時設置
  broker.list 與 partitioner.class 不能同時設置
若是這麼幹,編譯時無所謂,運行時會拋異常session

1,指定brokerapp

props.put("broker.list", "0:10.10.10.10:9092");//直接鏈接kafka
設置這項後,就不能設置partitioner.class了,但是我在運行的時候發現,此時全部的數據都發往10.10.10.10的4個分區,並無只發給一個分區。我換了syncproducer裏的send(topic,partitionid,list)都沒用。

2,指定partition
props.put("partitioner.class","com.kafka.myparitioner.CidPartitioner");
props.put("zk.connect", "10.10.10.10:2181");//鏈接zksocket

上面的 com.kafka.myparitioner.CidPartitioner 爲本身實現的類,注意要本身實現完整的包名
CidPartitioner繼承了Partitioner類,其中實現的partition方法指定了經過key計算partition的方法
ide

package com.kafka.myparitioner;

import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;

//設定依據key將當前這條消息發送到哪一個partition的規則
public class CidPartitioner implements Partitioner {
    public CidPartitioner(VerifiableProperties props) {  
          //注意 : 構造函數的函數體沒有東西,可是不能沒有構造函數          
    }  
    
    @Override
    public int partition(Object key, int numPartitions) {
        try {            
            long partitionNum = Long.parseLong((String) key);
            return (int) Math.abs(partitionNum % numPartitions);
        } catch (Exception e) {
            return Math.abs(key.hashCode() % numPartitions);
        }
    }
}

 

想要依據key來進行partition的分配,須要在發送消息的時候指定key。 函數

import java.io.IOException;
import java.io.InputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Properties;
import java.util.regex.Pattern;

import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

//與KafkaReceiverLTELogSocket的區別在於,指定了消息的partition分配規則
public class KafkaReceiveLTELogSocketPartition extends Thread{
    //按照必定的時間間隔發送LTE信令數據
    String regEx ="[^0-9.\\+\\-\\s+\\,E]"; 
    Pattern p = Pattern.compile(regEx); 
        
    //第一個類型表明key的類型,第二個表明消息的類型
    private final kafka.javaapi.producer.Producer<String, String> producer;
    private final String topic;
    private final Properties props = new Properties();
    
    private final int port = 12345; 
    
    public KafkaReceiveLTELogSocketPartition(String topic) {
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put("metadata.broker.list", "192.168.1.164:9093"); // 配置kafka端口        
        props.put("partitioner.class","com.kafka.myparitioner.CidPartitioner");
        //props.put("zk.connect", "192.168.1.164:2181");//鏈接zk,新的版本好像不須要
        
        producer = new kafka.javaapi.producer.Producer<String, String>(new ProducerConfig(props));
        this.topic = topic;
    }
    
    public void receiveAndWrite2(String outputFileName , int port) throws IOException{
        ServerSocket serverSocket = new ServerSocket(port);
        Socket socket = serverSocket.accept();
        StringBuilder sb = new StringBuilder();
        try{
            while(true){                
                InputStream istream = socket.getInputStream();
                int count = 0;
                while (count == 0) {
                    count = istream.available();
                }
                byte[] b = new byte[count];
                istream.read(b);
                for(int i = 0 ; i < count ; i ++){
                    if(b[i]=='\n'){ //當遇到流中的換行符時,說明已經獲取一條完整的信息,發送        
                        String str = sb.toString();
                        
                        //獲取key_cid_str
                        String key_cid_str = str.substring(str.indexOf(":")+1, str.indexOf(","));
                        
                        System.out.println("接收長度:"+str.length());
                        System.out.println(str);
                        //第一個參數表明key的類型,第二個參數表明message的類型
                        producer.send(new KeyedMessage<String, String>(topic,key_cid_str,str));
                        
                        sb = new StringBuilder();
                    }else{        
                        sb.append(Character.toChars(b[i]));
                    }
                }
            }
            
        }finally{
            // 關閉socket,不要再while中關閉,不然發送方每次都要重建鏈接
            socket.close();
            serverSocket.close();
        }
    }
    
    @Override
    public void run() {
        String filename = "JSON1_Yanming_DriveTesting_09-04.16-17.16-27_TIME.json";
        String outputFileName  = ""+filename;
        
        try {
            receiveAndWrite2(outputFileName,port);
        } catch (IOException e) {    
            e.printStackTrace();
        }        
    }    
    public static void main(String[] args) {
        String topic = "kafka_flume_topic";
        new KafkaReceiveLTELogSocketPartition(topic).start();
    }
}

 

 

利用KafkaConsumer輸出(這裏使用高級別Consumer) fetch

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;


public class KafkaConsumer extends Thread {
    private final ConsumerConnector consumer;
    private final String topic;

    public KafkaConsumer(String topic) {
        consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig());
        this.topic = topic;
    }

    private static ConsumerConfig createConsumerConfig() {
        Properties props = new Properties();
        props.put("zookeeper.connect", "192.168.1.164:2181"); // zookeeper的地址
        props.put("group.id", "group2"); // 組ID

        //zk鏈接超時
        props.put("zookeeper.session.timeout.ms", "40000");
        props.put("zookeeper.sync.time.ms", "200");
        props.put("auto.commit.interval.ms", "1000");
        
        return new ConsumerConfig(props);
    }

    @Override
    public void run() {
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        //設定每一個topic開幾個線程
        topicCountMap.put(topic, new Integer(1));
        
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap     = consumer.createMessageStreams(topicCountMap);
        
        KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);
        ConsumerIterator<byte[], byte[]> it = stream.iterator();
        while (it.hasNext()) {
            MessageAndMetadata<byte[], byte[]> message = it.next();  
            String topic = message.topic();  
            int partition = message.partition();  
            long offset = message.offset();  
            String key = new String(message.key());  
            String msg = new String(message.message());  
            // 在這裏處理消息,這裏僅簡單的輸出  
            // 若是消息消費失敗,能夠將已上信息打印到日誌中,活着發送到報警短信和郵件中,以便後續處理  
            System.out.println( " thread : " + Thread.currentThread().getName()  
                    + ", topic : " + topic + ", partition : " + partition + ", offset : " + offset + " , key : "  
                    + key + " , mess : " + msg);              
        }
    }
}

 

附加:Kafka低級別consumer

package com.cuicui.kafkademon;  
  
  
import java.nio.ByteBuffer;  
import java.util.Collections;  
import java.util.HashMap;  
import java.util.List;  
import java.util.Map;  
  
  
import kafka.api.FetchRequest;  
import kafka.api.FetchRequestBuilder;  
import kafka.api.PartitionOffsetRequestInfo;  
import kafka.cluster.Broker;  
import kafka.common.TopicAndPartition;  
import kafka.javaapi.FetchResponse;  
import kafka.javaapi.OffsetRequest;  
import kafka.javaapi.OffsetResponse;  
import kafka.javaapi.PartitionMetadata;  
import kafka.javaapi.TopicMetadata;  
import kafka.javaapi.TopicMetadataRequest;  
import kafka.javaapi.TopicMetadataResponse;  
import kafka.javaapi.consumer.SimpleConsumer;  
import kafka.javaapi.message.ByteBufferMessageSet;  
import kafka.message.Message;  
import kafka.message.MessageAndOffset;  
  
  
/** 
 * offset本身維護 目標topic、partition均由本身分配 
 *  
 * @author <a href="mailto:leicui001@126.com">崔磊</a> 
 * @date 2015年11月4日 上午11:44:15 
 * 
 */  
public class MySimpleConsumer {  
  
  
    public static void main(String[] args) {  
        new MySimpleConsumer().consume();  
    }  
  
  
    /** 
     * 消費消息 
     */  
    public void consume() {  
        int partition = 0;  
  
  
        // 找到leader  
        Broker leaderBroker = findLeader(KafkaProperties.BROKER_CONNECT, KafkaProperties.TOPIC, partition);  
  
  
        // 從leader消費  
        SimpleConsumer simpleConsumer =  
                new SimpleConsumer(leaderBroker.host(), leaderBroker.port(), 20000, 10000, "mySimpleConsumer");  
        long startOffet = 1;  
        int fetchSize = 1000;  
  
  
        while (true) {  
            long offset = startOffet;  
            // 添加fetch指定目標tipic,分區,起始offset及fetchSize(字節),能夠添加多個fetch  
            FetchRequest req =  
                    new FetchRequestBuilder().addFetch(KafkaProperties.TOPIC, 0, startOffet, fetchSize).build();  
  
  
            // 拉取消息  
            FetchResponse fetchResponse = simpleConsumer.fetch(req);  
  
  
            ByteBufferMessageSet messageSet = fetchResponse.messageSet(KafkaProperties.TOPIC, partition);  
            for (MessageAndOffset messageAndOffset : messageSet) {  
                Message mess = messageAndOffset.message();  
                ByteBuffer payload = mess.payload();  
                byte[] bytes = new byte[payload.limit()];  
                payload.get(bytes);  
                String msg = new String(bytes);  
  
  
                offset = messageAndOffset.offset();  
                System.out.println("partition : " + 3 + ", offset : " + offset + "  mess : " + msg);  
            }  
            // 繼續消費下一批  
            startOffet = offset + 1;  
        }  
    }  
  
  
    /** 
     * 找到制定分區的leader broker 
     *  
     * @param brokerHosts broker地址,格式爲:「host1:port1,host2:port2,host3:port3」 
     * @param topic topic 
     * @param partition 分區 
     * @return 
     */  
    public Broker findLeader(String brokerHosts, String topic, int partition) {  
        Broker leader = findPartitionMetadata(brokerHosts, topic, partition).leader();  
        System.out.println(String.format("Leader tor topic %s, partition %d is %s:%d", topic, partition, leader.host(),  
                leader.port()));  
        return leader;  
    }  
  
  
    /** 
     * 找到指定分區的元數據 
     *  
     * @param brokerHosts broker地址,格式爲:「host1:port1,host2:port2,host3:port3」 
     * @param topic topic 
     * @param partition 分區 
     * @return 元數據 
     */  
    private PartitionMetadata findPartitionMetadata(String brokerHosts, String topic, int partition) {  
        PartitionMetadata returnMetaData = null;  
        for (String brokerHost : brokerHosts.split(",")) {  
            SimpleConsumer consumer = null;  
            String[] splits = brokerHost.split(":");  
            consumer = new SimpleConsumer(splits[0], Integer.valueOf(splits[1]), 100000, 64 * 1024, "leaderLookup");  
            List<String> topics = Collections.singletonList(topic);  
            TopicMetadataRequest request = new TopicMetadataRequest(topics);  
            TopicMetadataResponse response = consumer.send(request);  
            List<TopicMetadata> topicMetadatas = response.topicsMetadata();  
            for (TopicMetadata topicMetadata : topicMetadatas) {  
                for (PartitionMetadata PartitionMetadata : topicMetadata.partitionsMetadata()) {  
                    if (PartitionMetadata.partitionId() == partition) {  
                        returnMetaData = PartitionMetadata;  
                    }  
                }  
            }  
            if (consumer != null)  
                consumer.close();  
        }  
        return returnMetaData;  
    }  
  
  
    /** 
     * 根據時間戳找到某個客戶端消費的offset 
     *  
     * @param consumer SimpleConsumer 
     * @param topic topic 
     * @param partition 分區 
     * @param clientID 客戶端的ID 
     * @param whichTime 時間戳 
     * @return offset 
     */  
    public long getLastOffset(SimpleConsumer consumer, String topic, int partition, String clientID, long whichTime) {  
        TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);  
        Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo =  
                new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();  
        requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));  
        OffsetRequest request = new OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientID);  
        OffsetResponse response = consumer.getOffsetsBefore(request);  
        long[] offsets = response.offsets(topic, partition);  
        return offsets[0];  
    }  
}  
View Code
相關文章
相關標籤/搜索