package com.test.fanout;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.lang.String;
import java.lang.System;
import java.util.HashMap;
import java.util.Map;
import java.util.Scanner;
public class Producer {
public static void main(String[] args) throws Exception {
//使用默認端口鏈接MQ
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("admin");
factory.setPassword("admin");
factory.setHost("192.168.169.142"); //使用默認端口5672
Connection conn = factory.newConnection(); //聲明一個鏈接
Channel channel = conn.createChannel(); //聲明消息通道
String message = "hello world!";
String queueName1 = "queue_fanout1";
String queueName2 = "queue_fanout2";
String queueName3 = "queue_fanout3";
String exchangeName = "test.fanout";
//Routing Key
channel.queueDeclare(queueName1, false, false, false, null);
channel.queueDeclare(queueName2, false, false, false, null);
channel.queueDeclare(queueName3, false, false, false, null);
channel.exchangeDeclare(exchangeName, "fanout", false, false, null);
channel.queueBind(queueName1, exchangeName, "");
channel.queueBind(queueName2, exchangeName, "");
channel.queueBind(queueName3, exchangeName, "");
channel.basicPublish(exchangeName, "",
MessageProperties.TEXT_PLAIN, message.getBytes());
System.out.println("Message \"" + message + "\" sent successfully.");
channel.close();
conn.close();
}
}
package com.test.fanout;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.ShutdownSignalException;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
//經過channel.basicAck向服務器發送回執,刪除服務上的消息
public class Consumer implements com.rabbitmq.client.Consumer{
private Channel channel;
public static void main(String[] args) throws Exception {
//使用默認端口鏈接MQ
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("admin");
factory.setPassword("admin");
factory.setHost("192.168.169.142"); //使用默認端口5672
Connection conn = factory.newConnection(); //聲明一個鏈接
Channel channel = conn.createChannel(); //聲明消息通道
String queueName = args[0];//"queue_fanout1";
channel.queueDeclare(queueName, false, false, false, null);
Consumer consumer = new Consumer();
consumer.channel = channel;
channel.basicConsume(queueName, false, consumer);
}
@Override
public void handleConsumeOk(String consumerTag) {
// TODO Auto-generated method stub
System.out.println("Consumer \"" + consumerTag + "\" has subscribed.");
}
@Override
public void handleCancelOk(String consumerTag) {
// TODO Auto-generated method stub
}
@Override
public void handleCancel(String consumerTag) throws IOException {
// TODO Auto-generated method stub
}
@Override
public void handleDelivery(String consumerTag, Envelope env,
BasicProperties props, byte[] body) throws IOException {
// TODO Auto-generated method stub
System.out.println("Message \"" + new String(body) + "\" received.");
channel.basicAck(env.getDeliveryTag(), false);
}
@Override
public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {
// TODO Auto-generated method stub
}
@Override
public void handleRecoverOk(String consumerTag) {
// TODO Auto-generated method stub
}
}