Akka是一個開發併發、容錯和可伸縮應用的框架。它是Actor Model的一個實現,和Erlang的併發模型很像。在Actor模型中,全部的實體被認爲是獨立的actors。actors和其餘actors經過發送異步消息通訊。Actor模型的強大來自於異步。它也能夠顯式等待響應,這使得能夠執行同步操做。可是,強烈不建議同步消息,由於它們限制了系統的伸縮性。每一個actor有一個郵箱(mailbox),它收到的消息存儲在裏面。另外,每個actor維護自身單獨的狀態。一個Actors網絡以下所示:併發
Akka系統的核心ActorSystem和Actor,若需構建一個Akka系統,首先須要建立ActorSystem,建立完ActorSystem後,可經過其建立Actor(注意:Akka不容許直接new一個Actor,只能經過 Akka 提供的某些 API 才能建立或查找 Actor,通常會經過 ActorSystem#actorOf和ActorContext#actorOf來建立 Actor),另外,咱們只能經過ActorRef(Actor的引用, 其對原生的 Actor 實例作了良好的封裝,外界不能隨意修改其內部狀態)來與Actor進行通訊。以下代碼展現瞭如何配置一個Akka系統。框架
// 1. 構建ActorSystem // 使用缺省配置 ActorSystem system = ActorSystem.create("sys"); // 也可顯示指定appsys配置 // ActorSystem system1 = ActorSystem.create("helloakka", ConfigFactory.load("appsys")); // 2. 構建Actor,獲取該Actor的引用,即ActorRef ActorRef helloActor = system.actorOf(Props.create(HelloActor.class), "helloActor"); // 3. 給helloActor發送消息 helloActor.tell("hello helloActor", ActorRef.noSender()); // 4. 關閉ActorSystem system.terminate();
在Akka中,建立的每一個Actor都有本身的路徑,該路徑遵循 ActorSystem 的層級結構,大體以下:異步
本地:akka://sys/user/helloActor 遠程:akka.tcp://sys@l27.0.0.1:2020/user/remoteActor
ActorSystem system = ActorSystem.create("sys"); ActorSelection as = system.actorSelection("/path/to/actor"); Timeout timeout = new Timeout(Duration.create(2, "seconds")); Future<ActorRef> fu = as.resolveOne(timeout); fu.onSuccess(new OnSuccess<ActorRef>() { @Override public void onSuccess(ActorRef actor) { System.out.println("actor:" + actor); actor.tell("hello actor", ActorRef.noSender()); } }, system.dispatcher()); fu.onFailure(new OnFailure() { @Override public void onFailure(Throwable failure) { System.out.println("failure:" + failure); } }, system.dispatcher());
helloActor.tell("hello helloActor", ActorRef.noSender());
其中:第一個參數爲消息,它能夠是任何可序列化的數據或對象,第二個參數表示發送者,一般來說是另一個 Actor 的引用, ActorRef.noSender()表示無發送者((其實是一個 叫作deadLetters的Actor)。
當咱們須要從Actor獲取響應結果時,可以使用ask方法,ask方法會將返回結果包裝在scala.concurrent.Future中,而後經過異步回調獲取返回結果。 如調用方:
// 異步發送消息給Actor,並獲取響應結果 Future<Object> fu = Patterns.ask(printerActor, "hello helloActor", timeout); fu.onComplete(new OnComplete<Object>() { @Override public void onComplete(Throwable failure, String success) throws Throwable { if (failure != null) { System.out.println("failure is " + failure); } else { System.out.println("success is " + success); } } }, system.dispatcher());
private void handleMessage(Object object) { if (object instanceof String) { String str = (String) object; log.info("[HelloActor] message is {}, sender is {}", str, getSender().path().toString()); // 給發送者發送消息 getSender().tell(str, getSelf()); } }
public interface RpcGateway { /** * Returns the fully qualified address under which the associated rpc endpoint is reachable. * * @return Fully qualified (RPC) address under which the associated rpc endpoint is reachable */ String getAddress(); /** * Returns the fully qualified hostname under which the associated rpc endpoint is reachable. * * @return Fully qualified hostname under which the associated rpc endpoint is reachable */ String getHostname(); }
protected RpcEndpoint(final RpcService rpcService, final String endpointId) { // 保存rpcService和endpointId this.rpcService = checkNotNull(rpcService, "rpcService"); this.endpointId = checkNotNull(endpointId, "endpointId"); // 經過RpcService啓動RpcServer this.rpcServer = rpcService.startServer(this); // 主線程執行器,全部調用在主線程中串行執行 this.mainThreadExecutor = new MainThreadExecutor(rpcServer, this::validateRunsInMainThread); }
、callAsync(Callable, Time)
public <C extends RpcEndpoint & RpcGateway> RpcServer startServer(C rpcEndpoint) { checkNotNull(rpcEndpoint, "rpc endpoint"); CompletableFuture<Void> terminationFuture = new CompletableFuture<>(); final Props akkaRpcActorProps; // 根據RpcEndpoint類型建立不一樣類型的Props if (rpcEndpoint instanceof FencedRpcEndpoint) { akkaRpcActorProps = Props.create( FencedAkkaRpcActor.class, rpcEndpoint, terminationFuture, getVersion(), configuration.getMaximumFramesize()); } else { akkaRpcActorProps = Props.create( AkkaRpcActor.class, rpcEndpoint, terminationFuture, getVersion(), configuration.getMaximumFramesize()); } ActorRef actorRef; // 同步塊,建立Actor,並獲取對應的ActorRef synchronized (lock) { checkState(!stopped, "RpcService is stopped"); actorRef = actorSystem.actorOf(akkaRpcActorProps, rpcEndpoint.getEndpointId()); actors.put(actorRef, rpcEndpoint); } LOG.info("Starting RPC endpoint for {} at {} .", rpcEndpoint.getClass().getName(), actorRef.path()); // 獲取Actor的路徑 final String akkaAddress = AkkaUtils.getAkkaURL(actorSystem, actorRef); final String hostname; Option<String> host = actorRef.path().address().host(); if (host.isEmpty()) { hostname = "localhost"; } else { hostname = host.get(); } // 解析該RpcEndpoint實現的全部RpcGateway接口 Set<Class<?>> implementedRpcGateways = new HashSet<>(RpcUtils.extractImplementedRpcGateways(rpcEndpoint.getClass())); // 額外添加RpcServer和AkkaBasedEnpoint類 implementedRpcGateways.add(RpcServer.class); implementedRpcGateways.add(AkkaBasedEndpoint.class); final InvocationHandler akkaInvocationHandler; // 根據不一樣類型動態建立代理對象 if (rpcEndpoint instanceof FencedRpcEndpoint) { // a FencedRpcEndpoint needs a FencedAkkaInvocationHandler akkaInvocationHandler = new FencedAkkaInvocationHandler<>( akkaAddress, hostname, actorRef, configuration.getTimeout(), configuration.getMaximumFramesize(), terminationFuture, ((FencedRpcEndpoint<?>) rpcEndpoint)::getFencingToken); implementedRpcGateways.add(FencedMainThreadExecutable.class); } else { akkaInvocationHandler = new AkkaInvocationHandler( akkaAddress, hostname, actorRef, configuration.getTimeout(), configuration.getMaximumFramesize(), terminationFuture); } // Rather than using the System ClassLoader directly, we derive the ClassLoader // from this class . That works better in cases where Flink runs embedded and all Flink // code is loaded dynamically (for example from an OSGI bundle) through a custom ClassLoader ClassLoader classLoader = getClass().getClassLoader(); // 生成RpcServer對象,然後對該server的調用都會進入Handler的invoke方法處理,handler實現了多個接口的方法 @SuppressWarnings("unchecked") RpcServer server = (RpcServer) Proxy.newProxyInstance( classLoader, implementedRpcGateways.toArray(new Class<?>[implementedRpcGateways.size()]), akkaInvocationHandler); return server; }
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { Class<?> declaringClass = method.getDeclaringClass(); Object result; // 先匹配指定類型(handler已實現接口的方法),若匹配成功則直接進行本地方法調用;若匹配爲FencedRpcGateway類型,則拋出異常(應該在FencedAkkaInvocationHandler中處理);其餘則進行Rpc調用 if (declaringClass.equals(AkkaBasedEndpoint.class) || declaringClass.equals(Object.class) || declaringClass.equals(RpcGateway.class) || declaringClass.equals(StartStoppable.class) || declaringClass.equals(MainThreadExecutable.class) || declaringClass.equals(RpcServer.class)) { result = method.invoke(this, args); } else if (declaringClass.equals(FencedRpcGateway.class)) { throw new UnsupportedOperationException("AkkaInvocationHandler does not support the call FencedRpcGateway#" + method.getName() + ". This indicates that you retrieved a FencedRpcGateway without specifying a " + "fencing token. Please use RpcService#connect(RpcService, F, Time) with F being the fencing token to " + "retrieve a properly FencedRpcGateway."); } else { result = invokeRpc(method, args); } return result; }
經過ActorRef#tell給對應的Actor發送消息rpcEndpoint.tell(ControlMessages.START, ActorRef.noSender());
public void scheduleRunAsync(Runnable runnable, long delayMillis) { checkNotNull(runnable, "runnable"); checkArgument(delayMillis >= 0, "delay must be zero or greater"); // 判斷是否爲本地Actor if (isLocal) { long atTimeNanos = delayMillis == 0 ? 0 : System.nanoTime() + (delayMillis * 1_000_000); // 向Actor發送消息runnable tell(new RunAsync(runnable, atTimeNanos)); } else { // 拋出異常,不支持遠程發送Runnable消息 throw new RuntimeException("Trying to send a Runnable to a remote actor at " + rpcEndpoint.path() + ". This is not supported."); } }
AkkaInvocationHandler#invoke -> AkkaInvocation#scheduleRunAsync;
AkkaRpcActor#handleMessage -> AkkaRpcActor#handleRpcMessage,其中handleRpcMessage方法以下:
protected void handleRpcMessage(Object message) { // 根據消息類型不一樣進行不一樣的處理 if (message instanceof RunAsync) { handleRunAsync((RunAsync) message); } else if (message instanceof CallAsync) { handleCallAsync((CallAsync) message); } else if (message instanceof RpcInvocation) { handleRpcInvocation((RpcInvocation) message); } else { log.warn( "Received message of unknown type {} with value {}. Dropping this message!", message.getClass().getName(), message); sendErrorIfSender(new AkkaUnknownMessageException("Received unknown message " + message + " of type " + message.getClass().getSimpleName() + '.')); } }
private void handleRunAsync(RunAsync runAsync) { // 獲取延遲調度時間 final long timeToRun = runAsync.getTimeNanos(); final long delayNanos; // 若爲0或已經到了調度時間,則馬上進行調度 if (timeToRun == 0 || (delayNanos = timeToRun - System.nanoTime()) <= 0) { // run immediately try { runAsync.getRunnable().run(); } catch (Throwable t) { log.error("Caught exception while executing runnable in main thread.", t); ExceptionUtils.rethrowIfFatalErrorOrOOM(t); } } else { // schedule for later. send a new message after the delay, which will then be immediately executed // 計算出延遲時間 FiniteDuration delay = new FiniteDuration(delayNanos, TimeUnit.NANOSECONDS); // 從新封裝消息 RunAsync message = new RunAsync(runAsync.getRunnable(), timeToRun); final Object envelopedSelfMessage = envelopeSelfMessage(message); // 等待指定延遲時間後給本身再發送一個消息 getContext().system().scheduler().scheduleOnce(delay, getSelf(), envelopedSelfMessage, getContext().dispatcher(), ActorRef.noSender()); } }
private Object invokeRpc(Method method, Object[] args) throws Exception { // 獲取方法相應的信息 String methodName = method.getName(); Class<?>[] parameterTypes = method.getParameterTypes(); Annotation[][] parameterAnnotations = method.getParameterAnnotations(); Time futureTimeout = extractRpcTimeout(parameterAnnotations, args, timeout); // 建立RpcInvocationMessage(可分爲LocalRpcInvocation/RemoteRpcInvocation) final RpcInvocation rpcInvocation = createRpcInvocationMessage(methodName, parameterTypes, args); Class<?> returnType = method.getReturnType(); final Object result; // 無返回,則使用tell方法 if (Objects.equals(returnType, Void.TYPE)) { tell(rpcInvocation); result = null; } else { // execute an asynchronous call // 有返回,則使用ask方法 CompletableFuture<?> resultFuture = ask(rpcInvocation, futureTimeout); CompletableFuture<?> completableFuture = resultFuture.thenApply((Object o) -> { // 調用返回後進行反序列化 if (o instanceof SerializedValue) { try { return ((SerializedValue<?>) o).deserializeValue(getClass().getClassLoader()); } catch (IOException | ClassNotFoundException e) { throw new CompletionException( new RpcException("Could not deserialize the serialized payload of RPC method : " + methodName, e)); } } else { // 直接返回 return o; } }); // 若返回類型爲CompletableFuture則直接賦值 if (Objects.equals(returnType, CompletableFuture.class)) { result = completableFuture; } else { try { // 從CompletableFuture獲取 result = completableFuture.get(futureTimeout.getSize(), futureTimeout.getUnit()); } catch (ExecutionException ee) { throw new RpcException("Failure while obtaining synchronous RPC result.", ExceptionUtils.stripExecutionException(ee)); } } } return result; }
private void handleRpcInvocation(RpcInvocation rpcInvocation) { Method rpcMethod = null; try { // 獲取方法的信息 String methodName = rpcInvocation.getMethodName(); Class<?>[] parameterTypes = rpcInvocation.getParameterTypes(); // 在RpcEndpoint中找指定方法 rpcMethod = lookupRpcMethod(methodName, parameterTypes); } catch (ClassNotFoundException e) { log.error("Could not load method arguments.", e); // 異常處理 RpcConnectionException rpcException = new RpcConnectionException("Could not load method arguments.", e); getSender().tell(new Status.Failure(rpcException), getSelf()); } catch (IOException e) { log.error("Could not deserialize rpc invocation message.", e); // 異常處理 RpcConnectionException rpcException = new RpcConnectionException("Could not deserialize rpc invocation message.", e); getSender().tell(new Status.Failure(rpcException), getSelf()); } catch (final NoSuchMethodException e) { log.error("Could not find rpc method for rpc invocation.", e); // 異常處理 RpcConnectionException rpcException = new RpcConnectionException("Could not find rpc method for rpc invocation.", e); getSender().tell(new Status.Failure(rpcException), getSelf()); } if (rpcMethod != null) { try { // this supports declaration of anonymous classes rpcMethod.setAccessible(true); // 返回類型爲空則直接進行invoke if (rpcMethod.getReturnType().equals(Void.TYPE)) { // No return value to send back rpcMethod.invoke(rpcEndpoint, rpcInvocation.getArgs()); } else { final Object result; try { result = rpcMethod.invoke(rpcEndpoint, rpcInvocation.getArgs()); } catch (InvocationTargetException e) { log.debug("Reporting back error thrown in remote procedure {}", rpcMethod, e); // tell the sender about the failure getSender().tell(new Status.Failure(e.getTargetException()), getSelf()); return; } final String methodName = rpcMethod.getName(); // 方法返回類型爲CompletableFuture if (result instanceof CompletableFuture) { final CompletableFuture<?> responseFuture = (CompletableFuture<?>) result; // 發送結果(使用Patterns發送結果給調用者,並會進行序列化並驗證結果大小) sendAsyncResponse(responseFuture, methodName); } else { // 類型非CompletableFuture,發送結果(使用Patterns發送結果給調用者,並會進行序列化並驗證結果大小) sendSyncResponse(result, methodName); } } } catch (Throwable e) { log.error("Error while executing remote procedure call {}.", rpcMethod, e); // tell the sender about the failure getSender().tell(new Status.Failure(e), getSelf()); } } }