使用 javassist 修改字節碼實現 eureka-client 監聽服務啓動

項目中一些服務須要監聽其餘微服務的啓動信息,須要監聽到啓動後主動向其發請求拉取一些配置等。 但是eureka-client並未提供監聽其餘服務啓動的事件,eureka-server卻是提供了事件, 能夠在本身的eureka-server中監聽服務啓動,監聽後發送服務啓動信息到kafka這些消息隊列,服務監聽kafka消息, 這種方式要依賴消息隊列: 源碼; 或者改造eureka-client, 因爲須要改的代碼很少,修改源碼從新打成依賴本身維護不方便,這裏經過javassist直接修改jar裏的字節碼實現。java

使用javassist修改字節碼

  • 查看eureka裏源碼,有個com.netflix.discovery.shared.Application類,addInstance方法在服務上線或更新,removeInstance方法在服務下線時調用,所以修改這倆方法,實現監聽服務上、下線。
  • 因爲spring-boot本身實現的類加載機制,以spring-boot的jar形式運行javassist會掃描不到包,要經過insertClassPath添加掃描路徑。
  • 經過setBody修改方法體,分別添加me.flyleft.eureka.client.event.EurekaEventHandler.getInstance().eurekaAddInstance($1);和me.flyleft.eureka.client.event.EurekaEventHandler.getInstance().eurekaRemoveInstance($1);
  • 經過toClass覆蓋原有類後,經過類加載器從新加載。
public void init() {
        try {
            ClassPool classPool = new ClassPool(true);
            //添加com.netflix.discovery包的掃描路徑
            ClassClassPath classPath = new ClassClassPath(Applications.class);
            classPool.insertClassPath(classPath);
            //獲取要修改Application類
            CtClass ctClass = classPool.get(APPLICATION_PATH);
            //獲取addInstance方法
            CtMethod addInstanceMethod = ctClass.getDeclaredMethod("addInstance");
            //修改addInstance方法
            addInstanceMethod.setBody("{instancesMap.put($1.getId(), $1);"
                    + "synchronized (instances) {me.flyleft.eureka.client.event.EurekaEventHandler.getInstance().eurekaAddInstance($1);" +
                    "instances.remove($1);instances.add($1);isDirty = true;}}");
            //獲取removeInstance方法
            CtMethod removeInstanceMethod = ctClass.getDeclaredMethod("removeInstance");
            //修改removeInstance方法
            removeInstanceMethod.setBody("{me.flyleft.eureka.client.event.EurekaEventHandler.getInstance().eurekaRemoveInstance($1);this.removeInstance($1, true);}");
            //覆蓋原有的Application類
            ctClass.toClass();
            //使用類加載器從新加載Application類
            classPool.getClassLoader().loadClass(APPLICATION_PATH);
            Class.forName(APPLICATION_PATH);
        } catch (Exception e) {
            throw new EurekaEventException(e);
        }
    }
複製代碼
  • 放入main函數,在spring boot啓動前執行或者使用spring boot的事件,在spring bean初始化以前執行。(確保在eureka第一次執行以前執行便可)
@SpringBootApplication
@EnableEurekaClient
public class EurekaClientApplication {

    public static void main(String[] args) {
        //先執行修改字節碼代碼
        EurekaEventHandler.getInstance().init();
        new SpringApplicationBuilder(EurekaClientApplication.class).web(true).run(args);
    }
}
複製代碼

使用JDK中Observable和Observer實現觀察者,訂閱者模式

  • 發送事件使用java.util.Observable的setChanged和notifyObservers
public class EurekaEventObservable extends Observable {
    public void sendEvent(EurekaEventPayload payload) {
        setChanged();
        notifyObservers(payload);
    }
}
複製代碼
  • 接收事件使用使用java.util.Observer的update
public abstract class AbstractEurekaEventObserver implements Observer, EurekaEventService {
      @Override
        public void update(Observable o, Object arg) {
            if (arg instanceof EurekaEventPayload) {
                EurekaEventPayload payload = (EurekaEventPayload) arg;
                if (InstanceInfo.InstanceStatus.UP.name().equals(payload.getStatus())) {
                    LOGGER.info("Receive UP event, payload: {}", payload);
                } else {
                    LOGGER.info("Receive DOWN event, payload: {}", payload);
                }
                putPayloadInCache(payload);
                consumerEventWithAutoRetry(payload);
            }
        }
}
複製代碼

使用RxJava實現自動重試。

接收到服務啓動去執行一些操做,若是執行失敗有異常則自動重試指定次數,每一個一段事件重試一次,執行成功則再也不執行git

private void consumerEventWithAutoRetry(final EurekaEventPayload payload) {
    rx.Observable.just(payload)
            .map(t -> {
                // 此處爲接收到服務啓動去執行的一些操做
                consumerEvent(payload);
                return payload;
            }).retryWhen(x -> x.zipWith(rx.Observable.range(1, retryTime),
            (t, retryCount) -> {
               //異常處理
                if (retryCount >= retryTime) {
                    if (t instanceof RemoteAccessException || t instanceof RestClientException) {
                        LOGGER.warn("error.eurekaEventObserver.fetchError, payload {}", payload, t);
                    } else {
                        LOGGER.warn("error.eurekaEventObserver.consumerError, payload {}", payload, t);
                    }
                }
                return retryCount;
            }).flatMap(y -> rx.Observable.timer(retryInterval, TimeUnit.SECONDS)))
            .subscribeOn(Schedulers.io())
            .subscribe((EurekaEventPayload payload1) -> {
            });
}
複製代碼

添加手動重試失敗接口

自動重試失敗,能夠手動重試,添加手動重試接口github

@RestController
@RequestMapping(value = "/v1/eureka/events")
public class EurekaEventEndpoint {

    private EurekaEventService eurekaEventService;

    public EurekaEventEndpoint(EurekaEventService eurekaEventService) {
        this.eurekaEventService = eurekaEventService;
    }

    @Permission(permissionLogin = true)
    @ApiOperation(value = "獲取未消費的事件列表")
    @GetMapping
    public List<EurekaEventPayload> list(@RequestParam(value = "service", required = false) String service) {
        return eurekaEventService.unfinishedEvents(service);
    }

    @Permission(permissionLogin = true)
    @ApiOperation(value = "手動重試未消費成功的事件")
    @PostMapping("retry")
    public List<EurekaEventPayload> retry(@RequestParam(value = "id", required = false) String id, @RequestParam(value = "service", required = false) String service) {
        return eurekaEventService.retryEvents(id, service);
    }

}
複製代碼

源碼

相關文章
相關標籤/搜索