官方圖例中,P表明消息生產者,中間表明消息隊列,C表明消息的消費者。java
/** * 獲取MQ鏈接 */ public static Connection getConnection() throws IOException, TimeoutException { //定義一個鏈接工廠 ConnectionFactory factory = new ConnectionFactory(); //設置服務地址 factory.setHost("*.*.*.*"); // 協議:端口號 AMQP:5672 factory.setPort(5672); //設置數據庫 factory.setVirtualHost("/vhost_haoyuehong"); //用戶名 factory.setUsername("haoyuehong"); //密碼 factory.setPassword("123456"); factory.setConnectionTimeout(30*1000);//默認60秒 return factory.newConnection(); }
public class Send { private static final String QUEUE_NAME = "test_simple_queue"; public static void main(String[] args) throws IOException, TimeoutException { //獲取一個鏈接 Connection connection = ConnectionUtils.getConnection(); //從鏈接中獲取吧通道 Channel channel = connection.createChannel(); //建立隊列聲明(註冊) channel.queueDeclare(QUEUE_NAME,false,false,false,null); //要發送的消息 String msg = "hello rabiitmq"; channel.basicPublish("",QUEUE_NAME,null,msg.getBytes()); System.out.println("-->send:"+msg); channel.close(); connection.close(); } }
運行以上程序,在RabbitMQ web控制面板QUEUE選項卡中能夠看到以下:python
說明消息發送成功。web
/** * 消息的消費者 */ public class Recv { private static final String QUEUE_NAME = "test_simple_queue"; public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { //獲取一個鏈接 Connection connection = ConnectionUtils.getConnection(); //從鏈接中獲取通道 Channel channel = connection.createChannel(); //定義隊列的消費者 QueueingConsumer consumer = new QueueingConsumer(channel); //監聽隊列 channel.basicConsume(QUEUE_NAME,true,consumer); while (true){ QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String s = new String(delivery.getBody()); System.out.println("<----recv:"+s); } } }
消息數量變爲0,說明已被消費。數據庫
以上方法已過世,修改以下:分佈式
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { //獲取一個鏈接 Connection connection = ConnectionUtils.getConnection(); //從鏈接中獲取通道 Channel channel = connection.createChannel(); //聲明隊列 channel.queueDeclare(QUEUE_NAME,false,false,false,null); //事件機制 有消息時觸發 DefaultConsumer consumer = new DefaultConsumer(channel) { public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "UTF-8"); System.out.println("<------new API recv:" + msg); } }; //監聽隊列 channel.basicConsume(QUEUE_NAME,true,consumer); }
咱們假設有一個生產者和兩個消費者(生產者和消費者的代碼和Simple queue同樣,只是多了一個消費者),生產者每0.5秒發送一條消息(共發送50條),消費者1每2秒處理一條消息,消費者2每1秒處理一條消息,測試結果發現:消費者1處理的數據數量和消費者2消費的消息數量相同,且消費者1處理奇數條(下角標爲奇數)的數據,消費者2處理偶數條(下角標爲偶數)的消息,這種方式叫作輪訓分發(Round-Robin)測試
2.公平分發(fair dispatch)fetch
消費者每次處理完消息後手動反饋給消息隊列,消息隊列分發下一條消息給消費者。ui
代碼以下:spa
public class Send { private static final String QUEUE_NAME = "test_work_queue"; public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); //聲明隊列 channel.queueDeclare(QUEUE_NAME,false,false,false,null); //每一個消費者在消息確認以前,消息隊列不發送消息到消費者,一次只處理一個消息 int prefetchCount = 1; channel.basicQos(prefetchCount); //發送50條消息 for(int i=0;i<50;i++){ String msg = "hello work queue :"+i; channel.basicPublish("",QUEUE_NAME,null,msg.getBytes()); Thread.sleep(i*20); } channel.close(); connection.close(); } }
public class Recv1 { private static final String QUEUE_NAME = "test_work_queue"; public static void main(String[] args) throws IOException, TimeoutException { //獲取鏈接 Connection connection = ConnectionUtils.getConnection(); //獲取通道 final Channel channel = connection.createChannel(); //聲明隊列 channel.queueDeclare(QUEUE_NAME,false,false,false,null); channel.basicQos(1); //定義消費者 DefaultConsumer consumer = new DefaultConsumer(channel) { public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String s = new String(body, "UTF-8"); System.out.println("Recv1 <------收到消息:"+s); try { Thread.sleep(1000*2); } catch (InterruptedException e) { e.printStackTrace(); }finally { System.out.println("[1] done"); channel.basicAck(envelope.getDeliveryTag(),false); } } }; //監聽隊列 boolean autoAck = false; //自動應答 channel.basicConsume(QUEUE_NAME,autoAck,consumer); } }
public class Recv2 { private static final String QUEUE_NAME = "test_work_queue"; public static void main(String[] args) throws IOException, TimeoutException { //獲取鏈接 Connection connection = ConnectionUtils.getConnection(); //獲取通道 final Channel channel = connection.createChannel(); //聲明隊列 channel.queueDeclare(QUEUE_NAME,false,false,false,null); channel.basicQos(1); //定義消費者 DefaultConsumer consumer = new DefaultConsumer(channel) { public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String s = new String(body, "UTF-8"); System.out.println("Recv1 <------收到消息:"+s); try { Thread.sleep(1000*2); } catch (InterruptedException e) { e.printStackTrace(); }finally { System.out.println("[1] done"); channel.basicAck(envelope.getDeliveryTag(),false); } } }; //監聽隊列 boolean autoAck = false; //自動應答 channel.basicConsume(QUEUE_NAME,autoAck,consumer); } }
int prefetchCount = 1; channel.basicQos(prefetchCount);
channel.basicQos(1);
channel.basicAck(envelope.getDeliveryTag(),false); ……………… boolean autoAck = false; //自動應答 channel.basicConsume(QUEUE_NAME,autoAck,consumer);
boolean autoAck = false; //自動應答 channel.basicConsume(QUEUE_NAME,autoAck,consumer);
autoAck爲自動應答,是一個布爾值,3d
一旦RabbitMQ將消息分發給消費者,就會將消息從內存中刪除,這種狀況下,若是消息處理過程當中發生異常,消息將會丟失,
2. boolean autoAck = false; 手動確認模式
若是有一個消費者掛掉,消息隊列將會將該消息交付給其餘消費者,消費者發送消息應答後RabbitMQ將內存中的消息刪除
消息應答默認爲false,假設一種極端狀況,RabbitMQ發生故障宕機了,由於消息存在內存中,RabbitMQ掛掉後,消息也會丟失,這就須要將數據持久化
boolean durable = false; //持久化 channel.queueDeclare(QUEUE_NAME, durable,false,false,null);
注意:聲明好的隊列不能修改durable,由於已經將隊列聲明好,RabbitMQ不許許從新定義一個已存在的對列
X---交換機
生產者:
public class Send { private static final String EXCHANGE_NAME = "test_exchange_fanout"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); //聲明交換機 channel.exchangeDeclare(EXCHANGE_NAME,"fanout"); //1 交換機名字 2 交換機類型 //發送消息 String msg = "hello ps"; channel.basicPublish(EXCHANGE_NAME,"",null,msg.getBytes()); System.out.println("send msg ----->"+msg); channel.close(); connection.close(); } }
運行以上程序,查看控制檯:
此時,消息已經丟失,由於交換機沒有存儲能力,RabbitMQ裏面只有對列有存儲能力,因爲此時沒有對列綁定,因此消息丟失了
消費者1:
public class Recv1 { private static final String QUEUE_NAME = "test_queue_fanout_email"; private static final String EXCHANGE_NAME = "test_exchange_fanout"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.getConnection(); final Channel channel = connection.createChannel(); //對列聲明 channel.queueDeclare(QUEUE_NAME,false,false,false,null); //綁定對列到交換機 channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,""); channel.basicQos(1); //定義消費者 DefaultConsumer consumer = new DefaultConsumer(channel) { public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String s = new String(body, "UTF-8"); System.out.println("Recv1 <------收到消息:"+s); try { Thread.sleep(1000*2); } catch (InterruptedException e) { e.printStackTrace(); }finally { System.out.println("[1] done"); channel.basicAck(envelope.getDeliveryTag(),false); } } }; //監聽隊列 boolean autoAck = false; //自動應答 channel.basicConsume(QUEUE_NAME,autoAck,consumer); } }
消費者2:
public class Recv1 { private static final String QUEUE_NAME = "test_queue_fanout_email"; private static final String EXCHANGE_NAME = "test_exchange_fanout"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.getConnection(); final Channel channel = connection.createChannel(); //對列聲明 channel.queueDeclare(QUEUE_NAME,false,false,false,null); //綁定對列到交換機 channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,""); channel.basicQos(1); //定義消費者 DefaultConsumer consumer = new DefaultConsumer(channel) { public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String s = new String(body, "UTF-8"); System.out.println("Recv2 <------收到消息:"+s); try { Thread.sleep(1000*2); } catch (InterruptedException e) { e.printStackTrace(); }finally { System.out.println("[2] done"); channel.basicAck(envelope.getDeliveryTag(),false); } } }; //監聽隊列 boolean autoAck = false; //自動應答 channel.basicConsume(QUEUE_NAME,autoAck,consumer); } }
交換器的類型,常見的有fanout、direct、topic、headers這四種
1.1 模式
2. Direct Exchange : 直連的方式 把消息路由到那些 BindingKey RoutingKey 徹底匹配的隊列中
2.1 模式
生產者:
public class Send { private static final String EXCHANGE_NAME = "test_exchange_rounting"; private static final String ROUNTINGKEY = "key"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); //聲明交換機 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT.getType()); String msg = "hello rounting"; channel.basicPublish(EXCHANGE_NAME,ROUNTINGKEY,null,msg.getBytes());//ROUNTINGKEY聲明瞭路由鍵 相同的路由鍵能收到消息 System.out.println("send msg ----->"+msg); channel.close(); connection.close(); } }
消費者:
public class Recv1 { private static final String QUEUE_NAME = "test_queue_rounting_email"; private static final String EXCHANGE_NAME = "test_exchange_rounting"; private static final String ROUNTINGKEY = "key"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.getConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME,false,false,false,null); channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,ROUNTINGKEY); //與生產者聲明的路由鍵相同 channel.basicQos(1); //定義消費者 DefaultConsumer consumer = new DefaultConsumer(channel) { public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String s = new String(body, "UTF-8"); System.out.println("Recv1 <------收到消息:"+s); try { Thread.sleep(1000*2); } catch (InterruptedException e) { e.printStackTrace(); }finally { System.out.println("[1] done"); channel.basicAck(envelope.getDeliveryTag(),false); } } }; //監聽隊列 boolean autoAck = false; //自動應答 channel.basicConsume(QUEUE_NAME,autoAck,consumer); } }