rabbit安裝及使用


3、RibbitMQ
一、解壓縮:tar -xvf otp_src_19.3.tar.gz java

二、重命名文件夾爲erlang:mv otp_src_19.3 ./erlang python

三、配置安裝 :cd erlang
./configure --prefix=/usr/erlang --without-javacredis

四、對源代碼進行編譯,運行以下命令:makespring

五、開始安裝: make installjson

六、運行如下命令編輯/etc/profile文件:vim /etc/profile
export PATH=$PATH:/usr/erlang/binvim

七、保存,而後運行如下命令使環境變量當即生效:source /etc/profileapp

八、驗證erlang是否安裝成功: erldom

開始安裝rabbitMQ
九、 解壓xxx.xz 後綴文件:xz -d xxxx.xzspring-boot

十、解壓xxx.tar後綴文件:tar -xf xxxx.tarfetch

十一、安裝mq須要插件
安裝python
yum install python -y

安裝simpleJson
yum install xmlto -y
yum install python-simplejson -y

十二、移動並命名:mv rabbit-server /usr/rabbitmq
vim /etc/profile
export PATH=$PATH:/usr/erlang/bin:$PATH:/usr/rabbitmq/sbin
source /etc/profile
1三、啓動服務5672服務 cd rabbitmq/sbin :rabbitmq-sever

1四、rabbitmqctl stop 中止服務

1五、 Possibly caused by authentication failure解決方案
在遠程機器上安裝上RabbitMQ後,而後會一直出現認證失敗的錯誤,即Possibly caused by authentication failure。此錯誤的致使緣由是帳號密碼錯誤。官網上給的例子都是在本地使用系統默認的guest用戶鏈接的,而沒有給出遠程鏈接的例子。對於這個guest用戶是剛剛安裝好rabbitmq-server時候,系統自動建立的一個名爲「/」的virtual host,同時也會建立一個用戶名和密碼都是guest的用戶,而且擁有「/ virtual host」的全部訪問權限。而且此guest只是容許從localhost訪問。因此在本機上運行官方示例是沒問題的,一旦切換到遠程機器訪問的話,單純的修改localhost字段是不行的。
這裏的解決辦法是新建一個對rabbitmq具備全部權限的用戶,而後用此用戶的信息進行遠程訪問。
解決辦法以下:
(1)添加新用戶:rabbitmqctl add_user root 123456

(2)賦administrator權限:rabbitmqctl set_user_tags root administrator

(3)設置ip: rabbitmqctl set_permissions -p / root ".*" ".*" ".*"

 

 1.pom.xml

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2.application.properties

#rabbitmq
spring.rabbitmq.host=192.168.146.130
spring.rabbitmq.port=5672
spring.rabbitmq.username=root
spring.rabbitmq.password=123456

spring.rabbitmq.virtual-host=/
#消費者數量
spring.rabbitmq.listener.simple.concurrency= 10
spring.rabbitmq.listener.simple.max-concurrency= 10
#消費者每次從隊列獲取的消息數量
spring.rabbitmq.listener.simple.prefetch= 1
#消費者自動啓動
spring.rabbitmq.listener.simple.auto-startup=true
#消費失敗,自動從新入隊
spring.rabbitmq.listener.simple.default-requeue-rejected= true
#啓用發送重試
spring.rabbitmq.template.retry.enabled=true
spring.rabbitmq.template.retry.initial-interval=1000
spring.rabbitmq.template.retry.max-attempts=3
spring.rabbitmq.template.retry.max-interval=10000
spring.rabbitmq.template.retry.multiplier=1.0

4.

package com.imooc.miaosha.rabbitmq;

import java.util.HashMap;
import java.util.Map;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.HeadersExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class MQConfig {

public static final String MIAOSHA_QUEUE = "miaosha.queue";
public static final String QUEUE = "queue";
public static final String TOPIC_QUEUE1 = "topic.queue1";
public static final String TOPIC_QUEUE2 = "topic.queue2";
public static final String HEADER_QUEUE = "header.queue";
public static final String TOPIC_EXCHANGE = "topicExchage";
public static final String FANOUT_EXCHANGE = "fanoutxchage";
public static final String HEADERS_EXCHANGE = "headersExchage";

/**
* Direct模式 交換機Exchange
* */
@Bean
public Queue queue() {
return new Queue(MIAOSHA_QUEUE, true);
}

/**
* Topic模式 交換機Exchange
* */
@Bean
public Queue topicQueue1() {
return new Queue(TOPIC_QUEUE1, true);
}
@Bean
public Queue topicQueue2() {
return new Queue(TOPIC_QUEUE2, true);
}
@Bean
public TopicExchange topicExchage(){
return new TopicExchange(TOPIC_EXCHANGE);
}
@Bean
public Binding topicBinding1() {
return BindingBuilder.bind(topicQueue1()).to(topicExchage()).with("topic.key1");
}
@Bean
public Binding topicBinding2() {
return BindingBuilder.bind(topicQueue2()).to(topicExchage()).with("topic.#");
}
/**
* Fanout模式 交換機Exchange
* */
@Bean
public FanoutExchange fanoutExchage(){
return new FanoutExchange(FANOUT_EXCHANGE);
}
@Bean
public Binding FanoutBinding1() {
return BindingBuilder.bind(topicQueue1()).to(fanoutExchage());
}
@Bean
public Binding FanoutBinding2() {
return BindingBuilder.bind(topicQueue2()).to(fanoutExchage());
}
/**
* Header模式 交換機Exchange
* */
@Bean
public HeadersExchange headersExchage(){
return new HeadersExchange(HEADERS_EXCHANGE);
}
@Bean
public Queue headerQueue1() {
return new Queue(HEADER_QUEUE, true);
}
@Bean
public Binding headerBinding() {
Map<String, Object> map = new HashMap<String, Object>();
map.put("header1", "value1");
map.put("header2", "value2");
return BindingBuilder.bind(headerQueue1()).to(headersExchage()).whereAll(map).match();
}


}

5入隊

package com.imooc.miaosha.rabbitmq;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import com.imooc.miaosha.redis.RedisService;

@Service
public class MQSender {

private static Logger log = LoggerFactory.getLogger(MQSender.class);

@Autowired
AmqpTemplate amqpTemplate ;

public void sendMiaoshaMessage(MiaoshaMessage mm) {
String msg = RedisService.beanToString(mm);
log.info("send message:"+msg);
amqpTemplate.convertAndSend(MQConfig.MIAOSHA_QUEUE, msg);
}

// public void send(Object message) {
// String msg = RedisService.beanToString(message);
// log.info("send message:"+msg);
// amqpTemplate.convertAndSend(MQConfig.QUEUE, msg);
// }
//
// public void sendTopic(Object message) {
// String msg = RedisService.beanToString(message);
// log.info("send topic message:"+msg);
// amqpTemplate.convertAndSend(MQConfig.TOPIC_EXCHANGE, "topic.key1", msg+"1");
// amqpTemplate.convertAndSend(MQConfig.TOPIC_EXCHANGE, "topic.key2", msg+"2");
// }
//
// public void sendFanout(Object message) {
// String msg = RedisService.beanToString(message);
// log.info("send fanout message:"+msg);
// amqpTemplate.convertAndSend(MQConfig.FANOUT_EXCHANGE, "", msg);
// }
//
// public void sendHeader(Object message) {
// String msg = RedisService.beanToString(message);
// log.info("send fanout message:"+msg);
// MessageProperties properties = new MessageProperties();
// properties.setHeader("header1", "value1");
// properties.setHeader("header2", "value2");
// Message obj = new Message(msg.getBytes(), properties);
// amqpTemplate.convertAndSend(MQConfig.HEADERS_EXCHANGE, "", obj);
// }



}

6.接受

package com.imooc.miaosha.rabbitmq;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import com.imooc.miaosha.domain.MiaoshaOrder;
import com.imooc.miaosha.domain.MiaoshaUser;
import com.imooc.miaosha.redis.RedisService;
import com.imooc.miaosha.service.GoodsService;
import com.imooc.miaosha.service.MiaoshaService;
import com.imooc.miaosha.service.OrderService;
import com.imooc.miaosha.vo.GoodsVo;

@Service
public class MQReceiver {

private static Logger log = LoggerFactory.getLogger(MQReceiver.class); @Autowired RedisService redisService; @Autowired GoodsService goodsService; @Autowired OrderService orderService; @Autowired MiaoshaService miaoshaService; @RabbitListener(queues=MQConfig.MIAOSHA_QUEUE) public void receive(String message) { log.info("receive message:"+message); MiaoshaMessage mm = RedisService.stringToBean(message, MiaoshaMessage.class); MiaoshaUser user = mm.getUser(); long goodsId = mm.getGoodsId(); GoodsVo goods = goodsService.getGoodsVoByGoodsId(goodsId); int stock = goods.getStockCount(); if(stock <= 0) { return; } //判斷是否已經秒殺到了 MiaoshaOrder order = orderService.getMiaoshaOrderByUserIdGoodsId(user.getId(), goodsId); if(order != null) { return; } //減庫存 下訂單 寫入秒殺訂單 miaoshaService.miaosha(user, goods); } // @RabbitListener(queues=MQConfig.QUEUE)// public void receive(String message) {// log.info("receive message:"+message);// }// // @RabbitListener(queues=MQConfig.TOPIC_QUEUE1)// public void receiveTopic1(String message) {// log.info(" topic queue1 message:"+message);// }// // @RabbitListener(queues=MQConfig.TOPIC_QUEUE2)// public void receiveTopic2(String message) {// log.info(" topic queue2 message:"+message);// }// // @RabbitListener(queues=MQConfig.HEADER_QUEUE)// public void receiveHeaderQueue(byte[] message) {// log.info(" header queue message:"+new String(message));// }// }

相關文章
相關標籤/搜索