聊聊flink的BlobService

本文主要研究一下flink的BlobServicehtml

BlobService

flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.javajava

/**
 * A simple store and retrieve binary large objects (BLOBs).
 */
public interface BlobService extends Closeable {

	/**
	 * Returns a BLOB service for accessing permanent BLOBs.
	 *
	 * @return BLOB service
	 */
	PermanentBlobService getPermanentBlobService();

	/**
	 * Returns a BLOB service for accessing transient BLOBs.
	 *
	 * @return BLOB service
	 */
	TransientBlobService getTransientBlobService();

	/**
	 * Returns the port of the BLOB server that this BLOB service is working with.
	 *
	 * @return the port the blob server.
	 */
	int getPort();
}
  • BlobService定義了getPermanentBlobService方法用於獲取PermanentBlobService;getTransientBlobService方法用於獲取TransientBlobService

PermanentBlobService

flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobService.javaapache

/**
 * A service to retrieve permanent binary large objects (BLOBs).
 *
 * <p>These may include per-job BLOBs that are covered by high-availability (HA) mode, e.g. a job's
 * JAR files or (parts of) an off-loaded {@link org.apache.flink.runtime.deployment.TaskDeploymentDescriptor}
 * or files in the {@link org.apache.flink.api.common.cache.DistributedCache}.
 */
public interface PermanentBlobService extends Closeable {

	/**
	 * Returns the path to a local copy of the file associated with the provided job ID and blob
	 * key.
	 *
	 * @param jobId
	 * 		ID of the job this blob belongs to
	 * @param key
	 * 		BLOB key associated with the requested file
	 *
	 * @return The path to the file.
	 *
	 * @throws java.io.FileNotFoundException
	 * 		if the BLOB does not exist;
	 * @throws IOException
	 * 		if any other error occurs when retrieving the file
	 */
	File getFile(JobID jobId, PermanentBlobKey key) throws IOException;

}
  • PermanentBlobService提供了getFile方法,它根據JobID及PermanentBlobKey來獲取File

TransientBlobService

flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/blob/TransientBlobService.javaapi

/**
 * A service to retrieve transient binary large objects (BLOBs) which are deleted on the
 * {@link BlobServer} when they are retrieved.
 *
 * <p>These may include per-job BLOBs like files in the {@link
 * org.apache.flink.api.common.cache.DistributedCache}, for example.
 *
 * <p>Note: None of these BLOBs is highly available (HA). This case is covered by BLOBs in the
 * {@link PermanentBlobService}.
 *
 * <p>TODO: change API to not rely on local files but return {@link InputStream} objects
 */
public interface TransientBlobService extends Closeable {

	// --------------------------------------------------------------------------------------------
	//  GET
	// --------------------------------------------------------------------------------------------

	/**
	 * Returns the path to a local copy of the (job-unrelated) file associated with the provided
	 * blob key.
	 *
	 * @param key
	 * 		blob key associated with the requested file
	 *
	 * @return The path to the file.
	 *
	 * @throws java.io.FileNotFoundException
	 * 		when the path does not exist;
	 * @throws IOException
	 * 		if any other error occurs when retrieving the file
	 */
	File getFile(TransientBlobKey key) throws IOException;

	/**
	 * Returns the path to a local copy of the file associated with the provided job ID and blob
	 * key.
	 *
	 * @param jobId
	 * 		ID of the job this blob belongs to
	 * @param key
	 * 		blob key associated with the requested file
	 *
	 * @return The path to the file.
	 *
	 * @throws java.io.FileNotFoundException
	 * 		when the path does not exist;
	 * @throws IOException
	 * 		if any other error occurs when retrieving the file
	 */
	File getFile(JobID jobId, TransientBlobKey key) throws IOException;

	// --------------------------------------------------------------------------------------------
	//  PUT
	// --------------------------------------------------------------------------------------------

	/**
	 * Uploads the (job-unrelated) data of the given byte array to the BLOB server.
	 *
	 * @param value
	 * 		the buffer to upload
	 *
	 * @return the computed BLOB key identifying the BLOB on the server
	 *
	 * @throws IOException
	 * 		thrown if an I/O error occurs while uploading the data to the BLOB server
	 */
	TransientBlobKey putTransient(byte[] value) throws IOException;

	/**
	 * Uploads the data of the given byte array for the given job to the BLOB server.
	 *
	 * @param jobId
	 * 		the ID of the job the BLOB belongs to
	 * @param value
	 * 		the buffer to upload
	 *
	 * @return the computed BLOB key identifying the BLOB on the server
	 *
	 * @throws IOException
	 * 		thrown if an I/O error occurs while uploading the data to the BLOB server
	 */
	TransientBlobKey putTransient(JobID jobId, byte[] value) throws IOException;

	/**
	 * Uploads the (job-unrelated) data from the given input stream to the BLOB server.
	 *
	 * @param inputStream
	 * 		the input stream to read the data from
	 *
	 * @return the computed BLOB key identifying the BLOB on the server
	 *
	 * @throws IOException
	 * 		thrown if an I/O error occurs while reading the data from the input stream or uploading the
	 * 		data to the BLOB server
	 */
	TransientBlobKey putTransient(InputStream inputStream) throws IOException;

	/**
	 * Uploads the data from the given input stream for the given job to the BLOB server.
	 *
	 * @param jobId
	 * 		ID of the job this blob belongs to
	 * @param inputStream
	 * 		the input stream to read the data from
	 *
	 * @return the computed BLOB key identifying the BLOB on the server
	 *
	 * @throws IOException
	 * 		thrown if an I/O error occurs while reading the data from the input stream or uploading the
	 * 		data to the BLOB server
	 */
	TransientBlobKey putTransient(JobID jobId, InputStream inputStream) throws IOException;

	// --------------------------------------------------------------------------------------------
	//  DELETE
	// --------------------------------------------------------------------------------------------

	/**
	 * Deletes the (job-unrelated) file associated with the provided blob key from the local cache.
	 *
	 * @param key
	 * 		associated with the file to be deleted
	 *
	 * @return  <tt>true</tt> if the given blob is successfully deleted or non-existing;
	 *          <tt>false</tt> otherwise
	 */
	boolean deleteFromCache(TransientBlobKey key);

	/**
	 * Deletes the file associated with the provided job ID and blob key from the local cache.
	 *
	 * @param jobId
	 * 		ID of the job this blob belongs to
	 * @param key
	 * 		associated with the file to be deleted
	 *
	 * @return  <tt>true</tt> if the given blob is successfully deleted or non-existing;
	 *          <tt>false</tt> otherwise
	 */
	boolean deleteFromCache(JobID jobId, TransientBlobKey key);

}
  • TransientBlobService用於獲取transient binary large objects (BLOBs),這些blobs在獲取時就會在BlobServer上刪掉;它提供了getFile、putTransient、deleteFromCache方法

BlobKey

flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobKey.javaapp

/**
 * A BLOB key uniquely identifies a BLOB.
 */
public abstract class BlobKey implements Serializable, Comparable<BlobKey> {

	private static final long serialVersionUID = 3847117712521785209L;

	/** Size of the internal BLOB key in bytes. */
	public static final int SIZE = 20;

	/** The byte buffer storing the actual key data. */
	private final byte[] key;

	/**
	 * (Internal) BLOB type - to be reflected by the inheriting sub-class.
	 */
	private final BlobType type;

	/**
	 * BLOB type, i.e. permanent or transient.
	 */
	enum BlobType {
		/**
		 * Indicates a permanent BLOB whose lifecycle is that of a job and which is made highly
		 * available.
		 */
		PERMANENT_BLOB,
		/**
		 * Indicates a transient BLOB whose lifecycle is managed by the user and which is not made
		 * highly available.
		 */
		TRANSIENT_BLOB
	}

	/**
	 * Random component of the key.
	 */
	private final AbstractID random;

	/**
	 * Constructs a new BLOB key.
	 *
	 * @param type
	 * 		whether the referenced BLOB is permanent or transient
	 */
	protected BlobKey(BlobType type) {
		this.type = checkNotNull(type);
		this.key = new byte[SIZE];
		this.random = new AbstractID();
	}

	/**
	 * Constructs a new BLOB key from the given byte array.
	 *
	 * @param type
	 * 		whether the referenced BLOB is permanent or transient
	 * @param key
	 *        the actual key data
	 */
	protected BlobKey(BlobType type, byte[] key) {
		if (key == null || key.length != SIZE) {
			throw new IllegalArgumentException("BLOB key must have a size of " + SIZE + " bytes");
		}

		this.type = checkNotNull(type);
		this.key = key;
		this.random = new AbstractID();
	}

	/**
	 * Constructs a new BLOB key from the given byte array.
	 *
	 * @param type
	 * 		whether the referenced BLOB is permanent or transient
	 * @param key
	 *        the actual key data
	 * @param random
	 *        the random component of the key
	 */
	protected BlobKey(BlobType type, byte[] key, byte[] random) {
		if (key == null || key.length != SIZE) {
			throw new IllegalArgumentException("BLOB key must have a size of " + SIZE + " bytes");
		}

		this.type = checkNotNull(type);
		this.key = key;
		this.random = new AbstractID(random);
	}

	/**
	 * Returns the right {@link BlobKey} subclass for the given parameters.
	 *
	 * @param type
	 * 		whether the referenced BLOB is permanent or transient
	 *
	 * @return BlobKey subclass
	 */
	@VisibleForTesting
	static BlobKey createKey(BlobType type) {
		if (type == PERMANENT_BLOB) {
			return new PermanentBlobKey();
		} else {
			return new TransientBlobKey();
		}
	}

	/**
	 * Returns the right {@link BlobKey} subclass for the given parameters.
	 *
	 * @param type
	 * 		whether the referenced BLOB is permanent or transient
	 * @param key
	 *        the actual key data
	 *
	 * @return BlobKey subclass
	 */
	static BlobKey createKey(BlobType type, byte[] key) {
		if (type == PERMANENT_BLOB) {
			return new PermanentBlobKey(key);
		} else {
			return new TransientBlobKey(key);
		}
	}

	/**
	 * Returns the right {@link BlobKey} subclass for the given parameters.
	 *
	 * @param type
	 * 		whether the referenced BLOB is permanent or transient
	 * @param key
	 *        the actual key data
	 * @param random
	 *        the random component of the key
	 *
	 * @return BlobKey subclass
	 */
	static BlobKey createKey(BlobType type, byte[] key, byte[] random) {
		if (type == PERMANENT_BLOB) {
			return new PermanentBlobKey(key, random);
		} else {
			return new TransientBlobKey(key, random);
		}
	}

	/**
	 * Returns the hash component of this key.
	 *
	 * @return a 20 bit hash of the contents the key refers to
	 */
	@VisibleForTesting
	public byte[] getHash() {
		return key;
	}

	/**
	 * Returns the (internal) BLOB type which is reflected by the inheriting sub-class.
	 *
	 * @return BLOB type, i.e. permanent or transient
	 */
	BlobType getType() {
		return type;
	}

	/**
	 * Adds the BLOB key to the given {@link MessageDigest}.
	 *
	 * @param md
	 *        the message digest to add the BLOB key to
	 */
	public void addToMessageDigest(MessageDigest md) {
		md.update(this.key);
	}

	@Override
	public boolean equals(final Object obj) {

		if (!(obj instanceof BlobKey)) {
			return false;
		}

		final BlobKey bk = (BlobKey) obj;

		return Arrays.equals(this.key, bk.key) &&
			this.type == bk.type &&
			this.random.equals(bk.random);
	}

	@Override
	public int hashCode() {
		int result = Arrays.hashCode(this.key);
		result = 37 * result + this.type.hashCode();
		result = 37 * result + this.random.hashCode();
		return result;
	}

	@Override
	public String toString() {
		final String typeString;
		switch (this.type) {
			case TRANSIENT_BLOB:
				typeString = "t-";
				break;
			case PERMANENT_BLOB:
				typeString = "p-";
				break;
			default:
				// this actually never happens!
				throw new IllegalStateException("Invalid BLOB type");
		}
		return typeString + StringUtils.byteToHexString(this.key) + "-" + random.toString();
	}

	@Override
	public int compareTo(BlobKey o) {
		// compare the hashes first
		final byte[] aarr = this.key;
		final byte[] barr = o.key;
		final int len = Math.min(aarr.length, barr.length);

		for (int i = 0; i < len; ++i) {
			final int a = (aarr[i] & 0xff);
			final int b = (barr[i] & 0xff);
			if (a != b) {
				return a - b;
			}
		}

		if (aarr.length == barr.length) {
			// same hash contents - compare the BLOB types
			int typeCompare = this.type.compareTo(o.type);
			if (typeCompare == 0) {
				// same type - compare random components
				return this.random.compareTo(o.random);
			} else {
				return typeCompare;
			}
		} else {
			return aarr.length - barr.length;
		}
	}

	// --------------------------------------------------------------------------------------------

	/**
	 * Auxiliary method to read a BLOB key from an input stream.
	 *
	 * @param inputStream
	 *        the input stream to read the BLOB key from
	 * @return the read BLOB key
	 * @throws IOException
	 *         throw if an I/O error occurs while reading from the input stream
	 */
	static BlobKey readFromInputStream(InputStream inputStream) throws IOException {

		final byte[] key = new byte[BlobKey.SIZE];
		final byte[] random = new byte[AbstractID.SIZE];

		int bytesRead = 0;
		// read key
		while (bytesRead < key.length) {
			final int read = inputStream.read(key, bytesRead, key.length - bytesRead);
			if (read < 0) {
				throw new EOFException("Read an incomplete BLOB key");
			}
			bytesRead += read;
		}

		// read BLOB type
		final BlobType blobType;
		{
			final int read = inputStream.read();
			if (read < 0) {
				throw new EOFException("Read an incomplete BLOB type");
			} else if (read == TRANSIENT_BLOB.ordinal()) {
				blobType = TRANSIENT_BLOB;
			} else if (read == PERMANENT_BLOB.ordinal()) {
				blobType = PERMANENT_BLOB;
			} else {
				throw new IOException("Invalid data received for the BLOB type: " + read);
			}
		}

		// read random component
		bytesRead = 0;
		while (bytesRead < AbstractID.SIZE) {
			final int read = inputStream.read(random, bytesRead, AbstractID.SIZE - bytesRead);
			if (read < 0) {
				throw new EOFException("Read an incomplete BLOB key");
			}
			bytesRead += read;
		}

		return createKey(blobType, key, random);
	}

	/**
	 * Auxiliary method to write this BLOB key to an output stream.
	 *
	 * @param outputStream
	 *        the output stream to write the BLOB key to
	 * @throws IOException
	 *         thrown if an I/O error occurs while writing the BLOB key
	 */
	void writeToOutputStream(final OutputStream outputStream) throws IOException {
		outputStream.write(this.key);
		outputStream.write(this.type.ordinal());
		outputStream.write(this.random.getBytes());
	}
}
  • BlobKey是個抽象類,它有key、BlobType、AbstractID三個屬性,其中BlobType分爲PERMANENT_BLOB及TRANSIENT_BLOB;它定義了createKey靜態方法,用於根據BlobType建立BlobKey;readFromInputStream方法用於從InputStream反序列化爲BlobKey;writeToOutputStream方法用於將BlobKey序列化到OutputStream;它有兩個子類,分別爲PermanentBlobKey及TransientBlobKey

PermanentBlobKey

flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobKey.javadom

/**
 * BLOB key referencing permanent BLOB files.
 */
public final class PermanentBlobKey extends BlobKey {

	/**
	 * Constructs a new BLOB key.
	 */
	@VisibleForTesting
	public PermanentBlobKey() {
		super(BlobType.PERMANENT_BLOB);
	}

	/**
	 * Constructs a new BLOB key from the given byte array.
	 *
	 * @param key
	 *        the actual key data
	 */
	PermanentBlobKey(byte[] key) {
		super(BlobType.PERMANENT_BLOB, key);
	}

	/**
	 * Constructs a new BLOB key from the given byte array.
	 *
	 * @param key
	 *        the actual key data
	 * @param random
	 *        the random component of the key
	 */
	PermanentBlobKey(byte[] key, byte[] random) {
		super(BlobType.PERMANENT_BLOB, key, random);
	}
}
  • PermanentBlobKey繼承了BlobKey,它的BlobType爲BlobType.PERMANENT_BLOB

TransientBlobKey

flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/blob/TransientBlobKey.javaide

/**
 * BLOB key referencing transient BLOB files.
 */
public final class TransientBlobKey extends BlobKey {

	/**
	 * Constructs a new BLOB key.
	 */
	@VisibleForTesting
	public TransientBlobKey() {
		super(BlobType.TRANSIENT_BLOB);
	}

	/**
	 * Constructs a new BLOB key from the given byte array.
	 *
	 * @param key
	 *        the actual key data
	 */
	TransientBlobKey(byte[] key) {
		super(BlobType.TRANSIENT_BLOB, key);
	}

	/**
	 * Constructs a new BLOB key from the given byte array.
	 *
	 * @param key
	 *        the actual key data
	 * @param random
	 *        the random component of the key
	 */
	TransientBlobKey(byte[] key, byte[] random) {
		super(BlobType.TRANSIENT_BLOB, key, random);
	}
}
  • TransientBlobKey繼承了BlobKey,它的BlobType爲BlobType.TRANSIENT_BLOB

AbstractID

flink-release-1.7.2/flink-core/src/main/java/org/apache/flink/util/AbstractID.javathis

/**
 * A statistically unique identification number.
 */
@PublicEvolving
public class AbstractID implements Comparable<AbstractID>, java.io.Serializable {

	private static final long serialVersionUID = 1L;

	private static final Random RND = new Random();

	/** The size of a long in bytes. */
	private static final int SIZE_OF_LONG = 8;

	/** The size of the ID in byte. */
	public static final int SIZE = 2 * SIZE_OF_LONG;

	// ------------------------------------------------------------------------

	/** The upper part of the actual ID. */
	protected final long upperPart;

	/** The lower part of the actual ID. */
	protected final long lowerPart;

	/** The memoized value returned by toString(). */
	private transient String toString;

	// --------------------------------------------------------------------------------------------

	/**
	 * Constructs a new ID with a specific bytes value.
	 */
	public AbstractID(byte[] bytes) {
		if (bytes == null || bytes.length != SIZE) {
			throw new IllegalArgumentException("Argument bytes must by an array of " + SIZE + " bytes");
		}

		this.lowerPart = byteArrayToLong(bytes, 0);
		this.upperPart = byteArrayToLong(bytes, SIZE_OF_LONG);
	}

	/**
	 * Constructs a new abstract ID.
	 *
	 * @param lowerPart the lower bytes of the ID
	 * @param upperPart the higher bytes of the ID
	 */
	public AbstractID(long lowerPart, long upperPart) {
		this.lowerPart = lowerPart;
		this.upperPart = upperPart;
	}

	/**
	 * Copy constructor: Creates a new abstract ID from the given one.
	 *
	 * @param id the abstract ID to copy
	 */
	public AbstractID(AbstractID id) {
		if (id == null) {
			throw new IllegalArgumentException("Id must not be null.");
		}
		this.lowerPart = id.lowerPart;
		this.upperPart = id.upperPart;
	}

	/**
	 * Constructs a new random ID from a uniform distribution.
	 */
	public AbstractID() {
		this.lowerPart = RND.nextLong();
		this.upperPart = RND.nextLong();
	}

	// --------------------------------------------------------------------------------------------

	/**
	 * Gets the lower 64 bits of the ID.
	 *
	 * @return The lower 64 bits of the ID.
	 */
	public long getLowerPart() {
		return lowerPart;
	}

	/**
	 * Gets the upper 64 bits of the ID.
	 *
	 * @return The upper 64 bits of the ID.
	 */
	public long getUpperPart() {
		return upperPart;
	}

	/**
	 * Gets the bytes underlying this ID.
	 *
	 * @return The bytes underlying this ID.
	 */
	public byte[] getBytes() {
		byte[] bytes = new byte[SIZE];
		longToByteArray(lowerPart, bytes, 0);
		longToByteArray(upperPart, bytes, SIZE_OF_LONG);
		return bytes;
	}

	// --------------------------------------------------------------------------------------------
	//  Standard Utilities
	// --------------------------------------------------------------------------------------------

	@Override
	public boolean equals(Object obj) {
		if (obj == this) {
			return true;
		} else if (obj != null && obj.getClass() == getClass()) {
			AbstractID that = (AbstractID) obj;
			return that.lowerPart == this.lowerPart && that.upperPart == this.upperPart;
		} else {
			return false;
		}
	}

	@Override
	public int hashCode() {
		return ((int)  this.lowerPart) ^
				((int) (this.lowerPart >>> 32)) ^
				((int)  this.upperPart) ^
				((int) (this.upperPart >>> 32));
	}

	@Override
	public String toString() {
		if (this.toString == null) {
			final byte[] ba = new byte[SIZE];
			longToByteArray(this.lowerPart, ba, 0);
			longToByteArray(this.upperPart, ba, SIZE_OF_LONG);

			this.toString = StringUtils.byteToHexString(ba);
		}

		return this.toString;
	}

	@Override
	public int compareTo(AbstractID o) {
		int diff1 = Long.compare(this.upperPart, o.upperPart);
		int diff2 = Long.compare(this.lowerPart, o.lowerPart);
		return diff1 == 0 ? diff2 : diff1;
	}

	// --------------------------------------------------------------------------------------------
	//  Conversion Utilities
	// --------------------------------------------------------------------------------------------

	/**
	 * Converts the given byte array to a long.
	 *
	 * @param ba the byte array to be converted
	 * @param offset the offset indicating at which byte inside the array the conversion shall begin
	 * @return the long variable
	 */
	private static long byteArrayToLong(byte[] ba, int offset) {
		long l = 0;

		for (int i = 0; i < SIZE_OF_LONG; ++i) {
			l |= (ba[offset + SIZE_OF_LONG - 1 - i] & 0xffL) << (i << 3);
		}

		return l;
	}

	/**
	 * Converts a long to a byte array.
	 *
	 * @param l the long variable to be converted
	 * @param ba the byte array to store the result the of the conversion
	 * @param offset offset indicating at what position inside the byte array the result of the conversion shall be stored
	 */
	private static void longToByteArray(long l, byte[] ba, int offset) {
		for (int i = 0; i < SIZE_OF_LONG; ++i) {
			final int shift = i << 3; // i * 8
			ba[offset + SIZE_OF_LONG - 1 - i] = (byte) ((l & (0xffL << shift)) >>> shift);
		}
	}
}
  • AbstractID由upperPart及lowerPart兩個long類型的屬性組成;無參構造器會使用Random.nextLong來生成upperPart及lowerPart;bytes參數的構造器則會從bytes中解析出lowerPart及upperPart;也能夠直接使用lowerPart及upperPart參數的構造器直接指定

小結

  • BlobService定義了getPermanentBlobService方法用於獲取PermanentBlobService;getTransientBlobService方法用於獲取TransientBlobService;PermanentBlobService提供了getFile方法,它根據JobID及PermanentBlobKey來獲取File;TransientBlobService用於獲取transient binary large objects (BLOBs),這些blobs在獲取時就會在BlobServer上刪掉;它提供了getFile、putTransient、deleteFromCache方法
  • BlobKey是個抽象類,它有key、BlobType、AbstractID三個屬性,其中BlobType分爲PERMANENT_BLOB及TRANSIENT_BLOB;它定義了createKey靜態方法,用於根據BlobType建立BlobKey;readFromInputStream方法用於從InputStream反序列化爲BlobKey;writeToOutputStream方法用於將BlobKey序列化到OutputStream;它有兩個子類,分別爲PermanentBlobKey及TransientBlobKey;PermanentBlobKey繼承了BlobKey,它的BlobType爲BlobType.PERMANENT_BLOB;TransientBlobKey繼承了BlobKey,它的BlobType爲BlobType.TRANSIENT_BLOB
  • AbstractID由upperPart及lowerPart兩個long類型的屬性組成;無參構造器會使用Random.nextLong來生成upperPart及lowerPart;bytes參數的構造器則會從bytes中解析出lowerPart及upperPart;也能夠直接使用lowerPart及upperPart參數的構造器直接指定

doc

相關文章
相關標籤/搜索