第8章Enterprise integration patterns是core Camel的最後一章了,第二章已經介紹了一部分camel在eip中的應用。這一章所有都是講eip.數據庫
看來eip確實是camel的核心,camel確實是基於eip的。緩存
這一章包含了5方面的EIP問題:安全
■ The Aggregator EIP 消息合併
■ The Splitter EIP 消息分拆
■ The Routing Slip EIP 根據消息標籤進行路由(也是一種路由)
■ The Dynamic Router EIP 動態路由
■ The Load Balancer EIP 負載均衡負載均衡
消息合併模式,以下圖所示,只有當接收到三個標識相同的消息合併完成後再發送出去。1爲消息標識,ABC爲消息內容。dom
關於消息合併咱們須要關注三個方面:ide
1.Correlation identifierui
消息合併的標識,即如何肯定這幾個消息是一組的。如上圖標識的1,2.net
2.Completion condition對象
消息合併完成的條件,即多少個消息,多長時間內的消息合併。如上圖3個消息合併完成。token
3.Aggregation strategy
消息合併的策略,即消息是如何合併的。這個須要本身實現接口AggregationStrategy。
public void configure() throws Exception {
from("direct:start")
.log("Sending ${body}")
.aggregate(xpath(/order/@customer), new MyAggregationStrategy()) //定義合併標示
.completionSize(2).completionTimeout(5000) //定義合併完成條件消息數量爲2,等待時間爲5s
.log("Sending out ${body}")
.to("mock:result");
}
消息合併的是須要等待的,默認狀況下Camel是將收到的消息緩存在內存中,在生產中爲了消息的安全,可能須要將收到的消息持久化。
Camel提供了接口AggregationRepository幫助咱們實現消息持久化,以下。
AggregationRepository myRepo = new
HawtDBAggregationRepository("myrepo", "data/myrepo.dat");
from("file://target/inbox")
.log("Consuming ${file:name}")
.aggregate(constant(true), new MyAggregationStrategy())
.aggregationRepository(myRepo)
.completionSize(3)
.log("Sending out ${body}")
.to("mock:result");
HawtDB是Camel自帶的一個嵌入式數據庫。
消息分拆模式與前一種模式恰好相反,是將接收到的消息按必定的規則分拆成多個消息發出去,以下圖所示。
Camel對消息的分拆也提供了多種方法:
若是消息自己就是一個集合,能夠用DSL中的split對消息分拆,示例以下:
public class SplitterABCTest extends CamelTestSupport {
public void testSplitABC() throws Exception {
MockEndpoint mock = getMockEndpoint("mock:split");
mock.expectedBodiesReceived("A", "B", "C");
List<String> body = new ArrayList<String>();
body.add("A");
body.add("B");
body.add("C");
template.sendBody("direct:start", body);
assertMockEndpointsSatisfied();
}
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
public void configure() throws Exception {
from("direct:start")
.split(body())
.log("Split line ${body}")
.to("mock:split");
}
};
}
}
若是消息自己是一個對象咱們也可能經過bean的形式對消息分拆,示例以下:
public void configure() throws Exception {
from("direct:start")
.split().method(CustomerService.class, "splitDepartments")
.to("log:split")
.to("mock:split");
}
最後對大消息的分拆可能採用流的形式如.split(body().tokenize("\n")).streaming()
有些特殊的消息咱們須要根據消息來動態的路由到一個多個結點。這是咱們能夠採用routing Slip的模式,示例以下:
public void testRoutingSlip() throws Exception {
getMockEndpoint("mock:a").expectedMessageCount(1);
getMockEndpoint("mock:b").expectedMessageCount(0);
getMockEndpoint("mock:c").expectedMessageCount(1);
template.sendBodyAndHeader("direct:start", "Hello World",
"mySlip", "mock:a,mock:c");
assertMockEndpointsSatisfied();
}
路由定義爲:from("direct:start").routingSlip("mySlip", ";");
關於Routing Slip的形式也有多種方式:
各split同樣能夠採用dsl的形式,bean的形式同時camel仍是提供了註解@RoutingSlip。
動態路由和Routing Slip同樣也是不一樣的消息路由到不一樣的節點,以下。
public class DynamicRouterBean {
public String route(String body,
@Header(Exchange.SLIP_ENDPOINT) String previous) {
return whereToGo(body, previous);
}
private String whereToGo(String body, String previous) {
if (previous == null) {
return "mock://a";
} else if ("mock://a".equals(previous)) {
return "language://simple:Bye ${body}";
} else {
return null;
}
}
基本能夠同上,也提供了註解的形式@DynamicRouter。
負載均衡就不用細說了,Camel提供了對負載均衡的支持,示例以下:
from("direct:start")
.loadBalance().roundRobin()
.to("seda:a").to("seda:b")
.end();
關於負載均衡的策略Camel也提供了6種策略:Random,Round robin,Sticky,Topic,Failover,Custom。
更詳細的信息請查詢手冊。
小結:至此Camel 的核心部分已經介紹完了,後面會再寫一篇關於Camel的事務控制(第9章),額外介紹下Camel關於路由質量的監控BAM組件的使用,
其餘的章節就再也不介紹了。