使用Akka構建集羣(二)

前言

在《使用Akka構建集羣(一)》一文中經過簡單集羣監聽器的例子演示瞭如何使用Akka搭建一個簡單的集羣,可是這個例子「也許」離咱們的實際業務場景太遠,你基本不太可能去作這樣的工做,除非你負責運維、監控相關的工做(但實際上一個合格的程序員在實現功能的同時,也應當考慮監控的問題,至少應當接入一些監控系統或框架)。css

本文將介紹一個相對看來更符合咱們對於集羣使用的業務需求的例子——將客戶端請求的字符串轉換爲大寫(假如客戶端真的沒有這個能力的話)。html

服務端

本文的Akka配置繼續沿用《使用Akka構建集羣(一)》一文中所展現的配置,但在正式編碼以前咱們須要在配置中增長一個新的配置項akka.cluster.roles指定集羣中服務端的角色,從新編輯事後的application.conf以下:前端

akka {
  actor {
    provider = "akka.cluster.ClusterActorRefProvider"
  }
  remote {
    log-remote-lifecycle-events = off
    netty.tcp {
      hostname = "127.0.0.1"
      port = 2551
    }
  }

  cluster {
    seed-nodes = [
      "akka.tcp://metadataAkkaSystem@127.0.0.1:2551",
      "akka.tcp://metadataAkkaSystem@127.0.0.1:2552"]

    #//#snippet
    # excluded from snippet
    auto-down-unreachable-after = 10s
    #//#snippet
    # auto downing is NOT safe for production deployments.
    # you may want to use it during development, read more about it in the docs.
    #
    # auto-down-unreachable-after = 10s
    roles = [backend]
    # Disable legacy metrics in akka-cluster.
	metrics.enabled=off
  }

}

你仍然不須要過多產生於集羣直接相關的細節。若是你已經閱讀了《使用Akka構建集羣(一)》一文,本文介紹的內容應該不會花費你太多的時間。java

客戶端與服務端通訊須要一些pojo,它們的實現以下:node

public interface TransformationMessages {

	public static class TransformationJob implements Serializable {
		private final String text;

		public TransformationJob(String text) {
			this.text = text;
		}

		public String getText() {
			return text;
		}
	}

	public static class TransformationResult implements Serializable {
		private final String text;

		public TransformationResult(String text) {
			this.text = text;
		}

		public String getText() {
			return text;
		}

		@Override
		public String toString() {
			return "TransformationResult(" + text + ")";
		}
	}

	public static class JobFailed implements Serializable {
		private final String reason;
		private final TransformationJob job;

		public JobFailed(String reason, TransformationJob job) {
			this.reason = reason;
			this.job = job;
		}

		public String getReason() {
			return reason;
		}

		public TransformationJob getJob() {
			return job;
		}

		@Override
		public String toString() {
			return "JobFailed(" + reason + ")";
		}
	}

	public static final String BACKEND_REGISTRATION = "BackendRegistration";

}

TransformationJob表明待轉換的任務,其text屬性是須要處理的字符串文本;TransformationResult是任務處理的結果,其text屬性是轉換完成的字符串文本;JobFailed是任務失敗,其reason屬性表明失敗緣由;字符串常量BACKEND_REGISTRATION用於服務端向客戶端註冊,以便於客戶端知道有哪些服務端能夠提供服務。程序員

服務端用於將字符串轉換爲大寫的Actor(正如我以前的文章所言,真正的處理應當從Actor中分離出去,只少經過接口解耦)的實現見代碼清單1所示。spring

代碼清單1後端

@Named("TransformationBackend")
@Scope("prototype")
public class TransformationBackend extends UntypedActor {
	
	private static Logger logger = LoggerFactory.getLogger(TransformationBackend.class);

	Cluster cluster = Cluster.get(getContext().system());

	// subscribe to cluster changes, MemberUp
	@Override
	public void preStart() {
		cluster.subscribe(getSelf(), MemberUp.class);
	}

	// re-subscribe when restart
	@Override
	public void postStop() {
		cluster.unsubscribe(getSelf());
	}

	@Override
	public void onReceive(Object message) {
		if (message instanceof TransformationJob) {
			TransformationJob job = (TransformationJob) message;
			logger.info(job.getText());
			getSender().tell(new TransformationResult(job.getText().toUpperCase()), getSelf());

		} else if (message instanceof CurrentClusterState) {
			CurrentClusterState state = (CurrentClusterState) message;
			for (Member member : state.getMembers()) {
				if (member.status().equals(MemberStatus.up())) {
					register(member);
				}
			}

		} else if (message instanceof MemberUp) {
			MemberUp mUp = (MemberUp) message;
			register(mUp.member());

		} else {
			unhandled(message);
		}
	}

	void register(Member member) {
		if (member.hasRole("frontend"))
			getContext().actorSelection(member.address() + "/user/transformationFrontend").tell(BACKEND_REGISTRATION, getSelf());
	}
}

TransformationBackend在preStart方法中訂閱了集羣的MemberUp事件,這樣當它發現新註冊的集羣成員節點的角色是frontend(前端)時,將向此節點發送BACKEND_REGISTRATION消息,後者將會知道前者提供了服務。TransformationBackend所在的節點在剛剛加入集羣時,TransformationBackend還會收到CurrentClusterState消息,從中能夠解析出集羣中的全部前端節點(即roles爲frontend的),並向其發送BACKEND_REGISTRATION消息。通過以上兩步能夠確保集羣中的前端節點和後端節點不管啓動或加入集羣的順序怎樣變化,都不會影響後端節點通知全部的前端節點及前端節點知道哪些後端節點提供了服務。緩存

客戶端

客戶端除了監聽端口不一樣外,也須要增長akka.cluster.roles配置項,咱們指定爲frontend。客戶端的配置以下:app

akka {
  actor {
    provider = "akka.cluster.ClusterActorRefProvider"
  }
  remote {
    log-remote-lifecycle-events = off
    netty.tcp {
      hostname = "127.0.0.1"
      port = 0
    }
  }

  cluster {
    seed-nodes = [
      "akka.tcp://metadataAkkaSystem@127.0.0.1:2551",
      "akka.tcp://metadataAkkaSystem@127.0.0.1:2552"]

    #//#snippet
    # excluded from snippet
    auto-down-unreachable-after = 10s
    #//#snippet
    # auto downing is NOT safe for production deployments.
    # you may want to use it during development, read more about it in the docs.
    #
    # auto-down-unreachable-after = 10s
    roles = [frontend]
  }

}

客戶端用於處理轉換任務的Actor見代碼清單2所說。

代碼清單2

@Named("TransformationFrontend")
@Scope("prototype")
public class TransformationFrontend extends UntypedActor {

  List<ActorRef> backends = new ArrayList<ActorRef>();
  int jobCounter = 0;

  @Override
  public void onReceive(Object message) {
    if ((message instanceof TransformationJob) && backends.isEmpty()) {
      TransformationJob job = (TransformationJob) message;
      getSender().tell(
          new JobFailed("Service unavailable, try again later", job),
          getSender());

    } else if (message instanceof TransformationJob) {
      TransformationJob job = (TransformationJob) message;
      jobCounter++;
      backends.get(jobCounter % backends.size())
          .forward(job, getContext());

    } else if (message.equals(BACKEND_REGISTRATION)) {
      getContext().watch(getSender());
      backends.add(getSender());

    } else if (message instanceof Terminated) {
      Terminated terminated = (Terminated) message;
      backends.remove(terminated.getActor());

    } else {
      unhandled(message);
    }
  }

}

能夠看到TransformationFrontend處理的消息分爲如下三種:

  • BACKEND_REGISTRATION:收到此消息說明有服務端通知客戶端,TransformationFrontend首先將服務端的ActorRef加入backends列表,而後對服務端的ActorRef添加監管;
  • Terminated:因爲TransformationFrontend對服務端的ActorRef添加了監管,因此當服務端進程奔潰或者重啓時,將收到Terminated消息,此時TransformationFrontend將此服務端的ActorRef從backends列表中移除;
  • TransformationJob:此消息說明有新的轉換任務須要TransformationFrontend處理,處理分兩種狀況:
  1. backends列表爲空,則向發送此任務的發送者返回JobFailed消息,並告知「目前沒有服務端可用,請稍後再試」;
  2. backends列表不爲空,則經過取模運算選出一個服務端,將TransformationJob轉發給服務端進一步處理;

運行展現

初始化服務端TransformationBackend的代碼以下:

		logger.info("Start transformationBackend");
		final ActorRef transformationBackend = actorSystem.actorOf(springExt.props("TransformationBackend"), "transformationBackend");
		actorMap.put("transformationBackend", transformationBackend);
		logger.info("Started transformationBackend");

初始化客戶端TransformationFrontend的代碼以下:

                logger.info("Start transformationFrontend");
		final ActorRef transformationFrontend = actorSystem
				.actorOf(springExt.props("TransformationFrontend"), "transformationFrontend");
		actorMap.put("transformationFrontend", transformationFrontend);
		logger.info("Started transformationFrontend");
		final FiniteDuration interval = Duration.create(2, TimeUnit.SECONDS);
		final Timeout timeout = new Timeout(Duration.create(5, TimeUnit.SECONDS));
		final ExecutionContext ec = actorSystem.dispatcher();
		final AtomicInteger counter = new AtomicInteger();
		actorSystem.scheduler().schedule(interval, interval, new Runnable() {
			public void run() {
				ask(transformationFrontend, new TransformationJob("hello-" + counter.incrementAndGet()), timeout)
						.onSuccess(new OnSuccess<Object>() {
							public void onSuccess(Object result) {
								logger.info(result.toString());
							}
						}, ec);
			}
		}, ec);

能夠看到咱們在客戶端每2秒將發送一個新的消息,這個消息以「hello-」開頭,後邊是一個不斷自增的數字。當收處處理結果後,客戶端還會將結果打印出來。

咱們以3個服務端節點(host相同,端口分別爲255一、2552及隨機)、1個客戶端節點(端口隨機)組成的集羣爲例,咱們首先啓動第一個種子節點,而後以任意順序啓動其它服務端或者客戶端節點(啓動順序問題在《使用Akka構建集羣(一)》一文中已介紹,此處再也不贅述),集羣成員變化的日誌以下圖:

從上面展現的日誌中能夠看到集羣的3個服務端節點和1個客戶端節點前後加入集羣的信息。

咱們再來看看端口爲57222的角色爲frontend的節點的日誌信息,以下圖:

從frontend的日誌看出,它已經打印了大寫得HELLO-3到HELLO-10十條任務處理結果。那麼這些任務分別是由集羣中的哪些節點負責處理的?咱們首先來看看端口爲2551的backend節點,其處理任務的日誌以下圖:

看來2551節點處理了hello-四、hello-7及hello-10三條任務。咱們再來看看端口爲2552的backend節點,其處理任務的日誌以下圖:

能夠看到2552節點處理了hello-二、hello-5及hello-8三條任務。最後看看端口爲57211的backend節點,其處理任務的日誌以下圖:

能夠看到從hello-3到hello-10這8條處理任務被均衡的分配給了3個不一樣的後端節點處理。奇怪的是hello-1這條消息竟然沒有任何顯示,那是由於前端節點剛開始處理消息時,backends列表裏尚未緩存好任何backend的ActorRef。咱們向上查找frontend節點的日誌,在相隔很遠的日誌中發現了下面的輸出:

這也印證了咱們的猜想。

總結

根據本文的例子,你們應當看到使用Akka構建集羣,開發人員只須要關注消息的發送與接收,而無需過多涉及集羣的細節。不管前端仍是後端節點均可以加入同一個集羣,並且多個後端節點處理消息也能達到負載均衡的功效。

其它Akka應用的博文以下:

  1. Spring與Akka的集成》;
  2. 《使用Akka的遠程調用》;
  3. 使用Akka構建集羣(一)》;
  4. 使用Akka構建集羣(二)》;
  5. 《使用Akka持久化——持久化與快照》;
  6. 《使用Akka持久化——消息發送與接收》;
相關文章
相關標籤/搜索