聊聊flink的BlobWriter

本文主要研究一下flink的BlobWriterhtml

BlobWriter

flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobWriter.javajava

/**
 * BlobWriter is used to upload data to the BLOB store.
 */
public interface BlobWriter {

	Logger LOG = LoggerFactory.getLogger(BlobWriter.class);

	/**
	 * Uploads the data of the given byte array for the given job to the BLOB server and makes it
	 * a permanent BLOB.
	 *
	 * @param jobId
	 * 		the ID of the job the BLOB belongs to
	 * @param value
	 * 		the buffer to upload
	 *
	 * @return the computed BLOB key identifying the BLOB on the server
	 *
	 * @throws IOException
	 * 		thrown if an I/O error occurs while writing it to a local file, or uploading it to the HA
	 * 		store
	 */
	PermanentBlobKey putPermanent(JobID jobId, byte[] value) throws IOException;

	/**
	 * Uploads the data from the given input stream for the given job to the BLOB server and makes it
	 * a permanent BLOB.
	 *
	 * @param jobId
	 * 		ID of the job this blob belongs to
	 * @param inputStream
	 * 		the input stream to read the data from
	 *
	 * @return the computed BLOB key identifying the BLOB on the server
	 *
	 * @throws IOException
	 * 		thrown if an I/O error occurs while reading the data from the input stream, writing it to a
	 * 		local file, or uploading it to the HA store
	 */
	PermanentBlobKey putPermanent(JobID jobId, InputStream inputStream) throws IOException;

	/**
	 * Returns the min size before data will be offloaded to the BLOB store.
	 *
	 * @return minimum offloading size
	 */
	int getMinOffloadingSize();

	/**
	 * Serializes the given value and offloads it to the BlobServer if its size exceeds the minimum
	 * offloading size of the BlobServer.
	 *
	 * @param value to serialize
	 * @param jobId to which the value belongs.
	 * @param blobWriter to use to offload the serialized value
	 * @param <T> type of the value to serialize
	 * @return Either the serialized value or the stored blob key
	 * @throws IOException if the data cannot be serialized
	 */
	static <T> Either<SerializedValue<T>, PermanentBlobKey> serializeAndTryOffload(
			T value,
			JobID jobId,
			BlobWriter blobWriter) throws IOException {
		Preconditions.checkNotNull(value);
		Preconditions.checkNotNull(jobId);
		Preconditions.checkNotNull(blobWriter);

		final SerializedValue<T> serializedValue = new SerializedValue<>(value);

		if (serializedValue.getByteArray().length < blobWriter.getMinOffloadingSize()) {
			return Either.Left(new SerializedValue<>(value));
		} else {
			try {
				final PermanentBlobKey permanentBlobKey = blobWriter.putPermanent(jobId, serializedValue.getByteArray());

				return Either.Right(permanentBlobKey);
			} catch (IOException e) {
				LOG.warn("Failed to offload value {} for job {} to BLOB store.", value, jobId, e);

				return Either.Left(serializedValue);
			}
		}
	}
}
  • BlobWriter定義了putPermanent、getMinOffloadingSize方法,同時還提供了serializeAndTryOffload靜態方法用於序列化指定value並在其大小超過minimum offloading size時調用blobWriter.putPermanent存放到BlobServer

BlobServer

flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.javaapache

/**
 * This class implements the BLOB server. The BLOB server is responsible for listening for incoming requests and
 * spawning threads to handle these requests. Furthermore, it takes care of creating the directory structure to store
 * the BLOBs or temporarily cache them.
 */
public class BlobServer extends Thread implements BlobService, BlobWriter, PermanentBlobService, TransientBlobService {
	//......

	@Override
	public PermanentBlobKey putPermanent(JobID jobId, byte[] value) throws IOException {
		checkNotNull(jobId);
		return (PermanentBlobKey) putBuffer(jobId, value, PERMANENT_BLOB);
	}

	@Override
	public PermanentBlobKey putPermanent(JobID jobId, InputStream inputStream) throws IOException {
		checkNotNull(jobId);
		return (PermanentBlobKey) putInputStream(jobId, inputStream, PERMANENT_BLOB);
	}

	/**
	 * Returns the configuration used by the BLOB server.
	 *
	 * @return configuration
	 */
	@Override
	public final int getMinOffloadingSize() {
		return blobServiceConfiguration.getInteger(BlobServerOptions.OFFLOAD_MINSIZE);
	}

	/**
	 * Uploads the data of the given byte array for the given job to the BLOB server.
	 *
	 * @param jobId
	 * 		the ID of the job the BLOB belongs to
	 * @param value
	 * 		the buffer to upload
	 * @param blobType
	 * 		whether to make the data permanent or transient
	 *
	 * @return the computed BLOB key identifying the BLOB on the server
	 *
	 * @throws IOException
	 * 		thrown if an I/O error occurs while writing it to a local file, or uploading it to the HA
	 * 		store
	 */
	private BlobKey putBuffer(@Nullable JobID jobId, byte[] value, BlobKey.BlobType blobType)
			throws IOException {

		if (LOG.isDebugEnabled()) {
			LOG.debug("Received PUT call for BLOB of job {}.", jobId);
		}

		File incomingFile = createTemporaryFilename();
		MessageDigest md = BlobUtils.createMessageDigest();
		BlobKey blobKey = null;
		try (FileOutputStream fos = new FileOutputStream(incomingFile)) {
			md.update(value);
			fos.write(value);
		} catch (IOException ioe) {
			// delete incomingFile from a failed download
			if (!incomingFile.delete() && incomingFile.exists()) {
				LOG.warn("Could not delete the staging file {} for job {}.",
					incomingFile, jobId);
			}
			throw ioe;
		}

		try {
			// persist file
			blobKey = moveTempFileToStore(incomingFile, jobId, md.digest(), blobType);

			return blobKey;
		} finally {
			// delete incomingFile from a failed download
			if (!incomingFile.delete() && incomingFile.exists()) {
				LOG.warn("Could not delete the staging file {} for blob key {} and job {}.",
					incomingFile, blobKey, jobId);
			}
		}
	}

	/**
	 * Uploads the data from the given input stream for the given job to the BLOB server.
	 *
	 * @param jobId
	 * 		the ID of the job the BLOB belongs to
	 * @param inputStream
	 * 		the input stream to read the data from
	 * @param blobType
	 * 		whether to make the data permanent or transient
	 *
	 * @return the computed BLOB key identifying the BLOB on the server
	 *
	 * @throws IOException
	 * 		thrown if an I/O error occurs while reading the data from the input stream, writing it to a
	 * 		local file, or uploading it to the HA store
	 */
	private BlobKey putInputStream(
			@Nullable JobID jobId, InputStream inputStream, BlobKey.BlobType blobType)
			throws IOException {

		if (LOG.isDebugEnabled()) {
			LOG.debug("Received PUT call for BLOB of job {}.", jobId);
		}

		File incomingFile = createTemporaryFilename();
		MessageDigest md = BlobUtils.createMessageDigest();
		BlobKey blobKey = null;
		try (FileOutputStream fos = new FileOutputStream(incomingFile)) {
			// read stream
			byte[] buf = new byte[BUFFER_SIZE];
			while (true) {
				final int bytesRead = inputStream.read(buf);
				if (bytesRead == -1) {
					// done
					break;
				}
				fos.write(buf, 0, bytesRead);
				md.update(buf, 0, bytesRead);
			}

			// persist file
			blobKey = moveTempFileToStore(incomingFile, jobId, md.digest(), blobType);

			return blobKey;
		} finally {
			// delete incomingFile from a failed download
			if (!incomingFile.delete() && incomingFile.exists()) {
				LOG.warn("Could not delete the staging file {} for blob key {} and job {}.",
					incomingFile, blobKey, jobId);
			}
		}
	}

	/**
	 * Moves the temporary <tt>incomingFile</tt> to its permanent location where it is available for
	 * use.
	 *
	 * @param incomingFile
	 * 		temporary file created during transfer
	 * @param jobId
	 * 		ID of the job this blob belongs to or <tt>null</tt> if job-unrelated
	 * @param digest
	 * 		BLOB content digest, i.e. hash
	 * @param blobType
	 * 		whether this file is a permanent or transient BLOB
	 *
	 * @return unique BLOB key that identifies the BLOB on the server
	 *
	 * @throws IOException
	 * 		thrown if an I/O error occurs while moving the file or uploading it to the HA store
	 */
	BlobKey moveTempFileToStore(
			File incomingFile, @Nullable JobID jobId, byte[] digest, BlobKey.BlobType blobType)
			throws IOException {

		int retries = 10;

		int attempt = 0;
		while (true) {
			// add unique component independent of the BLOB content
			BlobKey blobKey = BlobKey.createKey(blobType, digest);
			File storageFile = BlobUtils.getStorageLocation(storageDir, jobId, blobKey);

			// try again until the key is unique (put the existence check into the lock!)
			readWriteLock.writeLock().lock();
			try {
				if (!storageFile.exists()) {
					BlobUtils.moveTempFileToStore(
						incomingFile, jobId, blobKey, storageFile, LOG,
						blobKey instanceof PermanentBlobKey ? blobStore : null);
					// add TTL for transient BLOBs:
					if (blobKey instanceof TransientBlobKey) {
						// must be inside read or write lock to add a TTL
						blobExpiryTimes
							.put(Tuple2.of(jobId, (TransientBlobKey) blobKey),
								System.currentTimeMillis() + cleanupInterval);
					}
					return blobKey;
				}
			} finally {
				readWriteLock.writeLock().unlock();
			}

			++attempt;
			if (attempt >= retries) {
				String message = "Failed to find a unique key for BLOB of job " + jobId + " (last tried " + storageFile.getAbsolutePath() + ".";
				LOG.error(message + " No retries left.");
				throw new IOException(message);
			} else {
				if (LOG.isDebugEnabled()) {
					LOG.debug("Trying to find a unique key for BLOB of job {} (retry {}, last tried {})",
						jobId, attempt, storageFile.getAbsolutePath());
				}
			}
		}
	}

	/**
	 * Returns a temporary file inside the BLOB server's incoming directory.
	 *
	 * @return a temporary file inside the BLOB server's incoming directory
	 *
	 * @throws IOException
	 * 		if creating the directory fails
	 */
	File createTemporaryFilename() throws IOException {
		return new File(BlobUtils.getIncomingDirectory(storageDir),
				String.format("temp-%08d", tempFileCounter.getAndIncrement()));
	}

	//......
}
  • BlobServer實現了BlobWriter接口,putPermanent方法分別用到了putBuffer及putInputStream方法,而getMinOffloadingSize方法則從blobServiceConfiguration獲取BlobServerOptions.OFFLOAD_MINSIZE配置,默認是1M
  • putBuffer方法接收byte[]參數,它先把byte[]寫入到臨時文件,以後調用moveTempFileToStore方法進行持久化;putInputStream方法接收InputStream參數,它也是先把InputStream寫入到臨時文件,而後調用moveTempFileToStore方法進行持久化
  • moveTempFileToStore方法調用了BlobUtils.moveTempFileToStore將本地臨時文件轉移到permanent location;其中storageDir由BlobUtils.initLocalStorageDirectory(config)來初始化,而storageFile經過BlobUtils.getStorageLocation(storageDir, jobId, blobKey)來獲取

BlobUtils

flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.javaapi

/**
 * Utility class to work with blob data.
 */
public class BlobUtils {
	//......

	/**
	 * Creates a local storage directory for a blob service under the configuration parameter given
	 * by {@link BlobServerOptions#STORAGE_DIRECTORY}. If this is <tt>null</tt> or empty, we will
	 * fall back to Flink's temp directories (given by
	 * {@link org.apache.flink.configuration.CoreOptions#TMP_DIRS}) and choose one among them at
	 * random.
	 *
	 * @param config
	 * 		Flink configuration
	 *
	 * @return a new local storage directory
	 *
	 * @throws IOException
	 * 		thrown if the local file storage cannot be created or is not usable
	 */
	static File initLocalStorageDirectory(Configuration config) throws IOException {

		String basePath = config.getString(BlobServerOptions.STORAGE_DIRECTORY);

		File baseDir;
		if (StringUtils.isNullOrWhitespaceOnly(basePath)) {
			final String[] tmpDirPaths = ConfigurationUtils.parseTempDirectories(config);
			baseDir = new File(tmpDirPaths[RANDOM.nextInt(tmpDirPaths.length)]);
		}
		else {
			baseDir = new File(basePath);
		}

		File storageDir;

		// NOTE: although we will be using UUIDs, there may be collisions
		int maxAttempts = 10;
		for (int attempt = 0; attempt < maxAttempts; attempt++) {
			storageDir = new File(baseDir, String.format(
					"blobStore-%s", UUID.randomUUID().toString()));

			// Create the storage dir if it doesn't exist. Only return it when the operation was
			// successful.
			if (storageDir.mkdirs()) {
				return storageDir;
			}
		}

		// max attempts exceeded to find a storage directory
		throw new IOException("Could not create storage directory for BLOB store in '" + baseDir + "'.");
	}

	/**
	 * Returns the (designated) physical storage location of the BLOB with the given key.
	 *
	 * @param storageDir
	 * 		storage directory used be the BLOB service
	 * @param key
	 * 		the key identifying the BLOB
	 * @param jobId
	 * 		ID of the job for the incoming files (or <tt>null</tt> if job-unrelated)
	 *
	 * @return the (designated) physical storage location of the BLOB
	 *
	 * @throws IOException
	 * 		if creating the directory fails
	 */
	static File getStorageLocation(
			File storageDir, @Nullable JobID jobId, BlobKey key) throws IOException {
		File file = new File(getStorageLocationPath(storageDir.getAbsolutePath(), jobId, key));

		Files.createDirectories(file.getParentFile().toPath());

		return file;
	}

	/**
	 * Returns the path for the given blob key.
	 *
	 * <p>The returned path can be used with the (local or HA) BLOB store file system back-end for
	 * recovery purposes and follows the same scheme as {@link #getStorageLocation(File, JobID,
	 * BlobKey)}.
	 *
	 * @param storageDir
	 * 		storage directory used be the BLOB service
	 * @param key
	 * 		the key identifying the BLOB
	 * @param jobId
	 * 		ID of the job for the incoming files
	 *
	 * @return the path to the given BLOB
	 */
	static String getStorageLocationPath(
			String storageDir, @Nullable JobID jobId, BlobKey key) {
		if (jobId == null) {
			// format: $base/no_job/blob_$key
			return String.format("%s/%s/%s%s",
				storageDir, NO_JOB_DIR_PREFIX, BLOB_FILE_PREFIX, key.toString());
		} else {
			// format: $base/job_$jobId/blob_$key
			return String.format("%s/%s%s/%s%s",
				storageDir, JOB_DIR_PREFIX, jobId.toString(), BLOB_FILE_PREFIX, key.toString());
		}
	}

	/**
	 * Moves the temporary <tt>incomingFile</tt> to its permanent location where it is available for
	 * use (not thread-safe!).
	 *
	 * @param incomingFile
	 * 		temporary file created during transfer
	 * @param jobId
	 * 		ID of the job this blob belongs to or <tt>null</tt> if job-unrelated
	 * @param blobKey
	 * 		BLOB key identifying the file
	 * @param storageFile
	 *      (local) file where the blob is/should be stored
	 * @param log
	 *      logger for debug information
	 * @param blobStore
	 *      HA store (or <tt>null</tt> if unavailable)
	 *
	 * @throws IOException
	 * 		thrown if an I/O error occurs while moving the file or uploading it to the HA store
	 */
	static void moveTempFileToStore(
			File incomingFile, @Nullable JobID jobId, BlobKey blobKey, File storageFile,
			Logger log, @Nullable BlobStore blobStore) throws IOException {

		try {
			// first check whether the file already exists
			if (!storageFile.exists()) {
				try {
					// only move the file if it does not yet exist
					Files.move(incomingFile.toPath(), storageFile.toPath());

					incomingFile = null;

				} catch (FileAlreadyExistsException ignored) {
					log.warn("Detected concurrent file modifications. This should only happen if multiple" +
						"BlobServer use the same storage directory.");
					// we cannot be sure at this point whether the file has already been uploaded to the blob
					// store or not. Even if the blobStore might shortly be in an inconsistent state, we have
					// to persist the blob. Otherwise we might not be able to recover the job.
				}

				if (blobStore != null) {
					// only the one moving the incoming file to its final destination is allowed to upload the
					// file to the blob store
					blobStore.put(storageFile, jobId, blobKey);
				}
			} else {
				log.warn("File upload for an existing file with key {} for job {}. This may indicate a duplicate upload or a hash collision. Ignoring newest upload.", blobKey, jobId);
			}
			storageFile = null;
		} finally {
			// we failed to either create the local storage file or to upload it --> try to delete the local file
			// while still having the write lock
			if (storageFile != null && !storageFile.delete() && storageFile.exists()) {
				log.warn("Could not delete the storage file {}.", storageFile);
			}
			if (incomingFile != null && !incomingFile.delete() && incomingFile.exists()) {
				log.warn("Could not delete the staging file {} for blob key {} and job {}.", incomingFile, blobKey, jobId);
			}
		}
	}

	//......
}
  • initLocalStorageDirectory方法從配置文件讀取BlobServerOptions.STORAGE_DIRECTORY配置(blob.storage.directory),若是沒有配置,則經過ConfigurationUtils.parseTempDirectories來獲取tmpDirPaths,而後隨機選一個做爲baseDir,而storageDir目錄則是baseDir的子目錄,其目錄名前綴爲blobStore
  • getStorageLocation方法則在storageDir的基礎上根據JobID及BlobKey構造具體的存儲路徑,其格式爲$base/no_job/blob_$key或者$base/job_$jobId/blob_$key
  • moveTempFileToStore方法則在目標文件不存在的場景下使用Files.move將incomingFile轉移到storageFile,若是blobStore不爲null,還會將storageFile放入到BlobStore

小結

  • BlobWriter定義了putPermanent、getMinOffloadingSize方法,同時還提供了serializeAndTryOffload靜態方法用於序列化指定value並在其大小超過minimum offloading size時調用blobWriter.putPermanent存放到BlobServer
  • BlobServer實現了BlobWriter接口,putPermanent方法分別用到了putBuffer及putInputStream方法,而getMinOffloadingSize方法則從blobServiceConfiguration獲取BlobServerOptions.OFFLOAD_MINSIZE配置,默認是1M;putBuffer方法接收byte[]參數,它先把byte[]寫入到臨時文件,以後調用moveTempFileToStore方法進行持久化;putInputStream方法接收InputStream參數,它也是先把InputStream寫入到臨時文件,而後調用moveTempFileToStore方法進行持久化;moveTempFileToStore方法調用了BlobUtils.moveTempFileToStore將本地臨時文件轉移到permanent location;其中storageDir由BlobUtils.initLocalStorageDirectory(config)來初始化,而storageFile經過BlobUtils.getStorageLocation(storageDir, jobId, blobKey)來獲取
  • BlobUtils的initLocalStorageDirectory方法從配置文件讀取BlobServerOptions.STORAGE_DIRECTORY配置(blob.storage.directory),若是沒有配置,則經過ConfigurationUtils.parseTempDirectories來獲取tmpDirPaths,而後隨機選一個做爲baseDir,而storageDir目錄則是baseDir的子目錄,其目錄名前綴爲blobStore;getStorageLocation方法則在storageDir的基礎上根據JobID及BlobKey構造具體的存儲路徑,其格式爲$base/no_job/blob_$key或者$base/job_$jobId/blob_$key;moveTempFileToStore方法則在目標文件不存在的場景下使用Files.move將incomingFile轉移到storageFile,若是blobStore不爲null,還會將storageFile放入到BlobStore

doc

相關文章
相關標籤/搜索