聊聊flink的BlobServer

本文主要研究一下flink的BlobServerhtml

BlobServer

flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.javajava

public class BlobServer extends Thread implements BlobService, BlobWriter, PermanentBlobService, TransientBlobService {

	/** The log object used for debugging. */
	private static final Logger LOG = LoggerFactory.getLogger(BlobServer.class);

	/** Counter to generate unique names for temporary files. */
	private final AtomicLong tempFileCounter = new AtomicLong(0);

	/** The server socket listening for incoming connections. */
	private final ServerSocket serverSocket;

	/** Blob Server configuration. */
	private final Configuration blobServiceConfiguration;

	/** Indicates whether a shutdown of server component has been requested. */
	private final AtomicBoolean shutdownRequested = new AtomicBoolean();

	/** Root directory for local file storage. */
	private final File storageDir;

	/** Blob store for distributed file storage, e.g. in HA. */
	private final BlobStore blobStore;

	/** Set of currently running threads. */
	private final Set<BlobServerConnection> activeConnections = new HashSet<>();

	/** The maximum number of concurrent connections. */
	private final int maxConnections;

	/** Lock guarding concurrent file accesses. */
	private final ReadWriteLock readWriteLock;

	/**
	 * Shutdown hook thread to ensure deletion of the local storage directory.
	 */
	private final Thread shutdownHook;

	// --------------------------------------------------------------------------------------------

	/**
	 * Map to store the TTL of each element stored in the local storage, i.e. via one of the {@link
	 * #getFile} methods.
	 **/
	private final ConcurrentHashMap<Tuple2<JobID, TransientBlobKey>, Long> blobExpiryTimes =
		new ConcurrentHashMap<>();

	/** Time interval (ms) to run the cleanup task; also used as the default TTL. */
	private final long cleanupInterval;

	/**
	 * Timer task to execute the cleanup at regular intervals.
	 */
	private final Timer cleanupTimer;

	/**
	 * Instantiates a new BLOB server and binds it to a free network port.
	 *
	 * @param config Configuration to be used to instantiate the BlobServer
	 * @param blobStore BlobStore to store blobs persistently
	 *
	 * @throws IOException
	 * 		thrown if the BLOB server cannot bind to a free network port or if the
	 * 		(local or distributed) file storage cannot be created or is not usable
	 */
	public BlobServer(Configuration config, BlobStore blobStore) throws IOException {
		this.blobServiceConfiguration = checkNotNull(config);
		this.blobStore = checkNotNull(blobStore);
		this.readWriteLock = new ReentrantReadWriteLock();

		// configure and create the storage directory
		this.storageDir = BlobUtils.initLocalStorageDirectory(config);
		LOG.info("Created BLOB server storage directory {}", storageDir);

		// configure the maximum number of concurrent connections
		final int maxConnections = config.getInteger(BlobServerOptions.FETCH_CONCURRENT);
		if (maxConnections >= 1) {
			this.maxConnections = maxConnections;
		}
		else {
			LOG.warn("Invalid value for maximum connections in BLOB server: {}. Using default value of {}",
					maxConnections, BlobServerOptions.FETCH_CONCURRENT.defaultValue());
			this.maxConnections = BlobServerOptions.FETCH_CONCURRENT.defaultValue();
		}

		// configure the backlog of connections
		int backlog = config.getInteger(BlobServerOptions.FETCH_BACKLOG);
		if (backlog < 1) {
			LOG.warn("Invalid value for BLOB connection backlog: {}. Using default value of {}",
					backlog, BlobServerOptions.FETCH_BACKLOG.defaultValue());
			backlog = BlobServerOptions.FETCH_BACKLOG.defaultValue();
		}

		// Initializing the clean up task
		this.cleanupTimer = new Timer(true);

		this.cleanupInterval = config.getLong(BlobServerOptions.CLEANUP_INTERVAL) * 1000;
		this.cleanupTimer
			.schedule(new TransientBlobCleanupTask(blobExpiryTimes, readWriteLock.writeLock(),
				storageDir, LOG), cleanupInterval, cleanupInterval);

		this.shutdownHook = ShutdownHookUtil.addShutdownHook(this, getClass().getSimpleName(), LOG);

		//  ----------------------- start the server -------------------

		final String serverPortRange = config.getString(BlobServerOptions.PORT);
		final Iterator<Integer> ports = NetUtils.getPortRangeFromString(serverPortRange);

		final ServerSocketFactory socketFactory;
		if (SSLUtils.isInternalSSLEnabled(config) && config.getBoolean(BlobServerOptions.SSL_ENABLED)) {
			try {
				socketFactory = SSLUtils.createSSLServerSocketFactory(config);
			}
			catch (Exception e) {
				throw new IOException("Failed to initialize SSL for the blob server", e);
			}
		}
		else {
			socketFactory = ServerSocketFactory.getDefault();
		}

		final int finalBacklog = backlog;
		this.serverSocket = NetUtils.createSocketFromPorts(ports,
				(port) -> socketFactory.createServerSocket(port, finalBacklog));

		if (serverSocket == null) {
			throw new IOException("Unable to open BLOB Server in specified port range: " + serverPortRange);
		}

		// start the server thread
		setName("BLOB Server listener at " + getPort());
		setDaemon(true);

		if (LOG.isInfoEnabled()) {
			LOG.info("Started BLOB server at {}:{} - max concurrent requests: {} - max backlog: {}",
					serverSocket.getInetAddress().getHostAddress(), getPort(), maxConnections, backlog);
		}
	}

	//......

	@Override
	public void run() {
		try {
			while (!this.shutdownRequested.get()) {
				BlobServerConnection conn = new BlobServerConnection(serverSocket.accept(), this);
				try {
					synchronized (activeConnections) {
						while (activeConnections.size() >= maxConnections) {
							activeConnections.wait(2000);
						}
						activeConnections.add(conn);
					}

					conn.start();
					conn = null;
				}
				finally {
					if (conn != null) {
						conn.close();
						synchronized (activeConnections) {
							activeConnections.remove(conn);
						}
					}
				}
			}
		}
		catch (Throwable t) {
			if (!this.shutdownRequested.get()) {
				LOG.error("BLOB server stopped working. Shutting down", t);

				try {
					close();
				} catch (Throwable closeThrowable) {
					LOG.error("Could not properly close the BlobServer.", closeThrowable);
				}
			}
		}
	}

	/**
	 * Shuts down the BLOB server.
	 */
	@Override
	public void close() throws IOException {
		cleanupTimer.cancel();

		if (shutdownRequested.compareAndSet(false, true)) {
			Exception exception = null;

			try {
				this.serverSocket.close();
			}
			catch (IOException ioe) {
				exception = ioe;
			}

			// wake the thread up, in case it is waiting on some operation
			interrupt();

			try {
				join();
			}
			catch (InterruptedException ie) {
				Thread.currentThread().interrupt();

				LOG.debug("Error while waiting for this thread to die.", ie);
			}

			synchronized (activeConnections) {
				if (!activeConnections.isEmpty()) {
					for (BlobServerConnection conn : activeConnections) {
						LOG.debug("Shutting down connection {}.", conn.getName());
						conn.close();
					}
					activeConnections.clear();
				}
			}

			// Clean up the storage directory
			try {
				FileUtils.deleteDirectory(storageDir);
			}
			catch (IOException e) {
				exception = ExceptionUtils.firstOrSuppressed(e, exception);
			}

			// Remove shutdown hook to prevent resource leaks
			ShutdownHookUtil.removeShutdownHook(shutdownHook, getClass().getSimpleName(), LOG);

			if (LOG.isInfoEnabled()) {
				LOG.info("Stopped BLOB server at {}:{}", serverSocket.getInetAddress().getHostAddress(), getPort());
			}

			ExceptionUtils.tryRethrowIOException(exception);
		}
	}

	//......
}
  • BlobServer繼承了Thread,同時實現了BlobService、BlobWriter、PermanentBlobService、TransientBlobService接口
  • 其構造器使用DefaultServerSocketFactory建立了ServerSocket,同時使用ShutdownHookUtil.addShutdownHook註冊了shutdownHook,在shutdown的時候會調用close方法
  • 重寫了Thread的run方法,該方法在沒有接收到shutdown請求的時候,會不斷循環等待serverSocket.accept(),而後建立BlobServerConnection,若是當前activeConnections超過了maxConnections則會不斷循環等待2000毫秒,以後將鏈接維護到activeConnections,而後調用conn.start()

BlobServerConnection

flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.javaapache

class BlobServerConnection extends Thread {

	/** The log object used for debugging. */
	private static final Logger LOG = LoggerFactory.getLogger(BlobServerConnection.class);

	/** The socket to communicate with the client. */
	private final Socket clientSocket;

	/** The BLOB server. */
	private final BlobServer blobServer;

	/** Read lock to synchronize file accesses. */
	private final Lock readLock;

	BlobServerConnection(Socket clientSocket, BlobServer blobServer) {
		super("BLOB connection for " + clientSocket.getRemoteSocketAddress());
		setDaemon(true);

		this.clientSocket = clientSocket;
		this.blobServer = checkNotNull(blobServer);

		ReadWriteLock readWriteLock = blobServer.getReadWriteLock();

		this.readLock = readWriteLock.readLock();
	}

	// --------------------------------------------------------------------------------------------
	//  Connection / Thread methods
	// --------------------------------------------------------------------------------------------

	/**
	 * Main connection work method. Accepts requests until the other side closes the connection.
	 */
	@Override
	public void run() {
		try {
			final InputStream inputStream = this.clientSocket.getInputStream();
			final OutputStream outputStream = this.clientSocket.getOutputStream();

			while (true) {
				// Read the requested operation
				final int operation = inputStream.read();
				if (operation < 0) {
					// done, no one is asking anything from us
					return;
				}

				switch (operation) {
				case PUT_OPERATION:
					put(inputStream, outputStream, new byte[BUFFER_SIZE]);
					break;
				case GET_OPERATION:
					get(inputStream, outputStream, new byte[BUFFER_SIZE]);
					break;
				default:
					throw new IOException("Unknown operation " + operation);
				}
			}
		}
		catch (SocketException e) {
			// this happens when the remote site closes the connection
			LOG.debug("Socket connection closed", e);
		}
		catch (Throwable t) {
			LOG.error("Error while executing BLOB connection.", t);
		}
		finally {
			closeSilently(clientSocket, LOG);
			blobServer.unregisterConnection(this);
		}
	}

	/**
	 * Closes the connection socket and lets the thread exit.
	 */
	public void close() {
		closeSilently(clientSocket, LOG);
		interrupt();
	}

	// --------------------------------------------------------------------------------------------
	//  Actions
	// --------------------------------------------------------------------------------------------

	private void get(InputStream inputStream, OutputStream outputStream, byte[] buf) throws IOException {
		/*
		 * Retrieve the file from the (distributed?) BLOB store and store it
		 * locally, then send it to the service which requested it.
		 *
		 * Instead, we could send it from the distributed store directly but
		 * chances are high that if there is one request, there will be more
		 * so a local cache makes more sense.
		 */

		final File blobFile;
		final JobID jobId;
		final BlobKey blobKey;

		try {
			// read HEADER contents: job ID, key, HA mode/permanent or transient BLOB
			final int mode = inputStream.read();
			if (mode < 0) {
				throw new EOFException("Premature end of GET request");
			}

			// Receive the jobId and key
			if (mode == JOB_UNRELATED_CONTENT) {
				jobId = null;
			} else if (mode == JOB_RELATED_CONTENT) {
				byte[] jidBytes = new byte[JobID.SIZE];
				readFully(inputStream, jidBytes, 0, JobID.SIZE, "JobID");
				jobId = JobID.fromByteArray(jidBytes);
			} else {
				throw new IOException("Unknown type of BLOB addressing: " + mode + '.');
			}
			blobKey = BlobKey.readFromInputStream(inputStream);

			checkArgument(blobKey instanceof TransientBlobKey || jobId != null,
				"Invalid BLOB addressing for permanent BLOBs");

			if (LOG.isDebugEnabled()) {
				LOG.debug("Received GET request for BLOB {}/{} from {}.", jobId,
					blobKey, clientSocket.getInetAddress());
			}

			// the file's (destined) location at the BlobServer
			blobFile = blobServer.getStorageLocation(jobId, blobKey);

			// up to here, an error can give a good message
		}
		catch (Throwable t) {
			LOG.error("GET operation from {} failed.", clientSocket.getInetAddress(), t);
			try {
				writeErrorToStream(outputStream, t);
			}
			catch (IOException e) {
				// since we are in an exception case, it means that we could not send the error
				// ignore this
			}
			clientSocket.close();
			return;
		}

		try {

			readLock.lock();
			try {
				// copy the file to local store if it does not exist yet
				try {
					blobServer.getFileInternal(jobId, blobKey, blobFile);

					// enforce a 2GB max for now (otherwise the protocol's length field needs to be increased)
					if (blobFile.length() > Integer.MAX_VALUE) {
						throw new IOException("BLOB size exceeds the maximum size (2 GB).");
					}

					outputStream.write(RETURN_OKAY);
				} catch (Throwable t) {
					LOG.error("GET operation failed for BLOB {}/{} from {}.", jobId,
						blobKey, clientSocket.getInetAddress(), t);
					try {
						writeErrorToStream(outputStream, t);
					} catch (IOException e) {
						// since we are in an exception case, it means that we could not send the error
						// ignore this
					}
					clientSocket.close();
					return;
				}

				// from here on, we started sending data, so all we can do is close the connection when something happens
				int blobLen = (int) blobFile.length();
				writeLength(blobLen, outputStream);

				try (FileInputStream fis = new FileInputStream(blobFile)) {
					int bytesRemaining = blobLen;
					while (bytesRemaining > 0) {
						int read = fis.read(buf);
						if (read < 0) {
							throw new IOException("Premature end of BLOB file stream for " +
								blobFile.getAbsolutePath());
						}
						outputStream.write(buf, 0, read);
						bytesRemaining -= read;
					}
				}
			} finally {
				readLock.unlock();
			}

			// on successful transfer, delete transient files
			int result = inputStream.read();
			if (result < 0) {
				throw new EOFException("Premature end of GET request");
			} else if (blobKey instanceof TransientBlobKey && result == RETURN_OKAY) {
				// ignore the result from the operation
				if (!blobServer.deleteInternal(jobId, (TransientBlobKey) blobKey)) {
					LOG.warn("DELETE operation failed for BLOB {}/{} from {}.", jobId,
						blobKey, clientSocket.getInetAddress());
				}
			}

		} catch (SocketException e) {
			// happens when the other side disconnects
			LOG.debug("Socket connection closed", e);
		} catch (Throwable t) {
			LOG.error("GET operation failed", t);
			clientSocket.close();
		}

	}

	private void put(InputStream inputStream, OutputStream outputStream, byte[] buf) throws IOException {
		File incomingFile = null;

		try {
			// read HEADER contents: job ID, HA mode/permanent or transient BLOB
			final int mode = inputStream.read();
			if (mode < 0) {
				throw new EOFException("Premature end of PUT request");
			}

			final JobID jobId;
			if (mode == JOB_UNRELATED_CONTENT) {
				jobId = null;
			} else if (mode == JOB_RELATED_CONTENT) {
				byte[] jidBytes = new byte[JobID.SIZE];
				readFully(inputStream, jidBytes, 0, JobID.SIZE, "JobID");
				jobId = JobID.fromByteArray(jidBytes);
			} else {
				throw new IOException("Unknown type of BLOB addressing.");
			}

			final BlobKey.BlobType blobType;
			{
				final int read = inputStream.read();
				if (read < 0) {
					throw new EOFException("Read an incomplete BLOB type");
				} else if (read == TRANSIENT_BLOB.ordinal()) {
					blobType = TRANSIENT_BLOB;
				} else if (read == PERMANENT_BLOB.ordinal()) {
					blobType = PERMANENT_BLOB;
					checkArgument(jobId != null, "Invalid BLOB addressing for permanent BLOBs");
				} else {
					throw new IOException("Invalid data received for the BLOB type: " + read);
				}
			}

			if (LOG.isDebugEnabled()) {
				LOG.debug("Received PUT request for BLOB of job {} with from {}.", jobId,
					clientSocket.getInetAddress());
			}

			incomingFile = blobServer.createTemporaryFilename();
			byte[] digest = readFileFully(inputStream, incomingFile, buf);

			BlobKey blobKey = blobServer.moveTempFileToStore(incomingFile, jobId, digest, blobType);

			// Return computed key to client for validation
			outputStream.write(RETURN_OKAY);
			blobKey.writeToOutputStream(outputStream);
		}
		catch (SocketException e) {
			// happens when the other side disconnects
			LOG.debug("Socket connection closed", e);
		}
		catch (Throwable t) {
			LOG.error("PUT operation failed", t);
			try {
				writeErrorToStream(outputStream, t);
			}
			catch (IOException e) {
				// since we are in an exception case, it means not much that we could not send the error
				// ignore this
			}
			clientSocket.close();
		}
		finally {
			if (incomingFile != null) {
				if (!incomingFile.delete() && incomingFile.exists()) {
					LOG.warn("Cannot delete BLOB server staging file " + incomingFile.getAbsolutePath());
				}
			}
		}
	}

	private static byte[] readFileFully(
			final InputStream inputStream, final File incomingFile, final byte[] buf)
			throws IOException {
		MessageDigest md = BlobUtils.createMessageDigest();

		try (FileOutputStream fos = new FileOutputStream(incomingFile)) {
			while (true) {
				final int bytesExpected = readLength(inputStream);
				if (bytesExpected == -1) {
					// done
					break;
				}
				if (bytesExpected > BUFFER_SIZE) {
					throw new IOException(
						"Unexpected number of incoming bytes: " + bytesExpected);
				}

				readFully(inputStream, buf, 0, bytesExpected, "buffer");
				fos.write(buf, 0, bytesExpected);

				md.update(buf, 0, bytesExpected);
			}
			return md.digest();
		}
	}

	// --------------------------------------------------------------------------------------------
	//  Utilities
	// --------------------------------------------------------------------------------------------

	private static void writeErrorToStream(OutputStream out, Throwable t) throws IOException {
		byte[] bytes = InstantiationUtil.serializeObject(t);
		out.write(RETURN_ERROR);
		writeLength(bytes.length, out);
		out.write(bytes);
	}
}
  • BlobServerConnection繼承了Thread,它的構造器接收clientSocket及blobServer;它覆蓋了Thread的run方法,該方法首先從clientSocket讀取請求的operation,若是是PUT_OPERATION則調用put方法,若是是GET_OPERATION則調用get方法
  • put方法從inputStream讀取jobId及blobType,以後建立incomingFile,將輸入的文件先存儲到臨時文件,而後調用blobServer.moveTempFileToStore方法存儲到blob server
  • get方法從inputStream讀取jobId及blobType,以後調用blobServer.getStorageLocation獲取blobFile,以後將其拷貝到local store,而後寫入到outputStream

小結

  • BlobServer繼承了Thread,同時實現了BlobService、BlobWriter、PermanentBlobService、TransientBlobService接口;其構造器使用DefaultServerSocketFactory建立了ServerSocket,同時使用ShutdownHookUtil.addShutdownHook註冊了shutdownHook,在shutdown的時候會調用close方法
  • BlobServer重寫了Thread的run方法,該方法在沒有接收到shutdown請求的時候,會不斷循環等待serverSocket.accept(),而後建立BlobServerConnection,若是當前activeConnections超過了maxConnections則會不斷循環等待2000毫秒,以後將鏈接維護到activeConnections,而後調用conn.start()
  • BlobServerConnection繼承了Thread,它的構造器接收clientSocket及blobServer;它覆蓋了Thread的run方法,該方法首先從clientSocket讀取請求的operation,若是是PUT_OPERATION則調用put方法,若是是GET_OPERATION則調用get方法;put方法從inputStream讀取jobId及blobType,以後建立incomingFile,將輸入的文件先存儲到臨時文件,而後調用blobServer.moveTempFileToStore方法存儲到blob server;get方法從inputStream讀取jobId及blobType,以後調用blobServer.getStorageLocation獲取blobFile,以後將其拷貝到local store,而後寫入到outputStream

doc

相關文章
相關標籤/搜索