有些項目須要大量時間才能運行特定功能:電子商務場景或系統,當付款提供商確認付款過程後,該系統須要發送電子郵件。做爲開發人員,咱們知道讓用戶等待是不可行的。html
在付款的狀況下,應用程序須要在付款完成後發送電子郵件。建立要異步執行的任務隊列是一種處理大量數據而又不影響用戶並使用戶滿意的絕佳方法。這篇文章的目的是討論如何使用Spring和RabbitMQ使用Java建立這些異步調用。java
RabbitMQ是一種開源消息代理軟件,它將消息從發送者的正式消息傳遞協議轉換爲接收者的正式消息傳遞協議。換句話說,RabbitMQ是生產者-消費者實現,生產者處理消息,而消費者是運行該過程的客戶。mysql
爲了展現RabbitMQ的工做原理,咱們將建立一個平滑的示例來管理具備如下三種狀態的汽車:新車什麼時候,該車什麼時候售出以及該車被識別爲垃圾車。咱們但願將其存儲在關係數據庫中,並有兩個表:一個用於放置當前汽車狀態,第二個用於存儲有關汽車的歷史信息。所以,對於每一個新事件,咱們都會向RabbitMQ觸發一個事件,以便異步地對新客戶端執行該事件。git
該項目演示將是一個Maven項目。所以,第一步是定義項目相關性,例如 將Spring Boot,Spring Data,MySQL驅動程序和RabbitMQ客戶端插入pom.xml文件。web
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>sh.platform.start</groupId>
<artifactId>spring-boot-jms</artifactId>
<version>0.0.1</version>
<properties>
<java.version>1.8</java.version>
</properties>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.0.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.messaginghub</groupId>
<artifactId>pooled-jms</artifactId>
</dependency>
<dependency>
<groupId>com.rabbitmq.jms</groupId>
<artifactId>rabbitmq-jms</artifactId>
<version>1.11.2</version>
</dependency>
<dependency>
<groupId>sh.platform</groupId>
<artifactId>config</artifactId>
<version>2.2.2</version>
</dependency>
</dependencies>
<build>
<finalName>spring-boot-jms</finalName>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
<repositories>
<repository>
<id>oss.sonatype.org-snapshot</id>
<url>http://oss.sonatype.org/content/repositories/snapshots</url>
</repository>
</repositories>
</project>
下一步是配置類 這些類負責提供數據源以鏈接到數據庫,並提供鏈接工廠供客戶端用來與JMS提供程序建立鏈接。spring
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import sh.platform.config.Config;
import sh.platform.config.MySQL;
import javax.sql.DataSource;
@Configuration
public class DataSourceConfig {
@Bean(name = "dataSource")
public DataSource getDataSource() {
Config config = new Config();
MySQL database = config.getCredential("database", MySQL::new);
return database.get();
}
}
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.connection.CachingConnectionFactory;
import org.springframework.jms.support.converter.MappingJackson2MessageConverter;
import org.springframework.jms.support.converter.MessageConverter;
import org.springframework.jms.support.converter.MessageType;
import sh.platform.config.Config;
import sh.platform.config.RabbitMQ;
import javax.jms.ConnectionFactory;
@Configuration
@EnableJms
public class JMSConfig {
private ConnectionFactory getConnectionFactory() {
Config config = new Config();
final RabbitMQ rabbitMQ = config.getCredential("rabbitmq", RabbitMQ::new);
return rabbitMQ.get();
}
@Bean
public MessageConverter jacksonJmsMessageConverter() {
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
converter.setTargetType(MessageType.TEXT);
converter.setTypeIdPropertyName("_type");
return converter;
}
@Bean
public CachingConnectionFactory cachingConnectionFactory() {
ConnectionFactory connectionFactory = getConnectionFactory();
return new CachingConnectionFactory(connectionFactory);
}
}
建立配置後,下一步是定義實體。這些實體是業務的核心,將表明咱們將從數據庫建立/寫入並集成到隊列中的實例。在此示例中,有兩個實體:Car實體(其中咱們擁有汽車的當前狀態)和保存操做狀態的實體CarLog
。sql
import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;
import java.util.Objects;
@Entity
public class Car {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@Column
private String plate;
@Column
private String model;
@Column
private Integer age;
@Column
private String color;
public Long getId() {
return id;
}
public String getModel() {
return model;
}
public Integer getAge() {
return age;
}
public String getColor() {
return color;
}
public String getPlate() {
return plate;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Car car = (Car) o;
return Objects.equals(id, car.id);
}
@Override
public int hashCode() {
return Objects.hashCode(id);
}
@Override
public String toString() {
return "Car{" +
"id=" + id +
", plate='" + plate + '\'' +
", model='" + model + '\'' +
", age=" + age +
", color='" + color + '\'' +
'}';
}
}
public enum CarStatus {
NEW, JUNK, SOLD;
}
import javax.persistence.*;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Objects;
@Entity
public class CarLog {
private static final ZoneId UTC = ZoneId.of("UTC");
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@Column
private String plate;
@Column
private String model;
@Column
private LocalDateTime date = LocalDateTime.now(UTC);
@Column
@Enumerated(value = EnumType.STRING)
private CarStatus status;
public Long getId() {
return id;
}
public String getPlate() {
return plate;
}
public String getModel() {
return model;
}
public CarStatus getStatus() {
return status;
}
public LocalDateTime getDate() {
return date;
}
public static CarLog newCar(Car car) {
return of(car, CarStatus.NEW);
}
public static CarLog junk(Car car) {
return of(car, CarStatus.JUNK);
}
public static CarLog sold(Car car) {
return of(car, CarStatus.SOLD);
}
private static CarLog of(Car car, CarStatus status) {
Objects.requireNonNull(car, "car is required");
CarLog log = new CarLog();
log.plate = car.getPlate();
log.model = car.getModel();
log.status = status;
return log;
}
}
以後,一旦定義了Spring Data實體,下一步就是建立存儲庫的接口。Spring數據存儲庫抽象的目標是顯着減小實現各類持久性存儲的數據訪問層所需的樣板代碼量。數據庫
import org.springframework.data.repository.PagingAndSortingRepository;
public interface CarRepository extends PagingAndSortingRepository<Car, Long> {
}
import org.springframework.data.repository.PagingAndSortingRepository;
import java.util.List;
public interface CarLogRepository extends PagingAndSortingRepository<CarLog, Long> {
List<CarLog> findByPlate(String plate);
List<CarLog> findByModel(String model);
List<CarLog> findByStatus(CarStatus status);
}
在MVC模式中,控制器是模型和視圖之間的層,這就是咱們接下來要建立的控制器類。在CarController層中,有一個JmsTemplate,能夠很是輕鬆地將消息發送到JMS目標。apache
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.web.bind.annotation.*;
@RestController
@RequestMapping("cars")
public class CarController {
@Autowired
private CarRepository repository;
@Autowired
private JmsTemplate template;
@PostMapping
@ResponseStatus(code = HttpStatus.CREATED)
public String save(@RequestBody Car car) {
repository.save(car);
template.convertAndSend("new", car);
return "Saved- " + car.getModel();
}
@GetMapping(value = "/{id}", produces = "application/json")
public Car get(@PathVariable("id") long id) {
return repository.findById(id).orElseThrow(() -> new RuntimeException("Not found"));
}
@GetMapping(produces = "application/json")
public Iterable<Car> get() {
return repository.findAll();
}
@PutMapping(value = "/{id}", produces = "application/json")
public Car update(@PathVariable("id") long id, @RequestBody Car car) {
repository.save(car);
return car;
}
@DeleteMapping(value = "junk/{id}", produces = "application/json")
public Car junk(@PathVariable("id") long id) {
Car car = repository.findById(id).orElseThrow(() -> new RuntimeException("Not found"));
repository.deleteById(id);
template.convertAndSend("junk", car);
return car;
}
@DeleteMapping(value = "sold/{id}", produces = "application/json")
public Car sold(@PathVariable("id") long id) {
Car car = repository.findById(id).orElseThrow(() -> new RuntimeException("Not found"));
repository.deleteById(id);
template.convertAndSend("sold", car);
return car;
}
}
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.web.bind.annotation.*;
@RestController
@RequestMapping("logs")
public class CarLogController {
@Autowired
private CarLogRepository repository;
@GetMapping(produces = "application/json")
public Iterable<CarLog> get() {
return repository.findAll();
}
@GetMapping(value = "{plate}", produces = "application/json")
public Iterable<CarLog> getHistoric(@PathVariable("plate") String plate) {
return repository.findByPlate(plate);
}
@GetMapping(value = "models/{model}", produces = "application/json")
public Iterable<CarLog> get(@PathVariable("model") String model) {
return repository.findByModel(model);
}
@GetMapping(value = "status/{status}", produces = "application/json")
public Iterable<CarLog> get(@PathVariable("status") CarStatus status) {
return repository.findByStatus(status);
}
}
在CarLogController
層中,咱們僅看到GET動詞,這意味着它是隻讀控制器。可是信息將如何進入數據庫?在CarController
層中,客戶端將消息發送到RabbitMQ隊列。json
接下來,是時候討論將讀取此隊列的類了。CarEventReceiver
類爲JmsListener
批註提供了幾種方法,其屬性表示該方法的隊列。 它會偵聽並等待消息讀取和處理。若是再看一下如何在CarController
層類中使用JmsTemplate
,則第一個參數是一個String
,它提供將信息發送到的隊列名稱做爲第二個參數。Spring JMS經過容許生產和註釋的模板輕鬆地鏈接消費者和生產者,從而使信息更易使用。
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
@Component
public class CarEventReceiver {
@Autowired
private CarLogRepository repository;
@JmsListener(destination = "new")
public void newCar(Car car) {
CarLog log = CarLog.newCar(car);
repository.save(log);
}
@JmsListener(destination = "junk")
public void junk(Car car) {
CarLog log = CarLog.junk(car);
repository.save(log);
}
@JmsListener(destination = "sold")
public void sold(Car car) {
CarLog log = CarLog.sold(car);
repository.save(log);
}
}
Java應用程序已準備就緒!下一步是設置管理和部署應用程序所需的Platform.sh的文件。在咱們的第一篇Java文章中,咱們深刻研究了這三個文件的每一個細節:
一臺路由器(.platform/routes.yaml). Platform.sh容許你定義路由。
零個或多個服務容器(.platform/services.yaml). Platform.sh容許你徹底定義和配置要在項目上使用的拓撲和服務。
一個或多個應用程序容器(.platform.app.yaml)。 你能夠經過一個配置文件控制應用程序以及在Platform.sh上構建和部署應用程序的方式。
在這篇文章中將要更改的文件是服務文件,使你能夠定義數據庫,搜索引擎,緩存等。在此項目中,咱們將設置MariaDB和RabbitMQ。
db:
type: mariadb:10.4
disk: 512
queuerabbit:
type: rabbitmq:3.7
disk: 512
在應用程序文件中,咱們將更改關係以容許咱們的應用程序訪問服務。要指出的是,從安全角度來看,這種訪問是一項基本功能。所以,在微服務場景中,咱們能夠確保金融應用程序訪問金融服務等等。
# This file describes an application. You can have multiple applications
# in the same project.
#
# See https://docs.platform.sh/user_guide/reference/platform-app-yaml.html
# The name of this app. Must be unique within a project.
name: app
# The runtime the application uses.
type: "java:8"
disk: 1024
# The hooks executed at various points in the lifecycle of the application.
hooks:
build: mvn clean install
# The relationships of the application with services or other applications.
#
# The left-hand side is the name of the relationship as it will be exposed
# to the application in the PLATFORM_RELATIONSHIPS variable. The right-hand
# side is in the form `<service name>:<endpoint name>`.
relationships:
database: "db:mysql"
rabbitmq: "queuerabbit:rabbitmq"
# The configuration of app when it is exposed to the web.
web:
commands:
start: java -jar -Xmx512m target/spring-boot-jms.jar --server.port=$PORT
如今,該應用程序已經準備就緒,如今能夠經過如下步驟使用Platform.sh將其移至雲中:
建立一個新的免費試用賬戶。
使用新的用戶名和密碼註冊,或使用當前的GitHub,Bitbucket或Google賬戶登陸。若是你使用第三方登陸,則之後能夠爲Platform.sh賬戶設置密碼。
選擇你的網站應居住的世界區域。
選擇空白模板。
使用該向導後,Platform.sh將爲你提供整個基礎結構,併爲你的項目提供一個遠程Git存儲庫。Platform.sh Git驅動的基礎結構意味着它將自動管理你的應用程序將其推送到主遠程存儲庫所需的一切。設置SSH密鑰後,只需編寫代碼(包括一些用於指定所需基礎結構的YAML文件),而後將其提交到Git並推送便可。
git remote add platform <platform.sh@gitrepository>
git commit -m "Initial project"
git push -u platform master
推送的代碼將建立Java應用程序和服務實例,並在完成後將IP地址返回給服務。讓咱們測試一下該應用程序。
curl -X POST -k -H 'Content-Type: application/json' -i 'https://<host_addresss>/cars' --data '{"id":1,"plate":"AB-0001-AB","model":"Vogel","age":2012,"color":"green"}'
curl -X POST -k -H 'Content-Type: application/json' -i 'https://<host_addresss>/cars' --data '{"id":2,"plate":"AB-0003-AB","model":"Renault","age":2018,"color":"red"}'
curl -X POST -k -H 'Content-Type: application/json' -i 'https://<host_addresss>/cars' --data '{"id":3,"plate":"AB-0006-AB","model":"Peugeot","age":2019"color":"black"}'
curl -X GET -i 'https://<host_address>/logs'
在本文中,咱們學習瞭如何使用RabbitMQ和Spring優化整個系統的異步通訊。這種策略將使你的應用程序具備更大的可伸縮性,並防止用戶等待過久才能得到隊列/主題使用者的答案。具備任何異步通訊的體系結構容許(例如)第二個應用程序從代理讀取和處理信息,或者在系統須要的狀況下擁有多個消費者。
感謝閱讀!
另外近期整理了一套完整的【java架構思惟導圖】,分享給一樣正在認真學習的每位朋友~還有更多JVM、Mysql、Tomcat、Spring Boot、Spring Cloud、Zookeeper、Kafka、RabbitMQ、RockerMQ、Redis、ELK、Git等Java乾貨,歡迎私信交流!