Spring Cloud 基於Bus 的AB-TEST組件

1、前情提要:

因劇情須要,因此準備在基礎開發平臺中進行AB-TEST組件開發。目前主要使用了Spring Cloud E-SR2 版本,其中使用了kafka做爲內置bus總線,另外一組kafka用於監控trace推送(如zipkin、自定義監控)。AB-TEST你們都應該瞭解過,如不瞭解請參考www.itcodemonkey.com/article/103… ,這裏就很少講了。 其實簡單來講就是根據配置好的分桶模式如A 、B、C ,在進行邏輯處理時根據某一類條件(如uid)計算出當前用戶分桶進行動態的邏輯處理(簡單來講就是多個if else)。html

2、方案選型:

如要作成組件化那必然要對上層開發透明,最好時無感知的進行邏輯切換,固然咱們第一步不須要那麼完美,先實現功能組件。在進行技術選型的時候參考了兩種模式:
一、zookeeper
優勢:技術簡單,有定義好的工具
缺點:增長應用依賴的組件,當zk出現問題時形成ab-test失效
二、bus總線
優勢:實現簡單,耦合度低
缺點:須要詳解cloud bus 機制
固然咱們選擇了後者,由於在基礎平臺中,組件的侵入性越低,對上層開發越友好,並且就越穩定。spring

3、Spring CLoud Bus 事件機制

由於目前使用的是Spring Cloud E SR2全家桶,因此在技術處理上也遵循其原則,儘可能使用內置技術棧實現。內部主要以cloud bus機制進行了簡單的擴展實現,下面咱們先來簡單瞭解下BUS EVENT機制。bash

Bus 機制主要創建在 stream的基礎之上,在cloud的下 咱們能夠選擇rabbitmq 或者kafka,其特色主要是針對spring event消息的訂閱與發佈。app

Spring Event 事件驅動模型 dom

image.png
能夠看出模型由三部分構成:

  • 事件:ApplicationEvent,繼承自JDK的EventObject,全部事件將繼承它,並經過source獲得事件源
  • 發佈者:ApplicationEventPublisher及ApplicationEventMulticaster接口,使用這個接口,咱們的Service就擁有了發佈事件的能力。
  • 訂閱者:在spring bus 中能夠實現 ApplicationListener(繼承自JDK的EventListener),全部監聽器將繼承它或添加@EventListener

Bus 事件ide

衆所周知,在bus使用中,最多的場景就是配置中心的動態配置刷新。也就是說咱們經過/bus/refresh 接口調用就能夠進行鍼對性的配置刷新了,根據這條線,咱們來看下內部的源碼結構。工具

一、經過rest 接口進行外部請求 此處cloud 藉助其端點監控機制實現,主要看 RefreshBusEndpoint,固然Cloud E 和 F版本有些許不同,但其實不難理解Cloud E SR2組件化

@ManagedResource
public class RefreshBusEndpoint extends AbstractBusEndpoint {

	public RefreshBusEndpoint(ApplicationEventPublisher context, String id,
			BusEndpoint delegate) {
		super(context, id, delegate);
	}
    //定義對外訪問接口
	@RequestMapping(value = "refresh", method = RequestMethod.POST)
	@ResponseBody
	@ManagedOperation
	public void refresh(
			@RequestParam(value = "destination", required = false) String destination) {
		publish(new RefreshRemoteApplicationEvent(this, getInstanceId(), destination));
	}

}
複製代碼

Cloud F SR2測試

@Endpoint(id = "bus-refresh") //TODO: document new id
public class RefreshBusEndpoint extends AbstractBusEndpoint {

	public RefreshBusEndpoint(ApplicationEventPublisher context, String id) {
		super(context, id);
	}

	@WriteOperation
	public void busRefreshWithDestination(@Selector String destination) { //TODO: document destination
		publish(new RefreshRemoteApplicationEvent(this, getInstanceId(), destination));
	}

	@WriteOperation
	public void busRefresh() {
		publish(new RefreshRemoteApplicationEvent(this, getInstanceId(), null));
	}

}
複製代碼

經過上面的代碼能夠看到,請求進來的話都調用了 AbstractBusEndpoint 的publish進行了事件發佈優化

public class AbstractBusEndpoint {

	private ApplicationEventPublisher context;

	private String appId;

	public AbstractBusEndpoint(ApplicationEventPublisher context, String appId) {
		this.context = context;
		this.appId = appId;
	}

	protected String getInstanceId() {
		return this.appId;
	}
    //發佈事件
	protected void publish(ApplicationEvent event) {
		context.publishEvent(event);
	}

}
複製代碼

其中ApplicationEventPublisher 在哪定義的呢,這就不得不說BusAutoConfiguration 這裏了,這是bus的核心加載器,在經過外部接口調用發佈事件後內部對事件進行了監聽和處理就在BusAutoConfiguration中,以下:

//消費事件,進行邏輯判斷是不是由本身發出的事件,若是是本身內部發出的事件則經過stream(kafka)進行發佈
@EventListener(classes = RemoteApplicationEvent.class)
	public void acceptLocal(RemoteApplicationEvent event) {
		if (this.serviceMatcher.isFromSelf(event)
				&& !(event instanceof AckRemoteApplicationEvent)) {
			this.cloudBusOutboundChannel.send(MessageBuilder.withPayload(event).build());
		}
	}
//經過stream(kafka)進行外部訂閱類型爲RemoteApplicationEvent 的事件
@StreamListener(SpringCloudBusClient.INPUT)
	public void acceptRemote(RemoteApplicationEvent event) {
        //判斷是不是ack類型的回執事件,是的話進行內部發布,用於bustrace 等處理
		if (event instanceof AckRemoteApplicationEvent) {
			if (this.bus.getTrace().isEnabled() && !this.serviceMatcher.isFromSelf(event)
					&& this.applicationEventPublisher != null) {
				this.applicationEventPublisher.publishEvent(event);
			}
			// If it's an ACK we are finished processing at this point return; } //判斷是不是給本身的事件,在外部接口請求時可增長destination 進行選擇*:*表明所有應用 if (this.serviceMatcher.isForSelf(event) && this.applicationEventPublisher != null) { //若是不是本身發的就進行內部轉發 if (!this.serviceMatcher.isFromSelf(event)) { this.applicationEventPublisher.publishEvent(event); } //判斷是否要進行ack處理,默認開啓 if (this.bus.getAck().isEnabled()) { AckRemoteApplicationEvent ack = new AckRemoteApplicationEvent(this, this.serviceMatcher.getServiceId(), this.bus.getAck().getDestinationService(), event.getDestinationService(), event.getId(), event.getClass()); this.cloudBusOutboundChannel .send(MessageBuilder.withPayload(ack).build()); this.applicationEventPublisher.publishEvent(ack); } } //判斷是否要進行trace跟蹤,默認關閉 if (this.bus.getTrace().isEnabled() && this.applicationEventPublisher != null) { // We are set to register sent events so publish it for local consumption, // irrespective of the origin this.applicationEventPublisher.publishEvent(new SentApplicationEvent(this, event.getOriginService(), event.getDestinationService(), event.getId(), event.getClass())); } } 複製代碼

觀看完內部事件消費,和stream消息訂閱,那bus 的stream又是怎麼進行初始化和工做的呢,答案依然在BusAutoConfiguration 中,以下:

@Autowired
	private BusProperties bus;

	private ApplicationEventPublisher applicationEventPublisher;

	@PostConstruct
	public void init() {
		BindingProperties inputBinding = this.bindings.getBindings()
				.get(SpringCloudBusClient.INPUT);
		if (inputBinding == null) {
			this.bindings.getBindings().put(SpringCloudBusClient.INPUT,
					new BindingProperties());
		}
		BindingProperties input = this.bindings.getBindings()
				.get(SpringCloudBusClient.INPUT);
		if (input.getDestination() == null) {
			input.setDestination(this.bus.getDestination());
		}
		BindingProperties outputBinding = this.bindings.getBindings()
				.get(SpringCloudBusClient.OUTPUT);
		if (outputBinding == null) {
			this.bindings.getBindings().put(SpringCloudBusClient.OUTPUT,
					new BindingProperties());
		}
		BindingProperties output = this.bindings.getBindings()
				.get(SpringCloudBusClient.OUTPUT);
		if (output.getDestination() == null) {
			output.setDestination(this.bus.getDestination());
		}
	}
複製代碼

對於cloud stream就很少說了,這裏很簡單的進行了初始化,因此對於發佈和訂閱消息就很清晰了。其實在BusAutoConfiguration中因此的事件消費、發佈、訂閱都是爲了集羣內部的通訊,而真正的事件處理卻不此處。那麼對於配置的刷新行爲到底在哪呢,通過查看對於刷新的操做要看RefreshListener 了。 以下圖 RefreshListener 針對事件進行了監聽,其事件使用的是RefreshRemoteApplicationEvent,其繼承RemoteApplicationEvent。

image.png

public class RefreshListener
		implements ApplicationListener<RefreshRemoteApplicationEvent> {

	private static Log log = LogFactory.getLog(RefreshListener.class);

	private ContextRefresher contextRefresher;

	public RefreshListener(ContextRefresher contextRefresher) {
		this.contextRefresher = contextRefresher;
	}

	@Override
	public void onApplicationEvent(RefreshRemoteApplicationEvent event) {
		Set<String> keys = contextRefresher.refresh();
		log.info("Received remote refresh request. Keys refreshed " + keys);
	}
}
複製代碼

經過源碼能夠看出,事件的刷新行爲 Set keys = contextRefresher.refresh(); (固然contextRefresher 定義也是在BusAutoConfiguration 有興趣能夠查看下),對於刷新到底怎麼實現的,也就接近了bus config刷新的核心:ContextRefresher

public synchronized Set<String> refresh() {
        //加載當前內存中的配置
		Map<String, Object> before = extract(
				this.context.getEnvironment().getPropertySources());
        //單獨啓動一個內部容器進行遠程配置加載,
		addConfigFilesToEnvironment();
        //交叉對比配置信息,其實就倆Map進行,若是新配置不包含這個key了就result.put(key, null);,若是倆val不同,就            覆蓋新值
		Set<String> keys = changes(before,
				extract(this.context.getEnvironment().getPropertySources())).keySet();
        //發佈內部變動事件
		this.context.publishEvent(new EnvironmentChangeEvent(context, keys));
        //刷新配置,主要是加了註解@Refresh的方法 參數等,這裏的坑在於會形成服務狀態變動 UP-》DOWN-》UP 
        //大面積波動的話很可怕
		this.scope.refreshAll();
		return keys;
	}
複製代碼

通過以上的源碼,脈絡就很清晰了: 一、外部POST /bus/refresh 進行了刷新行爲,發佈內部事件RefreshRemoteApplicationEvent 二、經過@EventListener 進行內部事件消費,若是是本身內部發布的事件就經過stream進行廣播 三、經過@StreamListener 對stream進行監聽,若是是給本身的事件,就進行內部轉發,具體需不須要ack trace則根據配置進行。 四、內部經過RefreshListener 消費事件,經過ContextRefresher.refresh 進行配置刷新 這下一個完整的bus機制就展示在咱們眼前,咱們只須要簡單的進行改造,就能實現本身的動態事件推送了。

4、AB-TEST機制實現:

通過上面bus event的鋪墊,如今咱們來講下實現AB-TEST中分爲了幾個目標階段:
複製代碼

一、應用啓動load分桶數據 看過了bus的模式,這裏就簡單咯。在這裏咱們經過繼承 PropertyResourceConfigurer 來實現配置的初始化,而配置的來源就是cloud 配置中心的 application.properties,由於使用了配置中心後此配置會自動加載不要額外的處理。而後在對加載的配置進行歸類便可(因我爲test配置定義了前綴,因此只需過濾其便可),模仿bus配置篩選便可。

public static Map<String, Object> extract(MutablePropertySources propertySources) {
        Map<String, Object> result = new HashMap<>(16);
        List<PropertySource<?>> sources = new ArrayList<PropertySource<?>>();
        for (PropertySource<?> source : propertySources) {
            sources.add(0, source);
        }
        for (PropertySource<?> source : sources) {
            if (!standardSources.contains(source.getName())) {
                extract(source, result);
            }
        }
        return result;
    }

    public static void extract(PropertySource<?> parent, Map<String, Object> result) {
        if (parent instanceof CompositePropertySource) {
            try {
                List<PropertySource<?>> sources = new ArrayList<PropertySource<?>>();
                for (PropertySource<?> source : ((CompositePropertySource) parent)
                        .getPropertySources()) {
                    sources.add(0, source);
                }
                for (PropertySource<?> source : sources) {
                    extract(source, result);
                }
            } catch (Exception e) {
                return;
            }
        } else if (parent instanceof EnumerablePropertySource) {
            for (String key : ((EnumerablePropertySource<?>) parent).getPropertyNames()) {
                result.put(key, parent.getProperty(key));
                log.debug("PropertyConfigure load K[{}] V[{}]", key, parent.getProperty(key));
            }
        }
    }
複製代碼

二、在請求來臨時進行動態計算分桶 定義自定義註解@NoveTest用於標註須要進行測試的方法,定義NoveParamInterceptor 對入參進行解析,定義NoveTestInterceptor內部攔截器進行註解切入 ,爲增長了@NoveTest進行動態分桶計算。

@Pointcut("@annotation(NoveTest)")
    public void anyMethod() {

    } 
@Before(value = "anyMethod()")
    public void doBefore(JoinPoint jp) throws Exception {

        try {
            MethodSignature methodSig = (MethodSignature) jp.getSignature();
            Annotation[] annotations = methodSig.getMethod().getDeclaredAnnotations();
            for (Annotation annotation : annotations) {
                String name = annotation.annotationType().getName();
                String novetest = NoveTest.class.getName();
                if (novetest.equals(name)) {
                    NoveTest test = (NoveTest) annotation;
                    Map<String, String> buckets = RandomBucketUtils.getBuckets(test.name());
                    RandomBucketUtils.loadBuckets(buckets);
                }
            }
        } catch (Exception e) { //防護性容錯

        }

    }
複製代碼

其中分桶計算的策略很簡單,經過uid + 因子進行hash 計算index 獲取數據

int hash = (((sole + factor).hashCode() % 100) + 100) % 100;
 //獲取參數百分比值
      config.entrySet().stream().sorted(Map.Entry.comparingByKey(Comparator.reverseOrder())).forEach(entry -> {
                if (entry.getKey().contains(percent)) {
                    IntStream.range(0, Integer.valueOf((String) entry.getValue())).forEach(i -> bucket.add(entry.getKey()));
                }

            });
複製代碼

好了,定義完成,上層應用開發使用方式:

@RequestMapping("/test")
    @NoveTest(name = {"312", "feed"})
    public Map<String, String> test(@RequestParam(defaultValue = "", required = false) String ma) {

        String type = RandomBucketUtils.getProperties("type");
        //TODO 經過type 分枝處理 if else
        
         return null;

    }
複製代碼

三、修改配置進行分桶策略的動態刷新 分桶策略不可能一直不變,並且變化的時候也不該該從新發版,那真是太噁心人了。因此動態配置推送就相當重要了,固然你們也可使用自帶的bus總線進行數據推送,可是其destroy的問題真是噁心到家了,並且有可能形成服務大面積癱瘓,慎用。 基於種種問題,就須要自定義bus event,結合其現有的bus進行數據推送。

其實很簡單,分爲幾步:

  • 自定義端點,進行自定義事件發送。(事件廣播不須要特殊處理)
  • 自定義Listener進行本地事件消費,進行數據刷新
  • 注意幾點,在Cloud E 和F 版本有一些不一樣之處。主要在於端點的暴露的策略上,在F版本上使用以下
@WebEndpoint(id = "nove-refresh")
public class NoveRefreshBusEndpoint extends AbstractBusEndpoint {

    public NoveRefreshBusEndpoint(ApplicationEventPublisher context,  String id) {
        super(context, id);
    }



    @WriteOperation
    public void busNoveRefresh() {
        publish(new NoveRefreshRemoteApplicationEvent(this, getInstanceId(),null));
    }

    //此處原來的destination 參數必須寫成arg0 否則就不生效,噁心人,內部機制,這樣處理最簡單
    @WriteOperation
    public void busRefreshWithDestination(@Selector String arg0) {
        publish(new NoveRefreshRemoteApplicationEvent(this, getInstanceId(), arg0));
    }
}

複製代碼

SO 到目前爲止一個可用於生產的AB-TEST 機制就實現了,有的同窗就會說了,你這根據數據在進行if else 的邏輯判斷調用真是噁心到人了。確實,由於初版目前是最簡單的實現,在第二版將進行動態調用的優化。其實動態調用說來也很簡單,無非經過定義的接口實現各個不一樣的邏輯,而後針對其進行簡單的動態代理便可。後期源碼會同步上傳。

總結: 自定義AB-TEST組件的過程

  • 一、自定義內部端點暴露動態配置刷新,發送刷新事件
  • 二、接收事件,定義刷新策略,只刷新須要的配置便可
  • 三、定義啓動初始化方式
  • 四、經過動態代理實現動態邏輯調用(待完成)

qrcode_for_gh_13314ac27929_258.jpg
相關文章
相關標籤/搜索