本文主要研究一下resilience4j的bulkheadjava
resilience4j-bulkhead-0.13.0-sources.jar!/io/github/resilience4j/bulkhead/Bulkhead.javareact
/** * A Bulkhead instance is thread-safe can be used to decorate multiple requests. * * A {@link Bulkhead} represent an entity limiting the amount of parallel operations. It does not assume nor does it mandate usage * of any particular concurrency and/or io model. These details are left for the client to manage. This bulkhead, depending on the * underlying concurrency/io model can be used to shed load, and, where it makes sense, limit resource use (i.e. limit amount of * threads/actors involved in a particular flow, etc). * * In order to execute an operation protected by this bulkhead, a permission must be obtained by calling {@link Bulkhead#isCallPermitted()} * If the bulkhead is full, no additional operations will be permitted to execute until space is available. * * Once the operation is complete, regardless of the result, client needs to call {@link Bulkhead#onComplete()} in order to maintain * integrity of internal bulkhead state. * */ public interface Bulkhead { /** * Dynamic bulkhead configuration change. * NOTE! New `maxWaitTime` duration won't affect threads that are currently waiting for permission. * @param newConfig new BulkheadConfig */ void changeConfig(BulkheadConfig newConfig); /** * Attempts to acquire a permit, which allows an call to be executed. * * @return boolean whether a call should be executed */ boolean isCallPermitted(); /** * Records a completed call. */ void onComplete(); /** * Returns the name of this bulkhead. * * @return the name of this bulkhead */ String getName(); /** * Returns the BulkheadConfig of this Bulkhead. * * @return bulkhead config */ BulkheadConfig getBulkheadConfig(); /** * Get the Metrics of this Bulkhead. * * @return the Metrics of this Bulkhead */ Metrics getMetrics(); /** * Returns an EventPublisher which subscribes to the reactive stream of BulkheadEvent and * can be used to register event consumers. * * @return an EventPublisher */ EventPublisher getEventPublisher(); //...... /** * Returns a callable which is decorated by a bulkhead. * * @param bulkhead the bulkhead * @param callable the original Callable * @param <T> the result type of callable * * @return a supplier which is decorated by a Bulkhead. */ static <T> Callable<T> decorateCallable(Bulkhead bulkhead, Callable<T> callable){ return () -> { BulkheadUtils.isCallPermitted(bulkhead); try { return callable.call(); } finally { bulkhead.onComplete(); } }; } /** * Returns a supplier which is decorated by a bulkhead. * * @param bulkhead the bulkhead * @param supplier the original supplier * @param <T> the type of results supplied by this supplier * * @return a supplier which is decorated by a Bulkhead. */ static <T> Supplier<T> decorateSupplier(Bulkhead bulkhead, Supplier<T> supplier){ return () -> { BulkheadUtils.isCallPermitted(bulkhead); try { return supplier.get(); } finally { bulkhead.onComplete(); } }; } interface Metrics { /** * Returns the number of parallel executions this bulkhead can support at this point in time. * * @return remaining bulkhead depth */ int getAvailableConcurrentCalls(); } /** * An EventPublisher which can be used to register event consumers. */ interface EventPublisher extends io.github.resilience4j.core.EventPublisher<BulkheadEvent> { EventPublisher onCallRejected(EventConsumer<BulkheadOnCallRejectedEvent> eventConsumer); EventPublisher onCallPermitted(EventConsumer<BulkheadOnCallPermittedEvent> eventConsumer); EventPublisher onCallFinished(EventConsumer<BulkheadOnCallFinishedEvent> eventConsumer); } //...... }
resilience4j-bulkhead-0.13.0-sources.jar!/io/github/resilience4j/bulkhead/utils/BulkheadUtils.javagit
public final class BulkheadUtils { public static void isCallPermitted(Bulkhead bulkhead) { if(!bulkhead.isCallPermitted()) { throw new BulkheadFullException(String.format("Bulkhead '%s' is full", bulkhead.getName())); } } }
經過bulkhead.isCallPermitted()進行判斷,不經過則拋出BulkheadFullException
resilience4j-bulkhead-0.13.0-sources.jar!/io/github/resilience4j/bulkhead/internal/SemaphoreBulkhead.javagithub
/** * A Bulkhead implementation based on a semaphore. */ public class SemaphoreBulkhead implements Bulkhead { private final String name; private final Semaphore semaphore; private final Object configChangesLock = new Object(); private volatile BulkheadConfig config; private final BulkheadMetrics metrics; private final BulkheadEventProcessor eventProcessor; /** * Creates a bulkhead using a configuration supplied * * @param name the name of this bulkhead * @param bulkheadConfig custom bulkhead configuration */ public SemaphoreBulkhead(String name, BulkheadConfig bulkheadConfig) { this.name = name; this.config = bulkheadConfig != null ? bulkheadConfig : BulkheadConfig.ofDefaults(); // init semaphore this.semaphore = new Semaphore(this.config.getMaxConcurrentCalls(), true); this.metrics = new BulkheadMetrics(); this.eventProcessor = new BulkheadEventProcessor(); } /** * Creates a bulkhead with a default config. * * @param name the name of this bulkhead */ public SemaphoreBulkhead(String name) { this(name, BulkheadConfig.ofDefaults()); } /** * Create a bulkhead using a configuration supplier * * @param name the name of this bulkhead * @param configSupplier BulkheadConfig supplier */ public SemaphoreBulkhead(String name, Supplier<BulkheadConfig> configSupplier) { this(name, configSupplier.get()); } /** * {@inheritDoc} */ @Override public void changeConfig(final BulkheadConfig newConfig) { synchronized (configChangesLock) { int delta = newConfig.getMaxConcurrentCalls() - config.getMaxConcurrentCalls(); if (delta < 0) { semaphore.acquireUninterruptibly(-delta); } else if (delta > 0) { semaphore.release(delta); } config = newConfig; } } /** * {@inheritDoc} */ @Override public boolean isCallPermitted() { boolean callPermitted = tryEnterBulkhead(); publishBulkheadEvent( () -> callPermitted ? new BulkheadOnCallPermittedEvent(name) : new BulkheadOnCallRejectedEvent(name) ); return callPermitted; } /** * {@inheritDoc} */ @Override public void onComplete() { semaphore.release(); publishBulkheadEvent(() -> new BulkheadOnCallFinishedEvent(name)); } boolean tryEnterBulkhead() { boolean callPermitted = false; long timeout = config.getMaxWaitTime(); if (timeout == 0) { callPermitted = semaphore.tryAcquire(); } else { try { callPermitted = semaphore.tryAcquire(timeout, TimeUnit.MILLISECONDS); } catch (InterruptedException ex) { callPermitted = false; } } return callPermitted; } private void publishBulkheadEvent(Supplier<BulkheadEvent> eventSupplier) { if (eventProcessor.hasConsumers()) { eventProcessor.consumeEvent(eventSupplier.get()); } } private final class BulkheadMetrics implements Metrics { private BulkheadMetrics() { } @Override public int getAvailableConcurrentCalls() { return semaphore.availablePermits(); } } //...... }
resilience4j-bulkhead-0.13.0-sources.jar!/io/github/resilience4j/bulkhead/internal/SemaphoreBulkhead.java併發
private class BulkheadEventProcessor extends EventProcessor<BulkheadEvent> implements EventPublisher, EventConsumer<BulkheadEvent> { @Override public EventPublisher onCallPermitted(EventConsumer<BulkheadOnCallPermittedEvent> onCallPermittedEventConsumer) { registerConsumer(BulkheadOnCallPermittedEvent.class, onCallPermittedEventConsumer); return this; } @Override public EventPublisher onCallRejected(EventConsumer<BulkheadOnCallRejectedEvent> onCallRejectedEventConsumer) { registerConsumer(BulkheadOnCallRejectedEvent.class, onCallRejectedEventConsumer); return this; } @Override public EventPublisher onCallFinished(EventConsumer<BulkheadOnCallFinishedEvent> onCallFinishedEventConsumer) { registerConsumer(BulkheadOnCallFinishedEvent.class, onCallFinishedEventConsumer); return this; } @Override public void consumeEvent(BulkheadEvent event) { super.processEvent(event); } }