RabbitMQ - 發佈訂閱模式

Publish/Subscribe 模式

以前講的 工做隊列模式,一個工做隊列中的任務只能分發給一個 消費者。而咱們今天要聊的這個 發佈/訂閱模式 有着更復雜的工做模式, 他能夠將一個消息發給多個消費者。以下圖所示:.net


Exchanges

從上面的圖中,咱們能夠看到明顯比以前的工做隊列模式,多了一個組成部分(即圖中的 x ),他就是 exchanges,那麼這個東西究竟是什麼呢? 這裏引用官網的一句話 Exchange is like JFK airport,這是個形象的比喻。下面會具體說明。3d

full messaging model

核心思想就是生產者再也不發消息給 queue, 而是發給 exchanges,而且在這個過程當中,生產者並不知道把消息發到了哪一個 queue。取而代之的是,生產者將消息發給了 exchanges(exchanges 負責接收生產者發來的消息,並將消息傳送到隊列當中),而exchanges 要根據某種規則來判斷怎麼處理接收過來的消息(是把消息發給一個隊列,仍是發給多個隊列, 或者其餘)。而這些規則被聲明成了 exchange type;code

exchanges type

  1. direct
  2. topic
  3. headers
  4. fanout

示例程序

下面代碼 使用了 fanout(負責把消息以相似廣播的形式,發送到多個隊列) 類型的 exchangs,代碼較以前的代碼作了微小改動。blog

/**
 * 生產者
 * @author Administrator
 *
 */
   public class Producer {

	private static final String EXCHANGE_NAME = "logs";
	
	public static void main(String[] args) {
		ConnectionFactory factory = new ConnectionFactory();
	    factory.setHost("localhost");
	    try (Connection connection = factory.newConnection();
	        Channel channel = connection.createChannel()) {
	        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

	        String message = "info: Hello World!";
	        channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
	        System.out.println(" [x] Sent '" + message + "'");
	    }catch(Exception e){
	    	e.printStackTrace();
	    }
	}
	
}
**
 * 消費者
 * @author Administrator
 *
 */
public class Consumer {

	private static final String EXCHANGE_NAME = "logs";

	  public static void main(String[] argv) throws Exception {
	    ConnectionFactory factory = new ConnectionFactory();
	    factory.setHost("localhost");
	    Connection connection = factory.newConnection();
	    Channel channel = connection.createChannel();

	    channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
	    String queueName = channel.queueDeclare().getQueue();
	    channel.queueBind(queueName, EXCHANGE_NAME, "");

	    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

	    DeliverCallback deliverCallback = (consumerTag, delivery) -> {
	        String message = new String(delivery.getBody(), "UTF-8");
	        System.out.println(" [x] Received '" + message + "'");
	    };
	    channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
	  }
	
}
相關文章
相關標籤/搜索