本文主要研究一下Elasticsearch的RoundRobinSupplierjava
elasticsearch-7.0.1/libs/nio/src/main/java/org/elasticsearch/nio/RoundRobinSupplier.javagit
final class RoundRobinSupplier<S> implements Supplier<S> { private final AtomicBoolean selectorsSet = new AtomicBoolean(false); private volatile S[] selectors; private AtomicInteger counter = new AtomicInteger(0); RoundRobinSupplier() { this.selectors = null; } RoundRobinSupplier(S[] selectors) { this.selectors = selectors; this.selectorsSet.set(true); } @Override public S get() { S[] selectors = this.selectors; return selectors[counter.getAndIncrement() % selectors.length]; } void setSelectors(S[] selectors) { if (selectorsSet.compareAndSet(false, true)) { this.selectors = selectors; } else { throw new AssertionError("Selectors already set. Should only be set once."); } } int count() { return selectors.length; } }
counter.getAndIncrement() % selectors.length
來選擇selectors數組的下標,而後返回該下標的值elasticsearch-7.0.1/libs/nio/src/main/java/org/elasticsearch/nio/NioSelectorGroup.javagithub
public class NioSelectorGroup implements NioGroup { private final List<NioSelector> dedicatedAcceptors; private final RoundRobinSupplier<NioSelector> acceptorSupplier; private final List<NioSelector> selectors; private final RoundRobinSupplier<NioSelector> selectorSupplier; private final AtomicBoolean isOpen = new AtomicBoolean(true); //...... public NioSelectorGroup(ThreadFactory acceptorThreadFactory, int dedicatedAcceptorCount, ThreadFactory selectorThreadFactory, int selectorCount, Function<Supplier<NioSelector>, EventHandler> eventHandlerFunction) throws IOException { dedicatedAcceptors = new ArrayList<>(dedicatedAcceptorCount); selectors = new ArrayList<>(selectorCount); try { List<RoundRobinSupplier<NioSelector>> suppliersToSet = new ArrayList<>(selectorCount); for (int i = 0; i < selectorCount; ++i) { RoundRobinSupplier<NioSelector> supplier = new RoundRobinSupplier<>(); suppliersToSet.add(supplier); NioSelector selector = new NioSelector(eventHandlerFunction.apply(supplier)); selectors.add(selector); } for (RoundRobinSupplier<NioSelector> supplierToSet : suppliersToSet) { supplierToSet.setSelectors(selectors.toArray(new NioSelector[0])); assert supplierToSet.count() == selectors.size() : "Supplier should have same count as selector list."; } for (int i = 0; i < dedicatedAcceptorCount; ++i) { RoundRobinSupplier<NioSelector> supplier = new RoundRobinSupplier<>(selectors.toArray(new NioSelector[0])); NioSelector acceptor = new NioSelector(eventHandlerFunction.apply(supplier)); dedicatedAcceptors.add(acceptor); } if (dedicatedAcceptorCount != 0) { acceptorSupplier = new RoundRobinSupplier<>(dedicatedAcceptors.toArray(new NioSelector[0])); } else { acceptorSupplier = new RoundRobinSupplier<>(selectors.toArray(new NioSelector[0])); } selectorSupplier = new RoundRobinSupplier<>(selectors.toArray(new NioSelector[0])); assert selectorCount == selectors.size() : "We need to have created all the selectors at this point."; assert dedicatedAcceptorCount == dedicatedAcceptors.size() : "We need to have created all the acceptors at this point."; startSelectors(selectors, selectorThreadFactory); startSelectors(dedicatedAcceptors, acceptorThreadFactory); } catch (Exception e) { try { close(); } catch (Exception e1) { e.addSuppressed(e1); } throw e; } } public <S extends NioServerSocketChannel> S bindServerChannel(InetSocketAddress address, ChannelFactory<S, ?> factory) throws IOException { ensureOpen(); return factory.openNioServerSocketChannel(address, acceptorSupplier); } @Override public <S extends NioSocketChannel> S openChannel(InetSocketAddress address, ChannelFactory<?, S> factory) throws IOException { ensureOpen(); return factory.openNioChannel(address, selectorSupplier); } //...... }
elasticsearch-7.0.1/libs/nio/src/main/java/org/elasticsearch/nio/ChannelFactory.java數組
public abstract class ChannelFactory<ServerSocket extends NioServerSocketChannel, Socket extends NioSocketChannel> { //...... public ServerSocket openNioServerSocketChannel(InetSocketAddress address, Supplier<NioSelector> supplier) throws IOException { ServerSocketChannel rawChannel = rawChannelFactory.openNioServerSocketChannel(address); NioSelector selector = supplier.get(); ServerSocket serverChannel = internalCreateServerChannel(selector, rawChannel); scheduleServerChannel(serverChannel, selector); return serverChannel; } public Socket openNioChannel(InetSocketAddress remoteAddress, Supplier<NioSelector> supplier) throws IOException { SocketChannel rawChannel = rawChannelFactory.openNioChannel(remoteAddress); NioSelector selector = supplier.get(); Socket channel = internalCreateChannel(selector, rawChannel); scheduleChannel(channel, selector); return channel; } //...... }
counter.getAndIncrement() % selectors.length
來選擇selectors數組的下標,而後返回該下標的值