說實話,最近仍是比較忙的,手上素材卻是一大把,可是大多隻是初步整理了。可是博客這種東西仍是要寫的,果真後面仍是要放低一下排版要求(擴展性的一些東西也少提一些)。java
消息隊列這個東西,其實網上的資料仍是不少的。我就簡單說一些本身的認識與源代碼哈。spring
我是很喜歡瞭解技術演進的,由於演進的過程展示了前輩們的智慧。框架
最先的程序串行執行就不說了。異步
程序調用中的方法調用,每每調用方與被調用方都存在與同一內存空間(從Java角度說,都是在同一JVM中),因此方法調用的邏輯不會太複雜。簡單來講,就是調用方(Java中其實就是目標對象)將被調用方壓入Java虛擬機棧,從而執行(詳見JVM)。或者等我何時,把我有關JVM的筆記貼出來(嘿嘿)。分佈式
後來呢,就是出現了對非本地JVM方法調用的需求(舉個例子,我須要調用第三方的方法,若是每次都要雙方都寫一個專門的處理服務(在當時,也許接口更爲準確),比較麻煩),那麼就有了RPC與RMI的一個須要。那麼在Java中就出現了一個stub的技術,定義好後,相關方法就像調用本地同樣(詳見《Head First Java》相關章節)。固然了,這個時候已經有了中間件的概念了,因此也就有了CORBA等框架。談到中間件,感興趣的,能夠去查詢一下當時主流的中間件分類(如RPC,RMI,MOM,TPM,ORB)。ide
那麼到了如今呢,分佈式系統的通訊能夠按照同步與異步分爲兩大支柱。之因此這麼理解,是由於分佈式系統每每同步通訊與異步通訊都是須要的。簡單提一下,同步通訊業務邏輯相對簡單,實現快速,能夠實時得到迴應,但耦合度較高。異步通訊耦合度低,並能夠進行消息堆積,消峯,但沒法實時獲取迴應,業務邏輯複雜,從而提升系統複雜度(尤爲當一條業務線與多層異步邏輯)等。以後有機會,我會舉例細述。函數
固然了,在本篇中,只簡單談一下異步通訊的主流實現-消息隊列。學習
選擇方面,我就很少說了,目前只用過RabbitMq,RocketMq,Kafka。網上有關消息隊列選擇的文章不少,很細緻,我就不贅述了。fetch
這裏貼出來的都是實際生產代碼(若是內部版本也算的話,嘿嘿),因此若是有一些不是很熟悉的類,請查看import,是不是項目自身的類。或者也能夠直接詢問我。this
這裏的初步實現,是根據RabbitMq的原生方法進行編寫(詳細參考:《RabbitMQ實戰指南》第一章的兩個代碼清單及第二章的相關解釋)。
package com.renewable.gateway.rabbitmq.producer; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.MessageProperties; import com.renewable.gateway.pojo.Terminal; import com.renewable.gateway.util.JsonUtil; import com.renewable.gateway.util.PropertiesUtil; import org.springframework.stereotype.Component; import java.io.IOException; import java.util.concurrent.TimeoutException; import static com.renewable.gateway.common.constant.RabbitmqConstant.*; /** * @Description: * @Author: jarry */ @Component("TerminalProducer") public class TerminalProducer { private static final String IP_ADDRESS = PropertiesUtil.getProperty(RABBITMQ_HOST); private static final int PORT = Integer.parseInt(PropertiesUtil.getProperty(RABBITMQ_PORT)); private static final String USER_NAME = PropertiesUtil.getProperty(RABBITMQ_USER_NAME); private static final String USER_PASSWORD = PropertiesUtil.getProperty(RABBITMQ_USER_PASSWORD); private static final String TERMINAL_CONFIG_TERMINAL2CENTCONTROL_EXCHANGE = "exchange-terminal-config-terminal2centcontrol"; private static final String TERMINAL_CONFIG_TERMINAL2CENTCONTROL_QUEUE = "queue-terminal-config-terminal2centcontrol"; private static final String TERMINAL_CONFIG_TERMINAL2CENTCONTROL_ROUTINETYPE = "topic"; private static final String TERMINAL_CONFIG_TERMINAL2CENTCONTROL_BINDINGKEY = "terminal.config.terminal2centcontrol"; private static final String TERMINAL_CONFIG_TERMINAL2CENTCONTROL_ROUTINGKEY = "terminal.config.terminal2centcontrol"; public static void sendTerminalConfig(Terminal terminal) throws IOException, TimeoutException, InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(IP_ADDRESS); factory.setPort(PORT); factory.setUsername(USER_NAME); factory.setPassword(USER_PASSWORD); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(TERMINAL_CONFIG_TERMINAL2CENTCONTROL_EXCHANGE, TERMINAL_CONFIG_TERMINAL2CENTCONTROL_ROUTINETYPE, true, false, null); channel.queueDeclare(TERMINAL_CONFIG_TERMINAL2CENTCONTROL_QUEUE, true, false, false, null); channel.queueBind(TERMINAL_CONFIG_TERMINAL2CENTCONTROL_QUEUE, TERMINAL_CONFIG_TERMINAL2CENTCONTROL_EXCHANGE, TERMINAL_CONFIG_TERMINAL2CENTCONTROL_BINDINGKEY); String terminalStr = JsonUtil.obj2String(terminal); channel.basicPublish(TERMINAL_CONFIG_TERMINAL2CENTCONTROL_EXCHANGE, TERMINAL_CONFIG_TERMINAL2CENTCONTROL_ROUTINGKEY, MessageProperties.PERSISTENT_TEXT_PLAIN, terminalStr.getBytes()); channel.close(); connection.close(); } }
package com.renewable.gateway.rabbitmq.consumer; import com.rabbitmq.client.*; import com.renewable.gateway.common.GuavaCache; import com.renewable.gateway.common.ServerResponse; import com.renewable.gateway.pojo.Terminal; import com.renewable.gateway.service.ITerminalService; import com.renewable.gateway.util.JsonUtil; import com.renewable.gateway.util.PropertiesUtil; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import java.io.IOException; import java.util.concurrent.TimeoutException; import static com.renewable.gateway.common.constant.CacheConstant.TERMINAL_MAC; import static com.renewable.gateway.common.constant.RabbitmqConstant.*; /** * @Description: * @Author: jarry */ @Component public class TerminalConsumer { @Autowired private ITerminalService iTerminalService; private static final String TERMINAL_CONFIG_CENTCONTROL2TERMINAL_EXCHANGE = "exchange-terminal-config-centcontrol2terminal"; private static final String TERMINAL_CONFIG_CENTCONTROL2TERMINAL_QUEUE = "queue-terminal-config-centcontrol2terminal"; private static final String TERMINAL_CONFIG_CENTCONTROL2TERMINAL_ROUTINETYPE = "topic"; private static final String TERMINAL_CONFIG_CENTCONTROL2TERMINAL_BINDINGKEY = "terminal.config.centcontrol2terminal"; @PostConstruct public void messageOnTerminal() throws IOException, TimeoutException, InterruptedException { Address[] addresses = new Address[]{ new Address(PropertiesUtil.getProperty(RABBITMQ_HOST)) }; ConnectionFactory factory = new ConnectionFactory(); factory.setUsername(PropertiesUtil.getProperty(RABBITMQ_USER_NAME)); factory.setPassword(PropertiesUtil.getProperty(RABBITMQ_USER_PASSWORD)); Connection connection = factory.newConnection(addresses); final Channel channel = connection.createChannel(); channel.basicQos(64); // 設置客戶端最多接收未ack的消息個數,避免客戶端被沖垮(經常使用於限流) Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // 1.接收數據,並反序列化出對象 Terminal receiveTerminalConfig = JsonUtil.string2Obj(new String(body), Terminal.class); // 2.驗證是不是該終端的消息的消息 // 避免ACK其餘終端的消息 if (receiveTerminalConfig.getMac() == GuavaCache.getKey(TERMINAL_MAC)) { // 業務代碼 ServerResponse response = iTerminalService.receiveTerminalFromRabbitmq(receiveTerminalConfig); if (response.isSuccess()) { channel.basicAck(envelope.getDeliveryTag(), false); } } } }; channel.basicConsume(TERMINAL_CONFIG_CENTCONTROL2TERMINAL_QUEUE, consumer); // 等回調函數執行完畢後,關閉資源 // 想了想仍是不關閉資源,保持一個監聽的狀態,從而確保配置的實時更新 // TimeUnit.SECONDS.sleep(5); // channel.close(); // connection.close(); } }
這是早期寫的一個demo代碼,是直接參照源碼的。若是是學習RabbitMq的話,仍是建議手寫一下這種比較原始的程序,瞭解其中每一個方法的做用,從而理解RabbitMq的思路。若是條件容許的話,還能夠查看一下RabbitMq的底層通訊協議-AMQP(若是不方便下載,也能夠私聊我)。
固然,此處能夠經過@Value直接導入相關配置(乃至到了SpringCloud後,能夠經過@Refreshscope等實現配置自動更新)。
package com.renewable.terminal.rabbitmq.producer; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.MessageProperties; import com.renewable.terminal.pojo.Terminal; import com.renewable.terminal.util.JsonUtil; import org.springframework.stereotype.Component; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @Description: * @Author: jarry */ @Component("TerminalProducer") public class TerminalProducer { private static String rabbitmqHost = "47.92.249.250"; private static String rabbitmqUser = "admin"; private static String rabbitmqPassword = "123456"; private static String rabbitmqPort = "5672"; private static final String IP_ADDRESS = rabbitmqHost; private static final int PORT = Integer.parseInt(rabbitmqPort); private static final String USER_NAME = rabbitmqUser; private static final String USER_PASSWORD = rabbitmqPassword; private static final String TERMINAL_CONFIG_TERMINAL2CENTCONTROL_EXCHANGE = "exchange-terminal-config-terminal2centcontrol"; private static final String TERMINAL_CONFIG_TERMINAL2CENTCONTROL_QUEUE = "queue-terminal-config-terminal2centcontrol"; private static final String TERMINAL_CONFIG_TERMINAL2CENTCONTROL_ROUTINETYPE = "topic"; private static final String TERMINAL_CONFIG_TERMINAL2CENTCONTROL_BINDINGKEY = "terminal.config.terminal2centcontrol"; private static final String TERMINAL_CONFIG_TERMINAL2CENTCONTROL_ROUTINGKEY = "terminal.config.terminal2centcontrol"; public static void sendTerminalConfig(Terminal terminal) throws IOException, TimeoutException, InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(IP_ADDRESS); factory.setPort(PORT); factory.setUsername(USER_NAME); factory.setPassword(USER_PASSWORD); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(TERMINAL_CONFIG_TERMINAL2CENTCONTROL_EXCHANGE, TERMINAL_CONFIG_TERMINAL2CENTCONTROL_ROUTINETYPE, true, false, null); channel.queueDeclare(TERMINAL_CONFIG_TERMINAL2CENTCONTROL_QUEUE, true, false, false, null); channel.queueBind(TERMINAL_CONFIG_TERMINAL2CENTCONTROL_QUEUE, TERMINAL_CONFIG_TERMINAL2CENTCONTROL_EXCHANGE, TERMINAL_CONFIG_TERMINAL2CENTCONTROL_BINDINGKEY); String terminalStr = JsonUtil.obj2String(terminal); channel.basicPublish(TERMINAL_CONFIG_TERMINAL2CENTCONTROL_EXCHANGE, TERMINAL_CONFIG_TERMINAL2CENTCONTROL_ROUTINGKEY, MessageProperties.PERSISTENT_TEXT_PLAIN, terminalStr.getBytes()); channel.close(); connection.close(); } }
package com.renewable.terminal.rabbitmq.consumer; import com.rabbitmq.client.*; import com.renewable.terminal.Init.SerialSensorInit; import com.renewable.terminal.Init.TerminalInit; import com.renewable.terminal.common.GuavaCache; import com.renewable.terminal.common.ServerResponse; import com.renewable.terminal.pojo.Terminal; import com.renewable.terminal.service.ITerminalService; import com.renewable.terminal.util.JsonUtil; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.handler.annotation.Headers; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import java.io.IOException; import java.util.Map; import java.util.concurrent.TimeoutException; import static com.renewable.terminal.common.constant.CacheConstant.TERMINAL_ID; import static com.renewable.terminal.common.constant.CacheConstant.TERMINAL_MAC; /** * @Description: * @Author: jarry */ @Component @Slf4j public class TerminalConsumer { @Autowired private ITerminalService iTerminalService; @Autowired private SerialSensorInit serialSensorInit; private static final String TERMINAL_CONFIG_TERMINAL2CENTCONTROL_EXCHANGE = "exchange-terminal-config-centcontrol2terminal"; private static final String TERMINAL_CONFIG_TERMINAL2CENTCONTROL_QUEUE = "queue-terminal-config-centcontrol2terminal"; private static final String TERMINAL_CONFIG_TERMINAL2CENTCONTROL_ROUTINETYPE = "topic"; private static final String TERMINAL_CONFIG_TERMINAL2CENTCONTROL_BINDINGKEY = "terminal.config.centcontrol2terminal"; //TODO_FINISHED 2019.05.16 完成終端機TerminalConfig的接收與判斷(ID是否爲長隨機數,是否須要從新分配) @RabbitListener(bindings = @QueueBinding( value = @Queue(value = TERMINAL_CONFIG_TERMINAL2CENTCONTROL_QUEUE, declare = "true"), exchange = @Exchange(value = TERMINAL_CONFIG_TERMINAL2CENTCONTROL_EXCHANGE, declare = "true", type = TERMINAL_CONFIG_TERMINAL2CENTCONTROL_ROUTINETYPE), key = TERMINAL_CONFIG_TERMINAL2CENTCONTROL_BINDINGKEY )) @RabbitHandler public void messageOnTerminal(@Payload String terminalStr, @Headers Map<String, Object> headers, Channel channel) throws IOException { Terminal terminal = JsonUtil.string2Obj(terminalStr, Terminal.class); if (terminal == null){ log.info("consume the null terminal config !"); Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG); channel.basicAck(deliveryTag, false); } if (!GuavaCache.getKey(TERMINAL_MAC).equals(terminal.getMac())){ log.info("refuse target terminal with mac({}) configure to this terminal with mac({}).",terminal.getMac(), GuavaCache.getKey(TERMINAL_MAC)); return; } // 2.業務邏輯 ServerResponse response = iTerminalService.receiveTerminalFromRabbitmq(terminal); log.info("start serialSensorInit"); serialSensorInit.init(); // 3.確認 if (response.isSuccess()) { Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG); channel.basicAck(deliveryTag, false); } } }
# rabbitmq 消費端配置 spring: rabbitmq: listener: simple: concurrency: 5 max-concurrency: 10 acknowledge-mode: manual # 限流 prefetch: 1 host: "localhost" port: 5672 username: "admin" password: "123456" virtual-host: "/" connection-timeout: 15000
這裏不得不讚一下Spring,它經過提供RabbitMq地封裝API-ampq,極大地簡化了消息隊列的代碼。其實上述方法就是經過ampq的註解與yml配置來迅速實現RabbitMq的使用。
固然,這裏還有不少的提高空間。好比說,經過@Bean註解(創建目標配置)與公用方法提取,能夠有效提升代碼複用性。
這段代碼並非線上的代碼,而是慕課網學習時留下的代碼。主要實際生產中並無使用SpringStream,但這確實是認識事件驅動模型的要給很好途徑。
```java
package com.imooc.order.message; import org.springframework.cloud.stream.annotation.Input; import org.springframework.cloud.stream.annotation.Output; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.SubscribableChannel; /** * @Description: * @Author: jarry */ public interface StreamClient { String INPUT = "myMessage"; String INPUT2 = "myMessageACK"; @Input(StreamClient.INPUT) SubscribableChannel input(); @Output(StreamClient.INPUT) MessageChannel output(); @Input(StreamClient.INPUT2) SubscribableChannel input2(); @Output(StreamClient.INPUT2) MessageChannel output2(); }
```
package com.imooc.order; import org.junit.Assert; import org.junit.Test; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.Date; /** * @Description: * @Author: jarry */ @Component public class MqSenderTest extends OrderApplicationTests{ @Autowired private AmqpTemplate amqpTemplate; @Test public void send(){ amqpTemplate.convertAndSend("myQueue", "now: " + new Date()); Assert.assertNotNull(new Date()); } }
package com.imooc.order.message; import com.imooc.order.dto.OrderDTO; import lombok.extern.slf4j.Slf4j; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.messaging.handler.annotation.SendTo; import org.springframework.stereotype.Component; /** * @Description: * @Author: jarry */ @Component @EnableBinding(StreamClient.class) @Slf4j public class StreamReceiver { // @StreamListener(StreamClient.INPUT) // public void process(Object message){ // log.info("StreamReceiver: {}", message); // } @StreamListener(StreamClient.INPUT) // 增長如下註解,能夠在INPUT消息消費後,返回一個消息。說白了就是RabbitMq對消息消費後的確認回調函數(貌似叫這個,意思就這樣,以後細查) @SendTo(StreamClient.INPUT2) public String process(OrderDTO message){ log.info("StreamReceiver: {}", message); return "received."; } @StreamListener(StreamClient.INPUT2) public void process2(String message){ log.info("StreamReceiver2: {}", message); } }
在學習技術的過程當中,一方面不斷地感覺到本身對技術瞭解的不足,另外一方面則是發現更重要的是系統設計中技術選型的權衡。