本文主要研究一下flink taskmanager的data.port與rpc.porthtml
flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.javajava
public class TaskManagerServices { //...... public static TaskManagerServices fromConfiguration( TaskManagerServicesConfiguration taskManagerServicesConfiguration, ResourceID resourceID, Executor taskIOExecutor, long freeHeapMemoryWithDefrag, long maxJvmHeapMemory) throws Exception { // pre-start checks checkTempDirs(taskManagerServicesConfiguration.getTmpDirPaths()); final NetworkEnvironment network = createNetworkEnvironment(taskManagerServicesConfiguration, maxJvmHeapMemory); network.start(); final TaskManagerLocation taskManagerLocation = new TaskManagerLocation( resourceID, taskManagerServicesConfiguration.getTaskManagerAddress(), network.getConnectionManager().getDataPort()); // this call has to happen strictly after the network stack has been initialized final MemoryManager memoryManager = createMemoryManager(taskManagerServicesConfiguration, freeHeapMemoryWithDefrag, maxJvmHeapMemory); // start the I/O manager, it will create some temp directories. final IOManager ioManager = new IOManagerAsync(taskManagerServicesConfiguration.getTmpDirPaths()); final BroadcastVariableManager broadcastVariableManager = new BroadcastVariableManager(); final List<ResourceProfile> resourceProfiles = new ArrayList<>(taskManagerServicesConfiguration.getNumberOfSlots()); for (int i = 0; i < taskManagerServicesConfiguration.getNumberOfSlots(); i++) { resourceProfiles.add(ResourceProfile.ANY); } final TimerService<AllocationID> timerService = new TimerService<>( new ScheduledThreadPoolExecutor(1), taskManagerServicesConfiguration.getTimerServiceShutdownTimeout()); final TaskSlotTable taskSlotTable = new TaskSlotTable(resourceProfiles, timerService); final JobManagerTable jobManagerTable = new JobManagerTable(); final JobLeaderService jobLeaderService = new JobLeaderService(taskManagerLocation); final String[] stateRootDirectoryStrings = taskManagerServicesConfiguration.getLocalRecoveryStateRootDirectories(); final File[] stateRootDirectoryFiles = new File[stateRootDirectoryStrings.length]; for (int i = 0; i < stateRootDirectoryStrings.length; ++i) { stateRootDirectoryFiles[i] = new File(stateRootDirectoryStrings[i], LOCAL_STATE_SUB_DIRECTORY_ROOT); } final TaskExecutorLocalStateStoresManager taskStateManager = new TaskExecutorLocalStateStoresManager( taskManagerServicesConfiguration.isLocalRecoveryEnabled(), stateRootDirectoryFiles, taskIOExecutor); return new TaskManagerServices( taskManagerLocation, memoryManager, ioManager, network, broadcastVariableManager, taskSlotTable, jobManagerTable, jobLeaderService, taskStateManager); } private static NetworkEnvironment createNetworkEnvironment( TaskManagerServicesConfiguration taskManagerServicesConfiguration, long maxJvmHeapMemory) { NetworkEnvironmentConfiguration networkEnvironmentConfiguration = taskManagerServicesConfiguration.getNetworkConfig(); final long networkBuf = calculateNetworkBufferMemory(taskManagerServicesConfiguration, maxJvmHeapMemory); int segmentSize = networkEnvironmentConfiguration.networkBufferSize(); // tolerate offcuts between intended and allocated memory due to segmentation (will be available to the user-space memory) final long numNetBuffersLong = networkBuf / segmentSize; if (numNetBuffersLong > Integer.MAX_VALUE) { throw new IllegalArgumentException("The given number of memory bytes (" + networkBuf + ") corresponds to more than MAX_INT pages."); } NetworkBufferPool networkBufferPool = new NetworkBufferPool( (int) numNetBuffersLong, segmentSize); ConnectionManager connectionManager; boolean enableCreditBased = false; NettyConfig nettyConfig = networkEnvironmentConfiguration.nettyConfig(); if (nettyConfig != null) { connectionManager = new NettyConnectionManager(nettyConfig); enableCreditBased = nettyConfig.isCreditBasedEnabled(); } else { connectionManager = new LocalConnectionManager(); } ResultPartitionManager resultPartitionManager = new ResultPartitionManager(); TaskEventDispatcher taskEventDispatcher = new TaskEventDispatcher(); KvStateRegistry kvStateRegistry = new KvStateRegistry(); QueryableStateConfiguration qsConfig = taskManagerServicesConfiguration.getQueryableStateConfig(); int numProxyServerNetworkThreads = qsConfig.numProxyServerThreads() == 0 ? taskManagerServicesConfiguration.getNumberOfSlots() : qsConfig.numProxyServerThreads(); int numProxyServerQueryThreads = qsConfig.numProxyQueryThreads() == 0 ? taskManagerServicesConfiguration.getNumberOfSlots() : qsConfig.numProxyQueryThreads(); final KvStateClientProxy kvClientProxy = QueryableStateUtils.createKvStateClientProxy( taskManagerServicesConfiguration.getTaskManagerAddress(), qsConfig.getProxyPortRange(), numProxyServerNetworkThreads, numProxyServerQueryThreads, new DisabledKvStateRequestStats()); int numStateServerNetworkThreads = qsConfig.numStateServerThreads() == 0 ? taskManagerServicesConfiguration.getNumberOfSlots() : qsConfig.numStateServerThreads(); int numStateServerQueryThreads = qsConfig.numStateQueryThreads() == 0 ? taskManagerServicesConfiguration.getNumberOfSlots() : qsConfig.numStateQueryThreads(); final KvStateServer kvStateServer = QueryableStateUtils.createKvStateServer( taskManagerServicesConfiguration.getTaskManagerAddress(), qsConfig.getStateServerPortRange(), numStateServerNetworkThreads, numStateServerQueryThreads, kvStateRegistry, new DisabledKvStateRequestStats()); // we start the network first, to make sure it can allocate its buffers first return new NetworkEnvironment( networkBufferPool, connectionManager, resultPartitionManager, taskEventDispatcher, kvStateRegistry, kvStateServer, kvClientProxy, networkEnvironmentConfiguration.ioMode(), networkEnvironmentConfiguration.partitionRequestInitialBackoff(), networkEnvironmentConfiguration.partitionRequestMaxBackoff(), networkEnvironmentConfiguration.networkBuffersPerChannel(), networkEnvironmentConfiguration.floatingNetworkBuffersPerGate(), enableCreditBased); } //...... }
它從配置文件讀取taskmanager.data.port
),若是它的nettyConfig不爲null,則根據它建立了NettyConnectionManagerflink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.javaapache
public class NettyConnectionManager implements ConnectionManager { private final NettyServer server; private final NettyClient client; private final NettyBufferPool bufferPool; private final PartitionRequestClientFactory partitionRequestClientFactory; public NettyConnectionManager(NettyConfig nettyConfig) { this.server = new NettyServer(nettyConfig); this.client = new NettyClient(nettyConfig); this.bufferPool = new NettyBufferPool(nettyConfig.getNumberOfArenas()); this.partitionRequestClientFactory = new PartitionRequestClientFactory(client); } @Override public void start(ResultPartitionProvider partitionProvider, TaskEventDispatcher taskEventDispatcher) throws IOException { NettyProtocol partitionRequestProtocol = new NettyProtocol( partitionProvider, taskEventDispatcher, client.getConfig().isCreditBasedEnabled()); client.init(partitionRequestProtocol, bufferPool); server.init(partitionRequestProtocol, bufferPool); } @Override public PartitionRequestClient createPartitionRequestClient(ConnectionID connectionId) throws IOException, InterruptedException { return partitionRequestClientFactory.createPartitionRequestClient(connectionId); } @Override public void closeOpenChannelConnections(ConnectionID connectionId) { partitionRequestClientFactory.closeOpenChannelConnections(connectionId); } @Override public int getNumberOfActiveConnections() { return partitionRequestClientFactory.getNumberOfActiveClients(); } @Override public int getDataPort() { if (server != null && server.getLocalAddress() != null) { return server.getLocalAddress().getPort(); } else { return -1; } } @Override public void shutdown() { client.shutdown(); server.shutdown(); } NettyClient getClient() { return client; } NettyServer getServer() { return server; } NettyBufferPool getBufferPool() { return bufferPool; } }
flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.javaapp
public class TaskManagerRunner implements FatalErrorHandler, AutoCloseableAsync { //...... public static RpcService createRpcService( final Configuration configuration, final HighAvailabilityServices haServices) throws Exception { checkNotNull(configuration); checkNotNull(haServices); String taskManagerHostname = configuration.getString(TaskManagerOptions.HOST); if (taskManagerHostname != null) { LOG.info("Using configured hostname/address for TaskManager: {}.", taskManagerHostname); } else { Time lookupTimeout = Time.milliseconds(AkkaUtils.getLookupTimeout(configuration).toMillis()); InetAddress taskManagerAddress = LeaderRetrievalUtils.findConnectingAddress( haServices.getResourceManagerLeaderRetriever(), lookupTimeout); taskManagerHostname = taskManagerAddress.getHostName(); LOG.info("TaskManager will use hostname/address '{}' ({}) for communication.", taskManagerHostname, taskManagerAddress.getHostAddress()); } final String portRangeDefinition = configuration.getString(TaskManagerOptions.RPC_PORT); return AkkaRpcServiceUtils.createRpcService(taskManagerHostname, portRangeDefinition, configuration); } //...... }
它從配置文件讀取taskmanager.data.port
),若是它的nettyConfig不爲null,則根據它建立了NettyConnectionManager;NettyConnectionManager的構造器根據NettyConfig構造了NettyServer,而getDataPort則取的是server.getLocalAddress().getPort()