根據官網的介紹,ApacheKafka®是一個分佈式流媒體平臺,它主要有3種功能:html
Kafka目前主要做爲一個分佈式的發佈訂閱式的消息系統使用 下圖爲消息傳輸流程java
在kafka官網 http://kafka.apache.org/downloads下載到最新的kafka安裝包,選擇下載二進制版本的tgz文件spring
kafka解壓目錄下下有一個config的文件夾,裏面放置的是咱們的配置文件apache
consumer.properites 消費者配置json
producer.properties 生產者配置bootstrap
server.properties kafka服務器的配置,此配置文件用來配置kafka服務器 目前僅介紹幾個最基礎的配置服務器
listeners=PLAINTEXT:// 192.168.180.128:9092
。並確保服務器的9092端口可以訪問zookeeper.connect=localhost:2181
啓動zookeeperapp
#前臺啓動 [root@CentOS124 home]# cd kafka2.11/ [root@CentOS124 kafka2.11]# bin/zookeeper-server-start.sh config/zookeeper.properties #後臺啓動 [root@CentOS124 kafka2.11]# bin/zookeeper-server-start.sh config/zookeeper.properties 1>/dev/null 2>&1 & [1] 18466 #查看是否啓動成功 [root@CentOS124 ~]# ps -ef|grep kafka
啓動kafka框架
[root@CentOS124 kafka2.11]# bin/kafka-server-start.sh config/server.properties #後臺啓動 [root@CentOS124 kafka2.11]# bin/kafka-server-start.sh config/server.properties 1>/dev/null 2>&1 & #建立 topic [root@CentOS124 kafka2.11]# bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test #查看Kafka 中的 topic 列表 bin/kafka-topics.sh --list --zookeeper localhost:2181
#複製server.properties配置文件爲三份,分別起名爲server.properties,server-2.properties,server-3.properties 三份配置中都要修改如下 #三個配置中分別修改成0,2,3 broker.id=0 #三個配置中分別修改成9092,9093,9094 port=9092 #kafka-logs,kafka-logs-2,kafka-logs-3 log.dirs=/tmp/kafka-logs #都設置爲3,即每一個topic默認三個partition num.partitions=3 #zookeeper集羣地址,外部能夠配置,這裏環境有限 使用默認既可 zookeeper.connect=localhost:2181 #分別進入kafka目錄下 執行以下命令啓動服務控制檯輸出日子完成了 bin/kafka-server-start.sh config/server.properties bin/kafka-server-start.sh config/server-2.properties bin/kafka-server-start.sh config/server-3.properties
首先建立一個springBoot項目 引入spring-kafka 分佈式
application.properties 配置
server.port=8080 #kafka地址 brokers集羣地址用,隔開 spring.kafka.bootstrap-servers=127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094 #生產者的配置,大部分咱們可使用默認的,這裏列出幾個比較重要的屬性 #每批次發送消息的數量 spring.kafka.producer.batch-size=16 #發送失敗重試次數 spring.kafka.producer.retries=0 #即32MB的批處理緩衝區 spring.kafka.producer.buffer-memory=33554432 #key序列化方式 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer #消費者的配置 ##Kafka中沒有初始偏移或若是當前偏移在服務器上再也不存在時,默認區最新 ,有三個選項 【latest, earliest, none】 spring.kafka.consumer.auto-offset-reset=latest #是否開啓自動提交 spring.kafka.consumer.enable-auto-commit=true #自動提交的時間間隔 spring.kafka.consumer.auto-commit-interval=100 #key的解碼方式 spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer #value的解碼方式 spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer #在kafka/config文件的consumer.properties中有配置 spring.kafka.consumer.group-id=test-consumer-group
建立Producer生產者
package com.example.modules; import com.alibaba.fastjson.JSONObject; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component; import java.util.Date; /** * 〈生產者〉 * @author qinxuewu * @create 18/8/4下午11:56 * @since 1.0.0 */ @Component public class Producer { @Autowired private KafkaTemplate kafkaTemplate; //發送消息方法 public void send() { JSONObject obj=new JSONObject(); obj.put("id",System.currentTimeMillis()); obj.put("name","生產者發送消息"); obj.put("date",new Date()); //這個 topic 在 Java 程序中是不須要提早在 Kafka 中設置的,由於它會在發送的時候自動建立你設置的 topic kafkaTemplate.send("qxw",obj.toString()); } }
建立消費者
@Component public class Consumer { private static final Logger logger = LoggerFactory.getLogger(Consumer.class); /** * 同時監聽兩個 topic 的消息了,可同時監聽多個topic * @param record * @throws Exception */ @KafkaListener(topics = {"test","qxw"}) public void listen (ConsumerRecord<?, ?> record) throws Exception { Optional<?> kafkaMessage = Optional.ofNullable(record.value()); if (kafkaMessage.isPresent()) { Object message = kafkaMessage.get(); logger.info("消費者開始消費message:" + message); } } }
運行後就能夠看到控制檯輸出了
@RunWith(SpringRunner.class) @SpringBootTest public class KafkaDemoApplicationTests { @Autowired private Producer producer; @Test public void contextLoads() { for (int i = 0; i <3 ; i++) { producer.send(); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } }
https://www.cnblogs.com/alan319/p/8651434.html kafka的配置分爲 broker、producter、consumer三個不一樣的配置