因劇情須要,因此準備在基礎開發平臺中進行AB-TEST組件開發。目前主要使用了Spring Cloud E-SR2 版本,其中使用了kafka做爲內置bus總線,另外一組kafka用於監控trace推送(如zipkin、自定義監控)。AB-TEST你們都應該瞭解過,如不瞭解請參考www.itcodemonkey.com/article/103… ,這裏就很少講了。 其實簡單來講就是根據配置好的分桶模式如A 、B、C ,在進行邏輯處理時根據某一類條件(如uid)計算出當前用戶分桶進行動態的邏輯處理(簡單來講就是多個if else)。html
如要作成組件化那必然要對上層開發透明,最好時無感知的進行邏輯切換,固然咱們第一步不須要那麼完美,先實現功能組件。在進行技術選型的時候參考了兩種模式:
一、zookeeper
優勢:技術簡單,有定義好的工具
缺點:增長應用依賴的組件,當zk出現問題時形成ab-test失效
二、bus總線
優勢:實現簡單,耦合度低
缺點:須要詳解cloud bus 機制
固然咱們選擇了後者,由於在基礎平臺中,組件的侵入性越低,對上層開發越友好,並且就越穩定。spring
由於目前使用的是Spring Cloud E SR2全家桶,因此在技術處理上也遵循其原則,儘可能使用內置技術棧實現。內部主要以cloud bus機制進行了簡單的擴展實現,下面咱們先來簡單瞭解下BUS EVENT機制。bash
Bus 機制主要創建在 stream的基礎之上,在cloud的下 咱們能夠選擇rabbitmq 或者kafka,其特色主要是針對spring event消息的訂閱與發佈。app
Spring Event 事件驅動模型 dom
Bus 事件ide
衆所周知,在bus使用中,最多的場景就是配置中心的動態配置刷新。也就是說咱們經過/bus/refresh 接口調用就能夠進行鍼對性的配置刷新了,根據這條線,咱們來看下內部的源碼結構。工具
一、經過rest 接口進行外部請求 此處cloud 藉助其端點監控機制實現,主要看 RefreshBusEndpoint,固然Cloud E 和 F版本有些許不同,但其實不難理解Cloud E SR2組件化
@ManagedResource
public class RefreshBusEndpoint extends AbstractBusEndpoint {
public RefreshBusEndpoint(ApplicationEventPublisher context, String id,
BusEndpoint delegate) {
super(context, id, delegate);
}
//定義對外訪問接口
@RequestMapping(value = "refresh", method = RequestMethod.POST)
@ResponseBody
@ManagedOperation
public void refresh(
@RequestParam(value = "destination", required = false) String destination) {
publish(new RefreshRemoteApplicationEvent(this, getInstanceId(), destination));
}
}
複製代碼
Cloud F SR2測試
@Endpoint(id = "bus-refresh") //TODO: document new id
public class RefreshBusEndpoint extends AbstractBusEndpoint {
public RefreshBusEndpoint(ApplicationEventPublisher context, String id) {
super(context, id);
}
@WriteOperation
public void busRefreshWithDestination(@Selector String destination) { //TODO: document destination
publish(new RefreshRemoteApplicationEvent(this, getInstanceId(), destination));
}
@WriteOperation
public void busRefresh() {
publish(new RefreshRemoteApplicationEvent(this, getInstanceId(), null));
}
}
複製代碼
經過上面的代碼能夠看到,請求進來的話都調用了 AbstractBusEndpoint 的publish進行了事件發佈優化
public class AbstractBusEndpoint {
private ApplicationEventPublisher context;
private String appId;
public AbstractBusEndpoint(ApplicationEventPublisher context, String appId) {
this.context = context;
this.appId = appId;
}
protected String getInstanceId() {
return this.appId;
}
//發佈事件
protected void publish(ApplicationEvent event) {
context.publishEvent(event);
}
}
複製代碼
其中ApplicationEventPublisher 在哪定義的呢,這就不得不說BusAutoConfiguration 這裏了,這是bus的核心加載器,在經過外部接口調用發佈事件後內部對事件進行了監聽和處理就在BusAutoConfiguration中,以下:
//消費事件,進行邏輯判斷是不是由本身發出的事件,若是是本身內部發出的事件則經過stream(kafka)進行發佈
@EventListener(classes = RemoteApplicationEvent.class)
public void acceptLocal(RemoteApplicationEvent event) {
if (this.serviceMatcher.isFromSelf(event)
&& !(event instanceof AckRemoteApplicationEvent)) {
this.cloudBusOutboundChannel.send(MessageBuilder.withPayload(event).build());
}
}
//經過stream(kafka)進行外部訂閱類型爲RemoteApplicationEvent 的事件
@StreamListener(SpringCloudBusClient.INPUT)
public void acceptRemote(RemoteApplicationEvent event) {
//判斷是不是ack類型的回執事件,是的話進行內部發布,用於bustrace 等處理
if (event instanceof AckRemoteApplicationEvent) {
if (this.bus.getTrace().isEnabled() && !this.serviceMatcher.isFromSelf(event)
&& this.applicationEventPublisher != null) {
this.applicationEventPublisher.publishEvent(event);
}
// If it's an ACK we are finished processing at this point return; } //判斷是不是給本身的事件,在外部接口請求時可增長destination 進行選擇*:*表明所有應用 if (this.serviceMatcher.isForSelf(event) && this.applicationEventPublisher != null) { //若是不是本身發的就進行內部轉發 if (!this.serviceMatcher.isFromSelf(event)) { this.applicationEventPublisher.publishEvent(event); } //判斷是否要進行ack處理,默認開啓 if (this.bus.getAck().isEnabled()) { AckRemoteApplicationEvent ack = new AckRemoteApplicationEvent(this, this.serviceMatcher.getServiceId(), this.bus.getAck().getDestinationService(), event.getDestinationService(), event.getId(), event.getClass()); this.cloudBusOutboundChannel .send(MessageBuilder.withPayload(ack).build()); this.applicationEventPublisher.publishEvent(ack); } } //判斷是否要進行trace跟蹤,默認關閉 if (this.bus.getTrace().isEnabled() && this.applicationEventPublisher != null) { // We are set to register sent events so publish it for local consumption, // irrespective of the origin this.applicationEventPublisher.publishEvent(new SentApplicationEvent(this, event.getOriginService(), event.getDestinationService(), event.getId(), event.getClass())); } } 複製代碼
觀看完內部事件消費,和stream消息訂閱,那bus 的stream又是怎麼進行初始化和工做的呢,答案依然在BusAutoConfiguration 中,以下:
@Autowired
private BusProperties bus;
private ApplicationEventPublisher applicationEventPublisher;
@PostConstruct
public void init() {
BindingProperties inputBinding = this.bindings.getBindings()
.get(SpringCloudBusClient.INPUT);
if (inputBinding == null) {
this.bindings.getBindings().put(SpringCloudBusClient.INPUT,
new BindingProperties());
}
BindingProperties input = this.bindings.getBindings()
.get(SpringCloudBusClient.INPUT);
if (input.getDestination() == null) {
input.setDestination(this.bus.getDestination());
}
BindingProperties outputBinding = this.bindings.getBindings()
.get(SpringCloudBusClient.OUTPUT);
if (outputBinding == null) {
this.bindings.getBindings().put(SpringCloudBusClient.OUTPUT,
new BindingProperties());
}
BindingProperties output = this.bindings.getBindings()
.get(SpringCloudBusClient.OUTPUT);
if (output.getDestination() == null) {
output.setDestination(this.bus.getDestination());
}
}
複製代碼
對於cloud stream就很少說了,這裏很簡單的進行了初始化,因此對於發佈和訂閱消息就很清晰了。其實在BusAutoConfiguration中因此的事件消費、發佈、訂閱都是爲了集羣內部的通訊,而真正的事件處理卻不此處。那麼對於配置的刷新行爲到底在哪呢,通過查看對於刷新的操做要看RefreshListener 了。 以下圖 RefreshListener 針對事件進行了監聽,其事件使用的是RefreshRemoteApplicationEvent,其繼承RemoteApplicationEvent。
public class RefreshListener
implements ApplicationListener<RefreshRemoteApplicationEvent> {
private static Log log = LogFactory.getLog(RefreshListener.class);
private ContextRefresher contextRefresher;
public RefreshListener(ContextRefresher contextRefresher) {
this.contextRefresher = contextRefresher;
}
@Override
public void onApplicationEvent(RefreshRemoteApplicationEvent event) {
Set<String> keys = contextRefresher.refresh();
log.info("Received remote refresh request. Keys refreshed " + keys);
}
}
複製代碼
經過源碼能夠看出,事件的刷新行爲 Set keys = contextRefresher.refresh(); (固然contextRefresher 定義也是在BusAutoConfiguration 有興趣能夠查看下),對於刷新到底怎麼實現的,也就接近了bus config刷新的核心:ContextRefresher
public synchronized Set<String> refresh() {
//加載當前內存中的配置
Map<String, Object> before = extract(
this.context.getEnvironment().getPropertySources());
//單獨啓動一個內部容器進行遠程配置加載,
addConfigFilesToEnvironment();
//交叉對比配置信息,其實就倆Map進行,若是新配置不包含這個key了就result.put(key, null);,若是倆val不同,就 覆蓋新值
Set<String> keys = changes(before,
extract(this.context.getEnvironment().getPropertySources())).keySet();
//發佈內部變動事件
this.context.publishEvent(new EnvironmentChangeEvent(context, keys));
//刷新配置,主要是加了註解@Refresh的方法 參數等,這裏的坑在於會形成服務狀態變動 UP-》DOWN-》UP
//大面積波動的話很可怕
this.scope.refreshAll();
return keys;
}
複製代碼
通過以上的源碼,脈絡就很清晰了: 一、外部POST /bus/refresh 進行了刷新行爲,發佈內部事件RefreshRemoteApplicationEvent 二、經過@EventListener 進行內部事件消費,若是是本身內部發布的事件就經過stream進行廣播 三、經過@StreamListener 對stream進行監聽,若是是給本身的事件,就進行內部轉發,具體需不須要ack trace則根據配置進行。 四、內部經過RefreshListener 消費事件,經過ContextRefresher.refresh 進行配置刷新 這下一個完整的bus機制就展示在咱們眼前,咱們只須要簡單的進行改造,就能實現本身的動態事件推送了。
通過上面bus event的鋪墊,如今咱們來講下實現AB-TEST中分爲了幾個目標階段:
複製代碼
一、應用啓動load分桶數據 看過了bus的模式,這裏就簡單咯。在這裏咱們經過繼承 PropertyResourceConfigurer 來實現配置的初始化,而配置的來源就是cloud 配置中心的 application.properties,由於使用了配置中心後此配置會自動加載不要額外的處理。而後在對加載的配置進行歸類便可(因我爲test配置定義了前綴,因此只需過濾其便可),模仿bus配置篩選便可。
public static Map<String, Object> extract(MutablePropertySources propertySources) {
Map<String, Object> result = new HashMap<>(16);
List<PropertySource<?>> sources = new ArrayList<PropertySource<?>>();
for (PropertySource<?> source : propertySources) {
sources.add(0, source);
}
for (PropertySource<?> source : sources) {
if (!standardSources.contains(source.getName())) {
extract(source, result);
}
}
return result;
}
public static void extract(PropertySource<?> parent, Map<String, Object> result) {
if (parent instanceof CompositePropertySource) {
try {
List<PropertySource<?>> sources = new ArrayList<PropertySource<?>>();
for (PropertySource<?> source : ((CompositePropertySource) parent)
.getPropertySources()) {
sources.add(0, source);
}
for (PropertySource<?> source : sources) {
extract(source, result);
}
} catch (Exception e) {
return;
}
} else if (parent instanceof EnumerablePropertySource) {
for (String key : ((EnumerablePropertySource<?>) parent).getPropertyNames()) {
result.put(key, parent.getProperty(key));
log.debug("PropertyConfigure load K[{}] V[{}]", key, parent.getProperty(key));
}
}
}
複製代碼
二、在請求來臨時進行動態計算分桶 定義自定義註解@NoveTest用於標註須要進行測試的方法,定義NoveParamInterceptor 對入參進行解析,定義NoveTestInterceptor內部攔截器進行註解切入 ,爲增長了@NoveTest進行動態分桶計算。
@Pointcut("@annotation(NoveTest)")
public void anyMethod() {
}
@Before(value = "anyMethod()")
public void doBefore(JoinPoint jp) throws Exception {
try {
MethodSignature methodSig = (MethodSignature) jp.getSignature();
Annotation[] annotations = methodSig.getMethod().getDeclaredAnnotations();
for (Annotation annotation : annotations) {
String name = annotation.annotationType().getName();
String novetest = NoveTest.class.getName();
if (novetest.equals(name)) {
NoveTest test = (NoveTest) annotation;
Map<String, String> buckets = RandomBucketUtils.getBuckets(test.name());
RandomBucketUtils.loadBuckets(buckets);
}
}
} catch (Exception e) { //防護性容錯
}
}
複製代碼
其中分桶計算的策略很簡單,經過uid + 因子進行hash 計算index 獲取數據
int hash = (((sole + factor).hashCode() % 100) + 100) % 100;
//獲取參數百分比值
config.entrySet().stream().sorted(Map.Entry.comparingByKey(Comparator.reverseOrder())).forEach(entry -> {
if (entry.getKey().contains(percent)) {
IntStream.range(0, Integer.valueOf((String) entry.getValue())).forEach(i -> bucket.add(entry.getKey()));
}
});
複製代碼
好了,定義完成,上層應用開發使用方式:
@RequestMapping("/test")
@NoveTest(name = {"312", "feed"})
public Map<String, String> test(@RequestParam(defaultValue = "", required = false) String ma) {
String type = RandomBucketUtils.getProperties("type");
//TODO 經過type 分枝處理 if else
return null;
}
複製代碼
三、修改配置進行分桶策略的動態刷新 分桶策略不可能一直不變,並且變化的時候也不該該從新發版,那真是太噁心人了。因此動態配置推送就相當重要了,固然你們也可使用自帶的bus總線進行數據推送,可是其destroy的問題真是噁心到家了,並且有可能形成服務大面積癱瘓,慎用。 基於種種問題,就須要自定義bus event,結合其現有的bus進行數據推送。
其實很簡單,分爲幾步:
@WebEndpoint(id = "nove-refresh")
public class NoveRefreshBusEndpoint extends AbstractBusEndpoint {
public NoveRefreshBusEndpoint(ApplicationEventPublisher context, String id) {
super(context, id);
}
@WriteOperation
public void busNoveRefresh() {
publish(new NoveRefreshRemoteApplicationEvent(this, getInstanceId(),null));
}
//此處原來的destination 參數必須寫成arg0 否則就不生效,噁心人,內部機制,這樣處理最簡單
@WriteOperation
public void busRefreshWithDestination(@Selector String arg0) {
publish(new NoveRefreshRemoteApplicationEvent(this, getInstanceId(), arg0));
}
}
複製代碼
SO 到目前爲止一個可用於生產的AB-TEST 機制就實現了,有的同窗就會說了,你這根據數據在進行if else 的邏輯判斷調用真是噁心到人了。確實,由於初版目前是最簡單的實現,在第二版將進行動態調用的優化。其實動態調用說來也很簡單,無非經過定義的接口實現各個不一樣的邏輯,而後針對其進行簡單的動態代理便可。後期源碼會同步上傳。
總結: 自定義AB-TEST組件的過程