本文主要研究一下flink TaskManager的memory大小設置html
flink-release-1.7.2/flink-dist/src/main/resources/flink-conf.yamljava
# The heap size for the TaskManager JVM taskmanager.heap.size: 1024m # The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline. taskmanager.numberOfTaskSlots: 1 # Specify whether TaskManager's managed memory should be allocated when starting # up (true) or when memory is requested. # # We recommend to set this value to 'true' only in setups for pure batch # processing (DataSet API). Streaming setups currently do not use the TaskManager's # managed memory: The 'rocksdb' state backend uses RocksDB's own memory management, # while the 'memory' and 'filesystem' backends explicitly keep data as objects # to save on serialization cost. # # taskmanager.memory.preallocate: false # The amount of memory going to the network stack. These numbers usually need # no tuning. Adjusting them may be necessary in case of an "Insufficient number # of network buffers" error. The default min is 64MB, teh default max is 1GB. # # taskmanager.network.memory.fraction: 0.1 # taskmanager.network.memory.min: 64mb # taskmanager.network.memory.max: 1gb
heap及offHeap
)大小taskmanager.memory.fraction、taskmanager.memory.off-heap、taskmanager.memory.preallocate、taskmanager.memory.segment-size、taskmanager.memory.size
)用於設置memorytaskmanager.network.detailed-metrics、taskmanager.network.memory.buffers-per-channel、taskmanager.network.memory.floating-buffers-per-gate、taskmanager.network.memory.fraction、taskmanager.network.memory.max、taskmanager.network.memory.min
)用於設置taskmanager的network stack的內存flink-release-1.7.2/flink-dist/src/main/flink-bin/bin/config.shnode
#!/usr/bin/env bash # WARNING !!! , these values are only used if there is nothing else is specified in # conf/flink-conf.yaml DEFAULT_ENV_PID_DIR="/tmp" # Directory to store *.pid files to DEFAULT_ENV_LOG_MAX=5 # Maximum number of old log files to keep DEFAULT_ENV_JAVA_OPTS="" # Optional JVM args DEFAULT_ENV_JAVA_OPTS_JM="" # Optional JVM args (JobManager) DEFAULT_ENV_JAVA_OPTS_TM="" # Optional JVM args (TaskManager) DEFAULT_ENV_JAVA_OPTS_HS="" # Optional JVM args (HistoryServer) DEFAULT_ENV_SSH_OPTS="" # Optional SSH parameters running in cluster mode DEFAULT_YARN_CONF_DIR="" # YARN Configuration Directory, if necessary DEFAULT_HADOOP_CONF_DIR="" # Hadoop Configuration Directory, if necessary KEY_TASKM_MEM_SIZE="taskmanager.heap.size" KEY_TASKM_MEM_MB="taskmanager.heap.mb" KEY_TASKM_MEM_MANAGED_SIZE="taskmanager.memory.size" KEY_TASKM_MEM_MANAGED_FRACTION="taskmanager.memory.fraction" KEY_TASKM_OFFHEAP="taskmanager.memory.off-heap" KEY_TASKM_MEM_PRE_ALLOCATE="taskmanager.memory.preallocate" KEY_TASKM_NET_BUF_FRACTION="taskmanager.network.memory.fraction" KEY_TASKM_NET_BUF_MIN="taskmanager.network.memory.min" KEY_TASKM_NET_BUF_MAX="taskmanager.network.memory.max" KEY_TASKM_NET_BUF_NR="taskmanager.network.numberOfBuffers" # fallback KEY_TASKM_COMPUTE_NUMA="taskmanager.compute.numa" # Define FLINK_TM_HEAP if it is not already set if [ -z "${FLINK_TM_HEAP}" ]; then FLINK_TM_HEAP=$(readFromConfig ${KEY_TASKM_MEM_SIZE} 0 "${YAML_CONF}") fi # Try read old config key, if new key not exists if [ "${FLINK_TM_HEAP}" == 0 ]; then FLINK_TM_HEAP_MB=$(readFromConfig ${KEY_TASKM_MEM_MB} 0 "${YAML_CONF}") fi # Define FLINK_TM_MEM_MANAGED_SIZE if it is not already set if [ -z "${FLINK_TM_MEM_MANAGED_SIZE}" ]; then FLINK_TM_MEM_MANAGED_SIZE=$(readFromConfig ${KEY_TASKM_MEM_MANAGED_SIZE} 0 "${YAML_CONF}") if hasUnit ${FLINK_TM_MEM_MANAGED_SIZE}; then FLINK_TM_MEM_MANAGED_SIZE=$(getMebiBytes $(parseBytes ${FLINK_TM_MEM_MANAGED_SIZE})) else FLINK_TM_MEM_MANAGED_SIZE=$(getMebiBytes $(parseBytes ${FLINK_TM_MEM_MANAGED_SIZE}"m")) fi fi # Define FLINK_TM_MEM_MANAGED_FRACTION if it is not already set if [ -z "${FLINK_TM_MEM_MANAGED_FRACTION}" ]; then FLINK_TM_MEM_MANAGED_FRACTION=$(readFromConfig ${KEY_TASKM_MEM_MANAGED_FRACTION} 0.7 "${YAML_CONF}") fi # Define FLINK_TM_OFFHEAP if it is not already set if [ -z "${FLINK_TM_OFFHEAP}" ]; then FLINK_TM_OFFHEAP=$(readFromConfig ${KEY_TASKM_OFFHEAP} "false" "${YAML_CONF}") fi # Define FLINK_TM_MEM_PRE_ALLOCATE if it is not already set if [ -z "${FLINK_TM_MEM_PRE_ALLOCATE}" ]; then FLINK_TM_MEM_PRE_ALLOCATE=$(readFromConfig ${KEY_TASKM_MEM_PRE_ALLOCATE} "false" "${YAML_CONF}") fi # Define FLINK_TM_NET_BUF_FRACTION if it is not already set if [ -z "${FLINK_TM_NET_BUF_FRACTION}" ]; then FLINK_TM_NET_BUF_FRACTION=$(readFromConfig ${KEY_TASKM_NET_BUF_FRACTION} 0.1 "${YAML_CONF}") fi # Define FLINK_TM_NET_BUF_MIN and FLINK_TM_NET_BUF_MAX if not already set (as a fallback) if [ -z "${FLINK_TM_NET_BUF_MIN}" -a -z "${FLINK_TM_NET_BUF_MAX}" ]; then FLINK_TM_NET_BUF_MIN=$(readFromConfig ${KEY_TASKM_NET_BUF_NR} -1 "${YAML_CONF}") if [ $FLINK_TM_NET_BUF_MIN != -1 ]; then FLINK_TM_NET_BUF_MIN=$(parseBytes ${FLINK_TM_NET_BUF_MIN}) FLINK_TM_NET_BUF_MAX=${FLINK_TM_NET_BUF_MIN} fi fi # Define FLINK_TM_NET_BUF_MIN if it is not already set if [ -z "${FLINK_TM_NET_BUF_MIN}" -o "${FLINK_TM_NET_BUF_MIN}" = "-1" ]; then # default: 64MB = 67108864 bytes (same as the previous default with 2048 buffers of 32k each) FLINK_TM_NET_BUF_MIN=$(readFromConfig ${KEY_TASKM_NET_BUF_MIN} 67108864 "${YAML_CONF}") FLINK_TM_NET_BUF_MIN=$(parseBytes ${FLINK_TM_NET_BUF_MIN}) fi # Define FLINK_TM_NET_BUF_MAX if it is not already set if [ -z "${FLINK_TM_NET_BUF_MAX}" -o "${FLINK_TM_NET_BUF_MAX}" = "-1" ]; then # default: 1GB = 1073741824 bytes FLINK_TM_NET_BUF_MAX=$(readFromConfig ${KEY_TASKM_NET_BUF_MAX} 1073741824 "${YAML_CONF}") FLINK_TM_NET_BUF_MAX=$(parseBytes ${FLINK_TM_NET_BUF_MAX}) fi
flink-release-1.7.2/flink-dist/src/main/flink-bin/bin/taskmanager.shapache
#!/usr/bin/env bash # Start/stop a Flink TaskManager. USAGE="Usage: taskmanager.sh (start|start-foreground|stop|stop-all)" STARTSTOP=$1 ARGS=("${@:2}") if [[ $STARTSTOP != "start" ]] && [[ $STARTSTOP != "start-foreground" ]] && [[ $STARTSTOP != "stop" ]] && [[ $STARTSTOP != "stop-all" ]]; then echo $USAGE exit 1 fi bin=`dirname "$0"` bin=`cd "$bin"; pwd` . "$bin"/config.sh ENTRYPOINT=taskexecutor if [[ $STARTSTOP == "start" ]] || [[ $STARTSTOP == "start-foreground" ]]; then # if memory allocation mode is lazy and no other JVM options are set, # set the 'Concurrent Mark Sweep GC' if [[ $FLINK_TM_MEM_PRE_ALLOCATE == "false" ]] && [ -z "${FLINK_ENV_JAVA_OPTS}" ] && [ -z "${FLINK_ENV_JAVA_OPTS_TM}" ]; then export JVM_ARGS="$JVM_ARGS -XX:+UseG1GC" fi if [ ! -z "${FLINK_TM_HEAP_MB}" ] && [ "${FLINK_TM_HEAP}" == 0 ]; then echo "used deprecated key \`${KEY_TASKM_MEM_MB}\`, please replace with key \`${KEY_TASKM_MEM_SIZE}\`" else flink_tm_heap_bytes=$(parseBytes ${FLINK_TM_HEAP}) FLINK_TM_HEAP_MB=$(getMebiBytes ${flink_tm_heap_bytes}) fi if [[ ! ${FLINK_TM_HEAP_MB} =~ ${IS_NUMBER} ]] || [[ "${FLINK_TM_HEAP_MB}" -lt "0" ]]; then echo "[ERROR] Configured TaskManager JVM heap size is not a number. Please set '${KEY_TASKM_MEM_SIZE}' in ${FLINK_CONF_FILE}." exit 1 fi if [ "${FLINK_TM_HEAP_MB}" -gt "0" ]; then TM_HEAP_SIZE=$(calculateTaskManagerHeapSizeMB) # Long.MAX_VALUE in TB: This is an upper bound, much less direct memory will be used TM_MAX_OFFHEAP_SIZE="8388607T" export JVM_ARGS="${JVM_ARGS} -Xms${TM_HEAP_SIZE}M -Xmx${TM_HEAP_SIZE}M -XX:MaxDirectMemorySize=${TM_MAX_OFFHEAP_SIZE}" fi # Add TaskManager-specific JVM options export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_TM}" # Startup parameters ARGS+=("--configDir" "${FLINK_CONF_DIR}") fi if [[ $STARTSTOP == "start-foreground" ]]; then exec "${FLINK_BIN_DIR}"/flink-console.sh $ENTRYPOINT "${ARGS[@]}" else if [[ $FLINK_TM_COMPUTE_NUMA == "false" ]]; then # Start a single TaskManager "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $ENTRYPOINT "${ARGS[@]}" else # Example output from `numactl --show` on an AWS c4.8xlarge: # policy: default # preferred node: current # physcpubind: 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 # cpubind: 0 1 # nodebind: 0 1 # membind: 0 1 read -ra NODE_LIST <<< $(numactl --show | grep "^nodebind: ") for NODE_ID in "${NODE_LIST[@]:1}"; do # Start a TaskManager for each NUMA node numactl --membind=$NODE_ID --cpunodebind=$NODE_ID -- "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $ENTRYPOINT "${ARGS[@]}" done fi fi
flink-runtime_2.11-1.7.2-sources.jar!/org/apache/flink/runtime/taskexecutor/TaskManagerServices.javabash
public class TaskManagerServices { //...... /** * Calculates the amount of heap memory to use (to set via <tt>-Xmx</tt> and <tt>-Xms</tt>) * based on the total memory to use and the given configuration parameters. * * @param totalJavaMemorySizeMB * overall available memory to use (heap and off-heap) * @param config * configuration object * * @return heap memory to use (in megabytes) */ public static long calculateHeapSizeMB(long totalJavaMemorySizeMB, Configuration config) { Preconditions.checkArgument(totalJavaMemorySizeMB > 0); // subtract the Java memory used for network buffers (always off-heap) final long networkBufMB = calculateNetworkBufferMemory( totalJavaMemorySizeMB << 20, // megabytes to bytes config) >> 20; // bytes to megabytes final long remainingJavaMemorySizeMB = totalJavaMemorySizeMB - networkBufMB; // split the available Java memory between heap and off-heap final boolean useOffHeap = config.getBoolean(TaskManagerOptions.MEMORY_OFF_HEAP); final long heapSizeMB; if (useOffHeap) { long offHeapSize; String managedMemorySizeDefaultVal = TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue(); if (!config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE).equals(managedMemorySizeDefaultVal)) { try { offHeapSize = MemorySize.parse(config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE), MEGA_BYTES).getMebiBytes(); } catch (IllegalArgumentException e) { throw new IllegalConfigurationException( "Could not read " + TaskManagerOptions.MANAGED_MEMORY_SIZE.key(), e); } } else { offHeapSize = Long.valueOf(managedMemorySizeDefaultVal); } if (offHeapSize <= 0) { // calculate off-heap section via fraction double fraction = config.getFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION); offHeapSize = (long) (fraction * remainingJavaMemorySizeMB); } TaskManagerServicesConfiguration .checkConfigParameter(offHeapSize < remainingJavaMemorySizeMB, offHeapSize, TaskManagerOptions.MANAGED_MEMORY_SIZE.key(), "Managed memory size too large for " + networkBufMB + " MB network buffer memory and a total of " + totalJavaMemorySizeMB + " MB JVM memory"); heapSizeMB = remainingJavaMemorySizeMB - offHeapSize; } else { heapSizeMB = remainingJavaMemorySizeMB; } return heapSizeMB; } /** * Calculates the amount of memory used for network buffers based on the total memory to use and * the according configuration parameters. * * <p>The following configuration parameters are involved: * <ul> * <li>{@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_FRACTION},</li> * <li>{@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MIN},</li> * <li>{@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MAX}, and</li> * <li>{@link TaskManagerOptions#NETWORK_NUM_BUFFERS} (fallback if the ones above do not exist)</li> * </ul>. * * @param totalJavaMemorySize * overall available memory to use (heap and off-heap, in bytes) * @param config * configuration object * * @return memory to use for network buffers (in bytes); at least one memory segment */ @SuppressWarnings("deprecation") public static long calculateNetworkBufferMemory(long totalJavaMemorySize, Configuration config) { Preconditions.checkArgument(totalJavaMemorySize > 0); int segmentSize = checkedDownCast(MemorySize.parse(config.getString(TaskManagerOptions.MEMORY_SEGMENT_SIZE)).getBytes()); final long networkBufBytes; if (TaskManagerServicesConfiguration.hasNewNetworkBufConf(config)) { // new configuration based on fractions of available memory with selectable min and max float networkBufFraction = config.getFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION); long networkBufMin = MemorySize.parse(config.getString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN)).getBytes(); long networkBufMax = MemorySize.parse(config.getString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX)).getBytes(); TaskManagerServicesConfiguration .checkNetworkBufferConfig(segmentSize, networkBufFraction, networkBufMin, networkBufMax); networkBufBytes = Math.min(networkBufMax, Math.max(networkBufMin, (long) (networkBufFraction * totalJavaMemorySize))); TaskManagerServicesConfiguration .checkConfigParameter(networkBufBytes < totalJavaMemorySize, "(" + networkBufFraction + ", " + networkBufMin + ", " + networkBufMax + ")", "(" + TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.key() + ", " + TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.key() + ", " + TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.key() + ")", "Network buffer memory size too large: " + networkBufBytes + " >= " + totalJavaMemorySize + " (total JVM memory size)"); TaskManagerServicesConfiguration .checkConfigParameter(networkBufBytes >= segmentSize, "(" + networkBufFraction + ", " + networkBufMin + ", " + networkBufMax + ")", "(" + TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.key() + ", " + TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.key() + ", " + TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.key() + ")", "Network buffer memory size too small: " + networkBufBytes + " < " + segmentSize + " (" + TaskManagerOptions.MEMORY_SEGMENT_SIZE.key() + ")"); } else { // use old (deprecated) network buffers parameter int numNetworkBuffers = config.getInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS); networkBufBytes = (long) numNetworkBuffers * (long) segmentSize; TaskManagerServicesConfiguration.checkNetworkConfigOld(numNetworkBuffers); TaskManagerServicesConfiguration .checkConfigParameter(networkBufBytes < totalJavaMemorySize, networkBufBytes, TaskManagerOptions.NETWORK_NUM_BUFFERS.key(), "Network buffer memory size too large: " + networkBufBytes + " >= " + totalJavaMemorySize + " (total JVM memory size)"); TaskManagerServicesConfiguration .checkConfigParameter(networkBufBytes >= segmentSize, networkBufBytes, TaskManagerOptions.NETWORK_NUM_BUFFERS.key(), "Network buffer memory size too small: " + networkBufBytes + " < " + segmentSize + " (" + TaskManagerOptions.MEMORY_SEGMENT_SIZE.key() + ")"); } return networkBufBytes; } //...... }
heap及offHeap
)大小,而network buffers老是使用offHeap,於是這裏首先要從FLINK_TM_HEAP扣減掉這部分offHeap而後從新計算Xms及Xmxheap及offHeap
)大小;提供了taskmanager.memory相關配置(taskmanager.memory.fraction、taskmanager.memory.off-heap、taskmanager.memory.preallocate、taskmanager.memory.segment-size、taskmanager.memory.size
)用於設置memory;提供了taskmanager.network.memory相關配置(taskmanager.network.detailed-metrics、taskmanager.network.memory.buffers-per-channel、taskmanager.network.memory.floating-buffers-per-gate、taskmanager.network.memory.fraction、taskmanager.network.memory.max、taskmanager.network.memory.min
)用於設置taskmanager的network stack的內存heap及offHeap
)大小,而network buffers老是使用offHeap,於是這裏首先要從FLINK_TM_HEAP扣減掉這部分offHeap而後從新計算Xms及Xmx;calculateHeapSizeMB先調用calculateNetworkBufferMemory計算networkBufMB,而後從totalJavaMemorySizeMB扣減掉networkBufMB獲得remainingJavaMemorySizeMB;以後讀取taskmanager.memory.off-heap設置,默認爲false,則直接以remainingJavaMemorySizeMB返回;若是爲true,則須要計算offHeapSize的值,而後從remainingJavaMemorySizeMB扣減offHeapSize再返回
因而可知最後的jvm參數取決於JVM_ARGS及FLINK_ENV_JAVA_OPTS;其中注意不要設置內存相關參數到JVM_ARGS,由於taskmanager.sh在FLINK_TM_HEAP_MB大於0的時候,則使用該值計算TM_HEAP_SIZE設置Xms及Xmx追加到JVM_ARGS變量中,而FLINK_TM_HEAP_MB則取決於FLINK_TM_HEAP或者taskmanager.heap.size配置;FLINK_ENV_JAVA_OPTS的配置則取決於env.java.opts以及env.java.opts.taskmanager;於是要配置taskmanager的memory(
heap及offHeap
)大小,能夠指定FLINK_TM_HEAP環境變量(好比FLINK_TM_HEAP=512m),或者在flink-conf.yaml中指定taskmanager.heap.size;而最終的Xms及Xmx則是FLINK_TM_HEAP扣減掉offHeap而來,肯定使用offHeap爲network buffers,其他的看是否開啓taskmanager.memory.off-heap,默認爲false