Akka TCP是AkkaIO模塊下的內容,全部的AkkaIO API只能經過mangaer對象來訪問。當咱們使用Akka IO時,第一步要先獲取合適的manager對象,以下獲取TcpManager:html
ActorRef tcpManager = Tcp.get(getContext().getSystem()).manager();
複製代碼
manager是一個處理底層IO資源(selectors,channels)的actor,同時能夠給任務實例化workers。bash
TcpManager
發送TcpMessage.conenct
方法。CommandFailed
消息Connected
消息給最初發送TcpMessage.connect
的actor。TcpMessage.register
給鏈接對象。若是不作這一步,這個鏈接就是不能用的@Slf4j
public class AkkaTcpClientDemo extends AbstractActor {
final InetSocketAddress remote;
final ActorRef listener;
public static Props props(InetSocketAddress remote, ActorRef listener) {
return Props.create(AkkaTcpClientDemo.class, remote, listener);
}
public AkkaTcpClientDemo(InetSocketAddress remote, ActorRef listener) {
this.remote = remote;
this.listener = listener;
// 鏈接
final ActorRef tcp = Tcp.get(getContext().getSystem()).manager();
tcp.tell(TcpMessage.connect(remote), getSelf());
}
@Override
public Receive createReceive() {
return receiveBuilder()
.match(Tcp.Bound.class,msg -> {
log.info("Bounded {} ,sender = {}",msg,getSender());
})
.match(
Tcp.CommandFailed.class,
msg -> {
log.info("failed msg{},sender {}",msg,getSender());
listener.tell("failed ", getSelf());
getContext().stop(getSelf());
})
.match(
Tcp.Connected.class,
msg -> {
log.info("Conencted : msg {} sender{}",msg,getSender());
listener.tell(msg, getSelf());
getSender().tell(TcpMessage.register(getSelf()), getSelf());
getContext().become(connected(getSender()));
ActorRef conn = getSender();
for (int i = 0; i < 10; i++) {
conn.tell(TcpMessage.write(ByteString.fromString("Hello" + i)),getSelf());
}
})
.build();
}
private Receive connected(final ActorRef connection) {
return receiveBuilder()
.match(
ByteString.class,
msg -> {
connection.tell(TcpMessage.write((ByteString) msg), getSelf());
})
.match(
Tcp.CommandFailed.class,
msg -> {
// OS kernel socket buffer was full
})
.match(
Tcp.Received.class,
msg -> {
listener.tell(msg.data().utf8String(), getSelf());
})
.matchEquals(
"close",
msg -> {
connection.tell(TcpMessage.close(), getSelf());
})
.match(
Tcp.ConnectionClosed.class,
msg -> {
getContext().stop(getSelf());
})
.match(Tcp.Register.class,msg -> {
log.info("register msg {}",msg);
})
.build();
}
public static void main(String[] args) {
ActorSystem system = ActorSystem.create("demo", ConfigFactory.load("my"));
// ActorRef manager = Tcp.get(system).manager();
ActorRef listen = system.actorOf(Props.create(ServerHandler.class));
// system.actorOf( Props.create(AkkaTcpClientDemo.class,new InetSocketAddress("127.0.0.1",1234),listen));
system.actorOf( Props.create(AkkaTcpClientDemo.class,new InetSocketAddress("30.11.33.16",3307),listen));
}
}
複製代碼
@Slf4j
public class AkkaTcpServerDemo extends AbstractActor {
final ActorRef manager;
public AkkaTcpServerDemo(ActorRef manager) {
this.manager = manager;
}
public static Props props(ActorRef manager) {
return Props.create(AkkaTcpServerDemo.class, manager);
}
@Override
public void preStart() throws Exception {
final ActorRef tcp = Tcp.get(getContext().getSystem()).manager();
tcp.tell(TcpMessage.bind(getSelf(), new InetSocketAddress("localhost", 1234), 100), getSelf());
}
@Override
public Receive createReceive() {
return receiveBuilder()
.match(
Tcp.Bound.class,
msg -> {
log.info("Aihe Bounded {}, sender {}",msg,getSender());
manager.tell(msg, getSelf());
})
.match(
Tcp.CommandFailed.class,
msg -> {
log.info("Aihe CommandFailed {}",msg);
getContext().stop(getSelf());
})
.match(
Tcp.Connected.class,
conn -> {
log.info("Server Conencted: {},sneder {}",conn,getSender());
manager.tell(conn, getSelf());
final ActorRef handler =
getContext().actorOf(Props.create(SimplisticHandler.class));
getSender().tell(TcpMessage.register(handler), getSelf());
})
.match(Tcp.Register.class,msg -> {
log.info("註冊消息:{}",msg);
})
.build();
}
static class SimplisticHandler extends AbstractActor {
@Override
public Receive createReceive() {
return receiveBuilder()
.match(
Tcp.Received.class,
msg -> {
final ByteString data = msg.data();
log.info("收到數據:{}",data.utf8String());
getSender().tell(TcpMessage.write(data), getSelf());
})
.match(
Tcp.ConnectionClosed.class,
msg -> {
getContext().stop(getSelf());
})
.build();
}
}
public static void main(String[] args) {
ActorSystem system = ActorSystem.create("demo", ConfigFactory.load("my"));
ActorRef manager = Tcp.get(system).manager();
ActorRef server = system.actorOf(Props.create(AkkaTcpServerDemo.class, manager));
// system.actorOf(Props.create(AkkaTcpServerDemo.class));
}
}
複製代碼
TcpClient在激活鏈接以後,經過以下代碼從新處理接收的消息socket
getContext().become(connected(getSender()));
複製代碼
TcpServer在激活鏈接的時候指定代理Actor來處理後續的消息tcp
final ActorRef handler = getContext().actorOf(Props.create(SimplisticHandler.class));
getSender().tell(TcpMessage.register(handler), getSelf());
複製代碼
在交互的時候,使用TcpMessage.write方法發送給TcpManager建立的connection actoride
ActorRef conn = getSender();
for (int i = 0; i < 10; i++) {
conn.tell(TcpMessage.write(ByteString.fromString("Hello" + i)),getSelf());
}
複製代碼
想要關閉鏈接發送:TcpMessage.close
, TcpMessage.confirmedClose
或者 TcpMessage.abort
給conenctor actor便可。ui
RST
消息給對方鏈接,若是成功關閉鏈接,會收到Aborted消息當對方關閉的時候,監聽者會收到PeerClosed
事件,同時鏈接會自動關閉。若是想要支持辦關閉的鏈接,在使用TcpMessage.register的時候設置keepOpenOnPeerClosed
參數爲true,這個時候鏈接仍然保持打開,除非收到了上面三個的任意一個關閉消息。this
當某一方出錯的時候ErrorClosed
事件會發送給監聽者,同時強制關閉鏈接。spa
全部的關閉鏈接事件都是ConnectionClosed
的子事件,因此監聽這一個事件就能夠處理全部關閉的事件。代理
Akka的鏈接actor沒有內部的緩衝區,這表明着只有當數據在內核中滿的時候纔會發送出去,須要咱們本身對Akka TCP的讀寫作一些擁塞控制。code
對於對方鏈接回覆寫(back-pressuring writes)數據,有三種模式:
Write
命令都攜帶object數據,這個object不是Tcp.NoAck對象,而後在成功寫入全部的數據到socket以後,這個object會返回給發送方。CommandFailed
消息WritingResumed
消息給鏈接actor。對於讀數據(back-pressuring reads)的控制,有兩種模式:
Received
事件,若是讀actor發送TcpMessage.suspendReading方法給鏈接actor表明,它當前想要掛起接收新數據。只有再發送ResumeReading
事件發送後,表明讀actor準備好接收新數據了。Received
事件以後,鏈接actor自動的掛起接收的數據。直到讀actor發送ResumeReading
消息。讀數據是相對於 鏈接actor(connction actor)而言的。
以上是Akka TCP的使用說明,在處理的時候須要考慮Tcp的擁塞控制。
參考: