聊聊flink taskmanager的data.port與rpc.port

本文主要研究一下flink taskmanager的data.port與rpc.porthtml

TaskManagerServices

flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.javajava

public class TaskManagerServices {
	//......

	public static TaskManagerServices fromConfiguration(
			TaskManagerServicesConfiguration taskManagerServicesConfiguration,
			ResourceID resourceID,
			Executor taskIOExecutor,
			long freeHeapMemoryWithDefrag,
			long maxJvmHeapMemory) throws Exception {

		// pre-start checks
		checkTempDirs(taskManagerServicesConfiguration.getTmpDirPaths());

		final NetworkEnvironment network = createNetworkEnvironment(taskManagerServicesConfiguration, maxJvmHeapMemory);
		network.start();

		final TaskManagerLocation taskManagerLocation = new TaskManagerLocation(
			resourceID,
			taskManagerServicesConfiguration.getTaskManagerAddress(),
			network.getConnectionManager().getDataPort());

		// this call has to happen strictly after the network stack has been initialized
		final MemoryManager memoryManager = createMemoryManager(taskManagerServicesConfiguration, freeHeapMemoryWithDefrag, maxJvmHeapMemory);

		// start the I/O manager, it will create some temp directories.
		final IOManager ioManager = new IOManagerAsync(taskManagerServicesConfiguration.getTmpDirPaths());

		final BroadcastVariableManager broadcastVariableManager = new BroadcastVariableManager();

		final List<ResourceProfile> resourceProfiles = new ArrayList<>(taskManagerServicesConfiguration.getNumberOfSlots());

		for (int i = 0; i < taskManagerServicesConfiguration.getNumberOfSlots(); i++) {
			resourceProfiles.add(ResourceProfile.ANY);
		}

		final TimerService<AllocationID> timerService = new TimerService<>(
			new ScheduledThreadPoolExecutor(1),
			taskManagerServicesConfiguration.getTimerServiceShutdownTimeout());

		final TaskSlotTable taskSlotTable = new TaskSlotTable(resourceProfiles, timerService);

		final JobManagerTable jobManagerTable = new JobManagerTable();

		final JobLeaderService jobLeaderService = new JobLeaderService(taskManagerLocation);

		final String[] stateRootDirectoryStrings = taskManagerServicesConfiguration.getLocalRecoveryStateRootDirectories();

		final File[] stateRootDirectoryFiles = new File[stateRootDirectoryStrings.length];

		for (int i = 0; i < stateRootDirectoryStrings.length; ++i) {
			stateRootDirectoryFiles[i] = new File(stateRootDirectoryStrings[i], LOCAL_STATE_SUB_DIRECTORY_ROOT);
		}

		final TaskExecutorLocalStateStoresManager taskStateManager = new TaskExecutorLocalStateStoresManager(
			taskManagerServicesConfiguration.isLocalRecoveryEnabled(),
			stateRootDirectoryFiles,
			taskIOExecutor);

		return new TaskManagerServices(
			taskManagerLocation,
			memoryManager,
			ioManager,
			network,
			broadcastVariableManager,
			taskSlotTable,
			jobManagerTable,
			jobLeaderService,
			taskStateManager);
	}

	private static NetworkEnvironment createNetworkEnvironment(
			TaskManagerServicesConfiguration taskManagerServicesConfiguration,
			long maxJvmHeapMemory) {

		NetworkEnvironmentConfiguration networkEnvironmentConfiguration = taskManagerServicesConfiguration.getNetworkConfig();

		final long networkBuf = calculateNetworkBufferMemory(taskManagerServicesConfiguration, maxJvmHeapMemory);
		int segmentSize = networkEnvironmentConfiguration.networkBufferSize();

		// tolerate offcuts between intended and allocated memory due to segmentation (will be available to the user-space memory)
		final long numNetBuffersLong = networkBuf / segmentSize;
		if (numNetBuffersLong > Integer.MAX_VALUE) {
			throw new IllegalArgumentException("The given number of memory bytes (" + networkBuf
				+ ") corresponds to more than MAX_INT pages.");
		}

		NetworkBufferPool networkBufferPool = new NetworkBufferPool(
			(int) numNetBuffersLong,
			segmentSize);

		ConnectionManager connectionManager;
		boolean enableCreditBased = false;
		NettyConfig nettyConfig = networkEnvironmentConfiguration.nettyConfig();
		if (nettyConfig != null) {
			connectionManager = new NettyConnectionManager(nettyConfig);
			enableCreditBased = nettyConfig.isCreditBasedEnabled();
		} else {
			connectionManager = new LocalConnectionManager();
		}

		ResultPartitionManager resultPartitionManager = new ResultPartitionManager();
		TaskEventDispatcher taskEventDispatcher = new TaskEventDispatcher();

		KvStateRegistry kvStateRegistry = new KvStateRegistry();

		QueryableStateConfiguration qsConfig = taskManagerServicesConfiguration.getQueryableStateConfig();

		int numProxyServerNetworkThreads = qsConfig.numProxyServerThreads() == 0 ?
				taskManagerServicesConfiguration.getNumberOfSlots() : qsConfig.numProxyServerThreads();

		int numProxyServerQueryThreads = qsConfig.numProxyQueryThreads() == 0 ?
				taskManagerServicesConfiguration.getNumberOfSlots() : qsConfig.numProxyQueryThreads();

		final KvStateClientProxy kvClientProxy = QueryableStateUtils.createKvStateClientProxy(
				taskManagerServicesConfiguration.getTaskManagerAddress(),
				qsConfig.getProxyPortRange(),
				numProxyServerNetworkThreads,
				numProxyServerQueryThreads,
				new DisabledKvStateRequestStats());

		int numStateServerNetworkThreads = qsConfig.numStateServerThreads() == 0 ?
				taskManagerServicesConfiguration.getNumberOfSlots() : qsConfig.numStateServerThreads();

		int numStateServerQueryThreads = qsConfig.numStateQueryThreads() == 0 ?
				taskManagerServicesConfiguration.getNumberOfSlots() : qsConfig.numStateQueryThreads();

		final KvStateServer kvStateServer = QueryableStateUtils.createKvStateServer(
				taskManagerServicesConfiguration.getTaskManagerAddress(),
				qsConfig.getStateServerPortRange(),
				numStateServerNetworkThreads,
				numStateServerQueryThreads,
				kvStateRegistry,
				new DisabledKvStateRequestStats());

		// we start the network first, to make sure it can allocate its buffers first
		return new NetworkEnvironment(
			networkBufferPool,
			connectionManager,
			resultPartitionManager,
			taskEventDispatcher,
			kvStateRegistry,
			kvStateServer,
			kvClientProxy,
			networkEnvironmentConfiguration.ioMode(),
			networkEnvironmentConfiguration.partitionRequestInitialBackoff(),
			networkEnvironmentConfiguration.partitionRequestMaxBackoff(),
			networkEnvironmentConfiguration.networkBuffersPerChannel(),
			networkEnvironmentConfiguration.floatingNetworkBuffersPerGate(),
			enableCreditBased);
	}

	//......
}
  • TaskManagerServices的fromConfiguration方法從taskManagerServicesConfiguration讀取配置,而後建立NetworkEnvironment,以後建立TaskManagerLocation用到了NetworkEnvironment.getConnectionManager().getDataPort()
  • TaskExecutorToResourceManagerConnection及ConnectionID均從TaskManagerLocation獲取了dataPort信息
  • createNetworkEnvironment方法從taskManagerServicesConfiguration獲取NetworkEnvironmentConfiguration(它從配置文件讀取taskmanager.data.port),若是它的nettyConfig不爲null,則根據它建立了NettyConnectionManager

NettyConnectionManager

flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.javaapache

public class NettyConnectionManager implements ConnectionManager {

	private final NettyServer server;

	private final NettyClient client;

	private final NettyBufferPool bufferPool;

	private final PartitionRequestClientFactory partitionRequestClientFactory;

	public NettyConnectionManager(NettyConfig nettyConfig) {
		this.server = new NettyServer(nettyConfig);
		this.client = new NettyClient(nettyConfig);
		this.bufferPool = new NettyBufferPool(nettyConfig.getNumberOfArenas());

		this.partitionRequestClientFactory = new PartitionRequestClientFactory(client);
	}

	@Override
	public void start(ResultPartitionProvider partitionProvider, TaskEventDispatcher taskEventDispatcher) throws IOException {
		NettyProtocol partitionRequestProtocol = new NettyProtocol(
			partitionProvider,
			taskEventDispatcher,
			client.getConfig().isCreditBasedEnabled());

		client.init(partitionRequestProtocol, bufferPool);
		server.init(partitionRequestProtocol, bufferPool);
	}

	@Override
	public PartitionRequestClient createPartitionRequestClient(ConnectionID connectionId)
			throws IOException, InterruptedException {
		return partitionRequestClientFactory.createPartitionRequestClient(connectionId);
	}

	@Override
	public void closeOpenChannelConnections(ConnectionID connectionId) {
		partitionRequestClientFactory.closeOpenChannelConnections(connectionId);
	}

	@Override
	public int getNumberOfActiveConnections() {
		return partitionRequestClientFactory.getNumberOfActiveClients();
	}

	@Override
	public int getDataPort() {
		if (server != null && server.getLocalAddress() != null) {
			return server.getLocalAddress().getPort();
		} else {
			return -1;
		}
	}

	@Override
	public void shutdown() {
		client.shutdown();
		server.shutdown();
	}

	NettyClient getClient() {
		return client;
	}

	NettyServer getServer() {
		return server;
	}

	NettyBufferPool getBufferPool() {
		return bufferPool;
	}
}
  • NettyConnectionManager的構造器根據NettyConfig構造了NettyServer,而getDataPort則取的是server.getLocalAddress().getPort()

TaskManagerRunner

flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.javaapp

public class TaskManagerRunner implements FatalErrorHandler, AutoCloseableAsync {
	//......

	public static RpcService createRpcService(
		final Configuration configuration,
		final HighAvailabilityServices haServices) throws Exception {

		checkNotNull(configuration);
		checkNotNull(haServices);

		String taskManagerHostname = configuration.getString(TaskManagerOptions.HOST);

		if (taskManagerHostname != null) {
			LOG.info("Using configured hostname/address for TaskManager: {}.", taskManagerHostname);
		} else {
			Time lookupTimeout = Time.milliseconds(AkkaUtils.getLookupTimeout(configuration).toMillis());

			InetAddress taskManagerAddress = LeaderRetrievalUtils.findConnectingAddress(
				haServices.getResourceManagerLeaderRetriever(),
				lookupTimeout);

			taskManagerHostname = taskManagerAddress.getHostName();

			LOG.info("TaskManager will use hostname/address '{}' ({}) for communication.",
				taskManagerHostname, taskManagerAddress.getHostAddress());
		}

		final String portRangeDefinition = configuration.getString(TaskManagerOptions.RPC_PORT);
		return AkkaRpcServiceUtils.createRpcService(taskManagerHostname, portRangeDefinition, configuration);
	}

	//......
}
  • TaskManagerRunner提供了createRpcService方法,其從配置文件讀取taskmanager.rpc.port,而後調用AkkaRpcServiceUtils.createRpcService來建立RpcService

小結

  • TaskManagerServices的fromConfiguration方法從taskManagerServicesConfiguration讀取配置,而後建立NetworkEnvironment,以後建立TaskManagerLocation用到了NetworkEnvironment.getConnectionManager().getDataPort();TaskExecutorToResourceManagerConnection及ConnectionID均從TaskManagerLocation獲取了dataPort信息
  • TaskManagerServices的createNetworkEnvironment方法從taskManagerServicesConfiguration獲取NetworkEnvironmentConfiguration(它從配置文件讀取taskmanager.data.port),若是它的nettyConfig不爲null,則根據它建立了NettyConnectionManager;NettyConnectionManager的構造器根據NettyConfig構造了NettyServer,而getDataPort則取的是server.getLocalAddress().getPort()
  • TaskManagerRunner提供了createRpcService方法,其從配置文件讀取taskmanager.rpc.port,而後調用AkkaRpcServiceUtils.createRpcService來建立RpcService

doc

相關文章
相關標籤/搜索