10.RabbitMQ Fanout類型交換機

Fanout類型交換機忽略Routing Key,它將消息傳遞到全部與它綁定的隊列上。html

 
10.RabbitMQ <wbr>Fanout類型交換機
10.RabbitMQ <wbr>Fanout類型交換機
 
10.RabbitMQ <wbr>Fanout類型交換機
10.RabbitMQ <wbr>Fanout類型交換機
10.RabbitMQ <wbr>Fanout類型交換機
 
Producer.java
package com.test.fanout;
 
import com.rabbitmq.client.*;
 
import java.io.IOException;
import java.lang.String;
import java.lang.System;
import java.util.HashMap;
import java.util.Map;
import java.util.Scanner;
 
public class Producer {
 
    public static void main(String[] args) throws Exception {
      //使用默認端口鏈接MQ
        ConnectionFactory factory = new ConnectionFactory();
    factory.setUsername("admin");
    factory.setPassword("admin");
        factory.setHost("192.168.169.142"); //使用默認端口5672
        Connection conn = factory.newConnection(); //聲明一個鏈接
        Channel channel = conn.createChannel(); //聲明消息通道
   
String message = "hello world!";
String queueName1 = "queue_fanout1";
String queueName2 = "queue_fanout2";
String queueName3 = "queue_fanout3";
String exchangeName = "test.fanout";
//Routing Key
channel.queueDeclare(queueName1, false, false, false, null);
channel.queueDeclare(queueName2, false, false, false, null);
channel.queueDeclare(queueName3, false, false, false, null);
channel.exchangeDeclare(exchangeName, "fanout", false, false, null);
 
channel.queueBind(queueName1, exchangeName, "");
channel.queueBind(queueName2, exchangeName, "");
channel.queueBind(queueName3, exchangeName, "");
 
channel.basicPublish(exchangeName, "",
MessageProperties.TEXT_PLAIN, message.getBytes());
 
System.out.println("Message \"" + message + "\" sent successfully.");
 
channel.close();
conn.close();
    }
 
}
 
Customer.java
package com.test.fanout;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.ShutdownSignalException;
 
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
 
//經過channel.basicAck向服務器發送回執,刪除服務上的消息
public class Consumer implements com.rabbitmq.client.Consumer{
private Channel channel;
 
    public static void main(String[] args) throws Exception {
      //使用默認端口鏈接MQ
        ConnectionFactory factory = new ConnectionFactory();
    factory.setUsername("admin");
    factory.setPassword("admin");
        factory.setHost("192.168.169.142"); //使用默認端口5672
        Connection conn = factory.newConnection(); //聲明一個鏈接
        Channel channel = conn.createChannel(); //聲明消息通道
 
String queueName = args[0];//"queue_fanout1";
 
channel.queueDeclare(queueName, false, false, false, null);
 
Consumer consumer = new Consumer();
consumer.channel = channel;
 
channel.basicConsume(queueName, false, consumer);
    }
 
@Override
public void handleConsumeOk(String consumerTag) {
// TODO Auto-generated method stub
System.out.println("Consumer \"" + consumerTag + "\" has subscribed.");
}
 
@Override
public void handleCancelOk(String consumerTag) {
// TODO Auto-generated method stub
}
 
@Override
public void handleCancel(String consumerTag) throws IOException {
// TODO Auto-generated method stub
}
 
@Override
public void handleDelivery(String consumerTag, Envelope env,
BasicProperties props, byte[] body) throws IOException {
// TODO Auto-generated method stub
System.out.println("Message \"" + new String(body) + "\" received.");
channel.basicAck(env.getDeliveryTag(), false);
}
 
@Override
public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {
// TODO Auto-generated method stub
}
 
@Override
public void handleRecoverOk(String consumerTag) {
// TODO Auto-generated method stub
}
}
相關文章
相關標籤/搜索