Netty實踐(四):心跳檢測實現

心跳檢測的概念
java

在分佈式架構中,好比Hadoop集羣,Storm集羣等,或多或少都涉及到Master/Slave的概念,每每是一個或者多個Master和N個Slave之間進行通訊。那麼一般Master應該須要知道Slave的狀態,Slave會定時的向Master進行發送消息,至關於告知Master:「我還活着,我如今在作什麼,什麼進度,個人CPU/內存狀況如何」等,這就是所謂的心跳。Master根據Slave的心跳,進行協調,好比Slave的CPU/內存消耗很大,那麼Master能夠將任務分配給其餘負載小的Slave進行處理;好比Slave一段時間沒有發送心跳過來,那麼Master可能會將可服務列表中暫時刪除該Slave,並可能發出報警,告知運維/開發人員進行處理.以下圖所示。bootstrap

wKiom1iWwz-y9e5cAABXb5p25Bs315.png



Netty實現心跳檢測代碼實例數組


心跳信息對象服務器

主要儲存Slave的IP,通訊PORT,時間,內存,CPU信息等。架構

package day4;

import java.io.Serializable;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;

/**
 * Created by zhangfengzhe on 2017/2/4.
 */
public class HeartInfo implements Serializable{

    private String ip;

    private int port;

    private Date lasttime;

    private Map<String , String> cpuInfo = new HashMap<String,String>();

    private Map<String , String> memInfo = new HashMap<String, String>();

    public String getIp() {
        return ip;
    }

    public void setIp(String ip) {
        this.ip = ip;
    }

    public int getPort() {
        return port;
    }

    public void setPort(int port) {
        this.port = port;
    }

    public Date getLasttime() {
        return lasttime;
    }

    public void setLasttime(Date lasttime) {
        this.lasttime = lasttime;
    }

    public Map<String, String> getCpuInfo() {
        return cpuInfo;
    }

    public void setCpuInfo(Map<String, String> cpuInfo) {
        this.cpuInfo = cpuInfo;
    }

    public Map<String, String> getMemInfo() {
        return memInfo;
    }

    public void setMemInfo(Map<String, String> memInfo) {
        this.memInfo = memInfo;
    }

    @Override
    public String toString() {
        return "HeartInfo{" +
                "ip='" + ip + '\'' +
                ", port=" + port +
                ", lasttime=" + lasttime +
                ", cpuInfo=" + cpuInfo +
                ", memInfo=" + memInfo +
                '}';
    }
}


JBoss Marshalling編解碼處理器
運維

package day3;

import io.netty.handler.codec.marshalling.DefaultMarshallerProvider;
import io.netty.handler.codec.marshalling.DefaultUnmarshallerProvider;
import io.netty.handler.codec.marshalling.MarshallerProvider;
import io.netty.handler.codec.marshalling.MarshallingDecoder;
import io.netty.handler.codec.marshalling.MarshallingEncoder;
import io.netty.handler.codec.marshalling.UnmarshallerProvider;

import org.jboss.marshalling.MarshallerFactory;
import org.jboss.marshalling.Marshalling;
import org.jboss.marshalling.MarshallingConfiguration;

/**
 * Marshalling工廠
 */
public final class MarshallingCodeCFactory {

    /**
     * 建立Jboss Marshalling×××MarshallingDecoder
     * @return MarshallingDecoder
     */
    public static MarshallingDecoder buildMarshallingDecoder() {
       //首先經過Marshalling工具類的精通方法獲取Marshalling實例對象 參數serial標識建立的是java序列化工廠對象。
      final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
      //建立了MarshallingConfiguration對象,配置了版本號爲5 
      final MarshallingConfiguration configuration = new MarshallingConfiguration();
      configuration.setVersion(5);
      //根據marshallerFactory和configuration建立provider
      UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);
      //構建Netty的MarshallingDecoder對象,倆個參數分別爲provider和單個消息序列化後的最大長度
      MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024);
      return decoder;
    }

    /**
     * 建立Jboss Marshalling編碼器MarshallingEncoder
     * @return MarshallingEncoder
     */
    public static MarshallingEncoder buildMarshallingEncoder() {
      final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
      final MarshallingConfiguration configuration = new MarshallingConfiguration();
      configuration.setVersion(5);
      MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);
      //構建Netty的MarshallingEncoder對象,MarshallingEncoder用於實現序列化接口的POJO對象序列化爲二進制數組
      MarshallingEncoder encoder = new MarshallingEncoder(provider);
      return encoder;
    }
}


Client
dom

package day4;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

public class Client {

   
   public static void main(String[] args) throws Exception{
      
      EventLoopGroup group = new NioEventLoopGroup();
      Bootstrap b = new Bootstrap();

      final int port = 8765;
      final String serverIP = "127.0.0.1";

      b.group(group)
       .channel(NioSocketChannel.class)
       .handler(new ChannelInitializer<SocketChannel>() {
         @Override
         protected void initChannel(SocketChannel sc) throws Exception {
            sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
            sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
            sc.pipeline().addLast(new ClientHandler(port));
         }
      });
      
      ChannelFuture cf = b.connect(serverIP, port).sync();

      cf.channel().closeFuture().sync();
      group.shutdownGracefully();
   }
}


Client Handlersocket

package day4;

import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.ReferenceCountUtil;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

/**
 * Created by zhangfengzhe on 2017/2/4.
 */
public class ClientHandler extends ChannelHandlerAdapter {

    private String ip;
    private int port;
    private ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
    private ScheduledFuture<?> scheduledFuture;

    private static final String SUCCESS = "OK";

    public ClientHandler(){}

    public ClientHandler(int port) {

        this.port = port;

        //獲取本機IP
        try {
            this.ip = InetAddress.getLocalHost().getHostAddress();
        } catch (UnknownHostException e) {
            e.printStackTrace();
        }

    }

    //通道創建初始化時  發送信息 準備握手驗證
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {

        String authInfo = this.ip + ":" + this.port;

        ctx.writeAndFlush(authInfo);
    }

    //當服務器發送認證信息後,開始啓動心跳發送
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

        if(msg instanceof String){

            //認證成功
            if(SUCCESS.equals((String)msg)){

                this.scheduledFuture = scheduledExecutorService.scheduleWithFixedDelay(new HeartTask(ctx,ip,port),2,3, TimeUnit.SECONDS);

            }else{

                System.out.println("服務器發來消息:" + msg);
            }

        }

        ReferenceCountUtil.release(msg);

    }

    //若是出現異常 取消定時
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {

        cause.printStackTrace();
        if(this.scheduledFuture != null){
            this.scheduledFuture.cancel(true);
            this.scheduledFuture = null;
        }

    }
}

Client和Server創建通道初始化的時候,Client會向服務器發送信息用於認證。在實際開發中,Client在發送心跳前,須要和Server端進行握手驗證,會涉及到加解密,這裏爲了簡單起見,省去了這些過程。從上面的代碼也能夠看到,若是服務端認證成功,那麼Client會開始啓動定時線程去執行任務,那麼接下來,咱們看看這個心跳任務。
分佈式


心跳任務HeartTaskide

package day4;

import io.netty.channel.ChannelHandlerContext;
import org.hyperic.sigar.CpuPerc;
import org.hyperic.sigar.Mem;
import org.hyperic.sigar.Sigar;

import java.util.Date;
import java.util.Random;

/**
 * Created by zhangfengzhe on 2017/2/4.
 */
public class HeartTask implements  Runnable{

    //持有引用,方便讀寫操做
    private ChannelHandlerContext ctx;

    private HeartInfo heartInfo = new HeartInfo();

    public HeartTask(ChannelHandlerContext ctx, String ip, int port) {

        this.ctx = ctx;

        heartInfo.setIp(ip);
        heartInfo.setPort(port);
    }

    @Override
    public void run() {

        try{
            //利用sigar獲取 內存/CPU方面的信息 ; 利用CTX給服務器端發送消息
            Sigar sigar = new Sigar();

            //內存使用信息memory
            Mem mem = sigar.getMem();
            heartInfo.getMemInfo().put("total",String.valueOf(mem.getTotal()));
            heartInfo.getMemInfo().put("used",String.valueOf(mem.getUsed()));
            heartInfo.getMemInfo().put("free",String.valueOf(mem.getFree()));


            //CPU使用信息
            CpuPerc cpuPerc = sigar.getCpuPerc();
            heartInfo.getCpuInfo().put("user",String.valueOf(cpuPerc.getUser()));
            heartInfo.getCpuInfo().put("sys",String.valueOf(cpuPerc.getSys()));
            heartInfo.getCpuInfo().put("wait",String.valueOf(cpuPerc.getWait()));
            heartInfo.getCpuInfo().put("idle",String.valueOf(cpuPerc.getIdle()));

            heartInfo.setLasttime(new Date());

            ctx.writeAndFlush(heartInfo);

        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

首先,爲了方便在心跳任務中進行讀寫操做,HeartTask持有ChannelHandlerContext的引用。其次,爲了方便收集系統的內存、CPU信息,這裏使用了Sigar,也是在實際中引用很是普遍的一個工具。


Server

package day4;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;

public class Server {

   public static void main(String[] args) throws Exception{
      
      EventLoopGroup pGroup = new NioEventLoopGroup();
      EventLoopGroup cGroup = new NioEventLoopGroup();
      
      ServerBootstrap b = new ServerBootstrap();
      b.group(pGroup, cGroup)
       .channel(NioServerSocketChannel.class)
       .option(ChannelOption.SO_BACKLOG, 1024)
       //設置日誌
       .handler(new LoggingHandler(LogLevel.INFO))
       .childHandler(new ChannelInitializer<SocketChannel>() {
         protected void initChannel(SocketChannel sc) throws Exception {
            sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
            sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
            sc.pipeline().addLast(new ServerHandler());
         }
      });
      
      ChannelFuture cf = b.bind(8765).sync();
      
      cf.channel().closeFuture().sync();
      pGroup.shutdownGracefully();
      cGroup.shutdownGracefully();
      
   }
}


Server Handler

package day4;

import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * Created by zhangfengzhe on 2017/2/4.
 */
public class ServerHandler extends ChannelHandlerAdapter {

    //KEY: ip:port VALUE: HeartInfo
    private Map<String,HeartInfo> heartInfoMap = new HashMap<String, HeartInfo>();

    private static final List<String> authList = new ArrayList<String>();

    static {
        //從其餘地方加載出來的IP列表
        authList.add("192.168.99.219:8765");
    }


    //服務器會接收到2種消息 一個是客戶端初始化時發送過來的認證信息 第二個是心跳信息
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

        if(msg instanceof String){

            if(authList.contains(msg)){ //驗證經過
                ctx.writeAndFlush("OK");
            }else{
                ctx.writeAndFlush("不在認證列表中...");
            }

        }else if(msg instanceof HeartInfo){

            System.out.println((HeartInfo)msg);

            ctx.writeAndFlush("心跳接收成功!");

            HeartInfo heartInfo = (HeartInfo)msg;
            heartInfoMap.put(heartInfo.getIp() + ":" + heartInfo.getPort(),heartInfo);
        }

    }
}


運行結果


Client端

wKioL1iWyKajZ8I3AABV5IWr6XU837.png


Server端

wKiom1iWyLPTcWUcAABo90sCbsE792.png


到這裏,心跳檢測就實現了,就這麼簡單,你會了麼,See U~

相關文章
相關標籤/搜索