Spring AMQP 實戰 - 整合 RabbitMQ 發送郵件

這篇文章,咱們開始 Spring AMQP 項目實戰旅程。javascript

原文地址:Spring AMQP 實戰 - 整合 RabbitMQ 發送郵件
博客地址:blog.720ui.com/html

介紹

經過這個項目實戰旅程,你會學習到如何使用 Spring Boot 整合 Spring AMQP,而且使用 RabbitMQ 的消息隊列機制發送郵件。其中,消息生產者負責將用戶的郵件消息發送至消息隊列,而消息消費者從消息隊列中獲取郵件消息進行發送。這個過程,你能夠理解成郵局:當你將要發佈的郵件放在郵箱中時,您能夠確信郵差最終會將郵件發送給收件人。java

準備

本教程假定 RabbitMQ 已在標準端口(5672) 的 localhost 上安裝並運行。若是使用不一樣的主機,端口,鏈接設置將須要調整。git

host = localhost
username = guest
password = guest
port = 5672
vhost = /複製代碼

實戰旅程

準備工做

這個實戰教程會構建兩個工程項目:email-server-producer 與 email-server-consumer。其中,email-server-producer 是消息生產者工程,email-server-consumer 是消息消費者工程。github

在教程的最後,我會將完整的代碼提交至 github 上面,你能夠結合源碼來閱讀這個教程,會有更好的效果。spring

如今開始旅程吧。咱們使用 Spring Boot 整合 Spring AMQP,並經過 Maven 構建依賴關係。(因爲篇幅的問題,我並不會粘貼完整的 pom.xml 配置信息,你能夠在 github 源碼中查看完整的配置文件)編程

<dependencies>
    <!-- spring boot--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> <exclusions> <exclusion> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-logging</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context-support</artifactId> </dependency> <dependency> <groupId>javax.mail</groupId> <artifactId>mail</artifactId> <version>${javax.mail.version}</version> </dependency> </dependencies>複製代碼

構建消息生產者

咱們使用 Java Config 的方式配置消息生產者。json

@Configuration
@ComponentScan(basePackages = {"com.lianggzone.rabbitmq"})
@PropertySource(value = {"classpath:application.properties"})
public class RabbitMQConfig {
    @Autowired
    private Environment env;

    @Bean
    public ConnectionFactory connectionFactory() throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(env.getProperty("mq.host").trim());
        connectionFactory.setPort(Integer.parseInt(env.getProperty("mq.port").trim()));
        connectionFactory.setVirtualHost(env.getProperty("mq.vhost").trim());
        connectionFactory.setUsername(env.getProperty("mq.username").trim());
        connectionFactory.setPassword(env.getProperty("mq.password").trim());
        return connectionFactory;
    }

    @Bean
    public CachingConnectionFactory cachingConnectionFactory() throws Exception {
        return new CachingConnectionFactory(connectionFactory());
    }

    @Bean
    public RabbitTemplate rabbitTemplate() throws Exception {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory());
        rabbitTemplate.setChannelTransacted(true);
        return rabbitTemplate;
    }

    @Bean
    public AmqpAdmin amqpAdmin() throws Exception {
        return new RabbitAdmin(cachingConnectionFactory());
    }

    @Bean
    Queue queue() {
        String name = env.getProperty("mq.queue").trim();
        // 是否持久化
        boolean durable = StringUtils.isNotBlank(env.getProperty("mq.queue.durable").trim())?
                Boolean.valueOf(env.getProperty("mq.queue.durable").trim()) : true; 
        // 僅建立者可使用的私有隊列,斷開後自動刪除
        boolean exclusive = StringUtils.isNotBlank(env.getProperty("mq.queue.exclusive").trim())?
                Boolean.valueOf(env.getProperty("mq.queue.exclusive").trim()) : false; 
        // 當全部消費客戶端鏈接斷開後,是否自動刪除隊列
        boolean autoDelete = StringUtils.isNotBlank(env.getProperty("mq.queue.autoDelete").trim())?
                Boolean.valueOf(env.getProperty("mq.queue.autoDelete").trim()) : false; 
        return new Queue(name, durable, exclusive, autoDelete);
    }

    @Bean
    TopicExchange exchange() {
        String name = env.getProperty("mq.exchange").trim();
        // 是否持久化
        boolean durable = StringUtils.isNotBlank(env.getProperty("mq.exchange.durable").trim())?
                Boolean.valueOf(env.getProperty("mq.exchange.durable").trim()) : true;
        // 當全部消費客戶端鏈接斷開後,是否自動刪除隊列
        boolean autoDelete = StringUtils.isNotBlank(env.getProperty("mq.exchange.autoDelete").trim())?
                Boolean.valueOf(env.getProperty("mq.exchange.autoDelete").trim()) : false;
        return new TopicExchange(name, durable, autoDelete);
    }

    @Bean
    Binding binding() {
        String routekey = env.getProperty("mq.routekey").trim();
        return BindingBuilder.bind(queue()).to(exchange()).with(routekey);
    }
}複製代碼

其中,定義了隊列、交換器,以及綁定。事實上,經過這種方式當隊列或交換器不存在的時候,Spring AMQP 會自動建立它。(若是你不但願自動建立,能夠在 RabbitMQ 的管理後臺開通隊列和交換器,並註釋掉 queue() 方法和 exchange() 方法)。此外,咱們爲了更好地擴展,將建立隊列或交換器的配置信息抽離到了配置文件 application.properties。其中,還包括 RabbitMQ 的配置信息。服務器

mq.host=localhost
mq.username=guest
mq.password=guest
mq.port=5672
mq.vhost=/

mq.exchange=email_exchange
mq.exchange.durable=true
mq.exchange.autoDelete=false

mq.queue=email_queue
mq.queue.durable=true
mq.queue.exclusive=false
mq.queue.autoDelete=false

mq.routekey=email_routekey複製代碼

此外,假設一個生產者發送到一個交換器,而一個消費者從一個隊列接收消息。此時,將隊列綁定到交換器對於鏈接這些生產者和消費者相當重要。在 Spring AMQP 中,咱們定義一個 Binding 類來表示這些鏈接。咱們使用 BindingBuilder 來構建 「流式的 API」 風格。微信

BindingBuilder.bind(queue()).to(exchange()).with(routekey);複製代碼

如今,咱們離大功告成已經很近了,須要再定義一個發送郵件任務存入消息隊列的方法。此時,爲了更好地擴展,咱們定義一個接口和一個實現類,基於接口編程嘛。

public interface EmailService {
    /** * 發送郵件任務存入消息隊列 * @param message * @throws Exception */
    void sendEmail(String message) throws Exception;
}複製代碼

它的實現類中重寫 sendEmail() 方法,將消息轉碼並寫入到消息隊列中。

@Service
public class EmailServiceImpl implements EmailService{
    private static Logger logger = LoggerFactory.getLogger(EmailServiceImpl.class);

    @Resource( name = "rabbitTemplate" )
    private RabbitTemplate rabbitTemplate;

    @Value("${mq.exchange}")
    private String exchange;

    @Value("${mq.routekey}")
    private String routeKey;

    @Override
    public void sendEmail(String message) throws Exception {
        try {
            rabbitTemplate.convertAndSend(exchange, routeKey, message);
        }catch (Exception e){
            logger.error("EmailServiceImpl.sendEmail", ExceptionUtils.getMessage(e));
        }
    }
}複製代碼

那麼,咱們再模擬一個 RESTful API 接口調用的場景,來模擬真實的場景。

@RestController()
@RequestMapping(value = "/v1/emails")
public class EmailController {

    @Resource
    private EmailService emailService;

    @RequestMapping(method = RequestMethod.POST)
    public JSONObject add(@RequestBody JSONObject jsonObject) throws Exception {
        emailService.sendEmail(jsonObject.toJSONString());
        return jsonObject;
    }
}複製代碼

最後,再寫一個 main 方法,將 Spring Boot 服務運行起來吧。

@RestController
@EnableAutoConfiguration
@ComponentScan(basePackages = {"com.lianggzone.rabbitmq"})
public class WebMain {

    public static void main(String[] args) throws Exception {
        SpringApplication.run(WebMain.class, args);
    }
}複製代碼

至此,已經大功告成了。咱們能夠經過 Postman 發送一個 HTTP 請求。(Postman是一款功能強大的網頁調試與發送網頁HTTP請求的Chrome插件。)

{
    "to":"lianggzone@163.com",
    "subject":"email-server-producer",
    "text":"<html><head></head><body><h1>郵件測試</h1><p>hello!this is mail test。</p></body></html>"
}複製代碼

請參見圖示。

來看看 RabbitMQ 的管理後臺吧,它會出現一個未處理的消息。(地址:http://localhost:15672/#/queues)

注意的是,千萬別向個人郵箱發測試消息喲,否則個人郵箱會郵件爆炸的/(ㄒoㄒ)/~~。

構建消息消費者

完成消息生產者以後,咱們再來構建一個消息消費者的工程。一樣地,咱們使用 Java Config 的方式配置消息消費者。

@Configuration
@ComponentScan(basePackages = {"com.lianggzone.rabbitmq"})
@PropertySource(value = {"classpath:application.properties"})
public class RabbitMQConfig {
    @Autowired
    private Environment env;

    @Bean
    public ConnectionFactory connectionFactory() throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(env.getProperty("mq.host").trim());
        connectionFactory.setPort(Integer.parseInt(env.getProperty("mq.port").trim()));
        connectionFactory.setVirtualHost(env.getProperty("mq.vhost").trim());
        connectionFactory.setUsername(env.getProperty("mq.username").trim());
        connectionFactory.setPassword(env.getProperty("mq.password").trim());
        return connectionFactory;
    }

    @Bean
    public CachingConnectionFactory cachingConnectionFactory() throws Exception {
        return new CachingConnectionFactory(connectionFactory());
    }

    @Bean
    public RabbitTemplate rabbitTemplate() throws Exception {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory());
        rabbitTemplate.setChannelTransacted(true);
        return rabbitTemplate;
    }

    @Bean
    public AmqpAdmin amqpAdmin() throws Exception {
        return new RabbitAdmin(cachingConnectionFactory());
    }

    @Bean
    public SimpleMessageListenerContainer listenerContainer(
            @Qualifier("mailMessageListenerAdapter") MailMessageListenerAdapter mailMessageListenerAdapter) throws Exception {
        String queueName = env.getProperty("mq.queue").trim();

        SimpleMessageListenerContainer simpleMessageListenerContainer =
                new SimpleMessageListenerContainer(cachingConnectionFactory());
        simpleMessageListenerContainer.setQueueNames(queueName);
        simpleMessageListenerContainer.setMessageListener(mailMessageListenerAdapter);
        // 設置手動 ACK
        simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        return simpleMessageListenerContainer;
    }
}複製代碼

聰明的你,應該發現了其中的不一樣。這個代碼中多了一個 listenerContainer() 方法。是的,它是一個監聽器容器,用來監聽消息隊列進行消息處理的。注意的是,咱們這裏設置手動 ACK 的方式。默認的狀況下,它採用自動應答,這種方式中消息隊列會發送消息後當即從消息隊列中刪除該消息。此時,咱們經過手動 ACK 方式,若是消費者因宕機或連接失敗等緣由沒有發送 ACK,RabbitMQ 會將消息從新發送給其餘監聽在隊列的下一個消費者,保證消息的可靠性。

固然,咱們也定義 application.properties 配置文件。

mq.host=localhost
mq.username=guest
mq.password=guest
mq.port=5672
mq.vhost=/

mq.queue=email_queue複製代碼

此外,咱們建立了一個 MailMessageListenerAdapter 類來消費消息。

@Component("mailMessageListenerAdapter")
public class MailMessageListenerAdapter extends MessageListenerAdapter {

    @Resource
    private JavaMailSender mailSender;

    @Value("${mail.username}")
    private String mailUsername;

    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        try {
            // 解析RabbitMQ消息體
            String messageBody = new String(message.getBody());
            MailMessageModel mailMessageModel = JSONObject.toJavaObject(JSONObject.parseObject(messageBody), MailMessageModel.class);
            // 發送郵件
            String to =  mailMessageModel.getTo();
            String subject = mailMessageModel.getSubject();
            String text = mailMessageModel.getText();
            sendHtmlMail(to, subject, text);
            // 手動ACK
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        }catch (Exception e){
            e.printStackTrace();
        }
    }

    /** * 發送郵件 * @param to * @param subject * @param text * @throws Exception */
    private void sendHtmlMail(String to, String subject, String text) throws Exception {
        MimeMessage mimeMessage = mailSender.createMimeMessage();
        MimeMessageHelper mimeMessageHelper = new MimeMessageHelper(mimeMessage);
        mimeMessageHelper.setFrom(mailUsername);
        mimeMessageHelper.setTo(to);
        mimeMessageHelper.setSubject(subject);
        mimeMessageHelper.setText(text, true);
        // 發送郵件
        mailSender.send(mimeMessage);
    }
}複製代碼

在 onMessage() 方法中,咱們完成了三件事情:

  1. 從 RabbitMQ 的消息隊列中解析消息體。
  2. 根據消息體的內容,發送郵件給目標的郵箱。
  3. 手動應答 ACK,讓消息隊列刪除該消息。

這裏,JSONObject.toJavaObject() 方法使用 fastjson 將 json 字符串轉換成實體對象 MailMessageModel。注意的是,@Data 是 lombok 類庫的一個註解。

@Data
public class MailMessageModel {
    @JSONField(name = "from")
    private String from;

    @JSONField(name = "to")
    private String to;

    @JSONField(name = "subject")
    private String subject;

    @JSONField(name = "text")
    private String text;

    @Override
    public String toString() {
        StringBuffer sb = new StringBuffer();
        sb.append("Email{from:").append(this.from).append(", ");
        sb.append("to:").append(this.to).append(", ");
        sb.append("subject:").append(this.subject).append(", ");
        sb.append("text:").append(this.text).append("}");
        return sb.toString();
    }
}複製代碼

Spring 對 Java Mail 有很好的支持。其中,郵件包括幾種類型:簡單文本的郵件、 HTML 文本的郵件、 內嵌圖片的郵件、 包含附件的郵件。這裏,咱們封裝了一個簡單的 sendHtmlMail() 進行郵件發送。

對了,咱們還少了一個郵件的配置類。

@Configuration
@PropertySource(value = {"classpath:mail.properties"})
@ComponentScan(basePackages = {"com.lianggzone.rabbitmq"})
public class EmailConfig {
    @Autowired
    private Environment env;

    @Bean(name = "mailSender")
    public JavaMailSender mailSender() {
        // 建立郵件發送器, 主要提供了郵件發送接口、透明建立Java Mail的MimeMessage、及郵件發送的配置
        JavaMailSenderImpl mailSender = new JavaMailSenderImpl();
        // 若是爲普通郵箱, 非ssl認證等
        mailSender.setHost(env.getProperty("mail.host").trim());
        mailSender.setPort(Integer.parseInt(env.getProperty("mail.port").trim()));
        mailSender.setUsername(env.getProperty("mail.username").trim());
        mailSender.setPassword(env.getProperty("mail.password").trim());
        mailSender.setDefaultEncoding("utf-8");
        // 配置郵件服務器
        Properties props = new Properties();
        // 讓服務器進行認證,認證用戶名和密碼是否正確 
        props.put("mail.smtp.auth", "true");
        props.put("mail.smtp.timeout", "25000");
        mailSender.setJavaMailProperties(props);
        return mailSender;
    }
}複製代碼

這些配置信息,咱們在配置文件 mail.properties 中維護。

mail.host=smtp.163.com
mail.port=25
mail.username=用戶名
mail.password=密碼複製代碼

最後,咱們寫一個 main 方法,將 Spring Boot 服務運行起來吧。

至此,咱們也完成了一個消息消費者的工程,它將不斷地從消息隊列中處理郵件消息。

源代碼

相關示例完整代碼: github.com/lianggzone/…

(完)

更多精彩文章,盡在「服務端思惟」微信公衆號!

相關文章
相關標籤/搜索