(1):下載erlang,緣由在於RabbitMQ服務端代碼是使用併發式語言erlang編寫的,下載地址:http://www.erlang.org/downloads,雙擊.exe文件進行安裝就好,安裝完成以後建立一個名爲ERLANG_HOME的環境變量,其值指向erlang的安裝目錄,同時將%ERLANG_HOME%\bin加入到Path中,最後打開命令行,輸入erl,若是出現erlang的版本信息就表示erlang語言環境安裝成功;spring
(2):下載RabbitMQ,下載地址:http://www.rabbitmq.com/,一樣雙擊.exe進行安裝就好。而後下載RabbitMQ 管理插件,能夠更好的可視化方式查看Rabbit MQ 服務器實例的狀態。
1.使用管理員打開命令窗口,進入安裝目錄sbin:
輸入命令:rabbitmq-plugins.bat enable rabbitmq_managementjson
2.安裝成功後,重啓服務器瀏覽器
輸入命令:net stop RabbitMQ && net start RabbitMQ服務器
3.用戶及權限管理併發
使用rabbitmqctl控制檯命令來建立用戶,密碼,綁定權限等。app
查看已有用戶及用戶的角色:rabbitmqctl.bat list_users 默認會存在一個來賓帳號 guest框架
新增一個用戶:rabbitmqctl.bat add_user username passwordide
新增成功後,能夠看見新增的角色爲[],guest的角色是administor。微服務
rabbitmq用戶角色可分爲五類:超級管理員, 監控者, 策略制定者, 普通管理者以及其餘ui
(1) 超級管理員(administrator)
可登錄管理控制檯(啓用management plugin的狀況下),可查看全部的信息,而且能夠對用戶,策略(policy)進行操做。
(2) 監控者(monitoring)
可登錄管理控制檯(啓用management plugin的狀況下),同時能夠查看rabbitmq節點的相關信息(進程數,內存使用狀況,磁盤使用狀況等)
(3) 策略制定者(policymaker)
可登錄管理控制檯(啓用management plugin的狀況下), 同時能夠對policy進行管理。
(4) 普通管理者(management)
僅可登錄管理控制檯(啓用management plugin的狀況下),沒法看到節點信息,也沒法對策略進行管理。
(5) 其餘的
沒法登錄管理控制檯,一般就是普通的生產者和消費者
下面給新增的用戶來增長administrator角色
rabbitmqctl.bat set_user_tags username administrator
4.消息隊列的管理
使用瀏覽器打開 http://localhost:15672 訪問Rabbit Mq的管理控制檯,使用剛纔建立的帳號登錄系統:
結構原理:
Spring Cloud Stream 是一個構建消息驅動微服務的框架.
應用程序經過 inputs 或者 outputs 來與 Spring Cloud Stream 中binder 交互,經過咱們配置來 binding ,而 Spring Cloud Stream 的 binder 負責與中間件交互。
Binder 是 Spring Cloud Stream 的一個抽象概念,是應用與消息中間件之間的粘合劑。
經過 binder ,能夠很方便的鏈接中間件,能夠動態的改變消息的destinations(對應於 Kafka 的topic,Rabbit MQ 的 exchanges)
,這些均可以經過外部配置項來作到。
新建一個stream項目,主要有3部分,消息產生者類(provider),消息消費者類(receive),stream input/output通道定義類(source)
因爲是微服務框架,這裏我把stream的有關定義都放到了這個項目集中定義,其餘用到stream的項目直接引入這個項目的jar包就可使用其中的類:
消息提供者配置:
public interface MessageProviderSource { // exchange名稱 public static final String EXCHANGE_OUT = "exporttv_exchange_out"; // 綁定exchange @Output(MessageProviderSource.EXCHANGE_OUT) public MessageChannel messageOutput(); }
@EnableBinding(MessageProviderSource.class) public class MessageProvider { @Autowired private MessageProviderSource messageSource; public void sendApplicationLoadMessage(HashMap<String, Integer> map) { // 建立併發送消息 messageSource.messageOutput().send(message(map)); } private static final <T> Message<T> message(T val) { return MessageBuilder.withPayload(val).build(); } }
消息消費者配置:
public interface MessageReceiveSource { // exchange名稱 public static final String EXCHANGE_IN = "exporttv_exchange_in"; // 綁定通道 @Input(MessageReceiveSource.EXCHANGE_IN) public SubscribableChannel messageIutput(); }
@EnableBinding(MessageReceiveSource.class) public class MessageReceive { @StreamListener(MessageReceiveSource.EXCHANGE_IN) public void ApplicationLoadMessage(Message<HashMap<String,Integer>> message) { } }
而後其餘項目引入這個項目後,還要在yml中配置一下綁定:
消息提供者yml
spring: cloud: stream: bindings: # 服務的整合處理 exporttv_exchange_out: destination: exporttv_exchange # 綁定exchange content-type: application/json # 設置消息類型 binder: exporttv-rabbitmq # 消息中間件 binders: exporttv-rabbitmq: type: rabbit environment: spring: rabbitmq: host: localhost port: 5672 username: guest password: guest virtual-host: /
消息消費者yml:
spring: cloud: stream: bindings: # 服務的整合處理 exporttv_exchange_in: destination: exporttv_exchange # 綁定exchange content-type: application/json # 設置消息類型 group: exporttv-group # 進行操做的分組 binder: exporttv-rabbitmq # 消息中間件 binders: exporttv-rabbitmq: type: rabbit environment: spring: rabbitmq: host: localhost port: 5672 username: guest password: guest virtual-host: /
下面說說提供者和消費者怎麼引用以前定義的類
消息提供者項目:
@Service public class SendApplicationMessage { @Autowired private MessageProvider messageProvider; public void SendApplicationLoadMessage() { try { // 業務功能省略 messageProvider.sendApplicationLoadMessage(); } catch (Exception e) { // 打印錯誤日誌 LogUtil.printLog(e, Exception.class); // 拋出錯誤 throw new MyRuntimeException(ResultEnum.DBException); } } }
消息消費者子項目:
@Component public class ReceiveApplicationMessage extends MessageReceive{ @Autowired private ApplicationService applicationService; @Override public void ApplicationLoadMessage(Message<HashMap<String,Integer>> message) { Integer toalYear = message.getPayload().get("year"); Integer toalMonth = message.getPayload().get("month"); Integer toalWeek = message.getPayload().get("week"); Integer toanId = message.getPayload().get("toanId"); applicationService.updateApplicationLoad(toalYear, toalMonth, toalWeek, toanId); } }
我的以爲這樣作不夠好,後期再改進