Kafka 是一種高吞吐的分佈式發佈訂閱消息系統,可以替代傳統的消息隊列用於解耦合數據處理,緩存未處理消息等,同時具備更高的吞吐率,支持分區、多副本、冗餘,所以被普遍用於大規模消息數據處理應用。Kafka 支持Java 及多種其它語言客戶端,可與Hadoop、Storm、Spark等其它大數據工具結合使用。java
代碼我已放到 Github ,導入spring-boot-kafka
項目github
github github.com/souyunku/sp…spring
在項目中添加 kafka-clients
依賴apache
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.2.0</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
複製代碼
@Configuration
@EnableKafka
public class KafkaConfiguration {
}
複製代碼
@Component
public class MsgProducer {
private static final Logger log = LoggerFactory.getLogger(MsgProducer.class);
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String topicName, String jsonData) {
log.info("向kafka推送數據:[{}]", jsonData);
try {
kafkaTemplate.send(topicName, jsonData);
} catch (Exception e) {
log.error("發送數據出錯!!!{}{}", topicName, jsonData);
log.error("發送數據出錯=====>", e);
}
//消息發送的監聽器,用於回調返回信息
kafkaTemplate.setProducerListener(new ProducerListener<String, String>() {
@Override
public void onSuccess(String topic, Integer partition, String key, String value, RecordMetadata recordMetadata) {
}
@Override
public void onError(String topic, Integer partition, String key, String value, Exception exception) {
}
@Override
public boolean isInterestedInSuccess() {
log.info("數據發送完畢");
return false;
}
});
}
}
複製代碼
@Component
public class MsgConsumer {
@KafkaListener(topics = {"topic-1","topic-2"})
public void processMessage(String content) {
System.out.println("消息被消費"+content);
}
}
複製代碼
application.properties
json
#kafka
# 指定kafka 代理地址,能夠多個
spring.kafka.bootstrap-servers=YZ-PTEST-APP-HADOOP-02:9092,YZ-PTEST-APP-HADOOP-04:9092
# 指定listener 容器中的線程數,用於提升併發量
spring.kafka.listener.concurrency=3
# 每次批量發送消息的數量
spring.kafka.producer.batch-size=1000
# 指定默認消費者group id
spring.kafka.consumer.group-id=myGroup
# 指定默認topic id
spring.kafka.template.default-topic=topic-1
複製代碼
@SpringBootApplication
@ComponentScan(value = {"io.ymq.kafka"})
public class Startup {
public static void main(String[] args) {
SpringApplication.run(Startup.class, args);
}
}
複製代碼
import io.ymq.kafka.MsgProducer;
import io.ymq.kafka.run.Startup;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
/** * 描述: 測試 kafka * * @author yanpenglei * @create 2017-10-16 18:45 **/
@RunWith(SpringRunner.class)
@SpringBootTest(classes = Startup.class)
public class BaseTest {
@Autowired
private MsgProducer msgProducer;
@Test
public void test() throws Exception {
msgProducer.sendMessage("topic-1", "topic--------1");
msgProducer.sendMessage("topic-2", "topic--------2");
}
}
複製代碼
消息生產者,響應bootstrap
2017-10-17 15:54:44.814 INFO 2960 --- [ main] io.ymq.kafka.MsgProducer : 向kafka推送數據:[topic--------1]
2017-10-17 15:54:44.860 INFO 2960 --- [ main] io.ymq.kafka.MsgProducer : 向kafka推送數據:[topic--------2]
2017-10-17 15:54:44.878 INFO 2960 --- [ad | producer-1] io.ymq.kafka.MsgProducer : 數據發送完畢
2017-10-17 15:54:44.878 INFO 2960 --- [ad | producer-1] io.ymq.kafka.MsgProducer : 數據發送完畢
複製代碼
消息消費者,響應segmentfault
消息被消費topic--------1
消息被消費topic--------2
複製代碼
代碼我已放到 Github ,導入spring-boot-kafka
項目緩存
github github.com/souyunku/sp…bash
[2017-10-16 19:20:08.340] - 14884 嚴重 [main] --- org.springframework.kafka.support.LoggingProducerListener: Exception thrown when sending a message with key='null' and payload='topic--------2' to topic topic-2:
複製代碼
經調試發現 kafka 鏈接是用的主機名,因此修改 hosts
C:\Windows\System32\drivers\etc\hosts
10.32.32.149 YZ-PTEST-APP-HADOOP-02
10.32.32.154 YZ-PTEST-APP-HADOOP-04
複製代碼