本文主要研究一下flink的FencedAkkaInvocationHandlerhtml
flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/FencedRpcGateway.javajava
public interface FencedRpcGateway<F extends Serializable> extends RpcGateway { /** * Get the current fencing token. * * @return current fencing token */ F getFencingToken(); }
flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/FencedMainThreadExecutable.javaapache
public interface FencedMainThreadExecutable extends MainThreadExecutable { /** * Run the given runnable in the main thread without attaching a fencing token. * * @param runnable to run in the main thread without validating the fencing token. */ void runAsyncWithoutFencing(Runnable runnable); /** * Run the given callable in the main thread without attaching a fencing token. * * @param callable to run in the main thread without validating the fencing token. * @param timeout for the operation * @param <V> type of the callable result * @return Future containing the callable result */ <V> CompletableFuture<V> callAsyncWithoutFencing(Callable<V> callable, Time timeout); }
flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaInvocationHandler.javaapi
public class FencedAkkaInvocationHandler<F extends Serializable> extends AkkaInvocationHandler implements FencedMainThreadExecutable, FencedRpcGateway<F> { private final Supplier<F> fencingTokenSupplier; public FencedAkkaInvocationHandler( String address, String hostname, ActorRef rpcEndpoint, Time timeout, long maximumFramesize, @Nullable CompletableFuture<Void> terminationFuture, Supplier<F> fencingTokenSupplier) { super(address, hostname, rpcEndpoint, timeout, maximumFramesize, terminationFuture); this.fencingTokenSupplier = Preconditions.checkNotNull(fencingTokenSupplier); } @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { Class<?> declaringClass = method.getDeclaringClass(); if (declaringClass.equals(FencedMainThreadExecutable.class) || declaringClass.equals(FencedRpcGateway.class)) { return method.invoke(this, args); } else { return super.invoke(proxy, method, args); } } @Override public void runAsyncWithoutFencing(Runnable runnable) { checkNotNull(runnable, "runnable"); if (isLocal) { getActorRef().tell( new UnfencedMessage<>(new RunAsync(runnable, 0L)), ActorRef.noSender()); } else { throw new RuntimeException("Trying to send a Runnable to a remote actor at " + getActorRef().path() + ". This is not supported."); } } @Override public <V> CompletableFuture<V> callAsyncWithoutFencing(Callable<V> callable, Time timeout) { checkNotNull(callable, "callable"); checkNotNull(timeout, "timeout"); if (isLocal) { @SuppressWarnings("unchecked") CompletableFuture<V> resultFuture = (CompletableFuture<V>) FutureUtils.toJava( Patterns.ask( getActorRef(), new UnfencedMessage<>(new CallAsync(callable)), timeout.toMilliseconds())); return resultFuture; } else { throw new RuntimeException("Trying to send a Runnable to a remote actor at " + getActorRef().path() + ". This is not supported."); } } @Override public void tell(Object message) { super.tell(fenceMessage(message)); } @Override public CompletableFuture<?> ask(Object message, Time timeout) { return super.ask(fenceMessage(message), timeout); } @Override public F getFencingToken() { return fencingTokenSupplier.get(); } private <P> FencedMessage<F, P> fenceMessage(P message) { if (isLocal) { return new LocalFencedMessage<>(fencingTokenSupplier.get(), message); } else { if (message instanceof Serializable) { @SuppressWarnings("unchecked") FencedMessage<F, P> result = (FencedMessage<F, P>) new RemoteFencedMessage<>(fencingTokenSupplier.get(), (Serializable) message); return result; } else { throw new RuntimeException("Trying to send a non-serializable message " + message + " to a remote " + "RpcEndpoint. Please make sure that the message implements java.io.Serializable."); } } } }
flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/UnfencedMessage.javaide
public class UnfencedMessage<P> { private final P payload; public UnfencedMessage(P payload) { this.payload = Preconditions.checkNotNull(payload); } public P getPayload() { return payload; } @Override public String toString() { return "UnfencedMessage(" + payload + ')'; } }
flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/FencedMessage.javathis
public interface FencedMessage<F extends Serializable, P> { F getFencingToken(); P getPayload(); }
flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/LocalFencedMessage.javacode
public class LocalFencedMessage<F extends Serializable, P> implements FencedMessage<F, P> { private final F fencingToken; private final P payload; public LocalFencedMessage(@Nullable F fencingToken, P payload) { this.fencingToken = fencingToken; this.payload = Preconditions.checkNotNull(payload); } @Override public F getFencingToken() { return fencingToken; } @Override public P getPayload() { return payload; } @Override public String toString() { return "LocalFencedMessage(" + fencingToken + ", " + payload + ')'; } }
flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/RemoteFencedMessage.javahtm
public class RemoteFencedMessage<F extends Serializable, P extends Serializable> implements FencedMessage<F, P>, Serializable { private static final long serialVersionUID = 4043136067468477742L; private final F fencingToken; private final P payload; public RemoteFencedMessage(@Nullable F fencingToken, P payload) { this.fencingToken = fencingToken; this.payload = Preconditions.checkNotNull(payload); } @Override public F getFencingToken() { return fencingToken; } @Override public P getPayload() { return payload; } @Override public String toString() { return "RemoteFencedMessage(" + fencingToken + ", " + payload + ')'; } }