springboot+cloud 學習(三)消息中間件 RibbitMQ+Stream

安裝RabbitMQ

window下安裝: 

(1):下載erlang,緣由在於RabbitMQ服務端代碼是使用併發式語言erlang編寫的,下載地址:http://www.erlang.org/downloads,雙擊.exe文件進行安裝就好,安裝完成以後建立一個名爲ERLANG_HOME的環境變量,其值指向erlang的安裝目錄,同時將%ERLANG_HOME%\bin加入到Path中,最後打開命令行,輸入erl,若是出現erlang的版本信息就表示erlang語言環境安裝成功;spring

 (2):下載RabbitMQ,下載地址:http://www.rabbitmq.com/,一樣雙擊.exe進行安裝就好。而後下載RabbitMQ 管理插件,能夠更好的可視化方式查看Rabbit MQ 服務器實例的狀態。
1.使用管理員打開命令窗口,進入安裝目錄sbin:
輸入命令:rabbitmq-plugins.bat enable rabbitmq_managementjson

2.安裝成功後,重啓服務器瀏覽器

輸入命令:net stop RabbitMQ && net start RabbitMQ服務器

3.用戶及權限管理併發

使用rabbitmqctl控制檯命令來建立用戶,密碼,綁定權限等。app

查看已有用戶及用戶的角色:rabbitmqctl.bat list_users  默認會存在一個來賓帳號 guest框架

新增一個用戶:rabbitmqctl.bat add_user username passwordide

 

 

 

新增成功後,能夠看見新增的角色爲[],guest的角色是administor。微服務

rabbitmq用戶角色可分爲五類:超級管理員, 監控者, 策略制定者, 普通管理者以及其餘ui

(1) 超級管理員(administrator)
可登錄管理控制檯(啓用management plugin的狀況下),可查看全部的信息,而且能夠對用戶,策略(policy)進行操做。
(2) 監控者(monitoring)
可登錄管理控制檯(啓用management plugin的狀況下),同時能夠查看rabbitmq節點的相關信息(進程數,內存使用狀況,磁盤使用狀況等) 
(3) 策略制定者(policymaker)
可登錄管理控制檯(啓用management plugin的狀況下), 同時能夠對policy進行管理。
(4) 普通管理者(management)
僅可登錄管理控制檯(啓用management plugin的狀況下),沒法看到節點信息,也沒法對策略進行管理。
(5) 其餘的
沒法登錄管理控制檯,一般就是普通的生產者和消費者

下面給新增的用戶來增長administrator角色

rabbitmqctl.bat set_user_tags username administrator

4.消息隊列的管理

使用瀏覽器打開 http://localhost:15672 訪問Rabbit Mq的管理控制檯,使用剛纔建立的帳號登錄系統:

 

RibbitMQ的具體運用

結構原理:

spring cloud stream

Spring Cloud Stream 是一個構建消息驅動微服務的框架.

 

 

應用程序經過 inputs 或者 outputs 來與 Spring Cloud Stream 中binder 交互,經過咱們配置來 binding ,而 Spring Cloud Stream 的 binder 負責與中間件交互。

Binder 是 Spring Cloud Stream 的一個抽象概念,是應用與消息中間件之間的粘合劑。

經過 binder ,能夠很方便的鏈接中間件,能夠動態的改變消息的destinations(對應於 Kafka 的topic,Rabbit MQ 的 exchanges)

,這些均可以經過外部配置項來作到。

 新建一個stream項目,主要有3部分,消息產生者類(provider),消息消費者類(receive),stream input/output通道定義類(source)

 因爲是微服務框架,這裏我把stream的有關定義都放到了這個項目集中定義,其餘用到stream的項目直接引入這個項目的jar包就可使用其中的類:

 

消息提供者配置:

public interface MessageProviderSource {

    // exchange名稱
    public static final String EXCHANGE_OUT = "exporttv_exchange_out";
    
    // 綁定exchange
    @Output(MessageProviderSource.EXCHANGE_OUT)
    public MessageChannel messageOutput();
 
    
}
@EnableBinding(MessageProviderSource.class)
public class MessageProvider {

    @Autowired
    private MessageProviderSource messageSource;

    public void sendApplicationLoadMessage(HashMap<String, Integer> map) {
    // 建立併發送消息
    messageSource.messageOutput().send(message(map));
    }

    private static final <T> Message<T> message(T val) {
    return MessageBuilder.withPayload(val).build();
    }
}

消息消費者配置:

public interface MessageReceiveSource {

    // exchange名稱
    public static final String EXCHANGE_IN = "exporttv_exchange_in";
    // 綁定通道
    @Input(MessageReceiveSource.EXCHANGE_IN)
    public SubscribableChannel  messageIutput();
}
@EnableBinding(MessageReceiveSource.class) 
public class MessageReceive {
    
    @StreamListener(MessageReceiveSource.EXCHANGE_IN)
    public void ApplicationLoadMessage(Message<HashMap<String,Integer>> message) {
    
    }
}

而後其餘項目引入這個項目後,還要在yml中配置一下綁定:

消息提供者yml

spring:
  cloud:
    stream:
      bindings: # 服務的整合處理 
        exporttv_exchange_out: 
          destination: exporttv_exchange # 綁定exchange
          content-type: application/json # 設置消息類型
          binder: exporttv-rabbitmq      # 消息中間件
      binders:
        exporttv-rabbitmq:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
                virtual-host: /

消息消費者yml:

spring:
  cloud:
    stream:
      bindings: # 服務的整合處理 
        exporttv_exchange_in: 
          destination: exporttv_exchange # 綁定exchange
          content-type: application/json # 設置消息類型
          group: exporttv-group          # 進行操做的分組
          binder: exporttv-rabbitmq      # 消息中間件
      binders:
        exporttv-rabbitmq:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
                virtual-host: /

下面說說提供者和消費者怎麼引用以前定義的類

消息提供者項目:

 

@Service
public class SendApplicationMessage {

    @Autowired
    private MessageProvider messageProvider;

    public void SendApplicationLoadMessage() {

    try {

        // 業務功能省略
 messageProvider.sendApplicationLoadMessage();

    } catch (Exception e) {
        // 打印錯誤日誌
        LogUtil.printLog(e, Exception.class);
        // 拋出錯誤
        throw new MyRuntimeException(ResultEnum.DBException);
    }
    }

}

消息消費者子項目:

 

@Component
public class ReceiveApplicationMessage extends MessageReceive{
    
    @Autowired
    private ApplicationService applicationService;
    
    @Override
    public void ApplicationLoadMessage(Message<HashMap<String,Integer>> message) {
    
    Integer toalYear = message.getPayload().get("year");
    Integer toalMonth = message.getPayload().get("month");
    Integer toalWeek = message.getPayload().get("week");
    Integer toanId = message.getPayload().get("toanId");
    
    applicationService.updateApplicationLoad(toalYear, toalMonth, toalWeek, toanId);
    }
    
}

我的以爲這樣作不夠好,後期再改進

相關文章
相關標籤/搜索