



 *  A Bulkhead instance is thread-safe can be used to decorate multiple requests.
 * A {@link Bulkhead} represent an entity limiting the amount of parallel operations. It does not assume nor does it mandate usage
 * of any particular concurrency and/or io model. These details are left for the client to manage. This bulkhead, depending on the
 * underlying concurrency/io model can be used to shed load, and, where it makes sense, limit resource use (i.e. limit amount of
 * threads/actors involved in a particular flow, etc).
 * In order to execute an operation protected by this bulkhead, a permission must be obtained by calling {@link Bulkhead#isCallPermitted()}
 * If the bulkhead is full, no additional operations will be permitted to execute until space is available.
 * Once the operation is complete, regardless of the result, client needs to call {@link Bulkhead#onComplete()} in order to maintain
 * integrity of internal bulkhead state.
public interface Bulkhead {

     * Dynamic bulkhead configuration change.
     * NOTE! New `maxWaitTime` duration won't affect threads that are currently waiting for permission.
     * @param newConfig new BulkheadConfig
    void changeConfig(BulkheadConfig newConfig);

     * Attempts to acquire a permit, which allows an call to be executed.
     * @return boolean whether a call should be executed
    boolean isCallPermitted();

     * Records a completed call.
    void onComplete();

     * Returns the name of this bulkhead.
     * @return the name of this bulkhead
    String getName();

     * Returns the BulkheadConfig of this Bulkhead.
     * @return bulkhead config
    BulkheadConfig getBulkheadConfig();

     * Get the Metrics of this Bulkhead.
     * @return the Metrics of this Bulkhead
    Metrics getMetrics();

     * Returns an EventPublisher which subscribes to the reactive stream of BulkheadEvent and
     * can be used to register event consumers.
     * @return an EventPublisher
    EventPublisher getEventPublisher();


     * Returns a callable which is decorated by a bulkhead.
     * @param bulkhead the bulkhead
     * @param callable the original Callable
     * @param <T> the result type of callable
     * @return a supplier which is decorated by a Bulkhead.
    static <T> Callable<T> decorateCallable(Bulkhead bulkhead, Callable<T> callable){
        return () -> {
            try {
                return callable.call();
            finally {

     * Returns a supplier which is decorated by a bulkhead.
     * @param bulkhead the bulkhead
     * @param supplier the original supplier
     * @param <T> the type of results supplied by this supplier
     * @return a supplier which is decorated by a Bulkhead.
    static <T> Supplier<T> decorateSupplier(Bulkhead bulkhead, Supplier<T> supplier){
        return () -> {
            try {
                return supplier.get();
            finally {

    interface Metrics {

         * Returns the number of parallel executions this bulkhead can support at this point in time.
         * @return remaining bulkhead depth
        int getAvailableConcurrentCalls();

     * An EventPublisher which can be used to register event consumers.
    interface EventPublisher extends io.github.resilience4j.core.EventPublisher<BulkheadEvent> {

        EventPublisher onCallRejected(EventConsumer<BulkheadOnCallRejectedEvent> eventConsumer);

        EventPublisher onCallPermitted(EventConsumer<BulkheadOnCallPermittedEvent> eventConsumer);

        EventPublisher onCallFinished(EventConsumer<BulkheadOnCallFinishedEvent> eventConsumer);

public final class BulkheadUtils {

    public static void isCallPermitted(Bulkhead bulkhead) {
        if(!bulkhead.isCallPermitted()) {
            throw new BulkheadFullException(String.format("Bulkhead '%s' is full", bulkhead.getName()));



 * A Bulkhead implementation based on a semaphore.
public class SemaphoreBulkhead implements Bulkhead {

    private final String name;
    private final Semaphore semaphore;
    private final Object configChangesLock = new Object();
    private volatile BulkheadConfig config;
    private final BulkheadMetrics metrics;
    private final BulkheadEventProcessor eventProcessor;

     * Creates a bulkhead using a configuration supplied
     * @param name           the name of this bulkhead
     * @param bulkheadConfig custom bulkhead configuration
    public SemaphoreBulkhead(String name, BulkheadConfig bulkheadConfig) {
        this.name = name;
        this.config = bulkheadConfig != null ? bulkheadConfig
                : BulkheadConfig.ofDefaults();
        // init semaphore
        this.semaphore = new Semaphore(this.config.getMaxConcurrentCalls(), true);

        this.metrics = new BulkheadMetrics();
        this.eventProcessor = new BulkheadEventProcessor();

     * Creates a bulkhead with a default config.
     * @param name the name of this bulkhead
    public SemaphoreBulkhead(String name) {
        this(name, BulkheadConfig.ofDefaults());

     * Create a bulkhead using a configuration supplier
     * @param name           the name of this bulkhead
     * @param configSupplier BulkheadConfig supplier
    public SemaphoreBulkhead(String name, Supplier<BulkheadConfig> configSupplier) {
        this(name, configSupplier.get());

     * {@inheritDoc}
    public void changeConfig(final BulkheadConfig newConfig) {
        synchronized (configChangesLock) {
            int delta =  newConfig.getMaxConcurrentCalls() - config.getMaxConcurrentCalls();
            if (delta < 0) {
            } else if (delta > 0) {
            config = newConfig;

     * {@inheritDoc}
    public boolean isCallPermitted() {

        boolean callPermitted = tryEnterBulkhead();

                () -> callPermitted ? new BulkheadOnCallPermittedEvent(name)
                        : new BulkheadOnCallRejectedEvent(name)

        return callPermitted;

     * {@inheritDoc}
    public void onComplete() {
        publishBulkheadEvent(() -> new BulkheadOnCallFinishedEvent(name));

    boolean tryEnterBulkhead() {

        boolean callPermitted = false;
        long timeout = config.getMaxWaitTime();

        if (timeout == 0) {
            callPermitted = semaphore.tryAcquire();
        } else {
            try {
                callPermitted = semaphore.tryAcquire(timeout, TimeUnit.MILLISECONDS);
            } catch (InterruptedException ex) {
                callPermitted = false;
        return callPermitted;

    private void publishBulkheadEvent(Supplier<BulkheadEvent> eventSupplier) {
        if (eventProcessor.hasConsumers()) {

    private final class BulkheadMetrics implements Metrics {
        private BulkheadMetrics() {

        public int getAvailableConcurrentCalls() {
            return semaphore.availablePermits();

private class BulkheadEventProcessor extends EventProcessor<BulkheadEvent> implements EventPublisher, EventConsumer<BulkheadEvent> {

        public EventPublisher onCallPermitted(EventConsumer<BulkheadOnCallPermittedEvent> onCallPermittedEventConsumer) {
            registerConsumer(BulkheadOnCallPermittedEvent.class, onCallPermittedEventConsumer);
            return this;

        public EventPublisher onCallRejected(EventConsumer<BulkheadOnCallRejectedEvent> onCallRejectedEventConsumer) {
            registerConsumer(BulkheadOnCallRejectedEvent.class, onCallRejectedEventConsumer);
            return this;

        public EventPublisher onCallFinished(EventConsumer<BulkheadOnCallFinishedEvent> onCallFinishedEventConsumer) {
            registerConsumer(BulkheadOnCallFinishedEvent.class, onCallFinishedEventConsumer);
            return this;

        public void consumeEvent(BulkheadEvent event) {
