java B2B2C源碼電子商城系統-Kafka快速入門

你們對Kafka有了一些基本瞭解以後,下面咱們來嘗試構建一個Kafka服務端,並體驗一下基於Kafka的消息生產與消費。
須要JAVA Spring Cloud大型企業分佈式微服務雲構建的B2B2C電子商務平臺源碼 一零三八七七四六二六
環境安裝
首先,咱們須要從官網上下載安裝介質。
在解壓Kafka的安裝包以後,能夠看到其目錄結構以下:
 java

kafka
  +-bin
    +-windows
  +-config
  +-libs
  +-logs
  +-site-docs

因爲Kafka的設計中依賴了ZooKeeper,因此咱們能夠在bin和config目錄中除了看到Kafka相關的內容以外,還有ZooKeeper相關的內容。其中bin目錄存放了Kafka和ZooKeeper的命令行工具,bin根目錄下是適用於Linux/Unix的shell,而bin/windows下的則是適用於windows下的bat。咱們能夠根據實際的系統來設置環境變量,以方便後續的使用和操做。而在config目錄中,則是用來存放了關於Kafka與ZooKeeper的配置信息。spring

啓動測試
下面咱們來嘗試啓動ZooKeeper和Kafka來進行消息的生產和消費。示例中全部的命令均已配置了Kafka的環境變量爲例。shell

啓動ZooKeeper,執行命令:zookeeper-server-start config/zookeeper.properties,該命令須要指定zookeeper的配置文件位置才能正確啓動,kafka的壓縮包中包含了其默認配置,開發與測試環境不須要修改。apache

[2016-09-28 08:05:34,849] INFO Reading configuration from: config\zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2016-09-28 08:05:34,850] INFO autopurge.snapRetainCount set to 3 (org.apache.zookeeper.server.DatadirCleanupManager)
[2016-09-28 08:05:34,851] INFO autopurge.purgeInterval set to 0 (org.apache.zookeeper.server.DatadirCleanupManager)
[2016-09-28 08:05:34,851] INFO Purge task is not scheduled. (org.apache.zookeeper.server.DatadirCleanupManager)
...
[2016-09-28 08:05:34,940] INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory)


從控制檯信息中,咱們能夠看到ZooKeeper從指定的config/zookeeper.properties配置文件中讀取信息並綁定2181端口啓動服務。有時候啓動失敗,可查看一下端口是否被佔用,能夠殺掉佔用進程或經過修改config/zookeeper.properties配置文件中的clientPort內容以綁定其餘端口號來啓動ZooKeeper。bootstrap

啓動Kafka,執行命令:kafka-server-start config/server.properties,該命令也須要指定Kafka配置文件的正確位置,如上命令中指向瞭解壓目錄包含的默認配置。若在測試時,使用外部集中環境的ZooKeeper的話,咱們能夠在該配置文件中經過zookeeper.connect參數來設置ZooKeeper的地址和端口,它默認會鏈接本地2181端口的ZooKeeper;若是須要設置多個ZooKeeper節點,能夠爲這個參數配置多個ZooKeeper地址,並用逗號分割。好比:zookeeper.connect=127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002。windows

建立Topic,執行命令:kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test,經過該命令,建立一個名爲「test」的Topic,該Topic包含一個分區一個Replica。在建立完成後,可使用kafka-topics --list --zookeeper localhost:2181命令來查看當前的Topic。分佈式

另外,若是咱們不使用kafka-topics命令來手工建立,直接進行下面的內容進行消息建立時也會自動建立Topics來使用。微服務

建立消息生產者,執行命令:kafka-console-producer --broker-list localhost:9092 --topic test。kafka-console-producer命令能夠啓動Kafka基於命令行的消息生產客戶端,啓動後能夠直接在控制檯中輸入消息來發送,控制檯中的每一行數據都會被視爲一條消息來發送。咱們能夠嘗試輸入幾行消息,因爲此時並無消費者,因此這些輸入的消息都會被阻塞在名爲test的Topics中,直到有消費者將其消費掉位置。工具

建立消息消費者,執行命令:kafka-console-consumer --zookeeper localhost:2181 --topic test --from-beginning。kafka-console-consumer命令啓動的是Kafka基於命令行的消息消費客戶端,在啓動以後,咱們立刻能夠在控制檯中看到輸出了以前咱們在消息生產客戶端中發送的消息。咱們能夠再次打開以前的消息生產客戶端來發送消息,並觀察消費者這邊對消息的輸出來體驗Kafka對消息的基礎處理。測試

整合Spring Cloud Bus
在上一篇使用Rabbit實現消息總線的案例中,咱們已經經過引入spring-cloud-starter-bus-amqp模塊,完成了使用RabbitMQ來實現的消息總線。若咱們要使用Kafka來實現消息總線時,只須要把spring-cloud-starter-bus-amqp替換成spring-cloud-starter-bus-kafka模塊,在pom.xml的dependenies節點中進行修改,具體以下:
 

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-bus-kafka</artifactId>
</dependency>

若是咱們在啓動Kafka時均採用了默認配置,那麼咱們不須要再作任何其餘配置就能在本地實現從RabbitMQ到Kafka的切換。咱們能夠嘗試把剛剛搭建的ZooKeeper、Kafka啓動起來,並將修改成spring-cloud-starter-bus-kafka模塊的config-server和config-client啓動起來。

在config-server啓動時,咱們能夠在控制檯中看到以下輸出:
 

2016-09-28 22:11:29.627  INFO 15144 --- [           main] o.s.c.s.b.k.KafkaMessageChannelBinder    : Using kafka topic for outbound: springCloudBus
2016-09-28 22:11:29.642  INFO 15144 --- [-localhost:2181] org.I0Itec.zkclient.ZkEventThread        : Starting ZkClient event thread.
...
016-09-28 22:11:30.290  INFO 15144 --- [           main] o.s.i.kafka.support.ProducerFactoryBean  : Using producer properties => {bootstrap.servers=localhost:9092, linger.ms=0, acks=1, compression.type=none, batch.size=16384}
2016-09-28 22:11:30.298  INFO 15144 --- [           main] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values:
...
2016-09-28 22:11:30.322  INFO 15144 --- [           main] o.s.c.s.b.k.KafkaMessageChannelBinder$1  : Adding {message-handler:outbound.springCloudBus} as a subscriber to the 'springCloudBusOutput' channel
2016-09-28 22:11:31.467  INFO 15144 --- [           main] o.s.c.s.b.k.KafkaMessageChannelBinder$7  : Adding {message-handler:inbound.springCloudBus.anonymous.8b9e6c7b-6a50-48c5-b981-8282a0d5a30b} as a subscriber to the 'bridge.springCloudBus' channel
2016-09-28 22:11:31.467  INFO 15144 --- [           main] o.s.c.s.b.k.KafkaMessageChannelBinder$7  : started inbound.springCloudBus.anonymous.8b9e6c7b-6a50-48c5-b981-8282a0d5a30b


從控制檯的輸出內容,咱們能夠看到config-server鏈接到了Kafka中,並使用了名爲springCloudBus的Topic。

此時,咱們可使用kafka-topics --list --zookeeper localhost:2181命令來查看當前Kafka中的Topic,若已成功啓動了config-server並配置正確,咱們就能夠在Kafka中看到已經多了一個名爲springCloudBus的Topic。

咱們再啓動配置了spring-cloud-starter-bus-kafka模塊的config-client,能夠看到控制檯中輸出以下內容:

2016-09-28 22:43:55.067  INFO 6136 --- [           main] o.s.c.s.b.k.KafkaMessageChannelBinder    : Using kafka topic for outbound: springCloudBus
2016-09-28 22:43:55.078  INFO 6136 --- [-localhost:2181] org.I0Itec.zkclient.ZkEventThread        : Starting ZkClient event thread.
...
2016-09-28 22:50:38.584  INFO 828 --- [           main] o.s.i.kafka.support.ProducerFactoryBean  : Using producer properties => {bootstrap.servers=localhost:9092, linger.ms=0, acks=1, compression.type=none, batch.size=16384}
2016-09-28 22:50:38.592  INFO 828 --- [           main] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values: 
...
2016-09-28 22:50:38.615  INFO 828 --- [           main] o.s.c.s.b.k.KafkaMessageChannelBinder$1  : Adding {message-handler:outbound.springCloudBus} as a subscriber to the 'springCloudBusOutput' channel
2016-09-28 22:50:38.616  INFO 828 --- [           main] o.s.integration.channel.DirectChannel    : Channel 'didispace:7002.springCloudBusOutput' has 1 subscriber(s).
2016-09-28 22:50:38.616  INFO 828 --- [           main] o.s.c.s.b.k.KafkaMessageChannelBinder$1  : started outbound.springCloudBus
...
2016-09-28 22:50:39.162  INFO 828 --- [           main] s.i.k.i.KafkaMessageDrivenChannelAdapter : started org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter@60cf855e
2016-09-28 22:50:39.162  INFO 828 --- [           main] o.s.c.s.b.k.KafkaMessageChannelBinder$7  : Adding {message-handler:inbound.springCloudBus.anonymous.f8fc9c0c-ccd3-46dd-9537-07198f4ee216} as a subscriber to the 'bridge.springCloudBus' channel
2016-09-28 22:50:39.163  INFO 828 --- [           main] o.s.c.s.b.k.KafkaMessageChannelBinder$7  : started inbound.springCloudBus.anonymous.f8fc9c0c-ccd3-46dd-9537-07198f4ee216

 
能夠看到,config-client啓動時輸出了相似的內容,他們都訂閱了名爲springCloudBus的Topic。

在啓動了config-server和config-client以後,爲了更明顯地觀察消息總線刷新配置的效果,咱們能夠在本地啓動多個不一樣端口的config-client。此時,咱們的config-server以及多個config-client都已經鏈接到了由Kafka實現的消息總線上。咱們能夠先訪問各個config-client上的/from請求,查看他獲取到的配置內容。而後,修改Git中對應的參數內容,再訪問各個config-client上的/from請求,能夠看到配置內容並無改變。最後,咱們向config-server發送POST請求:/bus/refresh,此時咱們再去訪問各個config-client上的/from請求,就能得到到最新的配置信息,各客戶端上的配置都已經加載爲最新的Git配置內容。

從config-client的控制檯中,咱們能夠看到以下內容:

2016-09-29 08:20:34.361  INFO 21256 --- [ kafka-binder-1] o.s.cloud.bus.event.RefreshListener      :
 Received remote refresh request. Keys refreshed [from]

RefreshListener監聽類記錄了收到遠程刷新請求,並刷新了from屬性的日誌。 在上面的例子中,因爲Kafka、ZooKeeper均運行於本地,因此咱們沒有在測試程序中經過配置信息來指定Kafka和ZooKeeper的配置信息,就完成了本地消息總線的試驗。可是咱們實際應用中,Kafka和ZooKeeper通常都會獨立部署,因此在應用中都須要來爲Kafka和ZooKeeper配置一些鏈接信息等。Kafka的整合與RabbitMQ不一樣,在Spring Boot 1.3.7中並無直接提供的Starter模塊,而是採用了Spring Cloud Stream的Kafka模塊,因此對於Kafka的配置均採用了spring.cloud.stream.kafka的前綴。

相關文章
相關標籤/搜索