RabbitMQ持久化編碼注意事項

以Java語言,MQ客戶端爲amqp-client做爲示例前端

一、基本原則java

  direct模式,由生產者聲明隊列名,消費者也聲明隊列名數據庫

  topic模式,由生產者聲明交換器名,由消費者聲明隊列名+交換器名+綁定關係json

  即生產者只負責生產消息,至於消息要投遞到哪裏由消費者指定安全

二、隊列、交換器、消息的持久化配置網絡

    隊列聲明持久化  異步

public void queueDeclare(String queue) {
        try {
            if (conn == null) {
                conn = connectionFactory.newConnection();
            }
            Channel channel = conn.createChannel();

            // 聲明隊列,若是隊列不存在則建立之
            boolean durable = true;
            boolean exclusive = false;
            boolean autoDelete = false;
            Map<String, Object> arguments = null;
            channel.queueDeclare(queue, durable, exclusive, autoDelete, arguments);

            channel.close();
        } catch (IOException e) {
            logger.error("IOException:", e);
        } catch (TimeoutException e) {
            logger.error("TimeoutException:", e);
        }
    }

 

    交換器聲明持久化ide

         // 聲明topic交換器
    public void topicExchangeDeclare(String exchange) {
        String type = "topic";
        boolean durable = true;
        exchangeDeclare(exchange, type, durable);
    }

    private void exchangeDeclare(String exchange, String type, boolean durable) {
        try {
            if (conn == null) {
                conn = connectionFactory.newConnection();
            }
            Channel channel = conn.createChannel();

            // 聲明交換器
            channel.exchangeDeclare(exchange, type, durable);

            channel.close();
        } catch (IOException e) {
            logger.error("IOException:", e);
        } catch (TimeoutException e) {
            logger.error("TimeoutException:", e);
        }
    }    

   

 消息發送時指定持久化性能

   

       // 發送消息
    public void send(String exchange, String routingKey, JSONObject json) {
        try {
            if (conn == null) {
                conn = connectionFactory.newConnection();
            }
            Channel channel = conn.createChannel();

            String msg = json.toJSONString();
            channel.basicPublish(exchange, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes("utf-8"));
            channel.close();
        } catch (IOException e) {
            logger.error("IOException:", e);
        } catch (TimeoutException e) {
            logger.error("TimeoutException:", e);
        } 
 }  

 

三、網絡閃斷、RabbitMQ重啓時App的自恢復編碼this

  首先,必須已經指定了隊列和交換器的持久化,不然在自恢復時,因爲沒法找到隊列及交換器和綁定關係會報錯

須要注意的是,RabbitMQ推薦儘可能共用Connection,多個線程之間用不一樣的Channel 

<bean id="connectionFactory" class="com.rabbitmq.client.ConnectionFactory">
        <property name="automaticRecoveryEnabled" value="true"></property>
        <property name="host" value="${RABBITMQ.SERVER_IP}"></property>
        <property name="port" value="${RABBITMQ.SERVER_PORT}"></property>
        <property name="username" value="${RABBITMQ.USERNAME}"></property>
        <property name="password" value="${RABBITMQ.PASSWORD}"></property>
        <property name="virtualHost" value="${RABBITMQ.VIRTUAL_HOST}"></property> 
        </bean> 

設置automaticRecoveryEnabled爲true

 
public class MQConsumer implements Runnable, Consumer {

    static Logger logger = LoggerFactory.getLogger(MQConsumer.class);

    protected Connection connection;
    protected Channel channel;

    protected String queue;
    protected ConsumerExecutor executor;// 執行器
    private MQConfig config;

    public MQConsumer(MQConfig config, String queue, ConsumerExecutor executor) {
        this.config = config;
        this.queue = queue;
        this.executor = executor;
    }

    @Override
    public void run() {
        try {
            init();
            try {
                channel.basicConsume(queue, true, this);
            } catch (IOException e) {
                logger.error("MQ消費處理失敗:", e);
            }
        } catch (Exception e) {
            logger.error("mq init() error", e);
        }
    }

    protected void init() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(config.getIp());
        factory.setPort(config.getPort());
        factory.setUsername(config.getUserName());
        factory.setPassword(config.getPassword());
        factory.setVirtualHost(config.getvHost());
        factory.setAutomaticRecoveryEnabled(true);
        
        connection = factory.newConnection();
        channel = connection.createChannel();
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope env, BasicProperties props, byte[] body) throws IOException {
        String msg = new String(body, "utf-8");
        logger.debug("從隊列[" + queue + "] 接收消息: " + msg);
        try {
            executor.consume(msg);
        } catch (Exception e) {
            logger.error("handleDelivery error:", e);
        }
    }

    @Override
    public void handleCancel(String consumerTag) {
        logger.info("handleCancel:" + consumerTag);
    }

    @Override
    public void handleCancelOk(String consumerTag) {
        logger.info("handleCancelOk:" + consumerTag);
    }

    @Override
    public void handleConsumeOk(String consumerTag) {
        logger.info("handleConsumeOk:" + consumerTag);
    }

    @Override
    public void handleRecoverOk(String consumerTag) {
        logger.info("handleRecoverOk:" + consumerTag);
    }

    @Override
    public void handleShutdownSignal(String consumerTag, ShutdownSignalException e) {
        logger.info("handleShutdownSignal:" + consumerTag);
    }
}

消費者代碼示例,只要automaticRecoveryEnabled爲true,並且queue和exchange都是持久化的,可以自動恢復,不用手工處理。

四、auto_ack問題

在auto_ack爲true時,數據流是這樣的:

App從MQ取消息->刪除消息->App業務邏輯處理(包括讀寫數據庫等)->發送處理結果(若是有須要)

能夠看出當App業務邏輯處理失敗時,消息已經被刪除了,不少狀況下,這是不安全的,因此改成:

App從MQ取消息->App業務邏輯處理(包括讀寫數據庫等)->發送ACK刪除消息 ->發送處理結果(若是有須要)

可是因爲性能問題通常出如今業務邏輯部分,若是這部分處理慢又會形成擁塞,因此要自已權衡

      try {
                channel.basicConsume(queue, true, this);
                boolean autoAck = false;
                channel.basicConsume(queue, autoAck, this);
             } catch (IOException e) {
                 logger.error("MQ消費處理失敗:", e);
             }

 

         try{
                channel.basicAck(env.getDeliveryTag(), true);
            }catch(Exception e){
                logger.error("basicAck error:", e);
            }
		 	

五、超時處理

採用MQ解耦後系統之間雖然是異步處理,但正常狀況下響應速度跟同步處理接近。特殊狀況下響應慢時極可能消息從發送到被處理已通過去了很長一段時間,前端很可能已經重複提交併完成了業務,因此須要加個快速失敗機制。即消息生產者將消息的建立時間帶到消息體裏,消費者拿到消息後,判斷若是是已通過去了指定間隔的消息,則直接失敗返回。

相關文章
相關標籤/搜索