spring cloud stream 3.1.2 源碼搭配rocketmq學習 (二)

如今咱們從源碼來分析(一)中所涉及的東西spring


提問

問一下本身想從源碼中知道什麼, 帶着目的去看源碼才容易搞懂.springboot

從下述的代碼中發現定義了一個Function的Bean和在yaml中定義了definition, 那麼這兩個定義的做用是什麼呢? Function是怎麼樣去綁定、註冊的呢?app

帶着問題咱們就能夠去找對應的實現.ide

@Bean
public Function<Flux<Message>, Mono> consumerEvent() {
    return flux -> flux.map(message -> {
        System.out.println(message.getPayload());
        return message;
    }).then();
}
spring:
 cloud:
   stream:
     function:
       definition: consumerEvent

怎麼找springboot項目的啓動

首先咱們看META-INF/spring.factories函數

org.springframework.boot.autoconfigure.EnableAutoConfigura:\
(...省略一部分)
org.springframework.cloud.stream.function.FunctionConfiguration

發現自動裝載了一個FunctionConfiguration的類ui

進到這個類裏面看, 發現他註冊了一個functionBindingRegistrar的Bean.this

看英文---(functionBindingRegistrar) 方法綁定註冊, 這好像是咱們想知道的東西.flux

那麼接着往下看ci


functionBindingRegistrar

看傳入的參數發現這個Bean是根據StreamFunctionProperties註冊的一個的Bean.get

// FunctionConfiguration#functionBindingRegistrar
@Bean
public InitializingBean functionBindingRegistrar(Environment environment, FunctionCatalog functionCatalog, StreamFunctionProperties streamFunctionProperties) {
    return new FunctionConfiguration.FunctionBindingRegistrar(functionCatalog, streamFunctionProperties);
}

由於這個Bean是InitializingBean因此直接看afterPropertiesSet這個方法

看源碼最重要就是抓住主線, 從源碼中發現有這樣的一段代碼.

// 註冊了一個Bean定義, functionBindableProxyDefinition.
registry.registerBeanDefinition(name, functionBindableProxyDefinition);

咦? 那functionBindableProxyDefinition是一個什麼東西呢??

往上找這個的賦值.

// 看到這行, 他是一個 BindableFunctionProxyFactory
functionBindableProxyDefinition = new RootBeanDefinition(BindableFunctionProxyFactory.class);

// 爲構造參數進行了賦值
functionBindableProxyDefinition.getConstructorArgumentValues().addGenericArgumentValue(functionDefinition);
functionBindableProxyDefinition.getConstructorArgumentValues().addGenericArgumentValue(this.inputCount);
functionBindableProxyDefinition.getConstructorArgumentValues().addGenericArgumentValue(this.outputCount);
functionBindableProxyDefinition.getConstructorArgumentValues().addGenericArgumentValue(this.streamFunctionProperties);

初始化了一個RootBeanDefinition, 並對構造函數進行呢相對應的賦值, 那參數從哪來的呢, 繼續看源碼.

//  這些參數怎麼來的呢
//  看到下面這些代碼, 對於Function/Supplier/Consumer怎麼區分的, 是否是有的清晰的認知
FunctionInvocationWrapper function = (FunctionInvocationWrapper)this.functionCatalog.lookup(functionDefinition);

Type functionType = function.getFunctionType();
if (function.isSupplier()) {
    this.inputCount = 0;
    this.outputCount = this.getOutputCount(functionType, true);
} else if (function.isConsumer()) {
    this.inputCount = FunctionTypeUtils.getInputCount(functionType);
    this.outputCount = 0;
} else {
    this.inputCount = FunctionTypeUtils.getInputCount(functionType);
    this.outputCount = this.getOutputCount(functionType, false);
}

看到上面這一段代碼inputCount/outCount是計算出來的, 對於怎麼區分Function/Supplier/Consumer是Input仍是Output也有了必定的瞭解

那functionDefinition是什麼呢

functionDefinition = var3[var5];

咱們發現他是這樣賦值的, 再往上看

sourceNames = this.filterEligibleFunctionDefinitions();
var3 = sourceNames;
var4 = sourceNames.length;

經過閱讀發現filterEligibleFunctionDefinitions這個方法裏對咱們配置文件中Definition進行了解析處理, 並返回了合格的sourcesNames.

哦! 原來Definition的意義在這.

到此這個bean的註冊就已經完成了,那就來看看BindableFunctionProxyFactory,
發現他又是一個InitializingBean, 因此上訴代碼設置完初始化參數後, 在spring實例化Bean以後會調用afterPropertiesSet方法

public void afterPropertiesSet() {
    Assert.notEmpty(this.bindingTargetFactories, "'bindingTargetFactories' cannot be empty");
    int i;
    if (this.inputCount > 0) {
        for(i = 0; i < this.inputCount; ++i) {
            this.createInput(this.buildInputNameForIndex(i));
        }
    }

    if (this.outputCount > 0) {
        for(i = 0; i < this.outputCount; ++i) {
            this.createOutput(this.buildOutputNameForIndex(i));
        }
    }

}

而後看到這兩個函數. 發現綁定名稱原來是這樣的.

private String buildInputNameForIndex(int index) {
    return this.functionDefinition.replace(",", "|").replace("|", "") + "-" + "in" + "-" + index;
}

private String buildOutputNameForIndex(int index) {
    return this.functionDefinition.replace(",", "|").replace("|", "") + "-" + "out" + "-" + index;
}

以Input爲例子, 咱們看看createInput的方法.

先從簡單的來看, 先不看pollable

this.inputHolders.put(name, new BoundTargetHolder(this.getBindingTargetFactory(SubscribableChannel.class).createInput(name), true));

SubscribableChannel的createInput, 咱們找到SubscribableChannelBindingTargetFactory#createInput的

public SubscribableChannel createInput(String name) {
    DirectWithAttributesChannel subscribableChannel = new DirectWithAttributesChannel();
    subscribableChannel.setComponentName(name);
    subscribableChannel.setAttribute("type", "input");
    this.messageChannelConfigurer.configureInputChannel(subscribableChannel, name);
    if (this.context != null && !this.context.containsBean(name)) {
        this.context.registerBean(name, DirectWithAttributesChannel.class, () -> {
            return subscribableChannel;
        }, new BeanDefinitionCustomizer[0]);
    }

    return subscribableChannel;
}

發現返回了DirectWithAttributesChannel的一個類, 而且把他註冊成爲了Bean.

後面把這個類封裝在BoundTargetHolder中並放入inputHolders中就結束了Function註冊的過程

總結

  1. 啓動以後會註冊一個FunctionBindingRegistrar的Bean, 在這個Bean中會讀取配置文件找到對應的FunctionBean, 處理這個FunctionBean生成註冊須要的參數並把這些內容構成一個functionBindableProxyDefinition的Bean.
  2. functionBindableProxyDefinition的Bean處理上述構造函數傳入的參數並生成對應的Input/Output的Bean.

至此, funciton的註冊就完成了嗎. (不!). 其實尚未完成, 細心的朋友會發現 還有functionInitializer的Bean. 下一節咱們來看看這個.

Wish.    

Do.

相關文章
相關標籤/搜索