rabbitmq 使用

序言java

上篇寫了rabbitmq的簡單單機版安裝,這篇寫下他與springboot使用,簡單寫下demo傳到碼雲了,須要能夠去下。git

  1. 項目結構web

  2. pom.xml 註釋掉amqp-client打開spring-boot-starter-amqp能夠支持springboot的註解模式下邊會寫到spring

    <?xml version="1.0" encoding="UTF-8"?>
    	<project xmlns="http://maven.apache.org/POM/4.0.0"
    	  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    	  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    	  <modelVersion>4.0.0</modelVersion>
    	  <parent>
    		<groupId>org.springframework.boot</groupId>
    		<artifactId>spring-boot-starter-parent</artifactId>
    		<version>2.1.5.RELEASE</version>
    		<relativePath/> <!-- lookup parent from repository -->
    	  </parent>
    	  <groupId>top.heliming.rabbitmq</groupId>
    	  <artifactId>rabbitmq-demo</artifactId>
    	  <version>0.0.1-SNAPSHOT</version>
    	  <name>rabbitmq-demo</name>
    	  <description>Demo project for Spring Boot</description>
    
    	  <properties>
    		<java.version>1.8</java.version>
    	  </properties>
    
    	  <dependencies>
    		<dependency>
    		  <groupId>org.apache.commons</groupId>
    		  <artifactId>commons-lang3</artifactId>
    		  <version>3.3.2</version>
    		</dependency>
    		<!--amqp依賴-->
    		<!--<dependency>-->
    		  <!--<groupId>org.springframework.boot</groupId>-->
    		  <!--<artifactId>spring-boot-starter-amqp</artifactId>-->
    		<!--</dependency>-->
    		<dependency>
    		  <groupId>com.rabbitmq</groupId>
    		  <artifactId>amqp-client</artifactId>
    		  <version>3.2.4</version>
    		</dependency>
    		<dependency>
    		  <groupId>org.springframework.boot</groupId>
    		  <artifactId>spring-boot-starter-web</artifactId>
    		</dependency>
    		<!--lombok-->
    		<dependency>
    		  <groupId>org.projectlombok</groupId>
    		  <artifactId>lombok</artifactId>
    		  <version>1.16.20</version>
    		  <scope>provided</scope>
    		</dependency>
    		<dependency>
    		  <groupId>org.springframework.boot</groupId>
    		  <artifactId>spring-boot-starter-test</artifactId>
    		  <scope>test</scope>
    		</dependency>
    	  </dependencies>
    
    	  <build>
    		<finalName>rabbitmqsend</finalName>
    		<plugins>
    		  <plugin>
    			<groupId>org.springframework.boot</groupId>
    			<artifactId>spring-boot-maven-plugin</artifactId>
    		  </plugin>
    		</plugins>
    	  </build>
    
    	</project>
  3. 不用springboot監聽註解的例子topic包下apache

    發送者:springboot

    package top.heliming.rabbitmq.rabbitmqdemo.topic;
    	import com.rabbitmq.client.Channel;
    	import com.rabbitmq.client.Connection;
    	import com.rabbitmq.client.MessageProperties;
    	import top.heliming.rabbitmq.rabbitmqdemo.util.ConnectionUtil;
    
    	/**
    	 * description: //訂閱模式:生產者指定路由key SEND RECV2
    	 *
    	 * @author: heliming
    	 * @date:2019/06/07 上午 10:24
    	 */
    	public class Send {
    
    	  private final static String EXCHANGE_NAME = "topic_exchange_test";
    
    	  public static void main(String[] argv) throws Exception {
    		send();
    	  }
    
    	  public static void send() throws Exception{
    		// 獲取到鏈接
    		Connection connection = ConnectionUtil.getConnection();
    		// 獲取通道
    		Channel channel = connection.createChannel();
    		// 聲明exchange,指定類型爲topic
    		channel.exchangeDeclare(EXCHANGE_NAME, "topic");
    
    		// 消息內容
    		String message = "新增商品 : id = 1001";
    		// 發送消息,而且指定routing key 爲:insert ,表明新增商品
    		channel.basicPublish(EXCHANGE_NAME, "item.insert", null, message.getBytes());
    		System.out.println(" [商品服務:] Sent '" + message + "'");
    		String messageupdate = "修改商品 : id = 1001";
    		// 發送消息,而且指定routing key 爲:insert ,表明新增商品
    		channel.basicPublish(EXCHANGE_NAME, "item.update", null, messageupdate.getBytes());
    		System.out.println(" [商品服務:] Sent '" + messageupdate + "'");
    		String messagedelete = "刪除商品 : id = 1001";
    		// 發送消息,而且指定routing key 爲:insert ,表明新增商品
    		channel.basicPublish(EXCHANGE_NAME, "item.delete", null, messagedelete.getBytes());
    		System.out.println(" [商品服務:] Sent '" + messagedelete + "'");
    		channel.close();
    		connection.close();
    	  }
    	}

    接受者:maven

    package top.heliming.rabbitmq.rabbitmqdemo.topic;
    	import com.rabbitmq.client.AMQP.BasicProperties;
    	import com.rabbitmq.client.Channel;
    	import com.rabbitmq.client.Connection;
    	import com.rabbitmq.client.DefaultConsumer;
    	import com.rabbitmq.client.Envelope;
    	import java.io.IOException;
    	import top.heliming.rabbitmq.rabbitmqdemo.util.ConnectionUtil;
    
    	/**
    	 * description: //指定路由key
    	 *
    	 * @author: heliming
    	 * @date:2019/06/07 上午 10:29
    	 */
    	public class Recv2 {
    	  private final static String QUEUE_NAME = "topic_exchange_queue_2";
    	  private final static String EXCHANGE_NAME = "topic_exchange_test";
    
    	  public static void main(String[] argv) throws Exception {
    		// 獲取到鏈接
    		Connection connection = ConnectionUtil.getConnection();
    		// 獲取通道
    		Channel channel = connection.createChannel();
    		// 聲明隊列
    		channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    		// 綁定隊列到交換機,同時指定須要訂閱的routing key。訂閱 insert、update、delete
    		channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.*");
    
    		// 定義隊列的消費者
    		DefaultConsumer consumer = new DefaultConsumer(channel) {
    		  // 獲取消息,而且處理,這個方法相似事件監聽,若是有消息的時候,會被自動調用
    		  @Override
    		  public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
    			  byte[] body) throws IOException {
    			// body 即消息體
    			String msg = new String(body);
    			System.out.println(" [消費者2] received : " + msg + "!");
    		  }
    		};
    		// 監聽隊列,自動ACK
    		channel.basicConsume(QUEUE_NAME, true, consumer);
    	  }
    	}

    通用類util包 ConnectionUtil類ide

    package top.heliming.rabbitmq.rabbitmqdemo.util;
    
    	import com.rabbitmq.client.Connection;
    	import com.rabbitmq.client.ConnectionFactory;
    
    	/**
    	 * description: //TODO
    	 *
    	 * @author: heliming
    	 * @date:2019/06/06 下午 10:18
    	 */
    	public class ConnectionUtil {
    	  /**
    	   * 創建與RabbitMQ的鏈接
    	   * @return
    	   * @throws Exception
    	   */
    	  public static Connection getConnection() throws Exception {
    		//定義鏈接工廠
    		ConnectionFactory factory = new ConnectionFactory();
    		//設置服務地址
    		factory.setHost("10.0.1.224");
    		//端口
    		factory.setPort(5672);
    		//設置帳號信息,用戶名、密碼、vhost
    		factory.setVirtualHost("/leyou");
    		factory.setUsername("leyou");
    		factory.setPassword("leyou");
    		// 經過工程獲取鏈接
    		Connection connection = factory.newConnection();
    		return connection;
    	  }
    	}
  4. 啓動發送者main函數 再啓動接受者main函數這時建立了一個隊列通道,而後再執行發送者函數 就能獲取數據函數

    我建立的用戶spring-boot

  5. spring註解方式 spring包下

    發送者

    package top.heliming.rabbitmq.rabbitmqdemo.topic;
    	import com.rabbitmq.client.Channel;
    	import com.rabbitmq.client.Connection;
    	import com.rabbitmq.client.MessageProperties;
    	import top.heliming.rabbitmq.rabbitmqdemo.util.ConnectionUtil;
    
    	/**
    	 * description: //訂閱模式:生產者指定路由key
    	 *
    	 * @author: heliming
    	 * @date:2019/06/07 上午 10:24
    	 */
    	public class Send3 {
    
    	  private final static String EXCHANGE_NAME = "spring.test.exchange";
    
    	  public static void main(String[] argv) throws Exception {
    		send3();
    	  }
    
    	  public static void send3() throws Exception{
    		// 獲取到鏈接
    		Connection connection = ConnectionUtil.getConnection();
    		// 獲取通道
    		Channel channel = connection.createChannel();
    		// 聲明exchange,指定類型爲topic
    		// channel.exchangeDeclare(EXCHANGE_NAME, "topic");
    		//交換機持久化
    		channel.exchangeDeclare(EXCHANGE_NAME, "topic",true);
    		// 消息內容
    		String message = "新增商品 : id = 1001";
    		// 發送消息,而且指定routing key 爲:insert ,表明新增商品
    		//消息持久化
    		channel.basicPublish(EXCHANGE_NAME, "item.insert", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
    		System.out.println(" [商品服務:] Sent '" + message + "'");
    		String messageupdate = "修改商品 : id = 1001";
    		// 發送消息,而且指定routing key 爲:insert ,表明新增商品
    		channel.basicPublish(EXCHANGE_NAME, "item.update", MessageProperties.PERSISTENT_TEXT_PLAIN, messageupdate.getBytes());
    		//消息持久化
    		System.out.println(" [商品服務:] Sent '" + messageupdate + "'");
    		String messagedelete = "刪除商品 : id = 1001";
    		// 發送消息,而且指定routing key 爲:insert ,表明新增商品
    		//消息持久化
    		channel.basicPublish(EXCHANGE_NAME, "item.delete", MessageProperties.PERSISTENT_TEXT_PLAIN, messagedelete.getBytes());
    		System.out.println(" [商品服務:] Sent '" + messagedelete + "'");
    		channel.close();
    		connection.close();
    	  }
    	}

    接受者

    package top.heliming.rabbitmq.rabbitmqdemo.spring;
    
    	import org.springframework.amqp.core.ExchangeTypes;
    	import org.springframework.amqp.rabbit.annotation.Exchange;
    	import org.springframework.amqp.rabbit.annotation.Queue;
    	import org.springframework.amqp.rabbit.annotation.QueueBinding;
    	import org.springframework.amqp.rabbit.annotation.RabbitListener;
    	import org.springframework.stereotype.Component;
    
    	/**
    	 * description: //TODO
    	 *
    	 * @author: heliming
    	 * @date:2019/06/07 下午 12:26
    	 */
    	@Component
    	public class Listener {
    
    	  @RabbitListener(bindings = @QueueBinding(
    		  value = @Queue(value = "spring.test.queue", durable = "true"),
    		  exchange = @Exchange(
    			  value = "spring.test.exchange",
    			  ignoreDeclarationExceptions = "true",
    			  type = ExchangeTypes.TOPIC
    		  ),
    		  key = {"#.#"}))
    	  public void listen(String msg){
    		System.out.println("接收到消息:" + msg);
    	  }
    	}

    訪問:啓動項目,執行發送者函數能夠看到發送數據成功,若是不行,那就先執行發送者函數建立完隊列,而後啓動項目,而後執行發送者函數發送數據,最後就接受到數據了。

總結:

若是不建立rabbitmq用戶直接用guest會不讓其餘機器訪問的,具體能夠修改角色限制權限或者新添加用戶
用spring監聽註解方式pom文件須要修改上邊說過了。若是rabbitmq安裝erlang出現問題大多數是root權限問題。
erlang和rabbitmq版本不一致,上篇中有查看版本兼容的連接。

碼雲地址連接:https://gitee.com/hexiaoming123/rabbitmqshiyong/

相關文章
相關標籤/搜索