項目中一些服務須要監聽其餘微服務的啓動信息,須要監聽到啓動後主動向其發請求拉取一些配置等。 但是eureka-client
並未提供監聽其餘服務啓動的事件,eureka-server
卻是提供了事件, 能夠在本身的eureka-server
中監聽服務啓動,監聽後發送服務啓動信息到kafka這些消息隊列,服務監聽kafka消息, 這種方式要依賴消息隊列: 源碼; 或者改造eureka-client
, 因爲須要改的代碼很少,修改源碼從新打成依賴本身維護不方便,這裏經過javassist直接修改jar裏的字節碼實現。java
public void init() {
try {
ClassPool classPool = new ClassPool(true);
//添加com.netflix.discovery包的掃描路徑
ClassClassPath classPath = new ClassClassPath(Applications.class);
classPool.insertClassPath(classPath);
//獲取要修改Application類
CtClass ctClass = classPool.get(APPLICATION_PATH);
//獲取addInstance方法
CtMethod addInstanceMethod = ctClass.getDeclaredMethod("addInstance");
//修改addInstance方法
addInstanceMethod.setBody("{instancesMap.put($1.getId(), $1);"
+ "synchronized (instances) {me.flyleft.eureka.client.event.EurekaEventHandler.getInstance().eurekaAddInstance($1);" +
"instances.remove($1);instances.add($1);isDirty = true;}}");
//獲取removeInstance方法
CtMethod removeInstanceMethod = ctClass.getDeclaredMethod("removeInstance");
//修改removeInstance方法
removeInstanceMethod.setBody("{me.flyleft.eureka.client.event.EurekaEventHandler.getInstance().eurekaRemoveInstance($1);this.removeInstance($1, true);}");
//覆蓋原有的Application類
ctClass.toClass();
//使用類加載器從新加載Application類
classPool.getClassLoader().loadClass(APPLICATION_PATH);
Class.forName(APPLICATION_PATH);
} catch (Exception e) {
throw new EurekaEventException(e);
}
}
複製代碼
@SpringBootApplication
@EnableEurekaClient
public class EurekaClientApplication {
public static void main(String[] args) {
//先執行修改字節碼代碼
EurekaEventHandler.getInstance().init();
new SpringApplicationBuilder(EurekaClientApplication.class).web(true).run(args);
}
}
複製代碼
public class EurekaEventObservable extends Observable {
public void sendEvent(EurekaEventPayload payload) {
setChanged();
notifyObservers(payload);
}
}
複製代碼
public abstract class AbstractEurekaEventObserver implements Observer, EurekaEventService {
@Override
public void update(Observable o, Object arg) {
if (arg instanceof EurekaEventPayload) {
EurekaEventPayload payload = (EurekaEventPayload) arg;
if (InstanceInfo.InstanceStatus.UP.name().equals(payload.getStatus())) {
LOGGER.info("Receive UP event, payload: {}", payload);
} else {
LOGGER.info("Receive DOWN event, payload: {}", payload);
}
putPayloadInCache(payload);
consumerEventWithAutoRetry(payload);
}
}
}
複製代碼
接收到服務啓動去執行一些操做,若是執行失敗有異常則自動重試指定次數,每一個一段事件重試一次,執行成功則再也不執行git
private void consumerEventWithAutoRetry(final EurekaEventPayload payload) {
rx.Observable.just(payload)
.map(t -> {
// 此處爲接收到服務啓動去執行的一些操做
consumerEvent(payload);
return payload;
}).retryWhen(x -> x.zipWith(rx.Observable.range(1, retryTime),
(t, retryCount) -> {
//異常處理
if (retryCount >= retryTime) {
if (t instanceof RemoteAccessException || t instanceof RestClientException) {
LOGGER.warn("error.eurekaEventObserver.fetchError, payload {}", payload, t);
} else {
LOGGER.warn("error.eurekaEventObserver.consumerError, payload {}", payload, t);
}
}
return retryCount;
}).flatMap(y -> rx.Observable.timer(retryInterval, TimeUnit.SECONDS)))
.subscribeOn(Schedulers.io())
.subscribe((EurekaEventPayload payload1) -> {
});
}
複製代碼
自動重試失敗,能夠手動重試,添加手動重試接口github
@RestController
@RequestMapping(value = "/v1/eureka/events")
public class EurekaEventEndpoint {
private EurekaEventService eurekaEventService;
public EurekaEventEndpoint(EurekaEventService eurekaEventService) {
this.eurekaEventService = eurekaEventService;
}
@Permission(permissionLogin = true)
@ApiOperation(value = "獲取未消費的事件列表")
@GetMapping
public List<EurekaEventPayload> list(@RequestParam(value = "service", required = false) String service) {
return eurekaEventService.unfinishedEvents(service);
}
@Permission(permissionLogin = true)
@ApiOperation(value = "手動重試未消費成功的事件")
@PostMapping("retry")
public List<EurekaEventPayload> retry(@RequestParam(value = "id", required = false) String id, @RequestParam(value = "service", required = false) String service) {
return eurekaEventService.retryEvents(id, service);
}
}
複製代碼