如今咱們從源碼來分析(一)中所涉及的東西spring
問一下本身想從源碼中知道什麼, 帶着目的去看源碼才容易搞懂.springboot
從下述的代碼中發現定義了一個Function的Bean和在yaml中定義了definition, 那麼這兩個定義的做用是什麼呢? Function是怎麼樣去綁定、註冊的呢?app
帶着問題咱們就能夠去找對應的實現.ide
@Bean public Function<Flux<Message>, Mono> consumerEvent() { return flux -> flux.map(message -> { System.out.println(message.getPayload()); return message; }).then(); }
spring: cloud: stream: function: definition: consumerEvent
首先咱們看META-INF/spring.factories函數
org.springframework.boot.autoconfigure.EnableAutoConfigura:\ (...省略一部分) org.springframework.cloud.stream.function.FunctionConfiguration
發現自動裝載了一個FunctionConfiguration的類ui
進到這個類裏面看, 發現他註冊了一個functionBindingRegistrar的Bean.this
看英文---(functionBindingRegistrar) 方法綁定註冊, 這好像是咱們想知道的東西.flux
那麼接着往下看ci
看傳入的參數發現這個Bean是根據StreamFunctionProperties註冊的一個的Bean.get
// FunctionConfiguration#functionBindingRegistrar @Bean public InitializingBean functionBindingRegistrar(Environment environment, FunctionCatalog functionCatalog, StreamFunctionProperties streamFunctionProperties) { return new FunctionConfiguration.FunctionBindingRegistrar(functionCatalog, streamFunctionProperties); }
由於這個Bean是InitializingBean因此直接看afterPropertiesSet這個方法
看源碼最重要就是抓住主線, 從源碼中發現有這樣的一段代碼.
// 註冊了一個Bean定義, functionBindableProxyDefinition. registry.registerBeanDefinition(name, functionBindableProxyDefinition);
咦? 那functionBindableProxyDefinition是一個什麼東西呢??
往上找這個的賦值.
// 看到這行, 他是一個 BindableFunctionProxyFactory functionBindableProxyDefinition = new RootBeanDefinition(BindableFunctionProxyFactory.class); // 爲構造參數進行了賦值 functionBindableProxyDefinition.getConstructorArgumentValues().addGenericArgumentValue(functionDefinition); functionBindableProxyDefinition.getConstructorArgumentValues().addGenericArgumentValue(this.inputCount); functionBindableProxyDefinition.getConstructorArgumentValues().addGenericArgumentValue(this.outputCount); functionBindableProxyDefinition.getConstructorArgumentValues().addGenericArgumentValue(this.streamFunctionProperties);
初始化了一個RootBeanDefinition, 並對構造函數進行呢相對應的賦值, 那參數從哪來的呢, 繼續看源碼.
// 這些參數怎麼來的呢 // 看到下面這些代碼, 對於Function/Supplier/Consumer怎麼區分的, 是否是有的清晰的認知 FunctionInvocationWrapper function = (FunctionInvocationWrapper)this.functionCatalog.lookup(functionDefinition); Type functionType = function.getFunctionType(); if (function.isSupplier()) { this.inputCount = 0; this.outputCount = this.getOutputCount(functionType, true); } else if (function.isConsumer()) { this.inputCount = FunctionTypeUtils.getInputCount(functionType); this.outputCount = 0; } else { this.inputCount = FunctionTypeUtils.getInputCount(functionType); this.outputCount = this.getOutputCount(functionType, false); }
看到上面這一段代碼inputCount/outCount是計算出來的, 對於怎麼區分Function/Supplier/Consumer是Input仍是Output也有了必定的瞭解
那functionDefinition是什麼呢
functionDefinition = var3[var5];
咱們發現他是這樣賦值的, 再往上看
sourceNames = this.filterEligibleFunctionDefinitions(); var3 = sourceNames; var4 = sourceNames.length;
經過閱讀發現filterEligibleFunctionDefinitions這個方法裏對咱們配置文件中Definition進行了解析處理, 並返回了合格的sourcesNames.
哦! 原來Definition的意義在這.
到此這個bean的註冊就已經完成了,那就來看看BindableFunctionProxyFactory,
發現他又是一個InitializingBean, 因此上訴代碼設置完初始化參數後, 在spring實例化Bean以後會調用afterPropertiesSet方法
public void afterPropertiesSet() { Assert.notEmpty(this.bindingTargetFactories, "'bindingTargetFactories' cannot be empty"); int i; if (this.inputCount > 0) { for(i = 0; i < this.inputCount; ++i) { this.createInput(this.buildInputNameForIndex(i)); } } if (this.outputCount > 0) { for(i = 0; i < this.outputCount; ++i) { this.createOutput(this.buildOutputNameForIndex(i)); } } }
而後看到這兩個函數. 發現綁定名稱原來是這樣的.
private String buildInputNameForIndex(int index) { return this.functionDefinition.replace(",", "|").replace("|", "") + "-" + "in" + "-" + index; } private String buildOutputNameForIndex(int index) { return this.functionDefinition.replace(",", "|").replace("|", "") + "-" + "out" + "-" + index; }
以Input爲例子, 咱們看看createInput的方法.
先從簡單的來看, 先不看pollable
this.inputHolders.put(name, new BoundTargetHolder(this.getBindingTargetFactory(SubscribableChannel.class).createInput(name), true));
SubscribableChannel的createInput, 咱們找到SubscribableChannelBindingTargetFactory#createInput的
public SubscribableChannel createInput(String name) { DirectWithAttributesChannel subscribableChannel = new DirectWithAttributesChannel(); subscribableChannel.setComponentName(name); subscribableChannel.setAttribute("type", "input"); this.messageChannelConfigurer.configureInputChannel(subscribableChannel, name); if (this.context != null && !this.context.containsBean(name)) { this.context.registerBean(name, DirectWithAttributesChannel.class, () -> { return subscribableChannel; }, new BeanDefinitionCustomizer[0]); } return subscribableChannel; }
發現返回了DirectWithAttributesChannel的一個類, 而且把他註冊成爲了Bean.
後面把這個類封裝在BoundTargetHolder中並放入inputHolders中就結束了Function註冊的過程
至此, funciton的註冊就完成了嗎. (不!). 其實尚未完成, 細心的朋友會發現 還有functionInitializer的Bean. 下一節咱們來看看這個.
Do.