kafka介紹及使用

1、MAC環境下安裝啓動kafka

一、安裝kafka

brew install kafka複製代碼

安裝詳情java

安裝詳情裏面包含一些使用介紹,主要包括幾個部分:
web

安裝kafka前默認安裝了zookeeper,說明kafka依賴zookeeper,爲何依賴,下一部分會講到。正則表達式


這部分介紹了zookeeper和kafka的啓動命令,要麼用brew services start命令設置自啓或重啓(macOS 使用launchtl命令加載開機自動運行的服務,brew serviceslaunchctl的一個子集),或者直接使用工具自帶的命令啓動。spring

二、啓動kafka

啓動kafka
express

zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties & kafka-server-start /usr/local/etc/kafka/server.properties複製代碼

建立topicapache

kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test複製代碼

查看全部topic編程

kafka-topics --list --zookeeper localhost:2181複製代碼

生產者發送消息bootstrap

kafka-console-producer --broker-list localhost:9092 --topic test
>第一條消息
>第二條消息複製代碼

消費者消費消息 緩存

kafka-console-consumer --bootstrap-server localhost:9092 --topic test --from-beginning
第一條消息複製代碼

2、kafka原理介紹


Producer:消息生產者。bash

Broker:kafka集羣中的服務器。

Topic:消息的主題,能夠理解爲消息的分類,kafka的數據就保存在topic。在每一個broker上均可以建立多個topic。

Partition:Topic的分區,每一個topic能夠有多個分區,分區的做用是作負載,提升kafka的吞吐量。

Replication:每個分區都有多個副本,副本的做用是作備胎。當主分區(Leader)故障的時候會選擇一個備胎(Follower)上位,成爲Leader。在kafka中默認副本的最大數量是10個,且副本的數量不能大於Broker的數量,follower和leader絕對是在不一樣的機器,同一機器對同一個分區也只可能存放一個副本(包括本身)。

Consumer:消息消費者。

Consumer Group:咱們能夠將多個消費組組成一個消費者組,在kafka的設計中同一個分區的數據只能被消費者組中的某一個消費者消費。同一個消費者組的消費者能夠消費同一個topic的不一樣分區的數據,這也是爲了提升kafka的吞吐量!

Zookeeper:kafka集羣依賴zookeeper來保存集羣的的元信息,來保證系統的可用性。

一、生產消息


kafka的數據,其實是以文件的形式存儲在文件系統的。topic下有partition,partition下有segment,segment是實際的一個個文件,topic和partition都是抽象概念。在目錄/${topicName}-{$partitionid}/下,存儲着實際的log文件(即segment),還有對應的索引文件。每一個segment文件大小相等,文件名以這個segment中最小的offset命名,文件擴展名是.log;segment對應的索引的文件名字同樣,擴展名是.index。


二、消費消息

訂閱topic是以一個消費組來訂閱的,一個消費組裏面能夠有多個消費者。同一個消費組中的兩個消費者,不會同時消費一個partition。換句話來講,就是一個partition,只能被消費組裏的一個消費者消費,可是能夠同時被多個消費組消費。所以,若是消費組內的消費者若是比partition多的話,那麼就會有個別消費者一直空閒。
一個消費組消費partition,須要保存offset記錄消費到哪,之前保存在zk中,因爲zk的寫性能很差,之前的解決方法都是consumer每隔一分鐘上報一次。這裏zk的性能嚴重影響了消費的速度,並且很容易出現重複消費。
在0.10版本後,kafka把這個offset的保存,從zk總剝離,保存在一個名叫__consumeroffsets topic的topic中。寫進消息的key由groupid、topic、partition組成,value是偏移量offset。topic配置的清理策略是compact。老是保留最新的key,其他刪掉。通常狀況下,每一個key的offset都是緩存在內存中,查詢的時候不用遍歷partition,若是沒有緩存,第一次就會遍歷partition創建緩存,而後查詢返回。


三、kafka中消息具體是怎麼被存儲的

Kafka以Partition做爲存儲單元,一個partition是一個有序的,不變的消息隊列,消息老是被追加到尾部。一個partition不能被切分紅多個散落在多個broker上或者多個磁盤上。

Partition是由多個Segment組成,當Kafka要寫數據到一個partition時,它會寫入到狀態爲active的segment中。若是該segment被寫滿,則一個新的segment將會被新建,而後變成新的"active" segment。Segment以該segment的base offset做爲本身的名稱。


在磁盤上,一個partition就是一個目錄,而後每一個segment由一個index文件和一個log文件組成。以下:


Segment下的log文件就是存儲消息的地方,每一個消息都會包含消息體、offset、timestamp、key、size、壓縮編碼器、校驗和、消息版本號等。在磁盤上的數據格式和producer發送到broker的數據格式如出一轍,也和consumer收到的數據格式如出一轍。因爲磁盤格式與consumer以及producer的數據格式如出一轍,這樣就使得Kafka能夠經過零拷貝(zero-copy)技術來提升傳輸效率。

Segment下的index負責映射消息offset到某個消息在log文件中的位置。以下:


索引文件是內存映射(memory mapped)的,offset查找使用二分查找來查找小於或等於目標offset的最近offset。

索引文件由8個字節的條目組成,4個字節用來存儲相對於base offset的偏移量,另外4個字節用來存儲position。這個偏移量因爲是相對於base offset的,所以只須要4個字節來存儲。好比base offset是10000000000000000000,那麼接下來就不用存儲爲10000000000000000001 和10000000000000000002了,而是僅存儲爲1和2。

Kafka存儲內部文件工做總結:

• Partition被分紅多個segment。

• Segment包含兩個文件:index和log文件。

• Index負責映射每一個offset到消息的在log文件中的具體位置,主要用來查找消息。

• Indexes 保存的是當前segment的base offset的相對偏移量。

• 壓縮消息批量發送是被包裝一個wrapper message來發送。

• 保存在磁盤上的數據格式和broker從producer收到的以及發送給consumer的數據格式如出一轍,這樣就可以實現領拷貝(zero-copy)。

摘自cloud.tencent.com/developer/a…

四、kafka爲何會依賴zookeeper

一、在Kafka的設計中,選擇了使用Zookeeper來進行全部Broker的管理,體如今zookeeper上會有一個專門用來進行Broker服務器列表記錄的點,節點路徑爲/brokers/ids 每一個Broker服務器在啓動時,都會到Zookeeper上進行註冊,即建立/brokers/ids/[0-N]的節點,而後寫入IP,端口等信息,Broker建立的是臨時節點,全部一旦Broker上線或者下線,對應Broker節點也就被刪除了,所以咱們能夠經過zookeeper上Broker節點的變化來動態表徵Broker服務器的可用性,Kafka的Topic也相似於這種方式。

二、生產者負載均衡。生產者須要將消息合理的發送到分佈式Broker上,這就面臨如何進行生產者負載均衡問題。 對於生產者的負載均衡,Kafka支持傳統的4層負載均衡,zookeeper同時也支持zookeeper方式來實現負載均衡。 (1)傳統的4層負載均衡 根據生產者的IP地址和端口來爲其定一個相關聯的Broker,一般一個生產者只會對應單個Broker,只須要維護單個TCP連接。這樣的方案有不少弊端,由於在系統實際運行過程當中,每一個生產者生成的消息量,以及每一個Broker的消息存儲量都不同,那麼會致使不一樣的Broker接收到的消息量很是不均勻,並且生產者也沒法感知Broker的新增與刪除。 (2)使用zookeeper進行負載均衡很簡單,生產者經過監聽zookeeper上Broker節點感知Broker,Topic的狀態,變動,來實現動態負載均衡機制,固然這個機制Kafka已經結合zookeeper實現了。

三、消費者的負載均衡和生產負載均衡相似

四、記錄消息分區於消費者的關係,都是經過建立修改zookeeper上相應的節點實現

五、記錄消息消費進度Offset記錄,都是經過建立修改zookeeper上相應的節點實現 。

摘自blog.csdn.net/u011311291/…

更詳細解釋請參考www.jianshu.com/p/a036405f9…

3、spring-boot-kafka對接

maven依賴

<dependency>   
    <groupId>org.springframework.kafka</groupId>  
    <artifactId>spring-kafka</artifactId>   
    <version>2.2.7.RELEASE</version>
</dependency>複製代碼

properties配置

spring.kafka.producer.bootstrap-servers=192.168.41.140:9092
spring.kafka.consumer.bootstrap-servers=192.168.41.140:9092
spring.kafka.consumer.group-id=kafka
spring.kafka.consumer.auto-offset-reset=latest
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer複製代碼

一、生產消息

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;

/** * kafka生產者 * * @author blackupper * @version $Id: KafkaProducer, v0.1 * @company * @date 2019年08月02日 9:57 AM blackupper Exp $ */
@Component
@Slf4j
public class KafkaProducer {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void send(String topic, String msg){
        log.info("send data:{}, {}", topic, msg);
        ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, msg);
        future.addCallback(
                success -> log.info("KafkaMessageProducer 發送消息成功!"),
                fail -> log.error("KafkaMessageProducer 發送消息失敗!"));
    }
}複製代碼

import io.swagger.annotations.*;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;

/** * 測試controller * * @author blackupper * @version $Id: KafkaSendController, v0.1 * @company * @date 2019年08月02日 10:02 AM blackupper Exp $ */
@RestController
@Slf4j
@Api(description = "kafka測試接口")
public class KafkaSendController {
    @Autowired
    private KafkaProducer kafkaProducer;

    @ApiOperation(value = "發送消息")
    @RequestMapping(value = "/send", method = RequestMethod.GET)
    @ResponseBody
    public void queryBalance( @ApiParam(value = "topic", name = "topic") @RequestParam(value = "topic") String topic, @ApiParam(value = "消息內容", name = "msg") @RequestParam(value = "msg") String msg) {
        kafkaProducer.send(topic, msg);
    }
}
複製代碼

二、消費消息

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.TopicPartition;
import org.springframework.stereotype.Component;

/**
 * kafka消費者
 *
 * @author blackupper
 * @version $Id: KafKaConsumer, v0.1
 * @company 
 * @date 2019年08月02日 10:34 AM blackupper Exp $ */
@Component
@Slf4j
public class KafKaConsumer {
    @KafkaListener(id = "kafka", topicPartitions = {@TopicPartition(topic = "test1", partitions = { "0", "1" })})
    public void listen (ConsumerRecord<?, ?> record) {
        log.info("start consume");
        log.info("topic-{}, offset-{}, value-{}", record.topic(), record.offset(), record.value());
    }
}複製代碼


三、生產者經常使用調用方式

ListenableFuture<SendResult<K, V>> sendDefault(V data);複製代碼

KafkaTemplate中有defaultTopic這個屬性,當調用sendDefault方法時,kafka會自動把消息發送到defaultTopic屬性指定的topic中。

ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);複製代碼

將消息發送到指定的topic和partition中

ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);複製代碼

將消息發送到指定的topic和partition中,並在消息上帶上時間戳

ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);複製代碼

將消息內容封裝成ProducerRecord進行發送

其實上述幾個方法,最終都是分裝成ProducerRecord,調用doSend方法傳遞消息的,咱們下面看下doSend方法的源碼:

protected ListenableFuture<SendResult<K, V>> doSend(final ProducerRecord<K, V> producerRecord) {
        if (this.transactional) {
            Assert.state(inTransaction(),
                    "No transaction is in process; "
                            + "possible solutions: run the template operation within the scope of a "
                            + "template.executeInTransaction() operation, start a transaction with @Transactional "
                            + "before invoking the template method, "
                            + "run in a transaction started by a listener container when consuming a record");
        }
        final Producer<K, V> producer = getTheProducer();
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Sending: " + producerRecord);
        }
        final SettableListenableFuture<SendResult<K, V>> future = new SettableListenableFuture<>();
        producer.send(producerRecord, buildCallback(producerRecord, producer, future));
        if (this.autoFlush) {
            flush();
        }
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Sent: " + producerRecord);
        }
        return future;
    }複製代碼

從上述代碼能夠看到,doSend內部首先判斷是否開啓了事務,而後調用KafkaProducer的send方法發送消息,SettableListenableFuture接收返回值,SettableListenableFuture實現了ListenableFuture接口,ListenableFuture則實現了Future接口,Future是Java自帶的實現異步編程的接口,支持返回值的異步。因而可知上述的幾個方法都是異步發送消息的。若是想要同步獲取結果,能夠調用Future的get方法,該方法會阻塞直到任務返回結果。

四、@KafkaListener屬性詳解

@Target({ ElementType.TYPE, ElementType.METHOD, ElementType.ANNOTATION_TYPE })
@Retention(RetentionPolicy.RUNTIME)
@MessageMapping
@Documented
@Repeatable(KafkaListeners.class)
public @interface KafkaListener {

    /** * The unique identifier of the container managing for this endpoint. * <p>If none is specified an auto-generated one is provided. * <p>Note: When provided, this value will override the group id property * in the consumer factory configuration, unless {@link #idIsGroup()} * is set to false. * <p>SpEL {@code #{...}} and property place holders {@code ${...}} are supported. * @return the {@code id} for the container managing for this endpoint. * @see org.springframework.kafka.config.KafkaListenerEndpointRegistry#getListenerContainer(String) */
    String id() default "";

    /** * The bean name of the {@link org.springframework.kafka.config.KafkaListenerContainerFactory} * to use to create the message listener container responsible to serve this endpoint. * <p>If not specified, the default container factory is used, if any. * @return the container factory bean name. */
    String containerFactory() default "";

    /** * The topics for this listener. * The entries can be 'topic name', 'property-placeholder keys' or 'expressions'. * An expression must be resolved to the topic name. * <p> * Mutually exclusive with {@link #topicPattern()} and {@link #topicPartitions()}. * @return the topic names or expressions (SpEL) to listen to. */
    String[] topics() default {};

    /** * The topic pattern for this listener. The entries can be 'topic pattern', a * 'property-placeholder key' or an 'expression'. The framework will create a * container that subscribes to all topics matching the specified pattern to get * dynamically assigned partitions. The pattern matching will be performed * periodically against topics existing at the time of check. An expression must * be resolved to the topic pattern (String or Pattern result types are supported). * <p> * Mutually exclusive with {@link #topics()} and {@link #topicPartitions()}. * @return the topic pattern or expression (SpEL). * @see org.apache.kafka.clients.CommonClientConfigs#METADATA_MAX_AGE_CONFIG */
    String topicPattern() default "";

    /** * The topicPartitions for this listener. * <p> * Mutually exclusive with {@link #topicPattern()} and {@link #topics()}. * @return the topic names or expressions (SpEL) to listen to. */
    TopicPartition[] topicPartitions() default {};

    /** * If provided, the listener container for this listener will be added to a bean * with this value as its name, of type {@code Collection<MessageListenerContainer>}. * This allows, for example, iteration over the collection to start/stop a subset * of containers. * <p>SpEL {@code #{...}} and property place holders {@code ${...}} are supported. * @return the bean name for the group. */
    String containerGroup() default "";

    /** * Set an {@link org.springframework.kafka.listener.KafkaListenerErrorHandler} bean * name to invoke if the listener method throws an exception. * @return the error handler. * @since 1.3 */
    String errorHandler() default "";

    /** * Override the {@code group.id} property for the consumer factory with this value * for this listener only. * <p>SpEL {@code #{...}} and property place holders {@code ${...}} are supported. * @return the group id. * @since 1.3 */
    String groupId() default "";

    /** * When {@link #groupId() groupId} is not provided, use the {@link #id() id} (if * provided) as the {@code group.id} property for the consumer. Set to false, to use * the {@code group.id} from the consumer factory. * @return false to disable. * @since 1.3 */
    boolean idIsGroup() default true;

    /** * When provided, overrides the client id property in the consumer factory * configuration. A suffix ('-n') is added for each container instance to ensure * uniqueness when concurrency is used. * <p>SpEL {@code #{...}} and property place holders {@code ${...}} are supported. * @return the client id prefix. * @since 2.1.1 */
    String clientIdPrefix() default "";

    /** * A pseudo bean name used in SpEL expressions within this annotation to reference * the current bean within which this listener is defined. This allows access to * properties and methods within the enclosing bean. * Default '__listener'. * <p> * Example: {@code topics = "#{__listener.topicList}"}. * @return the pseudo bean name. * @since 2.1.2 */
    String beanRef() default "__listener";

    /** * Override the container factory's {@code concurrency} setting for this listener. May * be a property placeholder or SpEL expression that evaluates to a {@link Number}, in * which case {@link Number#intValue()} is used to obtain the value. * <p>SpEL {@code #{...}} and property place holders {@code ${...}} are supported. * @return the concurrency. * @since 2.2 */
    String concurrency() default "";

    /** * Set to true or false, to override the default setting in the container factory. May * be a property placeholder or SpEL expression that evaluates to a {@link Boolean} or * a {@link String}, in which case the {@link Boolean#parseBoolean(String)} is used to * obtain the value. * <p>SpEL {@code #{...}} and property place holders {@code ${...}} are supported. * @return true to auto start, false to not auto start. * @since 2.2 */
    String autoStartup() default "";

    /** * Kafka consumer properties; they will supersede any properties with the same name * defined in the consumer factory (if the consumer factory supports property overrides). * <h3>Supported Syntax</h3> * <p>The supported syntax for key-value pairs is the same as the * syntax defined for entries in a Java * {@linkplain java.util.Properties#load(java.io.Reader) properties file}: * <ul> * <li>{@code key=value}</li> * <li>{@code key:value}</li> * <li>{@code key value}</li> * </ul> * {@code group.id} and {@code client.id} are ignored. * @return the properties. * @since 2.2.4 * @see org.apache.kafka.clients.consumer.ConsumerConfig * @see #groupId() * @see #clientIdPrefix() */
    String[] properties() default {};

}複製代碼
  • id:表明當前節點的惟一標識,不配置的話會自動分配一個id,主動配置的話,groupId會被設置成id的值(前提是idIsGroup這個屬性值沒有被設置成false)。
  • containerFactory:設置監聽容器工廠類。
  • topics:須要監聽的Topic,可監聽多個。
  • topicsPattern:Topic主題,支持屬性佔位符,或者是正則表達式。
  • topicPartitions:能夠設置更加詳細的監聽信息,包括topic、partitions和partitionOffsets。
  • containerGroup:設置了這個屬性,當前的監聽器會被加進設置的這個容器組裏面,後面你能夠經過遍歷這個集合來啓動或終止一組監聽器集合。
  • errorHandler:異常處理器,若是監聽器處理方法拋出異常,你能夠設置一個實現了KafkaListenerErrorHandler的異常處理類來處理拋出的異常。
  • groupId:設置當前消費者組id,支持SpEL表達式{@code #{...}}和屬性佔位符{@code ${...}}
  • idIsGroup:id是否能用做groupId
  • clientIdPrefix:clientId前綴,後綴會默認加上-n來保證併發時該id的惟一性,支持SpEL表達式{@code #{...}}和屬性佔位符{@code ${...}}
  • beanRef:此註解中SpEL表達式中使用的僞bean名,用於指向此監聽器的當前bean,從而容許訪問封裝bean中的屬性和方法。
  • concurrency:用於覆蓋容器工廠中的併發屬性,支持SpEL表達式{@code #{...}}和屬性佔位符{@code ${...}}
  • autoStartup:是否自動啓動
  • properties:消費者屬性,將替換在消費者工廠中定義的具備相同名稱的任何屬性(若是消費者工廠支持屬性覆蓋)。

五、ReplyingKafkaTemplate簡介

在分析KafkaTemplate方法的時候,發現其實現的接口類KafkaOperations,還有另一個實現類ReplyingKafkaTemplate,簡單的描述處理流程就是:生產者經過TopicA發送消息,監聽器A從TopicA中獲取到消息,進行業務處理後將響應內容轉發到TopicB,監聽器B從TopicB獲取消息再次進行處理。

經過分析源碼,發現ReplyingKafkaTemplate是利用了請求響應模式,經過設置ProducerRecord.topic屬性能夠設置發送topic,經過設置ProducerRecord.Headers屬性能夠設置轉發topic,固然也能夠在new ReplyingKafkaTemplate()的時候,在GenericMessageListenerContainer中設置轉發topic。

@Configuration
@EnableKafka
public class ReplyKafkaTemplateConfiguration {
    @Value("${spring.kafka.producer.bootstrap-servers}")
    private String producer;

    @Value("${spring.kafka.consumer.bootstrap-servers}")
    private String consumer;

    @Bean
    public KafkaMessageListenerContainer<String, String> replyContainer(@Autowired ConsumerFactory consumerFactory) {
        ContainerProperties containerProperties = new ContainerProperties("topic.reply");
        return new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);
    }

    @Bean
    public ReplyingKafkaTemplate<String, String, String> replyingKafkaTemplate(@Autowired ProducerFactory producerFactory, KafkaMessageListenerContainer replyContainer) {
        ReplyingKafkaTemplate template = new ReplyingKafkaTemplate<>(producerFactory, replyContainer);
        template.setReplyTimeout(10000);
        return template;
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setReplyTemplate(kafkaTemplate());
        return factory;
    }

    @Bean
    @Primary
    public KafkaTemplate<Integer, String> kafkaTemplate() {
        KafkaTemplate template = new KafkaTemplate<>(producerFactory());
        return template;
    }

    @Bean
    public ProducerFactory<Integer, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(senderProps());
    }

    @Bean
    public ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerProps());
    }


    //消費者配置參數
    private Map<String, Object> consumerProps() {
        Map<String, Object> props = new HashMap<>();
        //鏈接地址
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, consumer);
        //GroupID
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "replyTest");
        //是否自動提交
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        //自動提交的頻率
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
        //Session超時設置
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
        //鍵的反序列化方式
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
        //值的反序列化方式
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }

    //生產者配置
    private Map<String, Object> senderProps (){
        Map<String, Object> props = new HashMap<>();
        //鏈接地址
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, producer);
        //重試,0爲不啓用重試機制
        props.put(ProducerConfig.RETRIES_CONFIG, 1);
        //控制批處理大小,單位爲字節
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        //批量發送,延遲爲1毫秒,啓用該功能能有效減小生產者發送消息次數,從而提升併發量
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        //生產者可使用的總內存字節來緩衝等待發送到服務器的記錄
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 1024000);
        //鍵的序列化方式
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
        //值的序列化方式
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }
}複製代碼

@Component
@Slf4j
public class ReplyKafkaTemplateProducer {
    @Autowired
    private ReplyingKafkaTemplate replyingKafkaTemplate;

    public void send() throws Exception {
        ProducerRecord<String, String> record = new ProducerRecord<>("topic.request", "request message");
        record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, "topic.reply".getBytes()));
        RequestReplyFuture<String, String, String> replyFuture = replyingKafkaTemplate.sendAndReceive(record);
        SendResult<String, String> sendResult = replyFuture.getSendFuture().get();
        log.info("send request msg result: " + sendResult.getRecordMetadata());
        ConsumerRecord<String, String> consumerRecord = replyFuture.get();
        log.info("receive reply result: " + consumerRecord.value());
    }
}複製代碼

@Component
@Slf4j
public class ReplyKafkaTemplateConsumer {
    @KafkaListener(id = "replyConsumer", topics = "topic.request",containerFactory = "kafkaListenerContainerFactory")
    @SendTo
    public String replyListen(ConsumerRecord<?, ?> record){
        log.info("topic-{}, offset-{}, value-{}", record.topic(), record.offset(), record.value());
        return "reply message";
    }
}複製代碼

相關文章
相關標籤/搜索