akka-rpc(基於akka的rpc實現)

akka-rpc(基於akka的rpc的實現)


代碼:http://git.oschina.net/for-1988/Simples


目前的工做在基於akka(java)實現數據服務總線,Akka 2.3中提供了 Cluster Sharing(分片集羣)和Persistence功能能夠很簡單的寫出一個大型的分佈式集羣的架構。裏面的一塊功能就是RPC(遠程過程調用)。java

RPC

遠程過程調用(Remote Procedure Call,RPC)是一個計算機通訊協議。該協議容許運行於一臺計算機的程序調用另外一臺計算機的子程序,而程序員無需額外地爲這個交互做用編程。若是涉及的軟件採用面向對象編程,那麼遠程過程調用亦可稱做遠程調用或遠程方法調用,例:Java RMIgit

實現原理

整個RPC的調用過程徹底基於akka來傳遞對象,由於須要進行網絡通訊,因此咱們的接口實現類、調用參數以及返回值都須要實現java序列化接口。客戶端跟服務端其實都是在一個Akka 集羣關係中,Client跟Server都是集羣中的一個節點。首先Client須要初始化RpcClient對象,在初始化的過程當中,咱們啓動了AkkaSystem,加入到整個集羣中,並建立了負責與Server進行通訊的Actor。而後經過RpcClient中的getBean(Class<T> clz)方法獲取Server端的接口實現類的實例對象,而後經過動態代理攔截這個對象的全部方法。最後,在執行方法的時候,在RpcBeanProxy中向Server發送CallMethod事件,執行遠程實現類的方法,獲取返回值給Client。程序員

Server端核心代碼

public class RpcServer extends UntypedActor {
         private Map<String, Object> proxyBeans;

	public RpcServer(Map<Class<?>, Object> beans) {
		proxyBeans = new HashMap<String, Object>();
		for (Iterator<Class<?>> iterator = beans.keySet().iterator(); iterator
				.hasNext();) {
			Class<?> inface = iterator.next();
			proxyBeans.put(inface.getName(), beans.get(inface));
		}
	}

	@Override
	public void onReceive(Object message) throws Exception {
		if (message instanceof RpcEvent.CallBean) {   //返回Server端的接口實現類的實例
			CallBean event = (CallBean) message;
			ReturnBean bean = new ReturnBean(
					proxyBeans.get(event.getBeanName()), getSelf());
			getSender().tell(bean, getSelf());
		} else if (message instanceof RpcEvent.CallMethod) {
			CallMethod event = (CallMethod) message;
			Object bean = proxyBeans.get(event.getBeanName());
			Object[] params = event.getParams();
			List<Class<?>> paraTypes = new ArrayList<Class<?>>();
			Class<?>[] paramerTypes = new Class<?>[] {};
			if (params != null) {
				for (Object param : params) {
					paraTypes.add(param.getClass());
				}
			}
			Method method = bean.getClass().getMethod(event.getMethodName(),
					paraTypes.toArray(paramerTypes));
			Object o = method.invoke(bean, params);
			getSender().tell(o, getSelf());
		}
	}

}

啓動Server編程

public static void main(String[] args) {
		final Config config = ConfigFactory
				.parseString("akka.remote.netty.tcp.port=" + 2551)
				.withFallback(
						ConfigFactory
								.parseString("akka.cluster.roles = [RpcServer]"))
				.withFallback(ConfigFactory.load());

		ActorSystem system = ActorSystem.create("EsbSystem", config);
		
		// Server 加入發佈的服務
		Map<Class<?>, Object> beans = new HashMap<Class<?>, Object>();
		beans.put(ExampleInterface.class, new ExampleInterfaceImpl());
		system.actorOf(Props.create(RpcServer.class, beans), "rpcServer");
	}

Client端核心代碼 

RpcClient類型集成了Thread,爲了解決一個問題:由於AkkaSystem在加入集羣中的時候是異步的,因此咱們在第一次new RpcClient對象的時候須要等待加入集羣成功之後,才能夠執行下面的方法,否則獲取的 /user/rpcServer Route中沒有Server的Actor,請求會失敗。網絡

public class RpcClient extends Thread {

	private ActorSystem system;

	private ActorRef rpc;

	private ActorRef clientServer;

	private static RpcClient instance = null;

	public RpcClient() {
		this.start();
		final Config config = ConfigFactory
				.parseString("akka.remote.netty.tcp.port=" + 2552)
				.withFallback(
						ConfigFactory
								.parseString("akka.cluster.roles = [RpcClient]"))
				.withFallback(ConfigFactory.load());
		system = ActorSystem.create("EsbSystem", config);

		int totalInstances = 100;
		Iterable<String> routeesPaths = Arrays.asList("/user/rpcServer");
		boolean allowLocalRoutees = false;
		ClusterRouterGroup clusterRouterGroup = new ClusterRouterGroup(
				new AdaptiveLoadBalancingGroup(
						HeapMetricsSelector.getInstance(),
						Collections.<String> emptyList()),
				new ClusterRouterGroupSettings(totalInstances, routeesPaths,
						allowLocalRoutees, "RpcServer"));
		rpc = system.actorOf(clusterRouterGroup.props(), "rpcCall");
		clientServer = system.actorOf(Props.create(RpcClientServer.class, rpc),
				"client");
		Cluster.get(system).registerOnMemberUp(new Runnable() {  //加入集羣成功後的回調事件,恢復當前線程的中斷
			@Override
			public void run() {
				synchronized (instance) {
					System.out.println("notify");
					instance.notify();
				}
			}
		});

	}

	public static RpcClient getInstance() {
		if (instance == null) {
			instance = new RpcClient();
			synchronized (instance) {
				try {   //中斷當前線程,等待加入集羣成功後,恢復
					System.out.println("wait");
					instance.wait();
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
		}
		return instance;
	}

	public <T> T getBean(Class<T> clz) {
		Future<Object> future = Patterns.ask(clientServer,
				new RpcEvent.CallBean(clz.getName(), clientServer),
				new Timeout(Duration.create(5, TimeUnit.SECONDS)));
		try {
			Object o = Await.result(future,
					Duration.create(5, TimeUnit.SECONDS));
			if (o != null) {
				ReturnBean returnBean = (ReturnBean) o;
				return (T) new RpcBeanProxy().proxy(returnBean.getObj(),
						clientServer, clz);
			}
		} catch (Exception e) {
			e.printStackTrace();
		}
		return null;
	}
}

RpcClientServer架構

public class RpcClientServer extends UntypedActor {

	private ActorRef rpc;

	public RpcClientServer(ActorRef rpc) {
		this.rpc = rpc;
	}

	@Override
	public void onReceive(Object message) throws Exception {
		if (message instanceof RpcEvent.CallBean) {  //向Server發送CallBean請求
			CallBean event = (CallBean) message;
			Future<Object> future = Patterns.ask(rpc, event, new Timeout(
					Duration.create(5, TimeUnit.SECONDS)));
			Object o = Await.result(future,
					Duration.create(5, TimeUnit.SECONDS));
			getSender().tell(o, getSelf());
		} else if (message instanceof RpcEvent.CallMethod) {  //向Server發送方法調用請求
			Future<Object> future = Patterns.ask(rpc, message, new Timeout(
					Duration.create(5, TimeUnit.SECONDS)));
			Object o = Await.result(future,
					Duration.create(5, TimeUnit.SECONDS));
			getSender().tell(o, getSelf());
		}
	}
}

RpcBeanProxy,客戶端的動態代理類異步

public class RpcBeanProxy implements InvocationHandler {

	private ActorRef rpcClientServer;

	private Class<?> clz;

	public Object proxy(Object target, ActorRef rpcClientServer, Class<?> clz) {
		this.rpcClientServer = rpcClientServer;
		this.clz = clz;
		return Proxy.newProxyInstance(target.getClass().getClassLoader(),
				target.getClass().getInterfaces(), this);
	}

	@Override
	public Object invoke(Object proxy, Method method, Object[] args)
			throws Throwable {
		Object result = null;
		RpcEvent.CallMethod callMethod = new RpcEvent.CallMethod(
				method.getName(), args, clz.getName());
		Future<Object> future = Patterns.ask(rpcClientServer, callMethod,
				new Timeout(Duration.create(5, TimeUnit.SECONDS)));
		Object o = Await.result(future, Duration.create(5, TimeUnit.SECONDS));
		result = o;
		return result;
	}

}

Demo

Interface,Client和Server都須要這個類,必須實現序列化tcp

public interface ExampleInterface extends Serializable{
	public String sayHello(String name);
}

實現類,只須要Server端存在這個類。分佈式

public class ExampleInterfaceImpl implements ExampleInterface {
	@Override
	public String sayHello(String name) {
		System.out.println("Be Called !");
		return "Hello " + name;
	}
}

Client調用ide

public static void main(String[] args) {
		RpcClient client = RpcClient.getInstance();
		long start = System.currentTimeMillis();
		
		ExampleInterface example = client.getBean(ExampleInterface.class);
		System.out.println(example.sayHello("rpc"));
		
		long time = System.currentTimeMillis() - start;
		System.out.println("time :" + time);
	}


這裏第一次調用耗時比較長鬚要46毫秒,akka會對消息進行優化,調用屢次之後時間爲 1~2毫秒。

目前還沒來得及作性能測試,後面會補充。

相關文章
相關標籤/搜索