1、RabbitMQ的HelloWorld

    消息隊列按使用者角色能夠分爲消息的生產者和消息的消費者,下面寫一個HelloWorld的例子,例子主要實現了消息的生產者publish一條消息到RabbitMQ,而後消息的消費者獲取這條消息。java

引用jar包

    Java使用RabbitMQ須要引用amqp-client.jar:apache

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<groupId>cn.net.bysoft</groupId>
	<artifactId>rabbitmqapp</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<packaging>war</packaging>

	<dependencies>
		<dependency>
			<groupId>com.rabbitmq</groupId>
			<artifactId>amqp-client</artifactId>
			<version>3.6.5</version>
		</dependency>
	</dependencies>
</project>

生產者代碼

    消息的生產者從代碼的角度來看,主要作了以下步驟:app

  • 鏈接到RabbitMQ
  • 獲取信道
  • 聲明交換器
  • 建立消息隊列
  • 發佈消息
  • 關閉信道
  • 關閉鏈接

    代碼以下:less

package cn.net.bysoft.rabbitmqapp.lesson1;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * 消息的生產者
 */
public class Producer {
	private final static String QUEUE_NAME = "hello-world";

	public static void main(String[] args) throws IOException, TimeoutException {
		// 1.建立鏈接
		ConnectionFactory factory = new ConnectionFactory();
		// 1.1 賬號
		factory.setUsername("guest");
		// 1.2 密碼
		factory.setPassword("guest");
		// 1.3 地址
		factory.setHost("127.0.0.1");
		// 1.4 虛擬主機,"/"是默認的
		factory.setVirtualHost("/");
		// 1.5 端口
		factory.setPort(5672);
		// 1.6 鏈接超時
		factory.setConnectionTimeout(10000);
		// 1.7 實例化鏈接
		Connection connection = factory.newConnection();

		// 2.建立信道
		Channel channel = connection.createChannel();

		// 3.聲明交換器(參數:交換器名稱、交換器類型)
		channel.exchangeDeclare("hello-exchange", "direct");

		// 4.聲明隊列(參數:隊列名稱、是否持久化、是不是私有的、是否自動刪除、參數對象)
		channel.queueDeclare(QUEUE_NAME, false, false, false, null);

		// 5.發佈消息(參數:交換器名稱、隊列名稱、屬性對象、發送內容)
		String msg = "Hello World";
		channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());

		System.out.println("Send " + msg);
		
		// 6.關閉鏈接
		channel.close();
		connection.close();

		
	}

}

    執行上面的程序,消息已經發送到了RabbitMQ,下面編寫消費者代碼。maven

消費者代碼

    仍是從程序的角度看,消費者主要操做了以下步驟:spa

  • 鏈接到RabbitMQ;
  • 得到信道;
  • 聲明交換器;
  • 聲明隊列;
  • 消費消息;
  • 關閉信道;
  • 關閉鏈接;

    代碼以下:.net

package cn.net.bysoft.rabbitmqapp.lesson1;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.ShutdownSignalException;

/**
 * 消息的消費者
 */
public class Consumer {
	private final static String QUEUE_NAME = "hello-world";

	public static void main(String[] args) throws IOException, TimeoutException, ShutdownSignalException,
			ConsumerCancelledException, InterruptedException {
		// 1.建立鏈接
		ConnectionFactory factory = new ConnectionFactory();
		// 1.1 賬號
		factory.setUsername("guest");
		// 1.2 密碼
		factory.setPassword("guest");
		// 1.3 地址
		factory.setHost("127.0.0.1");
		// 1.4 虛擬主機,"/"是默認的
		factory.setVirtualHost("/");
		// 1.5 端口
		factory.setPort(5672);
		// 1.6 鏈接超時
		factory.setConnectionTimeout(10000);
		// 1.7 實例化鏈接
		Connection connection = factory.newConnection();

		// 2.建立信道
		Channel channel = connection.createChannel();

		// 3.聲明交換器(參數:交換器名稱、交換器類型)
		channel.exchangeDeclare("hello-exchange", "direct");

		// 4.聲明隊列(參數:隊列名稱、是否持久化、是不是私有的、是否自動刪除、參數對象)
		QueueingConsumer qc = new QueueingConsumer(channel);
		channel.basicConsume(QUEUE_NAME, true, qc);
		
		// 5.消費消息
		System.out.println("begin reading...");

		while (true) {
			QueueingConsumer.Delivery delivery = qc.nextDelivery();
			if (delivery != null) {
				String message = new String(delivery.getBody());
				System.out.println(message);
			}
		}

	}
}

連接方式還能夠:code

ConnectionFactory factory = new ConnectionFactory();
  factory.setUri("amqp://userName:password@hostName:portNumber/virtualHost"); // 相似於http協議請求同樣
  Connection conn = factory.newConnection();
相關文章
相關標籤/搜索