RabbitMQ整合Spring Booot【消費者補償冪等問題】

若是消費者 運行時候 報錯了java

package com.toov5.msg.SMS;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component 
@RabbitListener(queues="fanout_sms_queue")   
public class SMSConsumer {
    
    @RabbitHandler  
   public void process(String mString) {
       System.out.println("短信消費者獲取生產者消息msg"+mString);
       int i = 1/0;
   }
}

當生產者投遞消息後:web

消費者會不停的進行打印:redis

 

 消息一直沒有被消費spring

 

緣由 Rabbitmq 默認狀況下 若是消費者程序出現異常狀況 會自動實現補償機制  也就是 重試機制apache

 

@RabbitListener底層使用AOP進行攔截,若是程序沒有拋出異常,自動提交事務。 若是Aop使用異常通知 攔截獲取異常信息的話 , 自動實現補償機制,該消息會一直緩存在Rabbitmq服務器端進行重放,一直重試到不拋出異常爲準。json

 能夠修改重試策略緩存

 通常來講默認5s重試一次,springboot

消費者配置:服務器

 listener:
      simple:
        retry:
        ####開啓消費者重試
          enabled: true
         ####最大重試次數(默認無數次)
          max-attempts: 5
        ####重試間隔次數
          initial-interval: 3000

效果: 充實5次 不行就放棄了網絡

 

 

 

 

 MQ重試機制機制 須要注意的問題

  

如何合適選擇重試機制

 

狀況1:  消費者獲取到消息後,調用第三方接口,但接口暫時沒法訪問,是否須要重試?    

          須要重試   別人的問題不是我本身的問題

 

狀況2:  消費者獲取到消息後,拋出數據轉換異常,是否須要重試?   

       不須要重試   充實一億次也是如此 木有必要  須要發佈版本解決

 

總結:

  •            對於狀況2,若是消費者代碼拋出異常是須要發佈新版本才能解決的問題,那麼不須要重試,重試也無濟於事。應該採用 日誌記錄+定時任務job健康檢查+人工進行補償
  •            把錯誤記錄在日誌裏面,經過定時Job去自動的補償,或經過人工去補償。 

 

 傳統的HTTP請求 若是失敗了無法自動重試 ,固然本身能夠寫個循環實現。MQ徹底本身自帶的。

狀況2的拓展延申:

    將以前的案例改成   郵件消費者 調用郵件第三方接口 

 

僞代碼:

    在consumer 中 調用接口後 判斷返回值  因爲RabbitMQ 在消費者異常時候 會進行重試機制 進行補償 

    因此能夠拋出個異常 來實現

   

Consumer:

              String result   =   template.Email();

             if(result == null){

                   throw new Exception("調用第三方郵件服務器接口失敗!");

               }

 

 

producer:

 

pom:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.itmayiedu</groupId>
    <artifactId>rabbitmq_producer_springboot</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.0.RELEASE</version>
    </parent>
    <dependencies>

        <!-- springboot-web組件 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!-- 添加springboot對amqp的支持 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
        </dependency>
        <!--fastjson -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.49</version>
        </dependency>
    </dependencies>
</project>

config:

package com.itmayiedu.rabbitmq.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

//Fanout 類型 發佈訂閱模式
@Component
public class FanoutConfig {

    // 郵件隊列
    private String FANOUT_EMAIL_QUEUE = "fanout_email_queue";

    // 短信隊列
    private String FANOUT_SMS_QUEUE = "fanout_sms_queue";
    // fanout 交換機
    private String EXCHANGE_NAME = "fanoutExchange";

    // 1.定義郵件隊列
    @Bean
    public Queue fanOutEamilQueue() {
        return new Queue(FANOUT_EMAIL_QUEUE);
    }

    // 2.定義短信隊列
    @Bean
    public Queue fanOutSmsQueue() {
        return new Queue(FANOUT_SMS_QUEUE);
    }

    // 2.定義交換機
    @Bean
    FanoutExchange fanoutExchange() {
        return new FanoutExchange(EXCHANGE_NAME);
    }

    // 3.隊列與交換機綁定郵件隊列
    @Bean
    Binding bindingExchangeEamil(Queue fanOutEamilQueue, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(fanOutEamilQueue).to(fanoutExchange);
    }

    // 4.隊列與交換機綁定短信隊列
    @Bean
    Binding bindingExchangeSms(Queue fanOutSmsQueue, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(fanOutSmsQueue).to(fanoutExchange);
    }
}

 

 Producer:

package com.itmayiedu.rabbitmq;

import java.util.UUID;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import com.alibaba.fastjson.JSONObject;

@Component
public class FanoutProducer {
    @Autowired
    private AmqpTemplate amqpTemplate;

    public void send(String queueName) {
        JSONObject jsonObject = new JSONObject();
        jsonObject.put("email", "xx@163.com");
        jsonObject.put("timestamp", System.currentTimeMillis());
        String jsonString = jsonObject.toJSONString();
        System.out.println("jsonString:" + jsonString);
        // 設置消息惟一id 保證每次重試消息id惟一
        /*Message message = MessageBuilder.withBody(jsonString.getBytes())
                .setContentType(MessageProperties.CONTENT_TYPE_JSON).setContentEncoding("utf-8")
                .setMessageId(UUID.randomUUID() + "").build();*/
        amqpTemplate.convertAndSend(queueName, jsonString);
    }
}

Controller:

package com.itmayiedu.controller;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import com.itmayiedu.rabbitmq.FanoutProducer;

@RestController
public class ProducerController {
    @Autowired
    private FanoutProducer fanoutProducer;

    @RequestMapping("/sendFanout")
    public String sendFanout(String queueName) {
        fanoutProducer.send(queueName);
        return "success";
    }
}

yml:

spring:
  rabbitmq:
  ####鏈接地址
    host: 192.168.91.6
   ####端口號   
    port: 5672
   ####帳號 
    username: admin
   ####密碼  
    password: admin
   ### 地址
    virtual-host: /admin_toov5

啓動類:

package com.itmayiedu;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class AppProducer {

    public static void main(String[] args) {
        SpringApplication.run(AppProducer.class, args);
    }

}

 

Consumer:

pom:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.itmayiedu</groupId>
    <artifactId>rabbitmq_consumer_springboot</artifactId>
    <version>0.0.1-SNAPSHOT</version>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.0.RELEASE</version>
    </parent>
    <dependencies>

        <!-- springboot-web組件 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!-- 添加springboot對amqp的支持 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-mail</artifactId>
        </dependency>

        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
        </dependency>
        <!--fastjson -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.49</version>
        </dependency>

    </dependencies>
</project>

 

 utils:

package com.itmayiedu.rabbitmq.utils;

import com.alibaba.fastjson.JSONObject;
import org.apache.http.HttpEntity;
import org.apache.http.HttpStatus;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;

/**
 * HttpClient4.3工具類
 * 
 * @author hang.luo
 */
public class HttpClientUtils {
    private static Logger logger = LoggerFactory.getLogger(HttpClientUtils.class); // 日誌記錄

    private static RequestConfig requestConfig = null;

    static {
        // 設置請求和傳輸超時時間
        requestConfig = RequestConfig.custom().setSocketTimeout(2000).setConnectTimeout(2000).build();
    }

    /**
     * post請求傳輸json參數
     * 
     * @param url
     *            url地址
     * @param json
     *            參數
     * @return
     */
    public static JSONObject httpPost(String url, JSONObject jsonParam) {
        // post請求返回結果
        CloseableHttpClient httpClient = HttpClients.createDefault();
        JSONObject jsonResult = null;
        HttpPost httpPost = new HttpPost(url);
        // 設置請求和傳輸超時時間
        httpPost.setConfig(requestConfig);
        try {
            if (null != jsonParam) {
                // 解決中文亂碼問題
                StringEntity entity = new StringEntity(jsonParam.toString(), "utf-8");
                entity.setContentEncoding("UTF-8");
                entity.setContentType("application/json");
                httpPost.setEntity(entity);
            }
            CloseableHttpResponse result = httpClient.execute(httpPost);
            // 請求發送成功,並獲得響應
            if (result.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
                String str = "";
                try {
                    // 讀取服務器返回過來的json字符串數據
                    str = EntityUtils.toString(result.getEntity(), "utf-8");
                    // 把json字符串轉換成json對象
                    jsonResult = JSONObject.parseObject(str);
                } catch (Exception e) {
                    logger.error("post請求提交失敗:" + url, e);
                }
            }
        } catch (IOException e) {
            logger.error("post請求提交失敗:" + url, e);
        } finally {
            httpPost.releaseConnection();
        }
        return jsonResult;
    }

    /**
     * post請求傳輸String參數 例如:name=Jack&sex=1&type=2
     * Content-type:application/x-www-form-urlencoded
     * 
     * @param url
     *            url地址
     * @param strParam
     *            參數
     * @return
     */
    public static JSONObject httpPost(String url, String strParam) {
        // post請求返回結果
        CloseableHttpClient httpClient = HttpClients.createDefault();
        JSONObject jsonResult = null;
        HttpPost httpPost = new HttpPost(url);
        httpPost.setConfig(requestConfig);
        try {
            if (null != strParam) {
                // 解決中文亂碼問題
                StringEntity entity = new StringEntity(strParam, "utf-8");
                entity.setContentEncoding("UTF-8");
                entity.setContentType("application/x-www-form-urlencoded");
                httpPost.setEntity(entity);
            }
            CloseableHttpResponse result = httpClient.execute(httpPost);
            // 請求發送成功,並獲得響應
            if (result.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
                String str = "";
                try {
                    // 讀取服務器返回過來的json字符串數據
                    str = EntityUtils.toString(result.getEntity(), "utf-8");
                    // 把json字符串轉換成json對象
                    jsonResult = JSONObject.parseObject(str);
                } catch (Exception e) {
                    logger.error("post請求提交失敗:" + url, e);
                }
            }
        } catch (IOException e) {
            logger.error("post請求提交失敗:" + url, e);
        } finally {
            httpPost.releaseConnection();
        }
        return jsonResult;
    }

    /**
     * 發送get請求
     * 
     * @param url
     *            路徑
     * @return
     */
    public static JSONObject httpGet(String url) {
        // get請求返回結果
        JSONObject jsonResult = null;
        CloseableHttpClient client = HttpClients.createDefault();
        // 發送get請求
        HttpGet request = new HttpGet(url);
        request.setConfig(requestConfig);
        try {
            CloseableHttpResponse response = client.execute(request);

            // 請求發送成功,並獲得響應
            if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
                // 讀取服務器返回過來的json字符串數據
                HttpEntity entity = response.getEntity();
                String strResult = EntityUtils.toString(entity, "utf-8");
                // 把json字符串轉換成json對象
                jsonResult = JSONObject.parseObject(strResult);
            } else {
                logger.error("get請求提交失敗:" + url);
            }
        } catch (IOException e) {
            logger.error("get請求提交失敗:" + url, e);
        } finally {
            request.releaseConnection();
        }
        return jsonResult;
    }

}

consumer:

package com.itmayiedu.rabbitmq;

import java.util.Map;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.stereotype.Component;

import com.alibaba.fastjson.JSONObject;
import com.itmayiedu.rabbitmq.utils.HttpClientUtils;
import com.rabbitmq.client.Channel;

//郵件隊列
@Component
public class FanoutEamilConsumer {
     @RabbitListener(queues = "fanout_email_queue")
     public void process(String msg) throws Exception {
     System.out.println("郵件消費者獲取生產者消息msg:" + msg);
     JSONObject jsonObject = JSONObject.parseObject(msg);
     // 獲取email參數
     String email = jsonObject.getString("email");
     // 請求地址
     String emailUrl = "http://127.0.0.1:8083/sendEmail?email=" + email;
     JSONObject result = HttpClientUtils.httpGet(emailUrl);
     if (result == null) {
     // 由於網絡緣由,形成沒法訪問,繼續重試
     throw new Exception("調用接口失敗!");
     }
     System.out.println("執行結束....");
    
     }
}

yml:

spring:
  rabbitmq:
  ####鏈接地址
    host: 192.168.91.6
   ####端口號   
    port: 5672
   ####帳號 
    username: admin
   ####密碼  
    password: admin
   ### 地址
    virtual-host: /admin_toov5
    listener: 
      simple:
        retry:
        ####開啓消費者異常重試
          enabled: true
         ####最大重試次數
          max-attempts: 5
        ####重試間隔次數
          initial-interval: 2000
       

server:
  port: 8081

 

啓動類:

package com.itmayiedu.rabbitmq;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class AppConsumer {

    public static void main(String[] args) {
        SpringApplication.run(AppConsumer.class, args);
    }

}

 

 郵件服務器:

package com.mayikt.controller;

import java.util.HashMap;
import java.util.Map;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@SpringBootApplication
@RestController
public class MsgController {

    // 模擬第三方發送郵件
    @RequestMapping("/sendEmail")
    public Map<String, Object> sendEmail(String email) {
        System.out.println("開始發送郵件:" + email);
        Map<String, Object> result = new HashMap<String, Object>();
        result.put("code", "200");
        result.put("msg", "發送郵件成功..");
        System.out.println("發送郵件成功");
        return result;
    }

    public static void main(String[] args) {
        SpringApplication.run(MsgController.class, args);
    }

}

yml:

server:
  port: 8083

pom:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.mayikt</groupId>
    <artifactId>mayikt_sms</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.0.RELEASE</version>
    </parent>
    <dependencies>

        <!-- springboot-web組件 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!-- 添加springboot對amqp的支持 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-mail</artifactId>
        </dependency>

        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
        </dependency>
        <!--fastjson -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.49</version>
        </dependency>

    </dependencies>
</project>

在沒有啓動郵件服務器時候,消費者調用接口失敗會一直重試,重試五次。

在此期間,若是啓動成功,則重試成功,再也不重試, 再也不進行補償機制。

消費者若是保證消息冪等性,不被重複消費

 

背景:

網絡延遲傳輸中,或者消費出現異常或者是消費延遲,會形成進行MQ重試進行重試補償機制,在重試過程當中,可能會形成重複消費。 

解決辦法:

使用全局MessageID判斷消費方使用同一個,解決冪等性。

  

   只要重試過程當中,判斷若是已經走完了 不能再繼續走 繼續執行了

   MQ消費者的冪等行的解決 通常使用全局ID  或者寫個惟一標識好比時間戳   或者UUID  或者訂單號

   

  改進:

  producer:

 添加:

 

// 設置消息惟一id 保證每次重試消息id惟一  
        Message message = MessageBuilder.withBody(jsonString.getBytes())
                .setContentType(MessageProperties.CONTENT_TYPE_JSON).setContentEncoding("utf-8")
                .setMessageId(UUID.randomUUID() + "").build(); //消息id設置在請求頭裏面 用UUID作全局ID 
        amqpTemplate.convertAndSend(queueName, message);

 

 所有代碼:

  

package com.itmayiedu.rabbitmq;

import java.util.UUID;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import com.alibaba.fastjson.JSONObject;

@Component
public class FanoutProducer {
    @Autowired
    private AmqpTemplate amqpTemplate;

    public void send(String queueName) {
        JSONObject jsonObject = new JSONObject();
        jsonObject.put("email", "xx@163.com");
        jsonObject.put("timestamp", System.currentTimeMillis());
        String jsonString = jsonObject.toJSONString();
        System.out.println("jsonString:" + jsonString);
        // 設置消息惟一id 保證每次重試消息id惟一  
        Message message = MessageBuilder.withBody(jsonString.getBytes())
                .setContentType(MessageProperties.CONTENT_TYPE_JSON).setContentEncoding("utf-8")
                .setMessageId(UUID.randomUUID() + "").build(); //消息id設置在請求頭裏面 用UUID作全局ID 
        amqpTemplate.convertAndSend(queueName, message);
    }
}

 

 一樣的 消費者也須要修改:

 方法參數類型爲 Message  而後能夠獲取這個ID 而後能夠進行業務邏輯操做

 @RabbitListener(queues = "fanout_email_queue")
     public void process(Message message) throws Exception {
     // 獲取消息Id
     String messageId = message.getMessageProperties().getMessageId();  //id獲取之
     String msg = new String(message.getBody(), "UTF-8"); //消息內容獲取之
     System.out.println("-----郵件消費者獲取生產者消息-----------------" + "messageId:" + messageId + ",消息內容:" +
     msg);
     if (messageId == null) {
            return;
        }
     JSONObject jsonObject = JSONObject.parseObject(msg);
     // 獲取email參數
     String email = jsonObject.getString("email");
     // 請求地址
     String emailUrl = "http://127.0.0.1:8083/sendEmail?email=" + email;
     JSONObject result = HttpClientUtils.httpGet(emailUrl);
     if (result == null) {
     // 由於網絡緣由,形成沒法訪問,繼續重試
     throw new Exception("調用接口失敗!");
     }
     System.out.println("執行結束....");
     //messId 的狀況寫入到redis 中  成功就修改成空
     }

 

 重試機制都是間隔性的  每次都是一個線程  單線程重試

 

 關於應答模式:

    Spring boot 中進行 AOP攔截 自動幫助作重試

    手動應答的話 ,若是不告訴服務器已經消費成功,則服務器不會刪除 消息。告訴消費成功了纔會刪除。

    

消費者的yml加入:

 acknowledge-mode: manual 

    

spring:
  rabbitmq:
  ####鏈接地址
    host: 192.168.91.6
   ####端口號   
    port: 5672
   ####帳號 
    username: admin
   ####密碼  
    password: admin
   ### 地址
    virtual-host: /admin_toov5
    listener: 
      simple:
        retry:
        ####開啓消費者異常重試
          enabled: true
         ####最大重試次數
          max-attempts: 5
        ####重試間隔次數
          initial-interval: 2000
        ####開啓手動ack  
        acknowledge-mode: manual 

server:
  port: 8081

 

 

 開啓模式以後:

   消費者參數須要加入:  @Headers Map<String, Object> headers, Channel channel

  代碼邏輯最後面加入:

// // 手動ack
Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
// 手動簽收  告訴RabbitMQ 消費成功了  消息能夠刪除了
channel.basicAck(deliveryTag, false);  

 

代碼以下:

@RabbitListener(queues = "fanout_email_queue")
    public void process(Message message, @Headers Map<String, Object> headers, Channel channel) throws Exception {
        // 獲取消息Id
        String messageId = message.getMessageProperties().getMessageId();
        String msg = new String(message.getBody(), "UTF-8");
        System.out.println("郵件消費者獲取生產者消息" + "messageId:" + messageId + ",消息內容:" + msg);
        JSONObject jsonObject = JSONObject.parseObject(msg);
        // 獲取email參數
        String email = jsonObject.getString("email");
        // 請求地址
        String emailUrl = "http://127.0.0.1:8083/sendEmail?email=" + email;
        JSONObject result = HttpClientUtils.httpGet(emailUrl);
        if (result == null) {
            // 由於網絡緣由,形成沒法訪問,繼續重試
            throw new Exception("調用接口失敗!");
        }
        // // 手動ack
        Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
        // 手動簽收
        channel.basicAck(deliveryTag, false);
        System.out.println("執行結束....");
    }
相關文章
相關標籤/搜索