生產者ide
package com.example.demo.produce; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class MyProcuder { public static void main(String[] args) throws Exception { //1 建立一個ConnectionFactory,並進行配置 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("39.106.128.***"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setUsername("admin"); connectionFactory.setPassword("admin"); //2 經過鏈接工廠建立鏈接 Connection connection = connectionFactory.newConnection(); //3 經過connection建立一個channel Channel channel = connection.createChannel(); //4 經過channel發送數據 String exchange = "exchange001"; String routingKey = "routingkey001"; String body = "this is a message"; channel.basicPublish(exchange, routingKey, null, body.getBytes()); //5 關閉相關鏈接 channel.close(); connection.close(); } }
消費者this
package com.example.demo.constumer; import com.rabbitmq.client.*; public class Consumer { public static void main(String[] args) throws Exception{ //1 建立一個ConnectionFactory,並進行配置 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("39.106.128.***"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setUsername("admin"); connectionFactory.setPassword("admin"); //2 經過鏈接工廠建立鏈接 Connection connection = connectionFactory.newConnection(); //3 經過connection建立一個channel Channel channel = connection.createChannel(); //4 聲明(建立)一個隊列 String queue = "queue001"; String exchangeName = "exchange001"; String routingKey = "routingkey001"; //是否持久化 boolean durable = true; boolean exclusive = false; boolean autoDelete = false; channel.exchangeDeclare(exchangeName, "topic", true, false, null); channel.queueDeclare(queue, durable, exclusive, autoDelete, null); channel.queueBind(queue, exchangeName, routingKey); //5 建立消費者 // Step06: let the consumer consume the queue channel.basicConsume(queue, true, new MyConsumer(channel)); } }
自定義消費者spa
package com.example.demo.constumer; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; public class MyConsumer extends DefaultConsumer { public MyConsumer(Channel channel) { super(channel); } @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) { String message = null; try { message = new String(body, "UTF-8"); } catch (Exception e) { e.printStackTrace(); } System.out.println("消費者接收到的消息:" + message); } }