rabbitmq提供了6中消息隊列模型服務器
public class RabbitmqConnectionUtil { public static Connection getConnection() throws IOException, TimeoutException { // 鏈接工廠 ConnectionFactory connectionFactory = new ConnectionFactory(); // 服務器地址 connectionFactory.setHost("localhost"); //服務器端口 connectionFactory.setPort(5672); // rabbitmq用戶名 connectionFactory.setUsername("guest"); // rabbitmq密碼 connectionFactory.setPassword("guest"); return connectionFactory.newConnection(); } }
public class Sender { public static final String QUEUE_NAME = "test_queue"; public static void main(String[] args) throws IOException, TimeoutException { // 獲取rabbitmq 服務器鏈接 Connection connection = RabbitmqConnectionUtil.getConnection(); // 建立rabbitmq的隊列 Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 將消息發佈到隊列 String message = "hello, world"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); // 關閉 channel.close(); connection.close(); } }
public class Receive { private static final String QUEUE_NAME = "test_queue"; public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { Connection connection = RabbitmqConnectionUtil.getConnection(); // 建立通道 Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 定義消費者 DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(message); } }; // 綁定通道和消費者 channel.basicConsume(QUEUE_NAME, false, consumer); } private static void oldApi() throws IOException, TimeoutException, InterruptedException { Connection connection = RabbitmqConnectionUtil.getConnection(); // 從鏈接中建立通道 Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 定義消費者 QueueingConsumer consumer = new QueueingConsumer(channel); // 監聽隊列 channel.basicConsume(QUEUE_NAME, false, consumer); while (true){ QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println("[Rec]:"+ message); } } }
簡單模型:生產者與消費者是一一對應的,一個生產者只能有一個消費者,若是多個的話,必須建立多個,耦合性強
2. Work Queues
Simple隊列是生產者和消費者一一對應的,實際開發中,生產者發送消息是絕不費力的,消費者通常須要處理業務,花費的時間較長,若是使用Simple會形成消息的擠壓異步
public class Send { public static final String QUEUE_NAME = "test_queue"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = RabbitmqConnectionUtil.getConnection(); // 建立channal Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); //發佈信息 for (int i = 0; i < 50; i++) { String message = "hello "+i; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); try { Thread.sleep(i*100); } catch (InterruptedException e) { e.printStackTrace(); } } channel.close(); connection.close(); } }
public class Rece1 { public static final String QUEUE_NAME = "test_queue"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = RabbitmqConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("[1] reced " + message); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println("done"); } } }; channel.basicConsume(QUEUE_NAME, true, consumer); } }
public class Send { public static final String QUEUE_NAME = "test_queue"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = RabbitmqConnectionUtil.getConnection(); // 建立channal Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); /* 每一個消費者發送確認消息以前,消息隊列再也不發送下一個消息到消費者,一次只處理一個消息 */ int prefectCount = 1; channel.basicQos(prefectCount); //發佈信息 for (int i = 0; i < 50; i++) { String message = "hello "+i; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); } channel.close(); connection.close(); } }
public class Rece1 { public static final String QUEUE_NAME = "test_queue"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = RabbitmqConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.basicQos(1); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("[1] reced " + message); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println("done"); } // 完成以後,手動應答 channel.basicAck(envelope.getDeliveryTag(), false); } }; //關閉自動應答 boolean ack = false; channel.basicConsume(QUEUE_NAME, ack, consumer); } }
public class Rece2 { public static final String QUEUE_NAME = "test_queue"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = RabbitmqConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.basicQos(1); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("[1] reced " + message); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println("done"); } // 手動應答 channel.basicAck(envelope.getDeliveryTag(), false); } }; //關閉自動應答 boolean ack = false; channel.basicConsume(QUEUE_NAME, ack, consumer); } }
boolean ack = false 手動模式,若是一個消費者掛掉,就會交付給其餘消費者,rabbitmq支持消息應答,消費者發送一個消息應答,告訴rabbitmq這個消息我已經處理掉了,你能夠刪除了,而後rabbitmq就刪除內存中的消息
消息應答模式是打開的,false
若是這種方式 rabbitmq掛的話,整個消息都會丟失ide
boolean durable = false; channel.queueDeclare(QUEUE_NAME, false, false, false, null); durable=true,設置爲true,若是在代碼運行前QUEUE_NAME隊列已經存在,而且不是持久化的,rabbitmq不容許從新定義一個已經存在的隊列
public class Send { public static final String EXCHANGE_NAME = "test_exchange_fanout"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = RabbitmqConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 聲明交換機 channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); // 分發 // 發送消息 String message = "hello world"; channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes()); channel.close(); connection.close(); } }
public class Receive { public static final String QUEUE_NAME = "test_queue_email"; public static final String EXCHANGE_NAME = "test_exchange_fanout"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = RabbitmqConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false,null); // 綁定交換機 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); channel.basicQos(1); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("[1] reced " + message); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println("done"); } // 手動應答 channel.basicAck(envelope.getDeliveryTag(), false); } }; //關閉自動應答 boolean ack = false; channel.basicConsume(QUEUE_NAME, ack, consumer); } }
public class Receive2 { public static final String QUEUE_NAME = "test_queue_sms"; public static final String EXCHANGE_NAME = "test_exchange_fanout"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = RabbitmqConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false,null); // 綁定交換機 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); channel.basicQos(1); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("[2] reced " + message); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println("done"); } // 手動應答 channel.basicAck(envelope.getDeliveryTag(), false); } }; //關閉自動應答 boolean ack = false; channel.basicConsume(QUEUE_NAME, ack, consumer); } }
public class Send { private static final String EXCHANGE_NAME = "test_exchange_direct"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = RabbitmqConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 聲明exchange channel.exchangeDeclare(EXCHANGE_NAME, "direct"); channel.basicQos(1); String message = "hello direct"; // 發送消息 String routingKey = "warning"; channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes()); System.out.println("send " + message); channel.close(); connection.close(); } }
public class Recv1 { private static final String EXCHANGE_NAME = "test_exchange_direct"; private static final String QUEUE_NAME = "test_queue_direct_1"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = RabbitmqConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.basicQos(1); channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error"); DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("[1] Recv "+ message); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); }finally { // 手動應答 System.out.println("done"); channel.basicAck(envelope.getDeliveryTag(), false); } } }; // 關閉自動應答 boolean ack = false; channel.basicConsume(QUEUE_NAME, ack, defaultConsumer); } }
public class Recv2 { private static final String EXCHANGE_NAME = "test_exchange_direct"; private static final String QUEUE_NAME = "test_queue_direct_2"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = RabbitmqConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.basicQos(1); channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error"); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info"); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "warning"); DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("[1] Recv "+ message); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); }finally { // 手動應答 System.out.println("done"); channel.basicAck(envelope.getDeliveryTag(), false); } } }; // 關閉自動應答 boolean ack = false; channel.basicConsume(QUEUE_NAME, ack, defaultConsumer); } }
public class Send { private static final String EXCHANGE_NAME = "test_exchange_topic"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = RabbitmqConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "topic"); String routingKey = "goods.update"; String message = "商品。。。。"; channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes()); channel.close(); connection.close(); } }
public class Recv1 { private static final String EXCHANGE_NAME = "test_exchange_topic"; private static final String QUEUE_NAME = "test_queue_topic_2"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = RabbitmqConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.basicQos(1); channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "goods.#"); DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("[2] Recv "+ message); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); }finally { // 手動應答 System.out.println("done"); channel.basicAck(envelope.getDeliveryTag(), false); } } }; // 關閉自動應答 boolean ack = false; channel.basicConsume(QUEUE_NAME, ack, defaultConsumer); } }
public class Recv2 { private static final String EXCHANGE_NAME = "test_exchange_topic"; private static final String QUEUE_NAME = "test_queue_topic_1"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = RabbitmqConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.basicQos(1); channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "goods.add"); DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("[1] Recv "+ message); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); }finally { // 手動應答 System.out.println("done"); channel.basicAck(envelope.getDeliveryTag(), false); } } }; // 關閉自動應答 boolean ack = false; channel.basicConsume(QUEUE_NAME, ack, defaultConsumer); } }
public class TxSend { private static final String QUEUE_NAME = "test_queue_tx"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = RabbitmqConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 發送數據 String message = "send tx message"; try{ channel.txSelect(); channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); int tt = 1/0; channel.txCommit(); }catch (Exception e){ channel.txRollback(); System.out.println("發送消息失敗"); } channel.close(); connection.close(); } }
public class TxRecv { private static final String QUEUE_NAME = "test_queue_tx"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = RabbitmqConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body); System.out.println("[recv] message: " + message); } }); } }
public class Send { private static final String QUEUE_NAME = "test_queue_confirm_1"; public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { Connection connection = RabbitmqConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 開啓confirm模式,一個隊列只能有一種模式 channel.confirmSelect(); String message = "send confirm..."; channel.basicPublish("", QUEUE_NAME, null , message.getBytes()); if(!channel.waitForConfirms()){ System.out.println("confirm send failed"); }else{ System.out.println("confirm send ok "); } // 關閉信道和鏈接 channel.close(); connection.close(); }
批量模式code
public class Send2 { private static final String QUEUE_NAME = "test_queue_confirm_1"; public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { Connection connection = RabbitmqConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 開啓confirm模式,一個隊列只能有一種模式 channel.confirmSelect(); String message = "send confirm..."; for (int i = 0; i < 10; i++) { channel.basicPublish("", QUEUE_NAME, null , message.getBytes()); } if(!channel.waitForConfirms()){ System.out.println("confirm send failed"); }else{ System.out.println("confirm send ok "); } // 關閉信道和鏈接 channel.close(); connection.close(); } }