spring cloud config與bus結合

總體流程

clipboard.png

默認gitlab的webhook更新調用config server的/monitor(spring-cloud-config-monitor)觸發RefreshRemoteApplicationEvent事件,而後spring cloud bus的StreamListener監聽RemoteApplicationEvent,經過mq發佈到每一個config client,而後client接收RemoteApplicationEvent事件來實現refresh。html

PropertyPathEndpoint/monitor

@RequiredArgsConstructor
@RestController
@RequestMapping(path = "${spring.cloud.config.monitor.endpoint.path:}/monitor")
@CommonsLog
public class PropertyPathEndpoint
        implements ApplicationEventPublisherAware, ApplicationContextAware {

@RequestMapping(method = RequestMethod.POST)
    public Set<String> notifyByPath(@RequestHeader HttpHeaders headers,
            @RequestBody Map<String, Object> request) {
        PropertyPathNotification notification = this.extractor.extract(headers, request);
        if (notification != null) {

            Set<String> services = new LinkedHashSet<>();

            for (String path : notification.getPaths()) {
                services.addAll(guessServiceName(path));
            }
            if (this.applicationEventPublisher != null) {
                for (String service : services) {
                    log.info("Refresh for: " + service);
                    this.applicationEventPublisher
                            .publishEvent(new RefreshRemoteApplicationEvent(this,
                                    this.contextId, service));
                }
                return services;
            }

        }
        return Collections.emptySet();
    }
}

MQ接收RefreshRemoteApplicationEvent發佈message(BusAutoConfiguration)

@EventListener(classes = RemoteApplicationEvent.class)
    public void acceptLocal(RemoteApplicationEvent event) {
        if (this.serviceMatcher.isFromSelf(event)
                && !(event instanceof AckRemoteApplicationEvent)) {
            this.cloudBusOutboundChannel.send(MessageBuilder.withPayload(event).build());
        }
    }

每一個client的MQ接收message

@StreamListener(SpringCloudBusClient.INPUT)
    public void acceptRemote(RemoteApplicationEvent event) {
        if (event instanceof AckRemoteApplicationEvent) {
            if (this.bus.getTrace().isEnabled() && !this.serviceMatcher.isFromSelf(event)
                    && this.applicationEventPublisher != null) {
                this.applicationEventPublisher.publishEvent(event);
            }
            // If it's an ACK we are finished processing at this point
            return;
        }
        if (this.serviceMatcher.isForSelf(event)
                && this.applicationEventPublisher != null) {
            if (!this.serviceMatcher.isFromSelf(event)) {
                this.applicationEventPublisher.publishEvent(event);
            }
            if (this.bus.getAck().isEnabled()) {
                AckRemoteApplicationEvent ack = new AckRemoteApplicationEvent(this,
                        this.serviceMatcher.getServiceId(),
                        this.bus.getAck().getDestinationService(),
                        event.getDestinationService(), event.getId(), event.getClass());
                this.cloudBusOutboundChannel
                        .send(MessageBuilder.withPayload(ack).build());
                this.applicationEventPublisher.publishEvent(ack);
            }
        }
        if (this.bus.getTrace().isEnabled() && this.applicationEventPublisher != null) {
            // We are set to register sent events so publish it for local consumption,
            // irrespective of the origin
            this.applicationEventPublisher.publishEvent(new SentApplicationEvent(this,
                    event.getOriginService(), event.getDestinationService(),
                    event.getId(), event.getClass()));
        }
    }

本地監聽RefreshListener

public class RefreshListener
        implements ApplicationListener<RefreshRemoteApplicationEvent> {

    private static Log log = LogFactory.getLog(RefreshListener.class);

    private ContextRefresher contextRefresher;

    public RefreshListener(ContextRefresher contextRefresher) {
        this.contextRefresher = contextRefresher;
    }

    @Override
    public void onApplicationEvent(RefreshRemoteApplicationEvent event) {
        Set<String> keys = contextRefresher.refresh();
        log.info("Received remote refresh request. Keys refreshed " + keys);
    }
}

docs

相關文章
相關標籤/搜索