本文主要研究一下flink的ConnectionManagerhtml
flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.javajava
public interface ConnectionManager { void start(ResultPartitionProvider partitionProvider, TaskEventDispatcher taskEventDispatcher) throws IOException; /** * Creates a {@link PartitionRequestClient} instance for the given {@link ConnectionID}. */ PartitionRequestClient createPartitionRequestClient(ConnectionID connectionId) throws IOException, InterruptedException; /** * Closes opened ChannelConnections in case of a resource release. */ void closeOpenChannelConnections(ConnectionID connectionId); int getNumberOfActiveConnections(); int getDataPort(); void shutdown() throws IOException; }
flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.javaapache
public class LocalConnectionManager implements ConnectionManager { @Override public void start(ResultPartitionProvider partitionProvider, TaskEventDispatcher taskEventDispatcher) { } @Override public PartitionRequestClient createPartitionRequestClient(ConnectionID connectionId) { return null; } @Override public void closeOpenChannelConnections(ConnectionID connectionId) {} @Override public int getNumberOfActiveConnections() { return 0; } @Override public int getDataPort() { return -1; } @Override public void shutdown() {} }
flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.javaapi
public class NettyConnectionManager implements ConnectionManager { private final NettyServer server; private final NettyClient client; private final NettyBufferPool bufferPool; private final PartitionRequestClientFactory partitionRequestClientFactory; public NettyConnectionManager(NettyConfig nettyConfig) { this.server = new NettyServer(nettyConfig); this.client = new NettyClient(nettyConfig); this.bufferPool = new NettyBufferPool(nettyConfig.getNumberOfArenas()); this.partitionRequestClientFactory = new PartitionRequestClientFactory(client); } @Override public void start(ResultPartitionProvider partitionProvider, TaskEventDispatcher taskEventDispatcher) throws IOException { NettyProtocol partitionRequestProtocol = new NettyProtocol( partitionProvider, taskEventDispatcher, client.getConfig().isCreditBasedEnabled()); client.init(partitionRequestProtocol, bufferPool); server.init(partitionRequestProtocol, bufferPool); } @Override public PartitionRequestClient createPartitionRequestClient(ConnectionID connectionId) throws IOException, InterruptedException { return partitionRequestClientFactory.createPartitionRequestClient(connectionId); } @Override public void closeOpenChannelConnections(ConnectionID connectionId) { partitionRequestClientFactory.closeOpenChannelConnections(connectionId); } @Override public int getNumberOfActiveConnections() { return partitionRequestClientFactory.getNumberOfActiveClients(); } @Override public int getDataPort() { if (server != null && server.getLocalAddress() != null) { return server.getLocalAddress().getPort(); } else { return -1; } } @Override public void shutdown() { client.shutdown(); server.shutdown(); } NettyClient getClient() { return client; } NettyServer getServer() { return server; } NettyBufferPool getBufferPool() { return bufferPool; } }
flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.javaide
class PartitionRequestClientFactory { private final NettyClient nettyClient; private final ConcurrentMap<ConnectionID, Object> clients = new ConcurrentHashMap<ConnectionID, Object>(); PartitionRequestClientFactory(NettyClient nettyClient) { this.nettyClient = nettyClient; } /** * Atomically establishes a TCP connection to the given remote address and * creates a {@link PartitionRequestClient} instance for this connection. */ PartitionRequestClient createPartitionRequestClient(ConnectionID connectionId) throws IOException, InterruptedException { Object entry; PartitionRequestClient client = null; while (client == null) { entry = clients.get(connectionId); if (entry != null) { // Existing channel or connecting channel if (entry instanceof PartitionRequestClient) { client = (PartitionRequestClient) entry; } else { ConnectingChannel future = (ConnectingChannel) entry; client = future.waitForChannel(); clients.replace(connectionId, future, client); } } else { // No channel yet. Create one, but watch out for a race. // We create a "connecting future" and atomically add it to the map. // Only the thread that really added it establishes the channel. // The others need to wait on that original establisher's future. ConnectingChannel connectingChannel = new ConnectingChannel(connectionId, this); Object old = clients.putIfAbsent(connectionId, connectingChannel); if (old == null) { nettyClient.connect(connectionId.getAddress()).addListener(connectingChannel); client = connectingChannel.waitForChannel(); clients.replace(connectionId, connectingChannel, client); } else if (old instanceof ConnectingChannel) { client = ((ConnectingChannel) old).waitForChannel(); clients.replace(connectionId, old, client); } else { client = (PartitionRequestClient) old; } } // Make sure to increment the reference count before handing a client // out to ensure correct bookkeeping for channel closing. if (!client.incrementReferenceCounter()) { destroyPartitionRequestClient(connectionId, client); client = null; } } return client; } public void closeOpenChannelConnections(ConnectionID connectionId) { Object entry = clients.get(connectionId); if (entry instanceof ConnectingChannel) { ConnectingChannel channel = (ConnectingChannel) entry; if (channel.dispose()) { clients.remove(connectionId, channel); } } } int getNumberOfActiveClients() { return clients.size(); } /** * Removes the client for the given {@link ConnectionID}. */ void destroyPartitionRequestClient(ConnectionID connectionId, PartitionRequestClient client) { clients.remove(connectionId, client); } //...... }
flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.javathis
private static final class ConnectingChannel implements ChannelFutureListener { private final Object connectLock = new Object(); private final ConnectionID connectionId; private final PartitionRequestClientFactory clientFactory; private boolean disposeRequestClient = false; public ConnectingChannel(ConnectionID connectionId, PartitionRequestClientFactory clientFactory) { this.connectionId = connectionId; this.clientFactory = clientFactory; } private boolean dispose() { boolean result; synchronized (connectLock) { if (partitionRequestClient != null) { result = partitionRequestClient.disposeIfNotUsed(); } else { disposeRequestClient = true; result = true; } connectLock.notifyAll(); } return result; } private void handInChannel(Channel channel) { synchronized (connectLock) { try { NetworkClientHandler clientHandler = channel.pipeline().get(NetworkClientHandler.class); partitionRequestClient = new PartitionRequestClient( channel, clientHandler, connectionId, clientFactory); if (disposeRequestClient) { partitionRequestClient.disposeIfNotUsed(); } connectLock.notifyAll(); } catch (Throwable t) { notifyOfError(t); } } } private volatile PartitionRequestClient partitionRequestClient; private volatile Throwable error; private PartitionRequestClient waitForChannel() throws IOException, InterruptedException { synchronized (connectLock) { while (error == null && partitionRequestClient == null) { connectLock.wait(2000); } } if (error != null) { throw new IOException("Connecting the channel failed: " + error.getMessage(), error); } return partitionRequestClient; } private void notifyOfError(Throwable error) { synchronized (connectLock) { this.error = error; connectLock.notifyAll(); } } @Override public void operationComplete(ChannelFuture future) throws Exception { if (future.isSuccess()) { handInChannel(future.channel()); } else if (future.cause() != null) { notifyOfError(new RemoteTransportException( "Connecting to remote task manager + '" + connectionId.getAddress() + "' has failed. This might indicate that the remote task " + "manager has been lost.", connectionId.getAddress(), future.cause())); } else { notifyOfError(new LocalTransportException( String.format( "Connecting to remote task manager '%s' has been cancelled.", connectionId.getAddress()), null)); } } }