Akka TCP使用指南

Akka TCP是AkkaIO模塊下的內容,全部的AkkaIO API只能經過mangaer對象來訪問。當咱們使用Akka IO時,第一步要先獲取合適的manager對象,以下獲取TcpManager:html

ActorRef tcpManager = Tcp.get(getContext().getSystem()).manager();
複製代碼

manager是一個處理底層IO資源(selectors,channels)的actor,同時能夠給任務實例化workers。bash

Tcp客戶端

  1. 第一步是向TcpManager發送TcpMessage.conenct方法。
  2. 而後TcpManager會返回一個內部的actor表明鏈接對象,也可能返回CommandFailed 消息
  3. 新的actor對象,而後發送一個Connected 消息給最初發送TcpMessage.connect的actor。
  4. 想要激活鏈接,必需要發送一個TcpMessage.register給鏈接對象。若是不作這一步,這個鏈接就是不能用的
  5. 鏈接對象(connection actor),監聽註冊的handler,當兩邊的actor某個關閉的時候,關閉鏈接對象。而後清除這個鏈接的資源

1564493608133

@Slf4j
public class AkkaTcpClientDemo extends AbstractActor {

    final InetSocketAddress remote;
    final ActorRef listener;

    public static Props props(InetSocketAddress remote, ActorRef listener) {
        return Props.create(AkkaTcpClientDemo.class, remote, listener);
    }

    public AkkaTcpClientDemo(InetSocketAddress remote, ActorRef listener) {
        this.remote = remote;
        this.listener = listener;
		// 鏈接
        final ActorRef tcp = Tcp.get(getContext().getSystem()).manager();
        tcp.tell(TcpMessage.connect(remote), getSelf());
    }

    @Override
    public Receive createReceive() {
        return receiveBuilder()
                .match(Tcp.Bound.class,msg -> {
                    log.info("Bounded {} ,sender = {}",msg,getSender());
                })
                .match(
                        Tcp.CommandFailed.class,
                        msg -> {
                            log.info("failed msg{},sender {}",msg,getSender());
                            listener.tell("failed ", getSelf());
                            getContext().stop(getSelf());
                        })
                .match(
                        Tcp.Connected.class,
                        msg -> {
                            log.info("Conencted : msg {} sender{}",msg,getSender());
                            listener.tell(msg, getSelf());
                            getSender().tell(TcpMessage.register(getSelf()), getSelf());
                            getContext().become(connected(getSender()));
                            ActorRef conn = getSender();
                            for (int i = 0; i < 10; i++) {
                                conn.tell(TcpMessage.write(ByteString.fromString("Hello" + i)),getSelf());
                            }

                        })
                .build();
    }


    private Receive connected(final ActorRef connection) {
        return receiveBuilder()
                .match(
                        ByteString.class,
                        msg -> {
                            connection.tell(TcpMessage.write((ByteString) msg), getSelf());
                        })
                .match(
                        Tcp.CommandFailed.class,
                        msg -> {
                            // OS kernel socket buffer was full
                        })
                .match(
                        Tcp.Received.class,
                        msg -> {
                            listener.tell(msg.data().utf8String(), getSelf());
                        })
                .matchEquals(
                        "close",
                        msg -> {
                            connection.tell(TcpMessage.close(), getSelf());
                        })
                .match(
                        Tcp.ConnectionClosed.class,
                        msg -> {
                            getContext().stop(getSelf());
                        })
                .match(Tcp.Register.class,msg -> {
                    log.info("register msg {}",msg);
                })
                .build();
    }

    public static void main(String[] args) {
        ActorSystem system = ActorSystem.create("demo", ConfigFactory.load("my"));
//        ActorRef manager = Tcp.get(system).manager();
        ActorRef listen = system.actorOf(Props.create(ServerHandler.class));
//        system.actorOf( Props.create(AkkaTcpClientDemo.class,new InetSocketAddress("127.0.0.1",1234),listen));
        system.actorOf( Props.create(AkkaTcpClientDemo.class,new InetSocketAddress("30.11.33.16",3307),listen));

    }
}
複製代碼

Tcp服務端

  1. 發送TcpMessage.bind方法給TcpManager,而後TcpManager會監聽在指定的地址端口上
  2. 經過TcpMessage.bind方法指定的actor會收到Bound消息,表明服務端已經準備好接收請求了。Bound消息也會包含socket實際綁定的端口
  3. 當收到Connected消息以後,發送TcpMessage.Register消息激活當前的鏈接,在Register方法中也能夠指定代理actor來處理後續收到的數據
@Slf4j
public class AkkaTcpServerDemo extends AbstractActor {
    final ActorRef manager;

    public AkkaTcpServerDemo(ActorRef manager) {
        this.manager = manager;
    }

    public static Props props(ActorRef manager) {
        return Props.create(AkkaTcpServerDemo.class, manager);
    }

    @Override
    public void preStart() throws Exception {
        final ActorRef tcp = Tcp.get(getContext().getSystem()).manager();
        tcp.tell(TcpMessage.bind(getSelf(), new InetSocketAddress("localhost", 1234), 100), getSelf());
    }


    @Override
    public Receive createReceive() {
        return receiveBuilder()
                .match(
                        Tcp.Bound.class,
                        msg -> {
                            log.info("Aihe Bounded {}, sender {}",msg,getSender());
                            manager.tell(msg, getSelf());
                        })
                .match(
                        Tcp.CommandFailed.class,
                        msg -> {
                            log.info("Aihe CommandFailed {}",msg);
                            getContext().stop(getSelf());
                        })
                .match(
                        Tcp.Connected.class,
                        conn -> {
                            log.info("Server Conencted: {},sneder {}",conn,getSender());
                            manager.tell(conn, getSelf());
                            final ActorRef handler =
                                    getContext().actorOf(Props.create(SimplisticHandler.class));
                            getSender().tell(TcpMessage.register(handler), getSelf());

                        })
                .match(Tcp.Register.class,msg -> {
                    log.info("註冊消息:{}",msg);
                })
                .build();
    }


    static class SimplisticHandler extends AbstractActor {
        @Override
        public Receive createReceive() {
            return receiveBuilder()
                    .match(
                            Tcp.Received.class,
                            msg -> {
                                final ByteString data = msg.data();
                                log.info("收到數據:{}",data.utf8String());
                                getSender().tell(TcpMessage.write(data), getSelf());
                            })
                    .match(
                            Tcp.ConnectionClosed.class,
                            msg -> {
                                getContext().stop(getSelf());
                            })
                    .build();
        }
    }

    public static void main(String[] args) {
        ActorSystem system = ActorSystem.create("demo", ConfigFactory.load("my"));
        ActorRef manager = Tcp.get(system).manager();
        ActorRef server = system.actorOf(Props.create(AkkaTcpServerDemo.class, manager));

//        system.actorOf(Props.create(AkkaTcpServerDemo.class));

    }
}

複製代碼

TCP客戶端與服務端細節

TcpClient在激活鏈接以後,經過以下代碼從新處理接收的消息socket

getContext().become(connected(getSender()));
複製代碼

TcpServer在激活鏈接的時候指定代理Actor來處理後續的消息tcp

final ActorRef handler =                            getContext().actorOf(Props.create(SimplisticHandler.class));
getSender().tell(TcpMessage.register(handler), getSelf());
複製代碼

在交互的時候,使用TcpMessage.write方法發送給TcpManager建立的connection actoride

ActorRef conn = getSender();
for (int i = 0; i < 10; i++) {
	conn.tell(TcpMessage.write(ByteString.fromString("Hello" + i)),getSelf());
}
複製代碼

1564494534545

1564494549937

關閉鏈接

想要關閉鏈接發送:TcpMessage.close, TcpMessage.confirmedClose 或者 TcpMessage.abort給conenctor actor便可。ui

  • TcpMessage.close,發送FIN消息,可是不等待對方的確認。尚未處理的寫數據會被清空,若是成功關閉鏈接,監聽者會收到Closed事件。
  • TcpMessage.confirmedClose,發送方會發送FIN信號,可是還能夠收到數據,一直到對方也關閉鏈接。發送方後續的寫數據會被清空,監聽者會收到ConfirmedClosed事件
  • TcpMessage.abort,馬上關閉鏈接,發送RST 消息給對方鏈接,若是成功關閉鏈接,會收到Aborted消息

當對方關閉的時候,監聽者會收到PeerClosed 事件,同時鏈接會自動關閉。若是想要支持辦關閉的鏈接,在使用TcpMessage.register的時候設置keepOpenOnPeerClosed 參數爲true,這個時候鏈接仍然保持打開,除非收到了上面三個的任意一個關閉消息。this

當某一方出錯的時候ErrorClosed 事件會發送給監聽者,同時強制關閉鏈接。spa

全部的關閉鏈接事件都是ConnectionClosed 的子事件,因此監聽這一個事件就能夠處理全部關閉的事件。代理

讀寫擁塞控制

Akka的鏈接actor沒有內部的緩衝區,這表明着只有當數據在內核中滿的時候纔會發送出去,須要咱們本身對Akka TCP的讀寫作一些擁塞控制。code

對於對方鏈接回覆寫(back-pressuring writes)數據,有三種模式:

  • Ack-based:每次Write 命令都攜帶object數據,這個object不是Tcp.NoAck對象,而後在成功寫入全部的數據到socket以後,這個object會返回給發送方。
  • NACK-based:在發送寫命令後,若是上一次的寫數據尚未完成就會返回CommandFailed 消息
  • NACK-based with write suspending: 和NACK-based很像,但在寫入失敗以後,若是沒有收到TcpMessage.resumeWriting方法的話,就不會繼續寫入。而後回覆一個WritingResumed 消息給鏈接actor。

對於讀數據(back-pressuring reads)的控制,有兩種模式:

  • Push-reading:鏈接actor發送給讀數據的actor Received 事件,若是讀actor發送TcpMessage.suspendReading方法給鏈接actor表明,它當前想要掛起接收新數據。只有再發送ResumeReading 事件發送後,表明讀actor準備好接收新數據了。
  • Pull-reading:在接收到Received 事件以後,鏈接actor自動的掛起接收的數據。直到讀actor發送ResumeReading 消息。

讀數據是相對於 鏈接actor(connction actor)而言的。

最後

以上是Akka TCP的使用說明,在處理的時候須要考慮Tcp的擁塞控制。

參考:

Akka Tcp

相關文章
相關標籤/搜索