Netty 編解碼技術 數據通訊和心跳監控案例

Netty 編解碼技術 數據通訊和心跳監控案例

多臺服務器之間在進行跨進程服務調用時,須要使用特定的編解碼技術,對須要進行網絡傳輸的對象作編碼和解碼操做,以便完成遠程調用。Netty提供了完善,易擴展,易使用的編解碼技術。本章除了介紹Marshalling的使用,還會基於編解碼技術實現數據通訊和心跳檢測案例。經過本章,你將學到Java序列化的優缺點,主流編解碼框架的特色,模擬特殊長鏈接通訊,心跳監控案例。還在等什麼,豐滿的知識等你來拿!html

技術:編解碼,數據通訊,心跳監控
說明:github上有完整代碼,部分文字描述摘錄《Netty權威指南》
源碼:https://github.com/ITDragonBl...java

編解碼

Netty 的一大亮點就是使用簡單,將經常使用的功能和API進行了很好的封裝,編解碼也不例外。針對編解碼功能,Netty提供了通用的編解碼框架經常使用的編解碼類庫,方便用戶擴展和使用。從而下降用戶的工做量和開發門檻。在io.netty.handler.codec目錄下找到不少預置的編解碼功能.
其實在上一章的知識點中,就已經使用了Netty的編解碼技術,如:DelimiterBasedFrameDecoder,FixedLengthFrameDecoder,StringDecodergit

什麼是編解碼技術

編碼(Encode)也稱序列化(serialization),將對象序列化爲字節數組,用於網絡傳輸、數據持久化等用途。
解碼(Decode)也稱反序列化(deserialization),把從網絡、磁盤等讀取的字節數組還原成原始對象,以方便後續的業務邏輯操做。github

主流編解碼框架

Java序列化

Java序列化使用簡單開發難度低。只須要實現java.io.Serializable接口並生成序列化ID,這個類就可以經過java.io.ObjectInput序列化和java.io.ObjectOutput反序列化。
但它也有存在不少缺點 :
1 沒法跨語言(java的序列化是java語言內部的私有協議,其餘語言並不支持),
2 序列化後碼流太大(採用二進制編解碼技術要比java原生的序列化技術強),
3 序列化性能過低數據庫

JBoss的Marshalling

JBoss的Marshalling是一個Java對象的序列化API包,修正了JDK自帶序列化包的不少問題,又兼容java.io.Serializable接口;同時可經過工廠類進行參數和特性地配置。
1) 可插拔的類解析器,提供更加便捷的類加載定製策略,經過一個接口便可實現定製;
2) 可插拔的對象替換技術,不須要經過繼承的方式;
3) 可插拔的預約義類緩存表,能夠減少序列化的字節數組長度,提高經常使用類型的對象序列化性能;
4) 無須實現java.io.Serializable接口,便可實現Java序列化;
5) 經過緩存技術提高對象的序列化性能。
6) 使用範圍小,通用性較差。bootstrap

Google的Protocol Buffers

Protocol Buffers由谷歌開源而來。將數據結構以 .proto 文件進行描述,經過代碼生成工具能夠生成對應數據結構的POJO對象和Protobuf相關的方法和屬性。
1) 結構化數據存儲格式(XML,JSON等);
2) 高效的編解碼性能;
3) 平臺無關、擴展性好;
4) 官方支持Java、C++和Python三種語言。windows

MessagePack框架

MessagePack是一個高效的二進制序列化格式。和JSON同樣跨語言交換數據。可是它比JSON更快、更小(It's like JSON.but fast and small)。
1) 高效的編解碼性能;
2) 跨語言;
3) 序列化後碼流小;api

Marshalling 配置工廠數組

package com.itdragon.marshalling;
import io.netty.handler.codec.marshalling.DefaultMarshallerProvider;
import io.netty.handler.codec.marshalling.DefaultUnmarshallerProvider;
import io.netty.handler.codec.marshalling.MarshallerProvider;
import io.netty.handler.codec.marshalling.MarshallingDecoder;
import io.netty.handler.codec.marshalling.MarshallingEncoder;
import io.netty.handler.codec.marshalling.UnmarshallerProvider;
import org.jboss.marshalling.MarshallerFactory;  
import org.jboss.marshalling.Marshalling;  
import org.jboss.marshalling.MarshallingConfiguration; 

public final class ITDragonMarshallerFactory {
    private static final String NAME = "serial"; // serial表示建立的是 Java序列化工廠對象.由jboss-marshalling-serial提供 
    private static final Integer VERSION = 5;  
    private static final Integer MAX_OBJECT_SIZE = 1024 * 1024 * 1; // 單個對象最大長度 
    /** 
     * 建立Jboss Marshalling 解碼器MarshallingDecoder 
     */  
    public static MarshallingDecoder buildMarshallingDecoder() {  
        // step1 經過工具類 Marshalling,獲取Marshalling實例對象,參數serial 標識建立的是java序列化工廠對象  
        final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory(NAME);  
        // step2 初始化Marshalling配置  
        final MarshallingConfiguration configuration = new MarshallingConfiguration();  
        // step3 設置Marshalling版本號  
        configuration.setVersion(VERSION);  
        // step4 初始化生產者  
        UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);  
        // step5 經過生產者和單個消息序列化後最大長度構建 Netty的MarshallingDecoder  
        MarshallingDecoder decoder = new MarshallingDecoder(provider, MAX_OBJECT_SIZE);  
        return decoder;  
    }  
    /** 
     * 建立Jboss Marshalling 編碼器MarshallingEncoder 
     */  
    public static MarshallingEncoder builMarshallingEncoder() {  
        final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory(NAME);  
        final MarshallingConfiguration configuration = new MarshallingConfiguration();  
        configuration.setVersion(VERSION);  
        MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);  
        MarshallingEncoder encoder = new MarshallingEncoder(provider);  
        return encoder;  
    }  
}

數據通訊

一個網絡應用最重要的工做莫過於數據的傳輸。兩臺機器之間如何創建鏈接才能提升服務器利用率,減輕服務器的壓力。這都是咱們值得去考慮的問題。緩存

常見的三種通訊模式

1) 長鏈接:服務器和客戶端的通道一直處於開啓狀態。合適服務器性能好,客戶端數量少的場景。
2) 短鏈接:只有在發送數據時創建鏈接,數據發送完後斷開鏈接。通常將數據保存在本地,根據某種邏輯一次性批量提交。適合對實時性不高的應用場景。
3) 特殊長鏈接:它擁有長鏈接的特性。當在服務器指定時間內,若沒有任何通訊,鏈接就會斷開。若客戶端再次向服務端發送請求,則需從新創建鏈接。主要爲減少服務端資源佔用。
本章重點介特殊長鏈接。它的設計思想在不少場景中都有,好比QQ的離開狀態,電腦的休眠狀態。既保證了用戶的正常使用,又減輕了服務器的壓力。是實際開發中比較經常使用的通訊模式。
它有三個狀況:
1、服務器和客戶端的通道一直處於開啓狀態。
2、指定時間內沒有通訊則斷開鏈接。
3、客戶端從新發起請求,則從新創建鏈接。

結合上面的Marshalling 配置工廠,模擬特殊長鏈接的通訊場景。
客戶端代碼,輔助啓動類Bootstrap,配置編解碼事件,超時事件,自定義事件。客戶端發送請求分兩中狀況,通訊鏈接時請求和鏈接斷開後請求。上一章有詳細的配置說明

package com.itdragon.marshalling;
import java.io.File;
import java.io.FileInputStream;
import java.util.concurrent.TimeUnit;
import com.itdragon.utils.ITDragonUtil;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.timeout.ReadTimeoutHandler;

public class ITDragonClient {
    
    private static final Integer PORT = 8888;
    private static final String HOST = "127.0.0.1";
    private EventLoopGroup group = null;
    private Bootstrap bootstrap = null;
    private ChannelFuture future = null;
    
    private static class SingletonHolder {
        static final ITDragonClient instance = new ITDragonClient();
    }
    public static ITDragonClient getInstance(){
        return SingletonHolder.instance;
    }
    public ITDragonClient() {
        group = new NioEventLoopGroup();
        bootstrap = new Bootstrap();
        try {
            bootstrap.group(group)
            .channel(NioSocketChannel.class)
            .handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel socketChannel) throws Exception {
                    socketChannel.pipeline().addLast(ITDragonMarshallerFactory.buildMarshallingDecoder());     // 配置編碼器
                    socketChannel.pipeline().addLast(ITDragonMarshallerFactory.builMarshallingEncoder());     // 配置解碼器
                    socketChannel.pipeline().addLast(new ReadTimeoutHandler(5));    // 表示5秒內沒有鏈接後斷開
                    socketChannel.pipeline().addLast(new ITDragonClientHandler());
                }
            })
            .option(ChannelOption.SO_BACKLOG, 1024);
        } catch (Exception e) {
            e.printStackTrace();
        } 
    }
    public void connect(){
        try {
            future = bootstrap.connect(HOST, PORT).sync();
            System.out.println("鏈接遠程服務器......");                
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    public ChannelFuture getChannelFuture(){
        if(this.future == null || !this.future.channel().isActive()){
            this.connect();
        }
        return this.future;
    }
    /**
     * 特殊長鏈接:
     * 1. 服務器和客戶端的通道一直處於開啓狀態,
     * 2. 在服務器指定時間內,沒有任何通訊,則斷開,
     * 3. 客戶端再次向服務端發送請求則從新創建鏈接,
     * 4. 從而減少服務端資源佔用壓力。
     */
    public static void main(String[] args) {
        final ITDragonClient client = ITDragonClient.getInstance();
        try {
            ChannelFuture future = client.getChannelFuture();
            // 1. 服務器和客戶端的通道一直處於開啓狀態,
            for(Long i = 1L; i <= 3L; i++ ){
                ITDragonReqData reqData = new ITDragonReqData();
                reqData.setId(i);
                reqData.setName("ITDragon-" + i);
                reqData.setRequestMsg("NO." + i + " Request");
                future.channel().writeAndFlush(reqData);
                TimeUnit.SECONDS.sleep(2); // 2秒請求一次,服務器是5秒內沒有請求則會斷開鏈接
            }
            // 2. 在服務器指定時間內,沒有任何通訊,則斷開,
            Thread.sleep(6000);
            // 3. 客戶端再次向服務端發送請求則從新創建鏈接,
            new Thread(new Runnable() {
                public void run() {
                    try {
                        System.out.println("喚醒......");
                        ChannelFuture cf = client.getChannelFuture();
                        System.out.println("鏈接是否活躍  : " + cf.channel().isActive());
                        System.out.println("鏈接是否打開  : " + cf.channel().isOpen());
                        ITDragonReqData reqData = new ITDragonReqData();
                        reqData.setId(4L);
                        reqData.setName("ITDragon-picture");
                        reqData.setRequestMsg("斷開的通道被喚醒了!!!!");
                        // 路徑path自定義
                        String path = System.getProperty("user.dir") + File.separatorChar + 
                            "sources" + File.separatorChar + "itdragon.jpg";  
                        File file = new File(path);  
                        FileInputStream inputStream = new FileInputStream(file);  
                        byte[] data = new byte[inputStream.available()];  
                        inputStream.read(data);  
                        inputStream.close();  
                        reqData.setAttachment(ITDragonUtil.gzip(data));
                        cf.channel().writeAndFlush(reqData);                    
                        cf.channel().closeFuture().sync();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }).start();
            future.channel().closeFuture().sync();
            System.out.println("斷開鏈接,主線程結束.....");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

客戶端自定義事務代碼,負責將服務器返回的數據打印出來

package com.itdragon.marshalling;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;

public class ITDragonClientHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("Netty Client active ^^^^^^");
    }
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        try {
            ITDragonRespData responseData = (ITDragonRespData) msg;
            System.out.println("Netty Client : " + responseData.toString());
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            ReferenceCountUtil.release(msg);
        }
    }
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

服務器代碼,輔助啓動類ServerBootstrap,配置日誌打印事件,編解碼事件,超時控制事件,自定義事件。

package com.itdragon.marshalling;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.ReadTimeoutHandler; 

public class ITDragonServer {

    private static final Integer PORT = 8888;
    public static void main(String[] args) {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
                .handler(new LoggingHandler(LogLevel.INFO))
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        socketChannel.pipeline().addLast(ITDragonMarshallerFactory.buildMarshallingDecoder());     // 配置解碼器
                        socketChannel.pipeline().addLast(ITDragonMarshallerFactory.builMarshallingEncoder());     // 配置編碼器
                        socketChannel.pipeline().addLast(new ReadTimeoutHandler(5)); // 傳入的參數單位是秒,表示5秒內沒有鏈接後斷開
                        socketChannel.pipeline().addLast(new ITDragonServerHandler());
                    }
                })
                .option(ChannelOption.SO_BACKLOG, 1024)
                .childOption(ChannelOption.SO_KEEPALIVE, true);

            ChannelFuture future = bootstrap.bind(PORT).sync();
            future.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }
}

服務器自定義事件代碼,負責接收客戶端傳輸的數據,如有附件則下載到receive目錄下(這裏只是簡單的下載邏輯)。

package com.itdragon.marshalling;
import java.io.File;
import java.io.FileOutputStream;
import com.itdragon.utils.ITDragonUtil;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;  

public class ITDragonServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("Netty Server active ......");
    }
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        try {
            // 獲取客戶端傳來的數據
            ITDragonReqData requestData = (ITDragonReqData) msg;
            System.out.println("Netty Server : " + requestData.toString());
            // 處理數據並返回給客戶端
            ITDragonRespData responseData = new ITDragonRespData();
            responseData.setId(requestData.getId());
            responseData.setName(requestData.getName() + "-SUCCESS!");
            responseData.setResponseMsg(requestData.getRequestMsg() + "-SUCCESS!");
            // 若是有附件則保存附件
            if (null != requestData.getAttachment()) {
                byte[] attachment = ITDragonUtil.ungzip(requestData.getAttachment());
                String path = System.getProperty("user.dir") + File.separatorChar + "receive" + 
                        File.separatorChar + System.currentTimeMillis() + ".jpg";
                FileOutputStream outputStream = new FileOutputStream(path);
                outputStream.write(attachment);
                outputStream.close();
                responseData.setResponseMsg("file upload success , file path is : " + path);
            }
            ctx.writeAndFlush(responseData);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            ReferenceCountUtil.release(msg);
        }
    }
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

ITDragonReqData 和 ITDragonRespData 實體類的代碼就不貼出來了,github上有源碼。

心跳監控案例

在分佈式,集羣系統架構中,咱們須要定時獲取各機器的資源使用狀況和服務器之間是否保持正常鏈接狀態。以便能在最短的時間內避免和處理問題。類比集羣中的哨兵模式。

獲取本機數據

能夠經過第三方sigar.jar的幫助,獲取主機的運行時信息,包括操做系統、CPU使用狀況、內存使用狀況、硬盤使用狀況以及網卡、網絡信息。使用很簡單,根據本身電腦的系統選擇對應的dll文件,而後拷貝到C:WindowsSystem32 目錄下便可。好比windows7 64位操做系統,則須要sigar-amd64-winnt.dll文件。
下載路徑:https://pan.baidu.com/s/1jJSaucI 密碼: 48d2

ITDragonClient.java,ITDragonCoreParam.java,ITDragonRequestInfo.java,ITDragonServer.java,ITDragonSigarUtil.java,pom.xml 的代碼這裏就不貼出來了,github上面有完整的源碼。

客戶端自定義事件代碼,負責發送認證信息,定時向服務器發送cpu信息和內存信息。

package com.itdragon.monitoring;
import java.net.InetAddress;
import java.util.HashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.hyperic.sigar.CpuPerc;
import org.hyperic.sigar.Mem;
import org.hyperic.sigar.Sigar;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;  
  
public class ITDragonClientHandler extends ChannelInboundHandlerAdapter{  
      
    private ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);  
    private ScheduledFuture<?> heartBeat;  
    private InetAddress addr ;  //主動向服務器發送認證信息  
    @Override  
    public void channelActive(ChannelHandlerContext ctx) throws Exception {  
        System.out.println("Client 鏈接一開通就開始驗證.....");  
        addr = InetAddress.getLocalHost();  
        String ip = addr.getHostAddress();  
        System.out.println("ip : " + ip);
        String key = ITDragonCoreParam.SALT_KEY.getValue(); // 僞裝進行了很複雜的加鹽加密  
        // 按照Server端的格式,傳遞令牌   
        String auth = ip + "," + key;   
        ctx.writeAndFlush(auth);  
    }  
    @Override  
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {  
        try {  
            if(msg instanceof String){  
                String result = (String) msg;  
                if(ITDragonCoreParam.AUTH_SUCCESS.getValue().equals(result)){  
                    // 驗證成功,每隔10秒,主動發送心跳消息  
                    this.heartBeat = this.scheduler.scheduleWithFixedDelay(new HeartBeatTask(ctx), 0, 10, TimeUnit.SECONDS);  
                    System.out.println(msg);                  
                }  
                else {  
                    System.out.println(msg);  
                }  
            }  
        } finally {  
            ReferenceCountUtil.release(msg);  
        }  
    }  
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {  
        cause.printStackTrace();  
        if (heartBeat != null) {  
            heartBeat.cancel(true);  
            heartBeat = null;  
        }  
        ctx.fireExceptionCaught(cause);  
    }  
}  
class HeartBeatTask implements Runnable {  
    private final ChannelHandlerContext ctx;  
    public HeartBeatTask(final ChannelHandlerContext ctx) {  
        this.ctx = ctx;  
    }  
    public void run() {  
        try {  
            // 採用sigar 獲取本機數據,放入實體類中  
            ITDragonRequestInfo info = new ITDragonRequestInfo();  
            info.setIp(InetAddress.getLocalHost().getHostAddress()); // ip  
            Sigar sigar = new Sigar();  
            
            CpuPerc cpuPerc = sigar.getCpuPerc();  
            HashMap<String, Object> cpuPercMap = new HashMap<String, Object>();  
            cpuPercMap.put(ITDragonCoreParam.COMBINED.getValue(), cpuPerc.getCombined());  
            cpuPercMap.put(ITDragonCoreParam.USER.getValue(), cpuPerc.getUser());  
            cpuPercMap.put(ITDragonCoreParam.SYS.getValue(), cpuPerc.getSys());  
            cpuPercMap.put(ITDragonCoreParam.WAIT.getValue(), cpuPerc.getWait());  
            cpuPercMap.put(ITDragonCoreParam.IDLE.getValue(), cpuPerc.getIdle());  
            
            Mem mem = sigar.getMem();  
            HashMap<String, Object> memoryMap = new HashMap<String, Object>();  
            memoryMap.put(ITDragonCoreParam.TOTAL.getValue(), mem.getTotal() / 1024L);  
            memoryMap.put(ITDragonCoreParam.USED.getValue(), mem.getUsed() / 1024L);  
            memoryMap.put(ITDragonCoreParam.FREE.getValue(), mem.getFree() / 1024L);  
            info.setCpuPercMap(cpuPercMap);  
            info.setMemoryMap(memoryMap);  
            ctx.writeAndFlush(info);  
        } catch (Exception e) {  
            e.printStackTrace();  
        }  
    }  
}

服務器自定義事件代碼,負責接收客戶端傳輸的數據,驗證令牌是否失效,打印客戶端傳來的數據。

package com.itdragon.monitoring;
import java.util.HashMap;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class ITDragonServerHandler extends ChannelInboundHandlerAdapter {

    // 令牌驗證的map,key爲ip地址,value爲密鑰
    private static HashMap<String, String> authMap = new HashMap<String, String>();
    // 模擬數據庫查詢
    static {
        authMap.put("xxx.xxx.x.x", "xxx");
        authMap.put(ITDragonCoreParam.CLIENT_HOST.getValue(), ITDragonCoreParam.SALT_KEY.getValue());
    }
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("Netty Server Monitoring.......");
    }
    // 模擬api請求前的驗證
    private boolean auth(ChannelHandlerContext ctx, Object msg) {
        System.out.println("令牌驗證...............");
        String[] ret = ((String) msg).split(",");
        String clientIp = ret[0];     // 客戶端ip地址
        String saltKey = ret[1];     // 數據庫保存的客戶端密鑰
        String auth = authMap.get(clientIp); // 客戶端傳來的密鑰
        if (null != auth && auth.equals(saltKey)) {
            ctx.writeAndFlush(ITDragonCoreParam.AUTH_SUCCESS.getValue());
            return true;
        } else {
            ctx.writeAndFlush(ITDragonCoreParam.AUTH_ERROR.getValue()).addListener(ChannelFutureListener.CLOSE);
            return false;
        }
    }
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 若是傳來的消息是字符串,則先驗證
        if (msg instanceof String) {
            auth(ctx, msg);
        } else if (msg instanceof ITDragonRequestInfo) {
            ITDragonRequestInfo info = (ITDragonRequestInfo) msg;
            System.out.println("--------------------------------------------");
            System.out.println("當前主機ip爲: " + info.getIp());
            HashMap<String, Object> cpu = info.getCpuPercMap();
            System.out.println("cpu 總使用率: " + cpu.get(ITDragonCoreParam.COMBINED.getValue()));
            System.out.println("cpu 用戶使用率: " + cpu.get(ITDragonCoreParam.USER.getValue()));
            System.out.println("cpu 系統使用率: " + cpu.get(ITDragonCoreParam.SYS.getValue()));
            System.out.println("cpu 等待率: " + cpu.get(ITDragonCoreParam.WAIT.getValue()));
            System.out.println("cpu 空閒率: " + cpu.get(ITDragonCoreParam.IDLE.getValue()));

            HashMap<String, Object> memory = info.getMemoryMap();
            System.out.println("內存總量: " + memory.get(ITDragonCoreParam.TOTAL.getValue()));
            System.out.println("當前內存使用量: " + memory.get(ITDragonCoreParam.USED.getValue()));
            System.out.println("當前內存剩餘量: " + memory.get(ITDragonCoreParam.FREE.getValue()));
            System.out.println("--------------------------------------------");

            ctx.writeAndFlush("info received!");
        } else {
            ctx.writeAndFlush("connect failure!").addListener(ChannelFutureListener.CLOSE);
        }
    }
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

總結

1 Netty的編解碼功能很好的解決了Java序列化 沒法跨語言,序列化後碼流太大,序列化性能過低等問題
2 JBoss的Marshalling是一個Java對象的序列化API包,修正了JDK自帶的序列化包的不少問題,又兼容java.io.Serializable接口,缺點使用範圍小。
3 特殊長鏈接能夠減少服務端資源佔用壓力,是一種比較經常使用的數據通訊方式。
4 Netty能夠用作心跳監測,定時獲取被監聽機器的數據信息。

推薦文檔

Netty 能作什麼?學Netty有什麼用?
https://www.zhihu.com/questio...
http://blog.csdn.net/broadvie...
Marshalling :
http://jbossmarshalling.jboss...

Netty 編解碼數據通訊和心跳監控案例到這裏就結束了,感謝你們的閱讀,歡迎點評。若是你以爲不錯,能夠"推薦"一下。也能夠"關注"我,得到更多豐富的知識。

相關文章
相關標籤/搜索