Mina、Netty、Twisted一塊兒學(七):發佈/訂閱(Publish/Subscribe)

消息傳遞有不少種方式,請求/響應(Request/Reply)是最經常使用的。在前面的博文的例子中,不少都是採用請求/響應的方式,當服務器接收到消息後,會當即write回寫一條消息到客戶端。HTTP協議也是基於請求/響應的方式。react

可是請求/響應並不能知足全部的消息傳遞的需求,有些需求可能須要服務端主動推送消息到客戶端,而不是被動的等待請求後再給出響應。git

發佈/訂閱(Publish/Subscribe)是一種服務器主動發送消息到客戶端的消息傳遞方式。訂閱者Subscriber鏈接到服務器客戶端後,至關於開始訂閱發佈者Publisher發佈的消息,當發佈者發佈了一條消息後,全部訂閱者都會接收到這條消息。github

網絡聊天室通常就是基於發佈/訂閱模式來實現。例如加入一個QQ羣,就至關於訂閱了這個羣的全部消息,當有新的消息,服務器會主動將消息發送給全部的客戶端。只不過聊天室裏的全部人既是發佈者又是訂閱者。安全

下面分別用MINA、Netty、Twisted分別實現簡單的發佈/訂閱模式的服務器程序,鏈接到服務器的全部客戶端都是訂閱者,當發佈者發佈一條消息後,服務器會將消息轉發給全部客戶端。服務器

MINA:
網絡

在MINA中,經過IoService的getManagedSessions()方法能夠獲取這個IoService當前管理的全部IoSession,即全部鏈接到服務器的客戶端集合。當服務器接收到發佈者發佈的消息後,能夠經過IoService的getManagedSessions()方法獲取到全部客戶端對應的IoSession並將消息發送到這些客戶端。session

public class TcpServer {

    public static void main(String[] args) throws IOException {
        IoAcceptor acceptor = new NioSocketAcceptor();

        acceptor.getFilterChain().addLast("codec",
                new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"), "\r\n", "\r\n")));

        acceptor.setHandler(new TcpServerHandle());
        acceptor.bind(new InetSocketAddress(8080));
    }

}

class TcpServerHandle extends IoHandlerAdapter {

    @Override
    public void exceptionCaught(IoSession session, Throwable cause)
            throws Exception {
        cause.printStackTrace();
    }

    @Override
    public void messageReceived(IoSession session, Object message)
            throws Exception {
        
        // 獲取全部正在鏈接的IoSession
        Collection<IoSession> sessions = session.getService().getManagedSessions().values();

        // 將消息寫到全部IoSession
        IoUtil.broadcast(message, sessions);
    }
}

Netty:併發

Netty提供了ChannelGroup來用於保存Channel組,ChannelGroup是一個線程安全的Channel集合,它提供了一些列Channel批量操做。當一個TCP鏈接關閉後,對應的Channel會自動從ChannelGroup移除,因此不用手動去移除關閉的Channel。異步

Netty文檔關於ChannelGroup的解釋:socket

A thread-safe Set that contains open Channels and provides various bulk operations on them. Using ChannelGroup, you can categorize Channels into a meaningful group (e.g. on a per-service or per-state basis.) A closed Channel is automatically removed from the collection, so that you don't need to worry about the life cycle of the added Channel. A Channel can belong to more than one ChannelGroup.

當有新的客戶端鏈接到服務器,將對應的Channel加入到一個ChannelGroup中,當發佈者發佈消息時,服務器能夠將消息經過ChannelGroup寫入到全部客戶端。

public class TcpServer {

    public static void main(String[] args) throws InterruptedException {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(new LineBasedFrameDecoder(80));
                            pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
                            pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
                            pipeline.addLast(new TcpServerHandler());
                        }
                    });
            ChannelFuture f = b.bind(8080).sync();
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }
}

class TcpServerHandler extends ChannelInboundHandlerAdapter {

    // ChannelGroup用於保存全部鏈接的客戶端,注意要用static來保證只有一個ChannelGroup實例,不然每new一個TcpServerHandler都會建立一個ChannelGroup
    private static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        channels.add(ctx.channel()); // 將新的鏈接加入到ChannelGroup,當鏈接斷開ChannelGroup會自動移除對應的Channel
    }
    
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        channels.writeAndFlush(msg + "\r\n"); // 接收到消息後,將消息發送到ChannelGroup中的全部客戶端
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        // cause.printStackTrace();  暫時把異常打印註釋掉,由於PublishClient發佈一條消息後會當即斷開鏈接,而服務器也會向PublishClient發送消息,因此會拋出異常
        ctx.close();
    }
}

Twisted:

在Twisted中,全局的數據通常會放在Factory,而每一個鏈接相關的數據會放在Protocol中。因此這裏能夠在Factory中加入一個屬性,來存放Protocol集合,表示全部鏈接服務器的客戶端。當有新的客戶端鏈接到服務器時,將對應的Protocol實例放入集合,當鏈接斷開,將對應的Protocol從集合中移除。當服務器接收到發佈者發佈的消息後,遍歷全部客戶端併發送消息。

# -*- coding:utf-8 –*-

from twisted.protocols.basic import LineOnlyReceiver
from twisted.internet.protocol import Factory
from twisted.internet import reactor

class TcpServerHandle(LineOnlyReceiver): 

    def __init__(self, factory):
        self.factory = factory

    def connectionMade(self):
        self.factory.clients.add(self) # 新鏈接添加鏈接對應的Protocol實例到clients

    def connectionLost(self, reason):
        self.factory.clients.remove(self) # 鏈接斷開移除鏈接對應的Protocol實例

    def lineReceived(self, line):
        # 遍歷全部的鏈接,發送數據
        for c in self.factory.clients:
            c.sendLine(line)

class TcpServerFactory(Factory):
    def __init__(self):
        self.clients = set() # set集合用於保存全部鏈接到服務器的客戶端

    def buildProtocol(self, addr):
        return TcpServerHandle(self)

reactor.listenTCP(8080, TcpServerFactory())
reactor.run()

下面分別是兩個客戶端程序,一個是用於發佈消息的客戶端,一個是訂閱消息的客戶端。

發佈消息的客戶端很簡單,就是向服務器write一條消息便可:

public class PublishClient {

    public static void main(String[] args) throws IOException {

        Socket socket = null;
        OutputStream out = null;

        try {

            socket = new Socket("localhost", 8080);
            out = socket.getOutputStream();
            out.write("Hello\r\n".getBytes()); // 發佈信息到服務器
            out.flush();

        } finally {
            // 關閉鏈接
            out.close();
            socket.close();
        }
    }
}

訂閱消息的客戶端鏈接到服務器後,會阻塞等待接收服務器發送的發佈消息:

public class SubscribeClient {

    public static void main(String[] args) throws IOException {

        Socket socket = null;
        BufferedReader in = null;

        try {

            socket = new Socket("localhost", 8080);
            in = new BufferedReader(new InputStreamReader(socket.getInputStream()));

            while (true) {
                String line = in.readLine(); // 阻塞等待服務器發佈的消息
                System.out.println(line);
            }

        } finally {
            // 關閉鏈接
            in.close();
            socket.close();
        }
    }
}

分別針對MINA、Netty、Twisted服務器進行測試:

一、測試時首先開啓服務器;
二、而後再運行訂閱消息的客戶端SubscribeClient,SubscribeClient能夠開啓多個;
三、最後運行發佈消息的客戶端PublishClient,能夠屢次運行查看全部SubscribeClient的輸出結果。

運行結果能夠發現,當運行發佈消息的客戶端PublishClient發佈一條消息到服務器時,服務器會主動將這條消息轉發給全部的TCP鏈接,全部的訂閱消息的客戶端SubscribeClient都會接收到這條消息並打印出來。

MINA、Netty、Twisted一塊兒學系列

MINA、Netty、Twisted一塊兒學(一):實現簡單的TCP服務器

MINA、Netty、Twisted一塊兒學(二):TCP消息邊界問題及按行分割消息

MINA、Netty、Twisted一塊兒學(三):TCP消息固定大小的前綴(Header)

MINA、Netty、Twisted一塊兒學(四):定製本身的協議

MINA、Netty、Twisted一塊兒學(五):整合protobuf

MINA、Netty、Twisted一塊兒學(六):session

MINA、Netty、Twisted一塊兒學(七):發佈/訂閱(Publish/Subscribe)

MINA、Netty、Twisted一塊兒學(八):HTTP服務器

MINA、Netty、Twisted一塊兒學(九):異步IO和回調函數

MINA、Netty、Twisted一塊兒學(十):線程模型

MINA、Netty、Twisted一塊兒學(十一):SSL/TLS

MINA、Netty、Twisted一塊兒學(十二):HTTPS

源碼

https://github.com/wucao/mina-netty-twisted

相關文章
相關標籤/搜索