brew install kafka複製代碼
安裝詳情java
安裝詳情裏面包含一些使用介紹,主要包括幾個部分:
web
安裝kafka前默認安裝了zookeeper,說明kafka依賴zookeeper,爲何依賴,下一部分會講到。正則表達式
這部分介紹了zookeeper和kafka的啓動命令,要麼用brew services start命令設置自啓或重啓(macOS 使用launchtl命令加載開機自動運行的服務,brew services是launchctl的一個子集),或者直接使用工具自帶的命令啓動。spring
啓動kafka
express
zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties & kafka-server-start /usr/local/etc/kafka/server.properties複製代碼
建立topicapache
kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test複製代碼
查看全部topic編程
kafka-topics --list --zookeeper localhost:2181複製代碼
生產者發送消息bootstrap
kafka-console-producer --broker-list localhost:9092 --topic test
>第一條消息
>第二條消息複製代碼
消費者消費消息 緩存
kafka-console-consumer --bootstrap-server localhost:9092 --topic test --from-beginning
第一條消息複製代碼
Producer:消息生產者。bash
Broker:kafka集羣中的服務器。
Topic:消息的主題,能夠理解爲消息的分類,kafka的數據就保存在topic。在每一個broker上均可以建立多個topic。
Partition:Topic的分區,每一個topic能夠有多個分區,分區的做用是作負載,提升kafka的吞吐量。
Replication:每個分區都有多個副本,副本的做用是作備胎。當主分區(Leader)故障的時候會選擇一個備胎(Follower)上位,成爲Leader。在kafka中默認副本的最大數量是10個,且副本的數量不能大於Broker的數量,follower和leader絕對是在不一樣的機器,同一機器對同一個分區也只可能存放一個副本(包括本身)。
Consumer:消息消費者。
Consumer Group:咱們能夠將多個消費組組成一個消費者組,在kafka的設計中同一個分區的數據只能被消費者組中的某一個消費者消費。同一個消費者組的消費者能夠消費同一個topic的不一樣分區的數據,這也是爲了提升kafka的吞吐量!Zookeeper:kafka集羣依賴zookeeper來保存集羣的的元信息,來保證系統的可用性。
kafka的數據,其實是以文件的形式存儲在文件系統的。topic下有partition,partition下有segment,segment是實際的一個個文件,topic和partition都是抽象概念。在目錄/${topicName}-{$partitionid}/下,存儲着實際的log文件(即segment),還有對應的索引文件。每一個segment文件大小相等,文件名以這個segment中最小的offset命名,文件擴展名是.log;segment對應的索引的文件名字同樣,擴展名是.index。
Kafka以Partition做爲存儲單元,一個partition是一個有序的,不變的消息隊列,消息老是被追加到尾部。一個partition不能被切分紅多個散落在多個broker上或者多個磁盤上。
Partition是由多個Segment組成,當Kafka要寫數據到一個partition時,它會寫入到狀態爲active的segment中。若是該segment被寫滿,則一個新的segment將會被新建,而後變成新的"active" segment。Segment以該segment的base offset做爲本身的名稱。
在磁盤上,一個partition就是一個目錄,而後每一個segment由一個index文件和一個log文件組成。以下:
Segment下的log文件就是存儲消息的地方,每一個消息都會包含消息體、offset、timestamp、key、size、壓縮編碼器、校驗和、消息版本號等。在磁盤上的數據格式和producer發送到broker的數據格式如出一轍,也和consumer收到的數據格式如出一轍。因爲磁盤格式與consumer以及producer的數據格式如出一轍,這樣就使得Kafka能夠經過零拷貝(zero-copy)技術來提升傳輸效率。
Segment下的index負責映射消息offset到某個消息在log文件中的位置。以下:
索引文件是內存映射(memory mapped)的,offset查找使用二分查找來查找小於或等於目標offset的最近offset。
索引文件由8個字節的條目組成,4個字節用來存儲相對於base offset的偏移量,另外4個字節用來存儲position。這個偏移量因爲是相對於base offset的,所以只須要4個字節來存儲。好比base offset是10000000000000000000,那麼接下來就不用存儲爲10000000000000000001 和10000000000000000002了,而是僅存儲爲1和2。
Kafka存儲內部文件工做總結:
• Partition被分紅多個segment。
• Segment包含兩個文件:index和log文件。
• Index負責映射每一個offset到消息的在log文件中的具體位置,主要用來查找消息。
• Indexes 保存的是當前segment的base offset的相對偏移量。
• 壓縮消息批量發送是被包裝一個wrapper message來發送。
• 保存在磁盤上的數據格式和broker從producer收到的以及發送給consumer的數據格式如出一轍,這樣就可以實現領拷貝(zero-copy)。
摘自cloud.tencent.com/developer/a…
一、在Kafka的設計中,選擇了使用Zookeeper來進行全部Broker的管理,體如今zookeeper上會有一個專門用來進行Broker服務器列表記錄的點,節點路徑爲/brokers/ids 每一個Broker服務器在啓動時,都會到Zookeeper上進行註冊,即建立/brokers/ids/[0-N]的節點,而後寫入IP,端口等信息,Broker建立的是臨時節點,全部一旦Broker上線或者下線,對應Broker節點也就被刪除了,所以咱們能夠經過zookeeper上Broker節點的變化來動態表徵Broker服務器的可用性,Kafka的Topic也相似於這種方式。
二、生產者負載均衡。生產者須要將消息合理的發送到分佈式Broker上,這就面臨如何進行生產者負載均衡問題。 對於生產者的負載均衡,Kafka支持傳統的4層負載均衡,zookeeper同時也支持zookeeper方式來實現負載均衡。 (1)傳統的4層負載均衡 根據生產者的IP地址和端口來爲其定一個相關聯的Broker,一般一個生產者只會對應單個Broker,只須要維護單個TCP連接。這樣的方案有不少弊端,由於在系統實際運行過程當中,每一個生產者生成的消息量,以及每一個Broker的消息存儲量都不同,那麼會致使不一樣的Broker接收到的消息量很是不均勻,並且生產者也沒法感知Broker的新增與刪除。 (2)使用zookeeper進行負載均衡很簡單,生產者經過監聽zookeeper上Broker節點感知Broker,Topic的狀態,變動,來實現動態負載均衡機制,固然這個機制Kafka已經結合zookeeper實現了。
三、消費者的負載均衡和生產負載均衡相似
四、記錄消息分區於消費者的關係,都是經過建立修改zookeeper上相應的節點實現
五、記錄消息消費進度Offset記錄,都是經過建立修改zookeeper上相應的節點實現 。
更詳細解釋請參考www.jianshu.com/p/a036405f9…
maven依賴
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.2.7.RELEASE</version>
</dependency>複製代碼
properties配置
spring.kafka.producer.bootstrap-servers=192.168.41.140:9092
spring.kafka.consumer.bootstrap-servers=192.168.41.140:9092
spring.kafka.consumer.group-id=kafka
spring.kafka.consumer.auto-offset-reset=latest
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer複製代碼
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
/** * kafka生產者 * * @author blackupper * @version $Id: KafkaProducer, v0.1 * @company * @date 2019年08月02日 9:57 AM blackupper Exp $ */
@Component
@Slf4j
public class KafkaProducer {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void send(String topic, String msg){
log.info("send data:{}, {}", topic, msg);
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, msg);
future.addCallback(
success -> log.info("KafkaMessageProducer 發送消息成功!"),
fail -> log.error("KafkaMessageProducer 發送消息失敗!"));
}
}複製代碼
import io.swagger.annotations.*;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
/** * 測試controller * * @author blackupper * @version $Id: KafkaSendController, v0.1 * @company * @date 2019年08月02日 10:02 AM blackupper Exp $ */
@RestController
@Slf4j
@Api(description = "kafka測試接口")
public class KafkaSendController {
@Autowired
private KafkaProducer kafkaProducer;
@ApiOperation(value = "發送消息")
@RequestMapping(value = "/send", method = RequestMethod.GET)
@ResponseBody
public void queryBalance( @ApiParam(value = "topic", name = "topic") @RequestParam(value = "topic") String topic, @ApiParam(value = "消息內容", name = "msg") @RequestParam(value = "msg") String msg) {
kafkaProducer.send(topic, msg);
}
}
複製代碼
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.TopicPartition;
import org.springframework.stereotype.Component;
/**
* kafka消費者
*
* @author blackupper
* @version $Id: KafKaConsumer, v0.1
* @company
* @date 2019年08月02日 10:34 AM blackupper Exp $ */
@Component
@Slf4j
public class KafKaConsumer {
@KafkaListener(id = "kafka", topicPartitions = {@TopicPartition(topic = "test1", partitions = { "0", "1" })})
public void listen (ConsumerRecord<?, ?> record) {
log.info("start consume");
log.info("topic-{}, offset-{}, value-{}", record.topic(), record.offset(), record.value());
}
}複製代碼
ListenableFuture<SendResult<K, V>> sendDefault(V data);複製代碼
KafkaTemplate中有defaultTopic這個屬性,當調用sendDefault方法時,kafka會自動把消息發送到defaultTopic屬性指定的topic中。
ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);複製代碼
將消息發送到指定的topic和partition中
ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);複製代碼
將消息發送到指定的topic和partition中,並在消息上帶上時間戳
ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);複製代碼
將消息內容封裝成ProducerRecord進行發送
其實上述幾個方法,最終都是分裝成ProducerRecord,調用doSend方法傳遞消息的,咱們下面看下doSend方法的源碼:
protected ListenableFuture<SendResult<K, V>> doSend(final ProducerRecord<K, V> producerRecord) {
if (this.transactional) {
Assert.state(inTransaction(),
"No transaction is in process; "
+ "possible solutions: run the template operation within the scope of a "
+ "template.executeInTransaction() operation, start a transaction with @Transactional "
+ "before invoking the template method, "
+ "run in a transaction started by a listener container when consuming a record");
}
final Producer<K, V> producer = getTheProducer();
if (this.logger.isTraceEnabled()) {
this.logger.trace("Sending: " + producerRecord);
}
final SettableListenableFuture<SendResult<K, V>> future = new SettableListenableFuture<>();
producer.send(producerRecord, buildCallback(producerRecord, producer, future));
if (this.autoFlush) {
flush();
}
if (this.logger.isTraceEnabled()) {
this.logger.trace("Sent: " + producerRecord);
}
return future;
}複製代碼
從上述代碼能夠看到,doSend內部首先判斷是否開啓了事務,而後調用KafkaProducer的send方法發送消息,SettableListenableFuture接收返回值,SettableListenableFuture實現了ListenableFuture接口,ListenableFuture則實現了Future接口,Future是Java自帶的實現異步編程的接口,支持返回值的異步。因而可知上述的幾個方法都是異步發送消息的。若是想要同步獲取結果,能夠調用Future的get方法,該方法會阻塞直到任務返回結果。
@Target({ ElementType.TYPE, ElementType.METHOD, ElementType.ANNOTATION_TYPE })
@Retention(RetentionPolicy.RUNTIME)
@MessageMapping
@Documented
@Repeatable(KafkaListeners.class)
public @interface KafkaListener {
/** * The unique identifier of the container managing for this endpoint. * <p>If none is specified an auto-generated one is provided. * <p>Note: When provided, this value will override the group id property * in the consumer factory configuration, unless {@link #idIsGroup()} * is set to false. * <p>SpEL {@code #{...}} and property place holders {@code ${...}} are supported. * @return the {@code id} for the container managing for this endpoint. * @see org.springframework.kafka.config.KafkaListenerEndpointRegistry#getListenerContainer(String) */
String id() default "";
/** * The bean name of the {@link org.springframework.kafka.config.KafkaListenerContainerFactory} * to use to create the message listener container responsible to serve this endpoint. * <p>If not specified, the default container factory is used, if any. * @return the container factory bean name. */
String containerFactory() default "";
/** * The topics for this listener. * The entries can be 'topic name', 'property-placeholder keys' or 'expressions'. * An expression must be resolved to the topic name. * <p> * Mutually exclusive with {@link #topicPattern()} and {@link #topicPartitions()}. * @return the topic names or expressions (SpEL) to listen to. */
String[] topics() default {};
/** * The topic pattern for this listener. The entries can be 'topic pattern', a * 'property-placeholder key' or an 'expression'. The framework will create a * container that subscribes to all topics matching the specified pattern to get * dynamically assigned partitions. The pattern matching will be performed * periodically against topics existing at the time of check. An expression must * be resolved to the topic pattern (String or Pattern result types are supported). * <p> * Mutually exclusive with {@link #topics()} and {@link #topicPartitions()}. * @return the topic pattern or expression (SpEL). * @see org.apache.kafka.clients.CommonClientConfigs#METADATA_MAX_AGE_CONFIG */
String topicPattern() default "";
/** * The topicPartitions for this listener. * <p> * Mutually exclusive with {@link #topicPattern()} and {@link #topics()}. * @return the topic names or expressions (SpEL) to listen to. */
TopicPartition[] topicPartitions() default {};
/** * If provided, the listener container for this listener will be added to a bean * with this value as its name, of type {@code Collection<MessageListenerContainer>}. * This allows, for example, iteration over the collection to start/stop a subset * of containers. * <p>SpEL {@code #{...}} and property place holders {@code ${...}} are supported. * @return the bean name for the group. */
String containerGroup() default "";
/** * Set an {@link org.springframework.kafka.listener.KafkaListenerErrorHandler} bean * name to invoke if the listener method throws an exception. * @return the error handler. * @since 1.3 */
String errorHandler() default "";
/** * Override the {@code group.id} property for the consumer factory with this value * for this listener only. * <p>SpEL {@code #{...}} and property place holders {@code ${...}} are supported. * @return the group id. * @since 1.3 */
String groupId() default "";
/** * When {@link #groupId() groupId} is not provided, use the {@link #id() id} (if * provided) as the {@code group.id} property for the consumer. Set to false, to use * the {@code group.id} from the consumer factory. * @return false to disable. * @since 1.3 */
boolean idIsGroup() default true;
/** * When provided, overrides the client id property in the consumer factory * configuration. A suffix ('-n') is added for each container instance to ensure * uniqueness when concurrency is used. * <p>SpEL {@code #{...}} and property place holders {@code ${...}} are supported. * @return the client id prefix. * @since 2.1.1 */
String clientIdPrefix() default "";
/** * A pseudo bean name used in SpEL expressions within this annotation to reference * the current bean within which this listener is defined. This allows access to * properties and methods within the enclosing bean. * Default '__listener'. * <p> * Example: {@code topics = "#{__listener.topicList}"}. * @return the pseudo bean name. * @since 2.1.2 */
String beanRef() default "__listener";
/** * Override the container factory's {@code concurrency} setting for this listener. May * be a property placeholder or SpEL expression that evaluates to a {@link Number}, in * which case {@link Number#intValue()} is used to obtain the value. * <p>SpEL {@code #{...}} and property place holders {@code ${...}} are supported. * @return the concurrency. * @since 2.2 */
String concurrency() default "";
/** * Set to true or false, to override the default setting in the container factory. May * be a property placeholder or SpEL expression that evaluates to a {@link Boolean} or * a {@link String}, in which case the {@link Boolean#parseBoolean(String)} is used to * obtain the value. * <p>SpEL {@code #{...}} and property place holders {@code ${...}} are supported. * @return true to auto start, false to not auto start. * @since 2.2 */
String autoStartup() default "";
/** * Kafka consumer properties; they will supersede any properties with the same name * defined in the consumer factory (if the consumer factory supports property overrides). * <h3>Supported Syntax</h3> * <p>The supported syntax for key-value pairs is the same as the * syntax defined for entries in a Java * {@linkplain java.util.Properties#load(java.io.Reader) properties file}: * <ul> * <li>{@code key=value}</li> * <li>{@code key:value}</li> * <li>{@code key value}</li> * </ul> * {@code group.id} and {@code client.id} are ignored. * @return the properties. * @since 2.2.4 * @see org.apache.kafka.clients.consumer.ConsumerConfig * @see #groupId() * @see #clientIdPrefix() */
String[] properties() default {};
}複製代碼
在分析KafkaTemplate方法的時候,發現其實現的接口類KafkaOperations,還有另一個實現類ReplyingKafkaTemplate,簡單的描述處理流程就是:生產者經過TopicA發送消息,監聽器A從TopicA中獲取到消息,進行業務處理後將響應內容轉發到TopicB,監聽器B從TopicB獲取消息再次進行處理。
經過分析源碼,發現ReplyingKafkaTemplate是利用了請求響應模式,經過設置ProducerRecord.topic屬性能夠設置發送topic,經過設置ProducerRecord.Headers屬性能夠設置轉發topic,固然也能夠在new ReplyingKafkaTemplate()的時候,在GenericMessageListenerContainer中設置轉發topic。
@Configuration
@EnableKafka
public class ReplyKafkaTemplateConfiguration {
@Value("${spring.kafka.producer.bootstrap-servers}")
private String producer;
@Value("${spring.kafka.consumer.bootstrap-servers}")
private String consumer;
@Bean
public KafkaMessageListenerContainer<String, String> replyContainer(@Autowired ConsumerFactory consumerFactory) {
ContainerProperties containerProperties = new ContainerProperties("topic.reply");
return new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);
}
@Bean
public ReplyingKafkaTemplate<String, String, String> replyingKafkaTemplate(@Autowired ProducerFactory producerFactory, KafkaMessageListenerContainer replyContainer) {
ReplyingKafkaTemplate template = new ReplyingKafkaTemplate<>(producerFactory, replyContainer);
template.setReplyTimeout(10000);
return template;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setReplyTemplate(kafkaTemplate());
return factory;
}
@Bean
@Primary
public KafkaTemplate<Integer, String> kafkaTemplate() {
KafkaTemplate template = new KafkaTemplate<>(producerFactory());
return template;
}
@Bean
public ProducerFactory<Integer, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(senderProps());
}
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerProps());
}
//消費者配置參數
private Map<String, Object> consumerProps() {
Map<String, Object> props = new HashMap<>();
//鏈接地址
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, consumer);
//GroupID
props.put(ConsumerConfig.GROUP_ID_CONFIG, "replyTest");
//是否自動提交
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
//自動提交的頻率
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
//Session超時設置
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
//鍵的反序列化方式
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
//值的反序列化方式
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
//生產者配置
private Map<String, Object> senderProps (){
Map<String, Object> props = new HashMap<>();
//鏈接地址
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, producer);
//重試,0爲不啓用重試機制
props.put(ProducerConfig.RETRIES_CONFIG, 1);
//控制批處理大小,單位爲字節
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
//批量發送,延遲爲1毫秒,啓用該功能能有效減小生產者發送消息次數,從而提升併發量
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
//生產者可使用的總內存字節來緩衝等待發送到服務器的記錄
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 1024000);
//鍵的序列化方式
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
//值的序列化方式
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
}複製代碼
@Component
@Slf4j
public class ReplyKafkaTemplateProducer {
@Autowired
private ReplyingKafkaTemplate replyingKafkaTemplate;
public void send() throws Exception {
ProducerRecord<String, String> record = new ProducerRecord<>("topic.request", "request message");
record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, "topic.reply".getBytes()));
RequestReplyFuture<String, String, String> replyFuture = replyingKafkaTemplate.sendAndReceive(record);
SendResult<String, String> sendResult = replyFuture.getSendFuture().get();
log.info("send request msg result: " + sendResult.getRecordMetadata());
ConsumerRecord<String, String> consumerRecord = replyFuture.get();
log.info("receive reply result: " + consumerRecord.value());
}
}複製代碼
@Component
@Slf4j
public class ReplyKafkaTemplateConsumer {
@KafkaListener(id = "replyConsumer", topics = "topic.request",containerFactory = "kafkaListenerContainerFactory")
@SendTo
public String replyListen(ConsumerRecord<?, ?> record){
log.info("topic-{}, offset-{}, value-{}", record.topic(), record.offset(), record.value());
return "reply message";
}
}複製代碼