Akka Remote Actor_簡單示例一

Akka Remote Actor_簡單示例一java


參照Typesafe的activator-1.2.12生成的示例。api

經過akka的配置文件來配置remote actor。
app

首先實現一個計算加減乘除的remote actor,以下,tcp

package com.usoft9;

import akka.actor.UntypedActor;

/**
 * 計算加減乘除的Actor
 * getSender
 * getSelf
 */
public class CalculatorActor extends UntypedActor {
    @Override
    public void onReceive(Object message) {

        if (message instanceof Op.Add) {
            Op.Add add = (Op.Add) message;
            System.out.println("Calculating " + add.getN1() + " + " + add.getN2());
            Op.AddResult result = new Op.AddResult(add.getN1(), add.getN2(),
                    add.getN1() + add.getN2());
            getSender().tell(result, getSelf());

        } else if (message instanceof Op.Subtract) {
            Op.Subtract subtract = (Op.Subtract) message;
            System.out.println("Calculating " + subtract.getN1() + " - "
                    + subtract.getN2());
            Op.SubtractResult result = new Op.SubtractResult(subtract.getN1(),
                    subtract.getN2(), subtract.getN1() - subtract.getN2());
            getSender().tell(result, getSelf());

        } else if (message instanceof Op.Multiply) {
            Op.Multiply multiply = (Op.Multiply) message;
            System.out.println("Calculating " + multiply.getN1() + " * "
                    + multiply.getN2());
            Op.MultiplicationResult result = new Op.MultiplicationResult(
                    multiply.getN1(), multiply.getN2(), multiply.getN1()
                    * multiply.getN2());
            getSender().tell(result, getSelf());

        } else if (message instanceof Op.Divide) {
            Op.Divide divide = (Op.Divide) message;
            System.out.println("Calculating " + divide.getN1() + " / "
                    + divide.getN2());
            Op.DivisionResult result = new Op.DivisionResult(divide.getN1(),
                    divide.getN2(), divide.getN1() / divide.getN2());
            getSender().tell(result, getSelf());

        } else {
            unhandled(message);
        }
    }
}

這個remote actor和本地actor沒有什麼區別。ide

相關的類以下,this

package com.usoft9;

import java.io.Serializable;

public class Op {

    public interface MathOp extends Serializable {
    }

    public interface MathResult extends Serializable {
    }

    static class Add implements MathOp {
        private static final long serialVersionUID = 1L;
        private final int n1;
        private final int n2;

        public Add(int n1, int n2) {
            this.n1 = n1;
            this.n2 = n2;
        }

        public int getN1() {
            return n1;
        }

        public int getN2() {
            return n2;
        }
    }

    static class AddResult implements MathResult {
        private static final long serialVersionUID = 1L;
        private final int n1;
        private final int n2;
        private final int result;

        public AddResult(int n1, int n2, int result) {
            this.n1 = n1;
            this.n2 = n2;
            this.result = result;
        }

        public int getN1() {
            return n1;
        }

        public int getN2() {
            return n2;
        }

        public int getResult() {
            return result;
        }
    }

    static class Subtract implements MathOp {
        private static final long serialVersionUID = 1L;
        private final int n1;
        private final int n2;

        public Subtract(int n1, int n2) {
            this.n1 = n1;
            this.n2 = n2;
        }

        public int getN1() {
            return n1;
        }

        public int getN2() {
            return n2;
        }
    }

    static class SubtractResult implements MathResult {
        private static final long serialVersionUID = 1L;
        private final int n1;
        private final int n2;
        private final int result;

        public SubtractResult(int n1, int n2, int result) {
            this.n1 = n1;
            this.n2 = n2;
            this.result = result;
        }

        public int getN1() {
            return n1;
        }

        public int getN2() {
            return n2;
        }

        public int getResult() {
            return result;
        }
    }

    static class Multiply implements MathOp {
        private static final long serialVersionUID = 1L;
        private final int n1;
        private final int n2;

        public Multiply(int n1, int n2) {
            this.n1 = n1;
            this.n2 = n2;
        }

        public int getN1() {
            return n1;
        }

        public int getN2() {
            return n2;
        }
    }

    static class MultiplicationResult implements MathResult {
        private static final long serialVersionUID = 1L;
        private final int n1;
        private final int n2;
        private final int result;

        public MultiplicationResult(int n1, int n2, int result) {
            this.n1 = n1;
            this.n2 = n2;
            this.result = result;
        }

        public int getN1() {
            return n1;
        }

        public int getN2() {
            return n2;
        }

        public int getResult() {
            return result;
        }
    }

    static class Divide implements MathOp {
        private static final long serialVersionUID = 1L;
        private final double n1;
        private final int n2;

        public Divide(double n1, int n2) {
            this.n1 = n1;
            this.n2 = n2;
        }

        public double getN1() {
            return n1;
        }

        public int getN2() {
            return n2;
        }
    }

    static class DivisionResult implements MathResult {
        private static final long serialVersionUID = 1L;
        private final double n1;
        private final int n2;
        private final double result;

        public DivisionResult(double n1, int n2, double result) {
            this.n1 = n1;
            this.n2 = n2;
            this.result = result;
        }

        public double getN1() {
            return n1;
        }

        public int getN2() {
            return n2;
        }

        public double getResult() {
            return result;
        }
    }
}


那麼問題來了,如何與這個remote actor交互?使用Patterns.ask來與remote actor交互以下是示例代碼,
spa

首先來看remote actor的配置,以下,
.net

common.confscala

akka {
  actor {
    provider = "akka.remote.RemoteActorRefProvider"
  }

  remote {
    netty.tcp {
      hostname = "127.0.0.1"
    }
  }
}

calculator.confnetty

include "common"

akka {
  # LISTEN on tcp port 2552
  remote.netty.tcp.port = 2552
}


啓動remote actor並與之交互,以下,代碼可能寫的有點複雜,但思路仍是比較清楚的,要結合前面的Future部分知識點,不過用IDE來代碼提示很輕鬆。

package com.usoft9;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.dispatch.Futures;
import akka.dispatch.OnSuccess;
import akka.japi.Function;
import akka.pattern.Patterns;
import akka.util.Timeout;
import com.typesafe.config.ConfigFactory;
import scala.Function1;
import scala.PartialFunction;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import scala.runtime.BoxedUnit;

import java.util.Arrays;
import java.util.concurrent.Callable;

/**
 * Created by liyanxin on 2015/1/19.
 */
public class RemoteActorDemo {

    public static void main(String args[]) {

        //不使用默認的配置,而是選擇加載選定的remote actor配置
        final ActorSystem system = ActorSystem.create("CalculatorWorkerSystem",
                ConfigFactory.load(("usoft9/calculator")));

        //remote actor的ref
        final ActorRef calculatorActor = system.actorOf(Props.create(CalculatorActor.class), "CalculatorActor");

        System.out.println("Started CalculatorWorkerSystem");

        final Timeout timeout = new Timeout(Duration.create(5, "seconds"));
        Future<Object> addFuture = Patterns.ask(calculatorActor, new Op.Add(1, 2), timeout);
        Future<Object> subtractFuture = Patterns.ask(calculatorActor, new Op.Subtract(1, 2), timeout);
        Future<Object> multiplyFuture = Patterns.ask(calculatorActor, new Op.Multiply(1, 2), timeout);
        Future<Object> divideFuture = Patterns.ask(calculatorActor, new Op.Divide(1, 2), timeout);

        Iterable<Future<Object>> futureArray = Arrays.asList(addFuture, subtractFuture, multiplyFuture, divideFuture);
        Future<Iterable<Op.MathResult>> futureResult = Futures.traverse(futureArray,
                new Function<Future<Object>, Future<Op.MathResult>>() {
                    @Override
                    public Future<Op.MathResult> apply(final Future<Object> param) throws Exception {
                        return Futures.future(new Callable<Op.MathResult>() {
                            @Override
                            public Op.MathResult call() throws Exception {
                                return (Op.MathResult) Await.result(param, timeout.duration());
                            }
                        }, system.dispatcher());
                    }
                }, system.dispatcher());

        futureResult.onSuccess(new OnSuccess<Iterable<Op.MathResult>>() {
            @Override
            public void onSuccess(Iterable<Op.MathResult> result) throws Throwable {
                for (Op.MathResult r : result) {
                    if (r instanceof Op.AddResult) {
                        System.out.println("add result=" + ((Op.AddResult) r).getResult());
                    } else if (r instanceof Op.SubtractResult) {
                        System.out.println("subtract result=" + ((Op.SubtractResult) r).getResult());
                    } else if (r instanceof Op.MultiplicationResult) {
                        System.out.println("multiply result=" + ((Op.MultiplicationResult) r).getResult());
                    } else if (r instanceof Op.DivisionResult) {
                        System.out.println("divide result=" + ((Op.DivisionResult) r).getResult());
                    }
                }
            }
        }, system.dispatcher());
    }
}


運行結果,以下,

[INFO] [01/20/2015 10:47:24.660] [main] [Remoting] Starting remoting

[INFO] [01/20/2015 10:47:25.328] [main] [Remoting] Remoting started; listening on addresses :[akka.tcp://CalculatorWorkerSystem@127.0.0.1:8989]

[INFO] [01/20/2015 10:47:25.333] [main] [Remoting] Remoting now listens on addresses: [akka.tcp://CalculatorWorkerSystem@127.0.0.1:8989]

Started CalculatorWorkerSystem

Calculating 1 + 2

Calculating 1 - 2

Calculating 1 * 2

Calculating 1.0 / 2

add result=3

subtract result=-1

multiply result=2

divide result=0.5

==================END==================

相關文章
相關標籤/搜索