1 Kafka
Kafka是一個開源分佈式的流處理平臺,一種高吞吐量的分佈式發佈訂閱消息系統,它能夠處理消費者在網站中的全部動做流數據。Kafka由Scala和Java編寫,2012年成爲Apache基金會下頂級項目。前端
2 Kafka優勢
- 低延遲:Kafka支持低延遲消息傳遞,速度極快,能達到200w寫/秒
- 高性能:Kafka對於消息的分佈,訂閱都有高吞吐量。即便存儲了TB級的信息,依然可以保證穩定的性能
- 可靠性:Kafka是分佈式,分區,複製和容錯的,保證零停機和零數據丟失
- 可擴展:用戶能夠從但個代理Broker開始做POC,而後慢慢擴展到由三個Broker組成的小型開發集羣,接着擴展到數十個甚至數百個Broker集羣進入生產階段,能夠在集羣聯機時進行擴展,而不會影響整個系統的可用性
- 多個生產者:不管這些客戶使用相同Topic仍是多個Topic,Kafka都能無縫處理多個生產者,使得系統能夠很是容易聚合來自許多前端系統的數據並使其保持一致
- 多個消費者:Kafka具備多個消費者設計,能夠讀取任何但個消息流而不會相互干擾。多個Kafka消費者能夠組成一個消費組進行操做並共享消息流,從而確保每一條消息只被整個消費組處理一次
- 基於磁盤的保留:Kafka使用分佈式提交日誌,消息可以快速持久化到磁盤上。消息持久化意味着若是消費者落後,不管是因爲處理速度緩慢仍是忽然的消息涌入都不會有丟失數據的危險,也意味着消費者能夠被中止。消息將保留在Kafka中,容許消費者從新啓動而且從中斷處獲取處理信息而不會丟失數據
3 Kafka相關術語
- Broker:Kafka集羣包含一個或多個服務器,這種服務器稱爲Broker
- Topic:每條發佈到Kafka的消息都有一個類別,這個類別稱爲Topic。物理上不一樣Topic的消息分開存儲,邏輯上Topic的消息雖然保存在一個或多個Broker上,但用戶只需指定消息的Topic便可生產或消費數據而沒必要關心數據存放於何處
- Partition:每一個Topic包含一個或多個Partition
- Producer:生產者,負責發佈消息到Broker
- Consumer:消費者,向Broker讀取消息的客戶端
- Consumer Group:每一個Consumer屬於一個特定的Consumer Group,能夠爲每一個Consumer指定Group Name,不然屬於默認Group
4 動手幹活
4.1 環境
- Spring Boot 2.3.1
- IDEA 2020.1.1
- OpenJDK 11.0.7
- Kafka 2.5.0
- Kotlin 1.3.72
4.2 下載Kafka
官網戳這裏。java
下載並解壓(注意須要Kafka與Spring Boot版本對應,能夠參考這裏):git
tar -xvf kafka_2.12-2.5.0.tgz cd kafka_2.12-2.5.0
接着啓動ZooKeeper與Kafka:github
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties bin/kafka-server-start.sh config/server.properties
Kafka須要用到ZooKeeper,須要在啓動Kafka以前啓動ZooKeeper(ZooKeeper是一個開源的分佈式應用程序協調服務,是Hadoop的組件,主要用於解決分佈式應用中的一些數據管理問題)。spring
Kafka默認使用9092端口,部署在服務器上須要注意防火牆以及安全組的處理。apache
4.3 新建工程
考慮到Spring Boot在2.3.0M1中(截至本文寫做日期2020.07.14Spring Boot已更新到2.4.0M1)首次採用Gradle而不是Maven來構建項目,換句話說往後Spring Boot的構建工具將從Maven遷移到Gradle,Spring Boot團隊給出的主要緣由是能夠減小項目構建所花費的時間,詳情能夠戳這裏瞧瞧。bootstrap
另外因爲另外一個基於JVM的語言Kotlin的日漸崛起,後端開始逐漸有人採用Kotlin(儘管很少,不過語法糖真的香,JetBrains家的語言配合IDE,爽得飛起),所以本示例項目將採用兩種方式搭建:後端
- Java+Maven
- Kotlin+Gradle
選擇的依賴以下(固然您喜歡的話能夠在pom.xml
或者build.gradle.kts
裏面加,對於Kotlin不須要Lombok
):安全
4.4 項目結構
Java版:springboot
Kotlin版:
serialize
:序列化/反序列化實體類Constant.java
/Constant.kt
:常量類Consumer.java
/Consumer.kt
:消費者類Entity.java
/Entity.kt
:實體類Producer.java
/Product.kt
:生產者類TestApplicationTests
:測試類
4.5 常量類
包含Topic與GroupId,Java版:
public class Constants { public static final String TOPIC = "TestTopic"; public static final String GROUP_ID = "TestGroupId"; }
Kotlin版:
object Constants { const val TOPIC = "TestTopic" const val GROUP_ID = "TestGroupId" }
4.6 實體類
@AllArgsConstructor @NoArgsConstructor @Data @Builder public class Entity { private long id; private String name; private int num; }
說一下Lombok的幾個註解:
@AllArgsConstructor
/@NoArgsConstructor
:生成全部參數/無參數構造方法@Data
:等價於@Setter+@Getter+@RequiredArgsConstrucotr+@ToString+@EqualAndHashCode
,自動生成Setter+Getter+toString()+equals()+hashCode()
,還有@RequireArgsConstructor
爲類的每個final
或非空字段生成一個構造方法@Builder
:能夠經過建造者模式建立對象
Kotlin版:
class Entity { var id: Long = 0 var name: String = "" var num: Int = 0 constructor() constructor(id:Long,name:String,num:Int) { this.id = id this.name = name this.num = num } }
4.7 生產者
@Component @Slf4j //防止出現Field injection not recommended警告,代替了原來的直接在字段上@Autowired @RequiredArgsConstructor(onConstructor = @__(@Autowired)) public class Producer { private final KafkaTemplate<String, Entity> kafkaTemplate; public void send(Entity entity) { //發送消息 //類型通常爲String+自定義消息內容,String表明消息Topic,這裏消息內容用Entity表示 ListenableFuture<SendResult<String, Entity>> future = kafkaTemplate.send(Constants.TOPIC, entity); //回調函數 future.addCallback(new ListenableFutureCallback<>() { @Override public void onFailure(Throwable throwable) { log.info("Send message failed"); } @Override public void onSuccess(SendResult<String, Entity> stringEntitySendResult) { log.info("Send message success"); } }); } }
這裏的send
有兩個參數,對應於sendResult<>
中的參數類型,第一個爲消息的Topic,第二個爲消息體,通常使用String或者Json。
Kotlin版:
@Component class Producer { @Autowired private var kafkaTemplate:KafkaTemplate<String,Entity> ? = null private val log = LoggerFactory.getLogger(this.javaClass) fun send(entity: Entity) { val future = kafkaTemplate!!.send(Constants.TOPIC,entity); future.addCallback(object : ListenableFutureCallback<SendResult<String?, Entity?>?>{ override fun onSuccess(result : SendResult<String?,Entity?>?) { log.info("Send success"); } override fun onFailure(e:Throwable) { log.info("Send failed"); } }) } }
4.8 消費者
@Component @Slf4j public class Consumer { @KafkaListener(topics = Constants.TOPIC,groupId = Constants.GROUP_ID) public void consume(Entity entity) { log.info("Consume a entity, id is "+entity.getId()); } }
使用@KafkaListener
註解,第一個參數表示須要消費的消息的Topic,能夠是String []
,第二個是消費者組的id。生產者的消息Topic必須與消費者的Topic保持一致不然不能消費,這裏簡單處理打印日誌。
Kotlin版:
@Component class Consumer { private val log = LoggerFactory.getLogger(this.javaClass) @KafkaListener(topics = [Constants.TOPIC],groupId = Constants.GROUP_ID) fun consume(entity: Entity) { log.info("Consume a entity, id is "+entity.id.toString()) } }
4.9 序列化/反序列化
這裏自定義了序列化/反序列化類,序列化/反序列化類須要實現org.apache.kafka.common.serialization.Serializer<T>/Deserializer<T>
接口,其中T
是想要序列化的類型,這裏是Entity
。序列化接口反編譯以下:
public interface Serializer<T> extends Closeable { default void configure(Map<String, ?> configs, boolean isKey) { } byte[] serialize(String var1, T var2); default byte[] serialize(String topic, Headers headers, T data) { return this.serialize(topic, data); } default void close() { } }
反序列化反編譯接口以下:
public interface Deserializer<T> extends Closeable { default void configure(Map<String, ?> configs, boolean isKey) { } T deserialize(String var1, byte[] var2); default T deserialize(String topic, Headers headers, byte[] data) { return this.deserialize(topic, data); } default void close() { } }
也就是隻須要實現其中的serialize/deserialize
方法便可。這裏序列化/反序列化用到了自帶的Jackson:
@Slf4j public class Serializer implements org.apache.kafka.common.serialization.Serializer<Entity> { public byte [] serialize(String topic, Entity entity) { try { return entity == null ? null : new ObjectMapper().writeValueAsBytes(entity); } catch (JsonProcessingException e) { e.printStackTrace(); log.error("Can not serialize entity in Serializer"); } return null; } }
反序列化:
@Slf4j public class Deserializer implements org.apache.kafka.common.serialization.Deserializer<Entity> { public Entity deserialize(String topic,byte [] data) { try { return data == null ? null : new ObjectMapper().readValue(data,Entity.class); } catch (IOException e) { e.printStackTrace(); log.error("Can not deserialize entity in Deserializer"); } return null; } }
Kotlin版:
class Serializer : org.apache.kafka.common.serialization.Serializer<Entity?> { private val log = LoggerFactory.getLogger(this.javaClass) override fun serialize(topic: String?, data: Entity?): ByteArray? { try { return if (data == null) null else ObjectMapper().writeValueAsBytes(data) } catch (e:JsonProcessingException) { e.printStackTrace() log.error("Can not serialize entity in Serializer") } return null } }
class Deserializer : org.apache.kafka.common.serialization.Deserializer<Entity?> { private val log = LoggerFactory.getLogger(this.javaClass) override fun deserialize(topic: String?, data: ByteArray?): Entity? { try { return ObjectMapper().readValue(data, Entity::class.java) } catch (e:IOException) { e.printStackTrace() log.error("Can not deserialize entity in Deserializer") } return null } }
4.10 配置文件
application.properties
:
# 地址,本地直接localhost,部署可使用公網ip spring.kafka.bootstrap-servers=localhost:9092 # 消費者組id spring.kafka.consumer.group-id=TestGroupId spring.kafka.consumer.auto-offset-reset=earliest # 消費者鍵反序列化類 spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer # 消費者值反序列化類 spring.kafka.consumer.value-deserializer=com.test.serialize.Deserializer # 生產者鍵序列化類 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer # 生產者值序列化類 spring.kafka.producer.value-serializer=com.test.serialize.Serializer
對於auto-offest-rest
,該屬性指定了消費者在讀取一個沒有偏移量的分區或者偏移量無效的狀況下怎麼處理,有四個取值:
earliest
:當各分區有已提交的offest
時,從提交的offest
開始消費,無提交的offest
時,從頭開始消費latest
(默認):當各分區有已提交的offest
時,從提交的offest
開始消費,無提交的offest
時,消費新產生的該分區下的數據none
:各分區都存在已提交的offest
時,從offest
後消費,只要有一個分區不存在已提交的offest
,則拋出異常exception
:其餘狀況將拋出異常給消費者
對於序列化/反序列化,String可使用自帶的序列化/反序列化類:
org.apache.kafka.common.serialization.StringSerializer org.apache.kafka.common.serialization.StringDeserializer
至於Json可使用:
org.springframework.kafka.support.serializer.JsonSerializer org.springframework.kafka.support.serializer.JsonDeserializer
其餘自定義的請實現org.apache.kafka.common.serialization.Serializer<T>/Deserializer<T>
接口。
yml版:
spring: kafka: bootstrap-servers: localhost:9092 consumer: group-id: TestGroupId auto-offset-reset: earliest key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: com.test.serialize.Deserializer producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: com.test.serialize.Serializer
5 測試
5.1 測試類
@SpringBootTest @Slf4j @RequiredArgsConstructor(onConstructor = @__(@Autowired)) class TestApplicationTests { private final Producer producer; @Test void contextLoads() { Random random = new Random(); for (int i = 0; i < 1000; i++) { long id = i+1; String name = UUID.randomUUID().toString(); int num = random.nextInt(); producer.send(Entity.builder().id(id).name(name).num(num).build()); } } }
生產者發送1000條消息。 Kotlin版:
@SpringBootTest class TestApplicationTests { @Autowired private val producer:Producer? = null @Test fun contextLoads() { for(i in 0..1000) { val id = (i + 1).toLong() val name = java.util.UUID.randomUUID().toString() val num = (0..100000).random() producer!!.send(Entity(id,name,num)) } } }
5.2 測試
控制檯輸出以下:
全部消息被成功發送而且被成功消費。
最後能夠去驗證一下Kafka的Topic列表,能夠看到配置文件中的Topic的值(TestTopic
),進入Kafka目錄:
bin/kafka-topics.sh --list --zookepper localhost:2181
6 源碼
7 參考
一、CSDN-Kafka優勢 二、簡書-Spring Boot 2.x 快速集成整合消息中間件 Kafka 三、簡書-springboot 之集成kafka
若是以爲文章好看,歡迎點贊。
同時歡迎關注微信公衆號:氷泠之路。