virtual hosts至關於mysql的dbjava
通常以/開頭mysql
須要對用戶進行受權sql
P:消息生產者編程
紅色:隊列服務器
C:消息消費者架構
包含三個對象:生產者、隊列、消費者異步
import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; public class ConnectionUtil { /** * 獲取MQ的鏈接 * @return */ public static Connection getConnection() throws IOException, TimeoutException { //定義一個鏈接工廠 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); //AMQP的端口 factory.setPort(5672); //vhost factory.setVirtualHost("/vhost_mmr"); factory.setUsername("rabbit"); factory.setPassword("123456"); Connection connection = factory.newConnection(); return connection; } }
import com.meituan.mq.simple.utils.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Send { private static final String QUEUE_NAME = "test_simple_queue"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); //從鏈接中獲取一個通道 Channel channel = connection.createChannel(); //建立隊列聲明 channel.queueDeclare(QUEUE_NAME, false, false, false, null); String msg = "hello world!"; channel.basicPublish("", QUEUE_NAME, null, msg.getBytes()); System.out.println("---send msg :" + msg); channel.close(); connection.close(); } }
import com.meituan.mq.simple.utils.ConnectionUtil; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Receive { private static final String QUEUE_NAME = "test_simple_queue"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); //建立channel Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "utf8"); System.out.println("msg receive : " + msg); } }; channel.basicConsume(QUEUE_NAME, consumer); } }
耦合性高,生產者一一對應消費者,若是須要多個消費者消費隊列中的消息,此時簡單隊列就無能爲力了。ide
隊列名變動,源碼須要同時變動搜索引擎
一個生產者將消息放入隊列中,能夠有多個消費者進行消費spa
爲何會出現工做隊列?
Simple隊列:是一一對應的,實際開發中,生產者改善消息是絕不費力的,而消費者通常須要跟業務相結合,消費者接收到消息以後就須要處理,可能須要花費時間,此時隊列就會積壓不少消息。
生產消息
import com.meituan.mq.simple.utils.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Send { private static final String QUEUE_NAME = "test_work_queue"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); //從鏈接中獲取一個通道 Channel channel = connection.createChannel(); //建立隊列聲明 channel.queueDeclare(QUEUE_NAME, false, false, false, null); for (int i = 0; i < 50; i++) { String msg = "hello " + i; channel.basicPublish("", QUEUE_NAME, null, msg.getBytes()); System.out.println("---send msg :" + msg); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } channel.close(); connection.close(); } }
消費者1
import com.meituan.mq.simple.utils.ConnectionUtil; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Recv1 { private static final String QUEUE_NAME = "test_work_queue"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); //建立channel Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "utf8"); System.out.println("[1] msg recv1 : " + msg); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } }; boolean ack = true; channel.basicConsume(QUEUE_NAME, ack, consumer); } }
消費者2
import com.meituan.mq.simple.utils.ConnectionUtil; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Recv2 { private static final String QUEUE_NAME = "test_work_queue"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); //建立channel Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "utf8"); System.out.println("[2] msg recv1 : " + msg); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } }; boolean ack = true; channel.basicConsume(QUEUE_NAME, ack, consumer); } }
現象:
消費者1和消費者2處理的消息是同樣多的,這種分發方式稱爲輪詢分發(round-robin),無論誰忙或者誰閒,都不會多給或者少給。任務均分。
保證一次發送給消費者的消息不超過一條
/** * 每一個消費者發送確認消息以前,消息隊列不發送下一個消息給消費者,消費者一次只處理一個消息 * * 限制發送給同一個消費者不得超過一條消息 */ int preFetchCount = 1; channel.basicQos(preFetchCount);
使用公平分發,必須關閉自動應答ack,改成手動
channel.basicAck(envelope.getDeliveryTag(), false); boolean ack = false;//自動應答改成false channel.basicConsume(QUEUE_NAME, ack, consumer);
生產消息
import com.meituan.mq.simple.utils.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Send { private static final String QUEUE_NAME = "test_work_queue"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); //從鏈接中獲取一個通道 Channel channel = connection.createChannel(); //建立隊列聲明 channel.queueDeclare(QUEUE_NAME, false, false, false, null); /** * 每一個消費者發送確認消息以前,消息隊列不發送下一個消息給消費者,消費者一次只處理一個消息 * * 限制發送給同一個消費者不得超過一條消息 */ int preFetchCount = 1; channel.basicQos(preFetchCount); for (int i = 0; i < 50; i++) { String msg = "hello " + i; channel.basicPublish("", QUEUE_NAME, null, msg.getBytes()); System.out.println("---send msg :" + msg); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } channel.close(); connection.close(); } }
消費消息
import com.meituan.mq.simple.utils.ConnectionUtil; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Recv2 { private static final String QUEUE_NAME = "test_work_queue"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); //建立channel Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.basicQos(1); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "utf8"); System.out.println("[2] msg recv1 : " + msg); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } finally { channel.basicAck(envelope.getDeliveryTag(), false); } } }; boolean ack = false;//自動應答改成false channel.basicConsume(QUEUE_NAME, ack, consumer); } }
boolean ack = false;//自動應答改成false channel.basicConsume(QUEUE_NAME, ack, consumer);
ack = true時爲自動確認模式,一旦rabbitMQ將消息分發給消費者,該消息就會在內存中刪除;這種狀況下,若是殺死正在處理消息的消費者,會丟失正在處理的消息;
ack = false時爲手動回執(消息應答)模式,若是有一個消費者掛掉,就會將會給其餘消費者,rabbitMQ支持消息應答,消費者發送一個消息應答,告訴rabbitMQ這個消息已經被處理,而後rabbitMQ就刪除內存中的消息;
消息應答默認打開,即爲false;
因爲消息在內存中存儲,若是rabbitMQ掛掉,消息仍然會丟失。
boolean durable = false; channel.queueDeclare(QUEUE_NAME, durable, false, false, null);
durable控制的屬性就是消息的持久化。
已經聲明好的隊列,若是durable已經爲false了,就沒法修改成true,rabbitMQ不容許從新定義(不一樣參數)一個已存在的隊列
解讀:
一、一個生產者,多個消費者;
二、每一個消費者都有本身的隊列;
三、生產者沒有直接把消息發送到隊列,而是發送至交換機(eXchange)
四、每一個隊列都要綁定到交換機上
五、生產者發送的消息,通過交換機,到達隊列,就能實現一個消息被多個消費者消費
生產消息
import com.meituan.mq.simple.utils.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Send { private static final String EXCHANGE_NAME = "test_exchange_fanout"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); //聲明交換機 channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String msg = "hello ps"; channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes()); System.out.println("Send " + msg); channel.close(); connection.close(); } }
消息哪去了?丟失了!由於交換機沒有存儲能力,在rabbitMQ中,只有隊列有存儲能力。此時並無完成隊列綁定到交換機,因此數據丟失了。
消費消息
import com.meituan.mq.simple.utils.ConnectionUtil; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Recv1 { private static final String QUEUE_NAME = "test_ps_fanout_email"; private static final String EXCHANGE_NAME = "test_exchange_fanout"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); //綁定隊列到交換機 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); channel.basicQos(1); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "utf8"); System.out.println("[1] msg recv1 : " + msg); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } finally { channel.basicAck(envelope.getDeliveryTag(), false); } } }; boolean ack = false;//自動應答改成false channel.basicConsume(QUEUE_NAME, ack, consumer); } }
不一樣的隊列作不一樣的事情。
一方面接收生產者的消息,另外一方面向隊列推送消息
rabbitMQ提供了四種Exchange:fanout,direct,topic,header header模式在實際使用中較少。
將路由鍵和某模式進行匹配
任何發送到Topic Exchange的消息都會被轉發到全部關心RouteKey中指定話題的Queue上
生產者
import com.meituan.mq.simple.utils.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Send { private static final String EXCHANGE_NAME = "test_exchange_direct"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); //聲明exchange channel.exchangeDeclare(EXCHANGE_NAME, "direct"); String msg = "hello direct"; //指定路由鍵 String routingKey = "warning"; channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes()); System.out.println("send msg:" + msg); channel.close(); connection.close(); } }
消費者1
import com.meituan.mq.simple.utils.ConnectionUtil; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Recv1 { private static final String EXCHANGE_NAME = "test_exchange_direct"; private static final String QUEUE_NAME = "test_queue_direct"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.basicQos(1); //綁定隊列與交換機時,指定路由鍵 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error"); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "utf8"); System.out.println("[1] msg recv1 : " + msg); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } finally { channel.basicAck(envelope.getDeliveryTag(), false); } } }; boolean ack = false;//自動應答改成false channel.basicConsume(QUEUE_NAME, ack, consumer); } }
消費者2
import com.meituan.mq.simple.utils.ConnectionUtil; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Recv2 { private static final String EXCHANGE_NAME = "test_exchange_direct"; private static final String QUEUE_NAME = "test_queue_direct_2"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.basicQos(1); //綁定隊列與交換機時,指定路由鍵 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error"); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info"); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "warning"); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "utf8"); System.out.println("[2] msg recv2 : " + msg); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } finally { channel.basicAck(envelope.getDeliveryTag(), false); } } }; boolean ack = false;//自動應答改成false channel.basicConsume(QUEUE_NAME, ack, consumer); } }
# 匹配一個或多個
* 匹配一個
生產者
import com.meituan.mq.simple.utils.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Send { private static final String EXCHANGE_NAME = "test_exchange_topic"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); //聲明exchange,指定模式爲topic channel.exchangeDeclare(EXCHANGE_NAME, "topic"); String msg = "商品...."; String routingKey = "goods.delete"; channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes()); System.out.println("send msg:" + msg); channel.close(); connection.close(); } }
消費者1
import com.meituan.mq.simple.utils.ConnectionUtil; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * Created by wangbin on 2018/6/26. */ public class Recv1 { private static final String EXCHANGE_NAME = "test_exchange_topic"; private static final String QUEUE_NAME = "test_queue_topic_1"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.basicQos(1); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "goods.#"); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "utf8"); System.out.println("[1] msg recv1 : " + msg); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } finally { channel.basicAck(envelope.getDeliveryTag(), false); } } }; boolean ack = false;//自動應答改成false channel.basicConsume(QUEUE_NAME, ack, consumer); } }
消費者2
import com.meituan.mq.simple.utils.ConnectionUtil; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * Created by wangbin on 2018/6/26. */ public class Recv2 { private static final String EXCHANGE_NAME = "test_exchange_topic"; private static final String QUEUE_NAME = "test_queue_topic_2"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.basicQos(1); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "goods.add"); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "utf8"); System.out.println("[2] msg recv2 : " + msg); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } finally { channel.basicAck(envelope.getDeliveryTag(), false); } } }; boolean ack = false;//自動應答改成false channel.basicConsume(QUEUE_NAME, ack, consumer); } }
其中,消費者1綁定路由鍵爲goods.#
,消費者2綁定路由鍵爲goods.add
。當生產者發送的消息路由鍵爲goods.add
時,兩個消費者都會收到消息並處理;當生產者發送的消息路由鍵爲goods.update
時,只有消費者1能夠接收到消息。
在rabbitMQ中,能夠經過持久化數據解決rabbitMQ服務器異常的數據丟失問題。
問題:生產者將消息發送出去以後,消息到底有沒有到達rabbitMQ服務器;默認狀況是不知道消息已到達的
兩種方式:
用於將當前channel設置成transaction模式
用於提交事務
回滾事務
生產者發送消息
import com.meituan.mq.simple.utils.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Send { private static final String QUEUE_NAME = "test_queue_tx"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); String msg = "hello tx msg!"; try { channel.txSelect(); channel.basicPublish("", QUEUE_NAME, null, msg.getBytes()); channel.txCommit(); } catch (IOException e) { channel.txRollback(); System.out.println("發生異常,事務已回滾"); } } }
事務機制會下降rabbitMQ的吞吐量。
生產者將信道設置成confirm模式,一旦信道進入confirm模式,全部在該信道上面發佈的消息都將會被指派一個惟一的ID(從1開始),一旦消息被投遞到全部匹配的隊列以後,broker就會發送一個確認給生產者(包含消息的惟一ID),這就使得生產者知道消息已經正確到達目的隊列了,若是消息和隊列是可持久化的,那麼確認消息會在將消息寫入磁盤以後發出,broker回傳給生產者的確認消息中delivery-tag域包含了確認消息的序列號,此外broker也能夠設置basic.ack的multiple域,表示到這個序列號以前的全部消息都已經獲得了處理;
confirm模式最大的好處在於他是異步的,一旦發佈一條消息,生產者應用程序就能夠在等信道返回確認的同時繼續發送下一條消息,當消息最終獲得確認以後,生產者應用即可以經過回調方法來處理該確認消息,若是RabbitMQ由於自身內部錯誤致使消息丟失,就會發送一條nack消息,生產者應用程序一樣能夠在回調方法中處理該nack消息。
編程模式:
一、普通,發一條
二、批量,發一批
三、異步confirm模式,提供一個回調方法