<dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-bus</artifactId> </dependency>
@Configuration public class EventConfig { @Bean Environment environment() { return Environment.initializeIfEmpty().assignErrorJournal(); } @Bean @Autowired public EventBus eventBus(Environment environment, MyEventListener myEventListener) { EventBus eventBus = EventBus.create(environment, Environment.THREAD_POOL); eventBus.on($("myevent"), myEventListener); return eventBus; } }
env的默認配置從reactor-core-2.0.8.RELEASE.jar!/META-INF/reactor/reactor-environment.properties中讀取java
# # Copyright (c) 2011-2015 Pivotal Software Inc., Inc. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ## # Dispatcher configuration # # Each dispatcher must be configured with a type: # # reactor.dispatchers.<name>.type = <type> # # Legal values for <type> are dispatcherGroup, ringBuffer, synchronous, and workQueue. # Depending on the type, further configuration is be possible: # # reactor.dispatchers.<name>.size: dispatcherGroup and workQueue Dispatchers # reactor.dispatchers.<name>.backlog: dispatcherGroup, ringBuffer, and workQueue Dispatchers # # A size less than 1 may be specified to indicate that the size should be the same as the number # of CPUs. # A thread pool executor dispatcher, named threadPoolExecutor reactor.dispatchers.threadPoolExecutor.type = threadPoolExecutor reactor.dispatchers.threadPoolExecutor.size = 0 # Backlog is how many Task objects to warm up internally reactor.dispatchers.threadPoolExecutor.backlog = 2048 # A group of dispatchers replicated from the default dispatcher, named dispatcherGroup reactor.dispatchers.dispatcherGroup.type = dispatcherGroup reactor.dispatchers.dispatcherGroup.size = 0 reactor.dispatchers.dispatcherGroup.backlog = 2048 # A ring buffer dispatcher, named ringBuffer reactor.dispatchers.shared.type = ringBuffer reactor.dispatchers.shared.backlog = 8192 # A work queue dispatcher, named workQueue reactor.dispatchers.workQueue.type = workQueue reactor.dispatchers.workQueue.size = 0 reactor.dispatchers.workQueue.backlog = 2048 # The dispatcher named shared should be the default dispatcher reactor.dispatchers.default = shared
@Component public class MyEventListener implements Consumer<Event<MyEvent>> { private static final Logger LOGGER = LoggerFactory.getLogger(MyEventListener.class); @Override public void accept(Event<MyEvent> eventContextEvent) { MyEvent event = eventContextEvent.getData(); LOGGER.info("thread {} ,receive event:{}",Thread.currentThread().getName(),event.getData()); } }
@Autowired EventBus eventBus; public void publishEvent(String data){ eventBus.notify("myevent", Event.wrap(new MyEvent(data))); }
reactor-core-2.0.8.RELEASE-sources.jar!/reactor/core/dispatch/ThreadPoolExecutorDispatcher.java
在reactor-core-2.0.8.RELEASE-sources.jar!/reactor/Environment.java建立默認的ThreadPoolExecutorDispatcherreact
private static ThreadPoolExecutorDispatcher createThreadPoolExecutorDispatcher(DispatcherConfiguration dispatcherConfiguration) { int size = getSize(dispatcherConfiguration, 0); int backlog = getBacklog(dispatcherConfiguration, 128); return new ThreadPoolExecutorDispatcher(size, backlog, dispatcherConfiguration.getName()); }
構造器spring
/** * Create a new {@literal ThreadPoolExecutorDispatcher} with the given size, backlog, name, and {@link * java.util.concurrent.RejectedExecutionHandler}. * * @param poolSize * the pool size * @param backlog * the backlog size * @param threadName * the name prefix to use when creating threads */ public ThreadPoolExecutorDispatcher(int poolSize, int backlog, String threadName) { this(poolSize, backlog, threadName, new LinkedBlockingQueue<Runnable>(backlog), new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { r.run(); } }); }
默認採用的是LinkedBlockingQueue,大小爲配置文件指定的backlog,RejectedExecutionHandler採用的是調用者執行。express
/** * A handler for rejected tasks that runs the rejected task * directly in the calling thread of the {@code execute} method, * unless the executor has been shut down, in which case the task * is discarded. */ public static class CallerRunsPolicy implements RejectedExecutionHandler { /** * Creates a {@code CallerRunsPolicy}. */ public CallerRunsPolicy() { } /** * Executes task r in the caller's thread, unless the executor * has been shut down, in which case the task is discarded. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } } }
所以,隊列沒有滿的時候是異步的,滿的時候就阻塞了。apache