玩轉Kafka的生產者——分區器與多線程

上篇文章學習kafka的基本安裝和基礎概念,本文主要是學習kafka的經常使用API。其中包括生產者和消費者,java

多線程生產者,多線程消費者,自定義分區等,固然還包括一些避坑指南。apache

 首發於我的網站:連接地址bootstrap

準備工做

kafka版本:2.11-1.1.1centos

操做系統:centos7api

java:jdk1.8緩存

有了以上這些條件就OK了,具體怎麼安裝和啓動Kafka這裏就不強調了,能夠看上一篇文章。服務器

新建一個maven工程,須要的依賴以下:網絡

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.11</artifactId>
    <version>1.1.1</version>
</dependency>

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>1.1.1</version>
</dependency>

 

 

主題管理

kafka的核心就是主題,學會使用kafka的腳本建立主題,也須要學習使用Java API來建立主題。session

Kafka將zookeeper的操做封裝成一個ZkUtils類,經過AdminUtils類來調用ZkUtils,來實現Kafka中元數據的操做。多線程

下面一個例子是使用AdminUtils來建立主題,並同時建立指定大小的分區數。

 1     // 鏈接配置
 2     private static final String ZK_CONNECT = "10.0.90.53:2181";
 3 
 4     // session過時時間
 5     private static final int SEESSION_TIMEOUT = 30 * 1000;
 6 
 7     // 鏈接超時時間
 8     private static final int CONNECT_TIMEOUT = 30 * 1000;
 9 
10     /**
11      * 建立主題
12      *
13      * @param topic 主題名稱
14      * @param partition 分區數
15      * @param repilca 副本數
16      * @param properties 配置信息
17      */
18     public static void createTopic(String topic, int partition, int repilca, Properties properties) {
19         ZkUtils zkUtils = null;
20         try {
21             // 建立zkutil
22             zkUtils = ZkUtils.apply(ZK_CONNECT, SEESSION_TIMEOUT, CONNECT_TIMEOUT, JaasUtils.isZkSecurityEnabled());
23             if (!AdminUtils.topicExists(zkUtils, topic)) {
24                 //主題不存在,則建立主題
25                 AdminUtils.createTopic(zkUtils, topic, partition, repilca, properties, AdminUtils.createTopic$default$6());
26             }
27         } catch (Exception e) {
28             e.printStackTrace();
29         } finally {
30             zkUtils.close();
31         }
32     }

 

 

執行該方法,建立主題,

在centos7中查看以前建立的主題:

bin/kafka-topics.sh --list --zookeeper localhost:2181  

 

刪除主題:

/**
 * 刪除主題
 *
 * @param topic
 */
public static void deleteTopic(String topic){
    ZkUtils zkUtils = null;
    try {
        zkUtils = ZkUtils.apply(ZK_CONNECT, SEESSION_TIMEOUT, CONNECT_TIMEOUT, JaasUtils.isZkSecurityEnabled());
        AdminUtils.deleteTopic(zkUtils,topic);
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        zkUtils.close();
    }
}

 

 

生產者API

在掌握了建立和刪除主題以後,接下來,學習Kafka的生產者API。

Kafka中的生產者,經過KafkaProducer這個類來實現的,在介紹這個類的使用以前,首先介紹kafka的配置項,這也是實際生產中比較關心的。

消息發送流程

實例化生產者時,有三個配置是必須指定的:

  • bootstrap.servers:配置鏈接代理列表,沒必要包含Kafka集羣的全部代理地址,當鏈接上一個代理後,會從集羣元數據信息中獲取其餘存活的代理信息。但爲了保證可以成功連上Kafka集羣,在多代理集羣的狀況下,建議至少配置兩個代理。
  • (因爲電腦配置有限,本文實驗的是單機狀況)
    key.serializer : 用於序列化消息Key的類
  • value.serializer :用於序列化消息值(Value)的類

向Kafka發送一個消息,基本上要通過如下的流程:

1.配置Properties對象,這個是必須的

2.實例化KafkaProducer對象

3.實例化ProducerRecord對象,每條消息對應一個ProducerRecord對象

4.調用KafkaProducer的send方法,發送消息。發送消息有兩種,一種是帶回調函數的(若是發送消息有異常,會在回調函數中返回),另外一種是不帶回調函數的。

KafkaProducer默認是異步發送消息,首先它會將消息緩存到消息緩衝區中,當緩存區累積到必定數量時,將消息封裝成一個

RecordBatch,統一發送消息。也就是說,發送消息實質上分爲兩個階段,第一將消息發送到消息緩衝區,第二執行網絡I/O操做

5.關閉KafkaProducer,釋放鏈接的資源。

瞭解以上的流程,那麼接下來就實現Java版本的API。

代碼實例

第一步:

新建一個消息實體類,模擬支付訂單消息,包含消息的ID,商家名稱,建立時間,備註。

public class OrderMessage {

    // 訂單ID
    private String id;

    // 商家名稱
    private String sName;

    // 建立時間
    private long createTime;

    // 備註
    private String remake;

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getsName() {
        return sName;
    }

    public void setsName(String sName) {
        this.sName = sName;
    }

    public long getCreateTime() {
        return createTime;
    }

    public void setCreateTime(long createTime) {
        this.createTime = createTime;
    }

    public String getRemake() {
        return remake;
    }

    public void setRemake(String remake) {
        this.remake = remake;
    }

    @Override
    public String toString() {
        return "OrderMessage{" +
                "id='" + id + '\'' +
                ", sName='" + sName + '\'' +
                ", createTime=" + createTime +
                ", remake='" + remake + '\'' +
                '}';
    }
}

 

第二步:

這裏簡單的發送一個消息demo,按照上面的流程,生產者例子以下:

package kafka.producer;

import kafka.OrderMessage;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;
import java.util.UUID;

/**
 * kafka生產者
 */
public class ProducerSimpleDemo {
    static Properties properties = new Properties();

    //主題名稱
    static String topic = "myTopic";

    //生產者
    static KafkaProducer<String, String> producer = null;

    //生產者配置
    static {
        properties.put(org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.0.90.53:9092");
        properties.put(org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        producer = new KafkaProducer<>(properties);
    }

    public static void main(String args[]) throws Exception {
        sendMsg();
    }

    /**
     * 發送消息
     *
     * @throws Exception
     */
    public static void sendMsg() throws Exception {
        ProducerRecord<String, String> record = null;
        try {
            // 循環發送一百條消息
            for (int i = 0; i < 10; i++) {
                // 構造待發送的消息
                OrderMessage orderMessage = new OrderMessage();
                orderMessage.setId(UUID.randomUUID().toString());
                long timestamp = System.nanoTime();
                orderMessage.setCreateTime(timestamp);
                orderMessage.setRemake("remind");
                orderMessage.setsName("test");
                // 實例化ProducerRecord
                record = new ProducerRecord<String, String>(topic, timestamp + "", orderMessage.toString());
                producer.send(record, (metadata, e) -> {
                    // 使用回調函數
                    if (null != e) {
                        e.printStackTrace();
                    }
                    if (null != metadata) {
                        System.out.println(String.format("offset: %s, partition:%s, topic:%s  timestamp:%s", metadata.offset(), metadata.partition(), metadata.topic(), metadata.timestamp()));
                    }
                });
            }

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            producer.close();
        }
    }
}

 

運行,結果就出現了,異常。

異常記錄:

2018-07-30 18:05:10.755 DEBUG 10272 --- [ad | producer-1] o.apache.kafka.common.network.Selector   : Connection with localhost/127.0.0.1 disconnected
 
java.net.ConnectException: Connection refused: no further information
    at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[na:1.8.0_111]
    at sun.nio.ch.SocketChannelImpl.finishConnect(Unknown Source) ~[na:1.8.0_111]
    at org.apache.kafka.common.network.PlaintextTransportLayer.finishConnect(PlaintextTransportLayer.java:51) ~[kafka-clients-0.10.1.1.jar:na]
    at org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:73) ~[kafka-clients-0.10.1.1.jar:na]
    at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:323) [kafka-clients-0.10.1.1.jar:na]
    at org.apache.kafka.common.network.Selector.poll(Selector.java:291) [kafka-clients-0.10.1.1.jar:na]
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260) [kafka-clients-0.10.1.1.jar:na]
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:236) [kafka-clients-0.10.1.1.jar:na]
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:148) [kafka-clients-0.10.1.1.jar:na]
    at java.lang.Thread.run(Unknown Source) [na:1.8.0_111]

 

能夠看到報錯第一句顯示:Connection with localhost/127.0.0.1 disconnected
可是能夠看到本身的配置是正確的。
 
這裏須要在kafka中修改部分配置:

在配置kafka中,首先須要修改kafka的配置server.properties中的

 advertised.listeners=PLAINTEXT://:your.host.name:9092
翻譯過來就是hostname和端口是用來建議給生產者和消費者使用的。
若是沒有設置,將會使用listeners的配置,若是listeners也沒有配置,將使用 java.net.InetAddress.getCanonicalHostName ()來獲取這個hostname和port,對於ipv4,基本就是localhost了。
 
"PLAINTEXT"表示協議,可選的值有PLAINTEXT和SSL,hostname能夠指定IP地址,也能夠用"0.0.0.0"表示對全部的網絡接口有效,若是hostname爲空表示只對默認的網絡接口有效
也就是說若是你沒有配置advertised.listeners,就使用listeners的配置通告給消息的生產者和消費者,這個過程是在生產者和消費者獲取源數據(metadata)。
 
修改以後:
advertised.listeners=PLAINTEXT://10.0.90.53:9092

須要注意的是,若是Kafka有多個節點,那麼須要每一個節點都按照這個節點的實際hostname和port狀況進行設置。

修改完畢,重啓Kafka服務,開啓消費者,接受消息,在服務器中輸入:

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic myTopic --from-beginning

能夠看到服務器中的消費者:

 
成功接收到消息。以前提到過在生產者有回調函數,來看看回調函數的輸出:
offset: 0, partition:0, topic:myTopic  timestamp:1533199115840
offset: 1, partition:0, topic:myTopic  timestamp:1533199115850
offset: 2, partition:0, topic:myTopic  timestamp:1533199115850
offset: 3, partition:0, topic:myTopic  timestamp:1533199115850
offset: 4, partition:0, topic:myTopic  timestamp:1533199115850
offset: 5, partition:0, topic:myTopic  timestamp:1533199115850
offset: 6, partition:0, topic:myTopic  timestamp:1533199115850
offset: 7, partition:0, topic:myTopic  timestamp:1533199115852
offset: 8, partition:0, topic:myTopic  timestamp:1533199115852
offset: 9, partition:0, topic:myTopic  timestamp:1533199115852
打印出了偏移值,分區,主題,和時間戳。說明發送成功了。到此就完成第一個Helloworld操做了。
咱們能夠看到回調函數返回的消息,怎麼都在一個分區中呢?下面來研究分區器。
 

自定義分區器

Kafka在底層摒棄了Java堆緩存機制,採用了操做系統級別的頁緩存,同時將隨機寫操做改成順序寫,再結合Zero-Copy的特性極大地改善了IO性能。

這個在單機上的提升,對於集羣,Kafka使用了分區,將topic的消息分散到多個分區上,並保存在不一樣的機器上。

可是是否分區越多,效率越高呢?也不盡然!

1.每一個分區在底層文件系統都有屬於本身的一個目錄。該目錄下一般會有兩個文件: base_offset.log和base_offset.index。Kafak的controller和ReplicaManager會爲每一個broker都保存這兩個文件句柄(file handler)。很明顯,若是分區數越多,所須要保持打開狀態的文件句柄數也就越多,最終可能會突破你的ulimit -n的限制。

2.消費者和生產者都會爲分區緩存消息,分區越多,緩存的消息就越多,佔用的內存就越大。

3.下降高可用,Kafka是經過高可用來實現高可用性的。咱們知道在集羣中每每會有一個leader,假設集羣中有10個Kafka進程,1個leader,9個follwer,若是一個leader掛了,那麼就會從新選出一個leader,若是集羣中有10000個分區,那麼將要花費很長的時間,這對於高可用是有損耗的。

 

自己kafka有本身的分區策略的,若是未指定,就會使用默認的分區策略:

Kafka根據傳遞消息的key來進行分區的分配,即hash(key) % numPartitions。若是Key相同的話,那麼就會分配到統一分區。

 

Kafka提供了自定義的分區器,只要實現Partitioner接口便可,下面是自定義分區的例子:

package kafka.partition;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

import java.util.Map;

/**
* 自定義分區器
*/
public class PartitionUtil implements Partitioner {

// 分區數
private static final Integer PARTITION_NUM = 6;

@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
if (null == key){
return 0;
}
String keyValue = String.valueOf(key);
// key取模
int partitionId = (int) (Long.valueOf(key.toString())%PARTITION_NUM);
return partitionId;
}

@Override
public void close() {

}

@Override
public void configure(Map<String, ?> configs) {

}
}
 
仍是剛纔分區的代碼,只要在以前的配置中加上
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, PartitionUtil.class.getName());
運行生產者,回調函數打印以下:
offset: 3, partition:5, topic:MyOrder  timestamp:1533205894785
offset: 5, partition:3, topic:MyOrder  timestamp:1533205893202
offset: 6, partition:3, topic:MyOrder  timestamp:1533205894784
offset: 2, partition:2, topic:MyOrder  timestamp:1533205894785
offset: 4, partition:1, topic:MyOrder  timestamp:1533205894785
offset: 5, partition:1, topic:MyOrder  timestamp:1533205894785
offset: 5, partition:0, topic:MyOrder  timestamp:1533205894784
offset: 6, partition:0, topic:MyOrder  timestamp:1533205894784
offset: 7, partition:0, topic:MyOrder  timestamp:1533205894785
offset: 8, partition:0, topic:MyOrder  timestamp:1533205894786
分區成功了,在實際生產過程當中,能夠根據項目的實際須要進行分區設計。
 
 

線程池生產者

在實際生產過程當中,一般消息數量是比較多的,就能夠考慮使用線程池。

使用線程池發送消息時,要考慮兩點:1.須要結合實際狀況,合理設計線程池的大小;2.使用線程池時,消息的發送是無序的,若是對消息的順序有要求,不建議使用。

若是使用線程池,建議是隻實例化一個KafkaProducer對象,這樣性能最好。代碼以下:

首先寫一個線程類:

package kafka.producer;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

/**
 * 生產者線程
 */
public class ProducerThread implements Runnable {

    private KafkaProducer<String, String> producer = null;
    private ProducerRecord<String, String> record = null;

    public ProducerThread(KafkaProducer<String, String> producer, ProducerRecord<String, String> record) {
        this.producer = producer;
        this.record = record;
    }

    @Override
    public void run() {
        producer.send(record, (metadata, e) -> {
            if (null != e) {
                e.printStackTrace();
            }
            if (null != metadata) {
                System.out.println("消息發送成功 :         "+String.format("offset: %s, partition:%s, topic:%s  timestamp:%s",
                        metadata.offset(), metadata.partition(), metadata.topic(), metadata.timestamp()));
            }
        });
    }

}

 

接着完成啓動類,啓動類中自定義了一個線程池,這裏仍是有一些遐思,就是沒有自定義,線程建立工廠,沒有指定建立的線程名稱,在實際生產中,最好是自定義線程工廠。

代碼以下:

package kafka.producer;

import kafka.OrderMessage;
import kafka.partition.PartitionUtil;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.clients.producer.ProducerConfig;

import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.*;

/**
 * 線程池生產者
 *
 * @author tangj
 * @date 2018/7/29 20:15
 */
public class ProducerDemo {
    static Properties properties = new Properties();

    static String topic = "MyOrder";

    static KafkaProducer<String, String> producer = null;

    // 核心池大小
    static int corePoolSize = 5;

    // 最大值
    static int maximumPoolSize = 20;

    // 無任務時存活時間
    static long keepAliveTime = 60;

    // 時間單位
    static TimeUnit timeUnit = TimeUnit.SECONDS;

    // 阻塞隊列
    static BlockingQueue blockingQueue = new LinkedBlockingQueue();

    // 線程池
    static ExecutorService service = null;

    static {
        // 配置項
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.0.90.53:9092");
        properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, PartitionUtil.class.getName());
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        producer = new KafkaProducer<>(properties);
        // 初始化線程池
        service = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, timeUnit, blockingQueue);
    }

    public static void main(String args[]) throws Exception {
        for (int i = 0; i < 6; i++) {
            service.submit(createMsgTask());
        }
    }


    /**
     * 生產消息
     *
     * @return
     */
    public static ProducerThread createMsgTask() {
        OrderMessage orderMessage = new OrderMessage();
        orderMessage.setId(UUID.randomUUID().toString());
        long timestamp = System.nanoTime();
        orderMessage.setCreateTime(timestamp);
        orderMessage.setRemake("rem");
        orderMessage.setsName("test");
        ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, timestamp + "", orderMessage.toString());
        ProducerThread task = new ProducerThread(producer, record);
        return task;
    }

}

 

 總結

對於Kafka的分區器和多線程生成者,切記一點,必定要根據實際業務進行設計。

相關文章
相關標籤/搜索