Apache MINA --- [ProtocolCodecFilter]

爲何使用它:java

  • TCP協議保證全部的包有正確的順序,可是不保證發送端的一個寫操做只致使接收端的一個讀事件發生,用 MINA術語來描述    就是:沒有ProtocolCodecFilter,發送端一個IoSession.write(...)可以致使接收端多個 messageReceived(...),多個write(...)也能被引導到一個messageReceived(...),也許在單機測試時咱們 不會碰到這樣的狀況,可是咱們的應用應該有能力處理這種問題.安全

  • 大多數網絡應用須要一種方式來找到當前信息的結束點和下一條信息的開始點.網絡

  • 咱們可以在IoHandler中實現全部業務邏輯,可是添加ProtocolCodecFilter將使你的代碼更加容易,清晰的維護.session

  • 它能幫助咱們分離業務邏輯和協議邏輯.app

怎麼使用:性能

  • 應用基本上僅僅接收字節流並且咱們須要將它們轉化成高層對象(message).測試

  • 這裏有三種通用技術來分割字節流到message:this

        1.使用固定長度的信息.編碼

        2.使用固定長度的消息頭來指定的消息體的長度.線程

        3.使用定界符(如:許多基於文本的協議會在每條消息末尾加上換行符).

例子:

    本例中,咱們將開發一個無用的圖形字符服務來闡明如何實現本身的協議編解碼器

    Request:

//一個簡單的POJO表明一個請求
public class ImageRequest {
 
    private int width;
    private int height;
    private int numberOfCharacters;
 
    public ImageRequest(int width, int height, int numberOfCharacters) {
        this.width = width;
        this.height = height;
        this.numberOfCharacters = numberOfCharacters;
    }
 
    public int getWidth() {
        return width;
    }
 
    public int getHeight() {
        return height;
    }
 
    public int getNumberOfCharacters() {
        return numberOfCharacters;
    }
}
 
//將ImageRequest對象編碼成特定協議數據(客戶端使用)
public class ImageRequestEncoder implements ProtocolEncoder {
 
    public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception {
        ImageRequest request = (ImageRequest) message;
        IoBuffer buffer = IoBuffer.allocate(12, false);
        buffer.putInt(request.getWidth());
        buffer.putInt(request.getHeight());
        buffer.putInt(request.getNumberOfCharacters());
        buffer.flip();
        out.write(buffer);
    }
 
    public void dispose(IoSession session) throws Exception {
        // 釋放相關資源,若是沒有,直接繼承自ProtocolEncoderAdapter
    }
    
    /**
     * MINA會爲IoSession寫入隊列中的全部消息調用encode方法.由於咱們的客戶端只寫入ImageRequest,咱們能夠安全強轉.
     * 咱們從堆中分配一個新的IoBuffer,咱們最好避免使用直接緩衝區,由於一般來說,堆緩衝性能更好.
     * 你不須要手動釋放緩衝區,MINA會幫你作.
     * 在dispose()方法中釋放全部編碼期間使用到的資源,若是沒有須要處理的,能夠直接讓他繼承ProtocolEncoderAdapter.
     */
}
 
//將特定協議數據解碼成ImageRequest對象(服務端使用)
//CumulativeProtocolDecoder是很是有用的,他會緩衝全部的傳入數據直到你的解碼器能斷定數據已經準備就緒
public class ImageRequestDecoder extends CumulativeProtocolDecoder {
 
    protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {
        if (in.remaining() >= 12) {
            int width = in.getInt();
            int height = in.getInt();
            int numberOfCharachters = in.getInt();
            ImageRequest request = new ImageRequest(width, height, numberOfCharachters);
            out.write(request);
            return true;
        } else {
            return false;
        }
    }
    
    /**
     * 每當一條完整的消息被解碼,你應該把它寫入ProtocolDecoderOutput;這些消息會沿着過濾器鏈傳遞並最終抵達IoHandler.messageReceived()方法
     * 你不須要手動釋放緩衝區,MINA會幫你作.
     * 當數據還沒接收徹底時,只須要return false. 
     */
}

    Response:

//一個見到的POJO表明一個響應
public class ImageResponse {
 
    private BufferedImage image1;
 
    private BufferedImage image2;
 
    public ImageResponse(BufferedImage image1, BufferedImage image2) {
        this.image1 = image1;
        this.image2 = image2;
    }
 
    public BufferedImage getImage1() {
        return image1;
    }
 
    public BufferedImage getImage2() {
        return image2;
    }
}
 
//將ImageResponse對象編碼成特定協議數據(服務端使用)
public class ImageResponseEncoder extends ProtocolEncoderAdapter {
 
    public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception {
        ImageResponse imageResponse = (ImageResponse) message;
        byte[] bytes1 = getBytes(imageResponse.getImage1());
        byte[] bytes2 = getBytes(imageResponse.getImage2());
        int capacity = bytes1.length + bytes2.length + 8;
        IoBuffer buffer = IoBuffer.allocate(capacity, false);
        buffer.setAutoExpand(true);
        buffer.putInt(bytes1.length);
        buffer.put(bytes1);
        buffer.putInt(bytes2.length);
        buffer.put(bytes2);
        buffer.flip();
        out.write(buffer);
    }
 
    private byte[] getBytes(BufferedImage image) throws IOException {
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        ImageIO.write(image, "PNG", baos);
        return baos.toByteArray();
    }
    
    /**
     * 若是IoBuffer大小不能被事先計算時,可使用自動擴容setAutoExpand(true).
     */
}

//將特定協議數據解碼成ImageResponse對象(客戶端使用)
public class ImageResponseDecoder extends CumulativeProtocolDecoder {
 
    private static final String DECODER_STATE_KEY = ImageResponseDecoder.class.getName() + ".STATE";
 
    public static final int MAX_IMAGE_SIZE = 5 * 1024 * 1024;
 
    private static class DecoderState {
        BufferedImage image1;
    }
 
    protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {
        DecoderState decoderState = (DecoderState) session.getAttribute(DECODER_STATE_KEY);
        if (decoderState == null) {
            decoderState = new DecoderState();
            session.setAttribute(DECODER_STATE_KEY, decoderState);
        }
        if (decoderState.image1 == null) {
            // try to read first image
            if (in.prefixedDataAvailable(4, MAX_IMAGE_SIZE)) {
                decoderState.image1 = readImage(in);
            } else {
                // not enough data available to read first image
                return false;
            }
        }
        if (decoderState.image1 != null) {
            // try to read second image
            if (in.prefixedDataAvailable(4, MAX_IMAGE_SIZE)) {
                BufferedImage image2 = readImage(in);
                ImageResponse imageResponse = new ImageResponse(decoderState.image1, image2);
                out.write(imageResponse);
                decoderState.image1 = null;
                return true;
            } else {
                // not enough data available to read second image
                return false;
            }
        }
        return false;
    }
 
    private BufferedImage readImage(IoBuffer in) throws IOException {
        int length = in.getInt();
        byte[] bytes = new byte[length];
        in.get(bytes);
        ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
        return ImageIO.read(bais);
    }
    
    /**
     * 咱們在session屬性中保存了狀態裏記錄解碼進度,這個狀態也能夠被保存在Decoder自己屬性中,但那樣作有幾個缺點:        
     *     每一個IoSession都須要本身的Decoder實例.
     *     MINA確保在同一個IoSession中不會有超過一個線程同時在執行decode(),可是它不保證永遠是再一個線程中執行.假設第一個數據片被線程1處理,
     *     線程1斷定這些數據還不足以被解碼,當下一個數據片到達時,多是被另外一個線程處理.那麼爲了不可見性問題,咱們必須明確的對Decoder中的狀態屬
     *     性進行同步聲明(IoSession自己就具備這一特性).
     * IoBuffer.prefixedDataAvailable() 是很是方便的當你使用長度前綴,它支持的1/2/4字節的前綴.
     * 不要忘了重置解碼狀態當一個響應被解碼完成時(另外一種方式是:移除session中的狀態屬性).
     */
     
     //若是隻須要處理一張圖片,則不須要保存狀態
}

    Factory:

public class ImageCodecFactory implements ProtocolCodecFactory {
    private ProtocolEncoder encoder;
    private ProtocolDecoder decoder;
 
    public ImageCodecFactory(boolean client) {
        if (client) {
            encoder = new ImageRequestEncoder();
            decoder = new ImageResponseDecoder();
        } else {
            encoder = new ImageResponseEncoder();
            decoder = new ImageRequestDecoder();
        }
    }
 
    public ProtocolEncoder getEncoder(IoSession ioSession) throws Exception {
        return encoder;
    }
 
    public ProtocolDecoder getDecoder(IoSession ioSession) throws Exception {
        return decoder;
    }
    
    /**
     * 針對每個新的IoSession,MINA會要求ProtocolCodecFactory建立編解碼器.
     * 由於咱們的編解碼器沒有存儲交互式的狀態,因此讓全部session共享一個實例是安全的.
     */
}

    服務端:

public class ImageServer {
    public static final int PORT = 33789;
 
    public static void main(String[] args) throws IOException {
        ImageServerIoHandler handler = new ImageServerIoHandler();
        NioSocketAcceptor acceptor = new NioSocketAcceptor();
        acceptor.getFilterChain().addLast("protocol", new ProtocolCodecFilter(new ImageCodecFactory(false)));
        acceptor.setLocalAddress(new InetSocketAddress(PORT));
        acceptor.setHandler(handler);
        acceptor.bind();
        System.out.println("server is listenig at port " + PORT);
    }
}
 
public class ImageServerIoHandler extends IoHandlerAdapter {
 
    private final static String characters = "mina rocks abcdefghijklmnopqrstuvwxyz0123456789";
 
    public static final String INDEX_KEY = ImageServerIoHandler.class.getName() + ".INDEX";
 
    private Logger logger = LoggerFactory.getLogger(this.getClass());
 
    public void sessionOpened(IoSession session) throws Exception {
        session.setAttribute(INDEX_KEY, 0);
    }
 
    public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
        IoSessionLogger sessionLogger = IoSessionLogger.getLogger(session, logger);
        sessionLogger.warn(cause.getMessage(), cause);
    }
 
    public void messageReceived(IoSession session, Object message) throws Exception {
        ImageRequest request = (ImageRequest) message;
        String text1 = generateString(session, request.getNumberOfCharacters());
        String text2 = generateString(session, request.getNumberOfCharacters());
        BufferedImage image1 = createImage(request, text1);
        BufferedImage image2 = createImage(request, text2);
        ImageResponse response = new ImageResponse(image1, image2);
        session.write(response);
    }
 
    private BufferedImage createImage(ImageRequest request, String text) {
        BufferedImage image = new BufferedImage(request.getWidth(), request.getHeight(), BufferedImage.TYPE_BYTE_INDEXED);
        Graphics graphics = image.createGraphics();
        graphics.setColor(Color.YELLOW);
        graphics.fillRect(0, 0, image.getWidth(), image.getHeight());
        Font serif = new Font("serif", Font.PLAIN, 30);
        graphics.setFont(serif);
        graphics.setColor(Color.BLUE);
        graphics.drawString(text, 10, 50);
        return image;
    }
 
    private String generateString(IoSession session, int length) {
        Integer index = (Integer) session.getAttribute(INDEX_KEY);
        StringBuffer buffer = new StringBuffer(length);
 
        while (buffer.length() < length) {
            buffer.append(characters.charAt(index));
            index++;
            if (index >= characters.length()) {
                index = 0;
            }
        }
        session.setAttribute(INDEX_KEY, index);
        return buffer.toString();
    }
}

    客戶端:

public class ImageClient extends IoHandlerAdapter {
    public static final int CONNECT_TIMEOUT = 3000;
 
    private String host;
    private int port;
    private SocketConnector connector;
    private IoSession session;
    private ImageListener imageListener;
 
    public ImageClient(String host, int port, ImageListener imageListener) {
        this.host = host;
        this.port = port;
        this.imageListener = imageListener;
        connector = new NioSocketConnector();
        connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new ImageCodecFactory(true)));
        connector.setHandler(this);
    }
 
    public void messageReceived(IoSession session, Object message) throws Exception {
        ImageResponse response = (ImageResponse) message;
        imageListener.onImages(response.getImage1(), response.getImage2());
    }
    ...
}
相關文章
相關標籤/搜索