



public interface ActorGateway extends Serializable {

     * Sends a message asynchronously and returns its response. The response to the message is
     * returned as a future.
     * @param message Message to be sent
     * @param timeout Timeout until the Future is completed with an AskTimeoutException
     * @return Future which contains the response to the sent message
    Future<Object> ask(Object message, FiniteDuration timeout);

     * Sends a message asynchronously without a result.
     * @param message Message to be sent
    void tell(Object message);

     * Sends a message asynchronously without a result with sender being the sender.
     * @param message Message to be sent
     * @param sender Sender of the message
    void tell(Object message, ActorGateway sender);

     * Forwards a message. For the receiver of this message it looks as if sender has sent the
     * message.
     * @param message Message to be sent
     * @param sender Sender of the forwarded message
    void forward(Object message, ActorGateway sender);

     * Retries to send asynchronously a message up to numberRetries times. The response to this
     * message is returned as a future. The message is re-sent if the number of retries is not yet
     * exceeded and if an exception occurred while sending it.
     * @param message Message to be sent
     * @param numberRetries Number of times to retry sending the message
     * @param timeout Timeout for each sending attempt
     * @param executionContext ExecutionContext which is used to send the message multiple times
     * @return Future of the response to the sent message
    Future<Object> retry(
            Object message,
            int numberRetries,
            FiniteDuration timeout,
            ExecutionContext executionContext);

     * Returns the path of the remote instance.
     * @return Path of the remote instance.
    String path();

     * Returns the underlying actor with which is communicated
     * @return ActorRef of the target actor
    ActorRef actor();

     * Returns the leaderSessionID associated with the remote actor or null.
     * @return Leader session ID if its associated with this gateway, otherwise null
    UUID leaderSessionID();
  • ActorGateway接口定義了ask、tell、forward、retry、path、actor、leaderSessionID方法;它有一個實現類爲AkkaActorGateway



public class AkkaActorGateway implements ActorGateway, Serializable {

    private static final long serialVersionUID = 42L;

    // ActorRef of the remote instance
    private final ActorRef actor;

    // Associated leader session ID, which is used for RequiresLeaderSessionID messages
    private final UUID leaderSessionID;

    // Decorator for messages
    private final MessageDecorator decorator;

    public AkkaActorGateway(ActorRef actor, UUID leaderSessionID) {
        this.actor = Preconditions.checkNotNull(actor);
        this.leaderSessionID = Preconditions.checkNotNull(leaderSessionID);
        // we want to wrap RequiresLeaderSessionID messages in a LeaderSessionMessage
        this.decorator = new LeaderSessionMessageDecorator(leaderSessionID);

     * Sends a message asynchronously and returns its response. The response to the message is
     * returned as a future.
     * @param message Message to be sent
     * @param timeout Timeout until the Future is completed with an AskTimeoutException
     * @return Future which contains the response to the sent message
    public Future<Object> ask(Object message, FiniteDuration timeout) {
        Object newMessage = decorator.decorate(message);
        return Patterns.ask(actor, newMessage, new Timeout(timeout));

     * Sends a message asynchronously without a result.
     * @param message Message to be sent
    public void tell(Object message) {
        Object newMessage = decorator.decorate(message);
        actor.tell(newMessage, ActorRef.noSender());

     * Sends a message asynchronously without a result with sender being the sender.
     * @param message Message to be sent
     * @param sender Sender of the message
    public void tell(Object message, ActorGateway sender) {
        Object newMessage = decorator.decorate(message);
        actor.tell(newMessage, sender.actor());

     * Forwards a message. For the receiver of this message it looks as if sender has sent the
     * message.
     * @param message Message to be sent
     * @param sender Sender of the forwarded message
    public void forward(Object message, ActorGateway sender) {
        Object newMessage = decorator.decorate(message);
        actor.tell(newMessage, sender.actor());

     * Retries to send asynchronously a message up to numberRetries times. The response to this
     * message is returned as a future. The message is re-sent if the number of retries is not yet
     * exceeded and if an exception occurred while sending it.
     * @param message Message to be sent
     * @param numberRetries Number of times to retry sending the message
     * @param timeout Timeout for each sending attempt
     * @param executionContext ExecutionContext which is used to send the message multiple times
     * @return Future of the response to the sent message
    public Future<Object> retry(
            Object message,
            int numberRetries,
            FiniteDuration timeout,
            ExecutionContext executionContext) {

        Object newMessage = decorator.decorate(message);

        return AkkaUtils.retry(

     * Returns the ActorPath of the remote instance.
     * @return ActorPath of the remote instance.
    public String path() {
        return actor.path().toString();

     * Returns {@link ActorRef} of the target actor
     * @return ActorRef of the target actor
    public ActorRef actor() {
        return actor;

    public UUID leaderSessionID() {
        return leaderSessionID;

    public String toString() {
        return String.format("AkkaActorGateway(%s, %s)", actor.path(), leaderSessionID);
  • AkkaActorGateway實現了ActorGateway接口,它的構造器要求輸入ActorRef及leaderSessionID,同時基於leaderSessionID建立了LeaderSessionMessageDecorator;ask、tell、forward、retry方法均首先調用LeaderSessionMessageDecorator.decorate方法包裝message參數,而後再去調用ActorRef的相應方法



public interface MessageDecorator extends java.io.Serializable {

     * Decorates a message
     * @param message Message to decorate
     * @return Decorated message
    Object decorate(Object message);
  • MessageDecorator接口定義了decorate方法用於包裝message,它有一個實現類爲LeaderSessionMessageDecorator



public class LeaderSessionMessageDecorator implements MessageDecorator {

    private static final long serialVersionUID = 5359618147408392706L;
    /** Leader session ID with which the RequiresLeaderSessionID messages will be decorated */
    private final UUID leaderSessionID;

     * Sets the leader session ID with which the messages will be decorated.
     * @param leaderSessionID Leader session ID to be used for decoration
    public LeaderSessionMessageDecorator(UUID leaderSessionID) {
        this.leaderSessionID = leaderSessionID;

    public Object decorate(Object message) {
        if (message instanceof RequiresLeaderSessionID) {
            return new JobManagerMessages.LeaderSessionMessage(leaderSessionID, message);
        } else {
            return message;
  • LeaderSessionMessageDecorator實現了MessageDecorator接口,其decorate方法判斷message是RequiresLeaderSessionID類型的話,則返回JobManagerMessages.LeaderSessionMessage,不然返回原始的message



object JobManagerMessages {

  /** Wrapper class for leader session messages. Leader session messages implement the
    * [[RequiresLeaderSessionID]] interface and have to be wrapped in a [[LeaderSessionMessage]],
    * which also contains the current leader session ID.
    * @param leaderSessionID Current leader session ID
    * @param message [[RequiresLeaderSessionID]] message to be wrapped in a [[LeaderSessionMessage]]
  case class LeaderSessionMessage(leaderSessionID: UUID, message: Any)

  • JobManagerMessages.LeaderSessionMessage是一個case class,它有兩個屬性,分別是leaderSessionID及message


  • ActorGateway接口定義了ask、tell、forward、retry、path、actor、leaderSessionID方法;它有一個實現類爲AkkaActorGateway
  • AkkaActorGateway實現了ActorGateway接口,它的構造器要求輸入ActorRef及leaderSessionID,同時基於leaderSessionID建立了LeaderSessionMessageDecorator;ask、tell、forward、retry方法均首先調用LeaderSessionMessageDecorator.decorate方法包裝message參數,而後再去調用ActorRef的相應方法
  • MessageDecorator接口定義了decorate方法用於包裝message,它有一個實現類爲LeaderSessionMessageDecorator;LeaderSessionMessageDecorator實現了MessageDecorator接口,其decorate方法判斷message是RequiresLeaderSessionID類型的話,則返回JobManagerMessages.LeaderSessionMessage,不然返回原始的message;JobManagerMessages.LeaderSessionMessage是一個case class,它有兩個屬性,分別是leaderSessionID及message

