RabbitMQ 更像是一個郵件服務器,用戶發送郵件(消息),到郵箱服務器(exchange),其餘用戶可以保證收到消息發送者的郵件(消息).java
AMQP 服務器相似與郵件服務器, 每一個交換器(exchange)都扮演了消息傳送代理,每一個消息隊列(queue)都做爲郵箱,而綁定(binding)則定義了每一個傳送代理中的路由表.發佈者(producer)發送消息給獨立的傳送代理,而後傳送代理(exchange)再路由(binding)消息到郵箱(queue)中.消費者(customer)從郵箱(queue)中收取消息.git
RabbitMQ 有三個主要概念,生產者,隊列,消費者github
生產者 單純的發送消息正則表達式
隊列 依賴主機的內存和磁盤,(這個能夠經過配置文件修改參數)能夠理解爲一個緩存.緩存
消費者 一直等待接收消息服務器
rabbitmq默認配置virtual host 爲 "/", exchange默認AMQP default,沒有默認queue. 若是exchange不指定,則exchange爲默認exchange.使用virtualHost和exchange能夠方便實現"分區"的概念. 好比我想有交易和會員兩個系統,我能夠建立兩個virtualhost,分別表示兩個系統,這兩個系統是相互隔離的. 跟activemq不一樣的是,rabbitmq更加靈活一點.網絡
建立rabbitmq的流程跟通常的模板同樣,經過ConnectionFactory--Connection--Channel--獲取客戶端通道. 拿到channel以後,就能夠進行發佈或消費.對應的jms中的點對點隊列.負載均衡
下面是消費端代碼,這個代碼中沒有exchange,默認爲類型爲direct的exchange是AMQP default,這裏綁定了一個隊列.異步
//1 工廠 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); //2 鏈接 Connection connection = factory.newConnection(); //3 渠道 Channel channel = connection.createChannel(); //4 建立隊列 channel.queueDeclare(QUEUE_NAME,false,false,false,null); //5 設置消費 channel.basicConsume(QUEUE_NAME,true,new DefaultConsumer(channel){ [@Override](https://my.oschina.net/u/1162528) public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body,"UTF-8"); System.out.println(name + " 接收到消息 msg = " + msg); } }); System.out.println("客戶端啓動."); latch.countDown(); } catch (TimeoutException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); }
的第二個參數,routingkey是消費端的queue的名稱. 這一點,其實讓人有點迷惑的.
//1 工廠 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); //2 鏈接 try (Connection connection = factory.newConnection() ) { //3 渠道 Channel channel = connection.createChannel(); //4 發佈消息 String msg = "hello rabbitmq"; channel.basicPublish("",QUEUE_NAME,null,msg.getBytes()); System.out.println(name + " 發送消息 msg = " + msg); channel.close(); } catch (TimeoutException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); }
若是按照生產者/消費者--exchange--routingkey綁定--queue 的標準模式, 在下面的例子中exchange爲hello.world,routingkey爲key-hello,隊列爲QUEUE_NAME
//1 工廠 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); //2 鏈接 try (Connection connection = factory.newConnection() ) { //3 渠道 Channel channel = connection.createChannel(); //4 發佈消息 String msg = "hello rabbitmq"; channel.basicPublish("hello.world","key-hello",null,msg.getBytes()); System.out.println(name + " 發送消息 msg = " + msg); channel.close(); } catch (TimeoutException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); }
try { //1 工廠 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); //2 鏈接 Connection connection = factory.newConnection(); //3 渠道 Channel channel = connection.createChannel(); channel.exchangeDeclare("hello.world","direct"); //4 建立隊列 channel.queueDeclare(QUEUE_NAME,false,false,false,null); channel.queueBind(QUEUE_NAME,"hello.world","key-hello"); //5 設置消費 channel.basicConsume(QUEUE_NAME,true,new DefaultConsumer(channel){ [@Override](https://my.oschina.net/u/1162528) public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body,"UTF-8"); System.out.println(name + " 接收到消息 msg = " + msg); } }); System.out.println("客戶端啓動."); latch.countDown();
用過jms的都知道topic訂閱隊列,在rabbitmq的對應的是exchange的"fanout". 實現原理,直接使用exchange,隊列自動建立,不在經過routingkey綁定exchange和queue,從而實如今exchange下的queue均可以接收到消息.
Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_LOG, ExchangeTypes.FANOUT); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName,EXCHANGE_LOG,""); Consumer consumer = new DefaultConsumer(channel){ [@Override](https://my.oschina.net/u/1162528) public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String log = new String(body); System.out.println(name+"<<<< " + log); } }; channel.basicConsume(queueName,true,consumer); System.out.println(name +" 客戶端等待中....." ); latch.countDown();
Channel channel = connection.createChannel(); //channel.exchangeDeclare(EXCHANGE_LOG, ExchangeTypes.FANOUT); for (int i = 0; i < MSG_NUM; i++) { String logMsg =name +">>>> 日誌.... "+i; channel.basicPublish(EXCHANGE_LOG,"",null,logMsg.getBytes()); } channel.close();
首先區別正則表達式,只有2種通配符,*表示一個字(一個英文word,不是字母),#表示多個英文字. 這個實現是經過帶有通配符的綁定關係,經過綁定關係,將不一樣的消息分發到不一樣的queue.
try { Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(TOPIC,ExchangeTypes.TOPIC); String queueName = channel.queueDeclare().getQueue(); for (int i = 0; i < routingKeys.length; i++) { String routingKey = routingKeys[i]; channel.queueBind(queueName, TOPIC, routingKey); } channel.basicConsume(queueName,new DefaultConsumer(channel){ [@Override](https://my.oschina.net/u/1162528) public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println(name+ " : " +new String(body) ); } }); System.out.println(name+" 等待中....."); latch.countDown(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); }
try (Connection connection = factory.newConnection()){ Channel channel = connection.createChannel(); channel.exchangeDeclare(TOPIC, ExchangeTypes.TOPIC); for (String routingKey:routingKeys) { channel.basicPublish(TOPIC,routingKey,null,routingKey.getBytes()); } channel.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); }
看了上面的簡單,訂閱,主題,你會更加理解exchange,routingkey,queue之間的. 上面的都是一些比較特殊場景的應用.
try { Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(ROUNTING, ExchangeTypes.DIRECT); String queueNm = channel.queueDeclare().getQueue(); channel.queueBind(queueNm,ROUNTING,routingKey); if(routingKey.contains("info")){ channel.queueBind(queueNm,ROUNTING,"error"); } DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ [@Override](https://my.oschina.net/u/1162528) public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body); System.out.println(routingKey+" : " + msg); } }; channel.basicConsume(queueNm,true,defaultConsumer); System.out.println(routingKey+"客戶端等待中...."); latch.countDown(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); }
try (Connection connection = factory.newConnection()){ Channel channel = connection.createChannel(); // channel.exchangeDeclare(ROUNTING, ExchangeTypes.DIRECT); if (name.contains("1")){ channel.basicPublish(ROUNTING,"error",null,(name+">>error").getBytes()); channel.basicPublish(ROUNTING,"warning",null,(name+">>warning").getBytes()); }else { channel.basicPublish(ROUNTING,"info",null,(name+">>infoinfo").getBytes()); } channel.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); }
在分佈式環境中,遠程調用rpc有不少實現方式,比較流行的,非跨語言速度極快的java RMI, google的基於protobuf/http2的GRPC ,facebook的IO多路複用/tcp的Thrift,使用WSDL的Web Service等. MQ一樣也能夠作RPC實現,這源於MQ的自然負載均衡,以及rpc的非實時性要求. 使用rabbitmq實現rpc,用到了三點,第一是connection屬性的BasicProperties,須要設置一個 應答隊列replyTo,這個是在publish時帶入的;第二 使用默認exchange,不須要設定exchange; 第三,應答隊列的屬性應當是排他自動刪除的,這個使用默認無數方法生成的隊列就能夠,默認爲排他, 自動刪除,非持久隊列.關於這點,能夠看源碼:
@Override public AMQP.Queue.DeclareOk queueDeclare() throws IOException { return queueDeclare("", false, true, true, null); }
try { Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.basicQos(1); channel.queueDeclare(REQUEST_QUEUE,false,false,true,null); System.out.println("RPC 服務器等待...."); channel.basicConsume(REQUEST_QUEUE,false,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String replyQueue = properties.getReplyTo(); AMQP.BasicProperties replyProp = new AMQP.BasicProperties().builder().correlationId(properties.getCorrelationId()).build(); String message = new String(body); int n = Integer.parseInt(message); String responseBody =String.valueOf(fibonacci(n)); channel.basicPublish("",replyQueue,replyProp,responseBody.getBytes()); channel.basicAck(envelope.getDeliveryTag(),false); System.out.println("計算 Fibonacci ["+message+"] = "+responseBody); } }); latch.countDown(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } private int fibonacci(int value){ if(value == 0 || value == 1){ return value; }else { return fibonacci(value-1)+fibonacci(value-2); } }
try { Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //聲明應答隊列,默認是排他,自動刪除,非持久隊列,也就是說,當客戶端中止了,隊列就好消失 String queueName = channel.queueDeclare().getQueue(); String correlationId = UUID.randomUUID().toString(); AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().correlationId(correlationId).replyTo(queueName).build(); channel.basicPublish("",REQUEST_QUEUE,properties,message.getBytes()); BlockingQueue<String> response = new ArrayBlockingQueue<String>(1); channel.basicConsume(queueName,true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { if (correlationId.equalsIgnoreCase(properties.getCorrelationId())){ response.offer(new String(body)); } } }); System.out.println("接收到消息:"+response.take()); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); }
上面的例子都是使用routingkey來進行綁定關係,在一些狀況下,可能仍是不能知足業務場景, 好比我想要"張三",電話"123456789"的全部消息,轉到一個特殊處理(僅舉例,無心義).
消費端代碼,一樣是建立一個exchange,類型headers,而後構建一個map,經過BasicProperties, 傳遞參數.注意這裏的map的value能夠爲java的一些基本類型(能夠查閱Frame.fieldValueSize()
), 可是不能是用戶自定義的類型.rabbitmq對於不存在queue,發送的消息會丟失,因此從消息持久化的角度, 服務端和客戶端都應當declare,可是隻有消費端declare,並不會報錯,若是消息比客戶端啓動更早到達,則會丟失消息.
try { Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(HEADERS_QUEUE,ExchangeTypes.HEADERS); String queueName = channel.queueDeclare().getQueue(); // channel.queueDeclare(HEADERS_QUEUE,false,false,true,null); Map<String,Object> headers = new HashMap<>(); if(name.endsWith("all")){ headers.put("x-match","all"); headers.put("name","張三"); headers.put("phone","123456789"); }else if (name.endsWith("any1")){ headers.put("x-match","any"); headers.put("name","張三"); headers.put("phone","0000"); }else{ headers.put("x-match","any"); headers.put("name","張三1"); headers.put("phone","123456789"); } channel.queueBind(queueName,HEADERS,"",headers); channel.basicConsume(queueName,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("接收消息>>"+new String(body)); } }); System.out.println("客戶啓動."); latch.countDown(); ; } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); }
try { Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // channel.exchangeDeclare(HEADERS, ExchangeTypes.HEADERS); Map<String,Object> headers = new HashMap<>(); headers.put("name","張三"); headers.put("phone","123456789"); AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().headers(headers).build(); channel.basicPublish(HEADERS,"",properties,"hello rabbitmq ".getBytes()); channel.close(); connection.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } System.out.println("服務端啓動.");
事務幾乎無處不在,而如今談及事務毫不是簡單的事務,而是分佈式事務.遺憾的是這裏的事務跟分佈式事務沒有必然聯繫. 這裏單純的談及rabbitmq的事務.首先說一下,rabbitmq是基於tcp協議的,tcp三次握手四次揮手,這裏就涉及到消息的確認 機制.而rabbitmq的事務也是依賴這個確認機制的.再來講一下確認機制,咱們在使用rabbitmq或者jms默認都是 有確認機制的,只不過是默認實現,咱們能夠經過一些ack的參數或接口設置.通常都是默認批量自動ack, 何時ack呢?rabbitmq中沒有消息過時的概念,只有消息被正常處理了,客戶端發送了ack,纔會刪除. 批量ack,則是在ack到必定數量以後才一塊發送ack,減小帶寬,可是失敗則影響較大.傳統的事務,是先 開啓事務,進行操做,事務提交,事務回滾,速度將減慢到原來的2倍(通過本地測試,差很少這個數). rabbitmq提供了一個高級的Publisher Confirm機制,跟傳統不太同樣,其實是將事務的提交拆分了, 等全部事務提交完畢,在最終確認.速度介於並接近非事務速度(可能測試用例問題,跟傳統tx差很少?!). 當開啓publisher confirm時,該信道上會爲每個消息分配一個id,當消息被髮送到消費端時,rabbitmq就 會發確認到生產端,消息的發送和確認是異步.
static class NoPublisher implements Runnable { /** * When an object implementing interface <code>Runnable</code> is used * to create a thread, starting the thread causes the object's * <code>run</code> method to be called in that separately executing * thread. * <p> * The general contract of the method <code>run</code> is that it may * take any action whatsoever. * * @see Thread#run() */ @Override public void run() { try { try (Connection connection = factory.newConnection()) { Channel channel = connection.createChannel(); channel.queueDeclare(NO_TRANSACTION, false, false, false, null); long start = System.currentTimeMillis(); try { for (int i = 0; i < MSG_NUM; i++) { String msg = "rabbitmq msg!"; channel.basicPublish("", NO_TRANSACTION, null, msg.getBytes()); } channel.basicPublish("", NO_TRANSACTION, null, "end".getBytes()); } catch (Exception e) { e.printStackTrace(); } finally { channel.close(); } long end = System.currentTimeMillis(); System.out.println("[發送方]發送方耗時:" + (end - start)); } } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } } static class NoConsumer implements Runnable { /** * When an object implementing interface <code>Runnable</code> is used * to create a thread, starting the thread causes the object's * <code>run</code> method to be called in that separately executing * thread. * <p> * The general contract of the method <code>run</code> is that it may * take any action whatsoever. * * @see Thread#run() */ @Override public void run() { try { Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(NO_TRANSACTION, false, false, false, null); //每次1條 channel.basicQos(1); long start = System.currentTimeMillis(); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body); channel.basicAck(envelope.getDeliveryTag(),false); if (msg.equalsIgnoreCase("end")){ long end = System.currentTimeMillis(); System.out.println("[接收方]接收完畢"+(end-start)); try { channel.close(); connection.close(); } catch (TimeoutException e) { e.printStackTrace(); } } } }; //手動ack channel.basicConsume(NO_TRANSACTION, false, consumer); System.out.println("[接收方]客戶端等待中......"); latch.countDown(); } catch (TimeoutException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } }
[接收方]客戶端等待中...... [發送方]發送方耗時:4080 [接收方]接收完畢16904
static class TranPublisher implements Runnable { /** * When an object implementing interface <code>Runnable</code> is used * to create a thread, starting the thread causes the object's * <code>run</code> method to be called in that separately executing * thread. * <p> * The general contract of the method <code>run</code> is that it may * take any action whatsoever. * * @see Thread#run() */ @Override public void run() { try { try (Connection connection = factory.newConnection()) { Channel channel = connection.createChannel(); channel.queueDeclare(TRANSACTION, false, false, false, null); long start = System.currentTimeMillis(); try { for (int i = 0; i < MSG_NUM;) { if (i%BATCH ==0){ //開啓事務 channel.txSelect(); for (int j = 0; j < BATCH; j++) { String msg = "rabbitmq msg!"; if(i + j != MSG_NUM -1 ){ channel.basicPublish("", TRANSACTION, null, msg.getBytes()); }else{ channel.basicPublish("", TRANSACTION, null, "end".getBytes()); } } i += BATCH; //commit channel.txCommit(); } } } catch (Exception e) { //回滾事務 channel.txRollback(); e.printStackTrace(); } finally { channel.close(); } long end = System.currentTimeMillis(); System.out.println("[tx發送方]發送方耗時:" + (end - start)+" 批量大小="+BATCH); } } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } } static class TranConsumer implements Runnable { /** * When an object implementing interface <code>Runnable</code> is used * to create a thread, starting the thread causes the object's * <code>run</code> method to be called in that separately executing * thread. * <p> * The general contract of the method <code>run</code> is that it may * take any action whatsoever. * * @see Thread#run() */ @Override public void run() { try { Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(TRANSACTION, false, false, false, null); //每次1條 channel.basicQos(1); long start = System.currentTimeMillis(); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body); //發送ack channel.basicAck(envelope.getDeliveryTag(), false); if (msg.equalsIgnoreCase("end")){ long end = System.currentTimeMillis(); System.out.println("[tx接收方]接收完畢"+(end-start)); try { channel.close(); connection.close(); } catch (TimeoutException e) { e.printStackTrace(); } } } }; //手動ack channel.basicConsume(TRANSACTION, false, consumer); System.out.println("[tx接收方]客戶端等待中......"); latch.countDown(); } catch (TimeoutException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } }
[tx接收方]客戶端等待中...... [tx發送方]發送方耗時:8703 批量大小=100 [tx接收方]接收完畢22160
static class ConfirmPublisher implements Runnable { /** * When an object implementing interface <code>Runnable</code> is used * to create a thread, starting the thread causes the object's * <code>run</code> method to be called in that separately executing * thread. * <p> * The general contract of the method <code>run</code> is that it may * take any action whatsoever. * * @see Thread#run() */ @Override public void run() { try { try (Connection connection = factory.newConnection()) { Channel channel = connection.createChannel(); long start = System.currentTimeMillis(); try { for (int i = 0; i < MSG_NUM; ) { if (i%BATCH ==0){ //開啓confirm3 channel.confirmSelect(); for (int j = 0; j < BATCH; j++) { String msg = "rabbitmq msg!"; if(i + j != MSG_NUM -1){ channel.basicPublish("", CONFIRM, null, msg.getBytes()); }else{ channel.basicPublish("", CONFIRM, null, "end".getBytes()); } } i += BATCH; //confirm // waitForConfirmsOrDie 相對於 waitForConfirms 來講,只要有nack就好拋出異常,同時也是一種阻塞式 channel.waitForConfirmsOrDie(); //channel.addConfirmListener(new ConfirmListener() { // @Override // public void handleAck(long deliveryTag, boolean multiple) throws IOException { //// System.out.println("ack deliveryTag = " + deliveryTag); // } // // @Override // public void handleNack(long deliveryTag, boolean multiple) throws IOException { //// System.out.println("nack deliveryTag = " + deliveryTag); // } // }); } } } catch (Exception e) { e.printStackTrace(); } finally { channel.close(); } long end = System.currentTimeMillis(); System.out.println("[confirm發送方]發送方耗時:" + (end - start)+" 批量大小="+BATCH); } } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } } static class ConfirmConsumer implements Runnable { /** * When an object implementing interface <code>Runnable</code> is used * to create a thread, starting the thread causes the object's * <code>run</code> method to be called in that separately executing * thread. * <p> * The general contract of the method <code>run</code> is that it may * take any action whatsoever. * * @see Thread#run() */ @Override public void run() { try { Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(CONFIRM, false, false, false, null); //每次1條 channel.basicQos(1); long start = System.currentTimeMillis(); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body); //發送ack channel.basicAck(envelope.getDeliveryTag(), false); // System.out.println("確認"+msg); if (msg.equals("end")){ long end = System.currentTimeMillis(); System.out.println("[confirm接收方]接收完畢"+(end-start)); try { channel.close(); connection.close(); } catch (TimeoutException e) { e.printStackTrace(); } } } }; //手動ack channel.basicConsume(CONFIRM, false, consumer); System.out.println("[confirm接收方]客戶端等待中......"); latch.countDown(); } catch (TimeoutException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } }
[confirm接收方]客戶端等待中...... [confirm發送方]發送方耗時:5358 批量大小=100 [confirm接收方]接收完畢22502