心跳檢測的概念
java
在分佈式架構中,好比Hadoop集羣,Storm集羣等,或多或少都涉及到Master/Slave的概念,每每是一個或者多個Master和N個Slave之間進行通訊。那麼一般Master應該須要知道Slave的狀態,Slave會定時的向Master進行發送消息,至關於告知Master:「我還活着,我如今在作什麼,什麼進度,個人CPU/內存狀況如何」等,這就是所謂的心跳。Master根據Slave的心跳,進行協調,好比Slave的CPU/內存消耗很大,那麼Master能夠將任務分配給其餘負載小的Slave進行處理;好比Slave一段時間沒有發送心跳過來,那麼Master可能會將可服務列表中暫時刪除該Slave,並可能發出報警,告知運維/開發人員進行處理.以下圖所示。bootstrap
Netty實現心跳檢測代碼實例數組
心跳信息對象服務器
主要儲存Slave的IP,通訊PORT,時間,內存,CPU信息等。架構
package day4; import java.io.Serializable; import java.util.Date; import java.util.HashMap; import java.util.Map; /** * Created by zhangfengzhe on 2017/2/4. */ public class HeartInfo implements Serializable{ private String ip; private int port; private Date lasttime; private Map<String , String> cpuInfo = new HashMap<String,String>(); private Map<String , String> memInfo = new HashMap<String, String>(); public String getIp() { return ip; } public void setIp(String ip) { this.ip = ip; } public int getPort() { return port; } public void setPort(int port) { this.port = port; } public Date getLasttime() { return lasttime; } public void setLasttime(Date lasttime) { this.lasttime = lasttime; } public Map<String, String> getCpuInfo() { return cpuInfo; } public void setCpuInfo(Map<String, String> cpuInfo) { this.cpuInfo = cpuInfo; } public Map<String, String> getMemInfo() { return memInfo; } public void setMemInfo(Map<String, String> memInfo) { this.memInfo = memInfo; } @Override public String toString() { return "HeartInfo{" + "ip='" + ip + '\'' + ", port=" + port + ", lasttime=" + lasttime + ", cpuInfo=" + cpuInfo + ", memInfo=" + memInfo + '}'; } }
JBoss Marshalling編解碼處理器
運維
package day3; import io.netty.handler.codec.marshalling.DefaultMarshallerProvider; import io.netty.handler.codec.marshalling.DefaultUnmarshallerProvider; import io.netty.handler.codec.marshalling.MarshallerProvider; import io.netty.handler.codec.marshalling.MarshallingDecoder; import io.netty.handler.codec.marshalling.MarshallingEncoder; import io.netty.handler.codec.marshalling.UnmarshallerProvider; import org.jboss.marshalling.MarshallerFactory; import org.jboss.marshalling.Marshalling; import org.jboss.marshalling.MarshallingConfiguration; /** * Marshalling工廠 */ public final class MarshallingCodeCFactory { /** * 建立Jboss Marshalling×××MarshallingDecoder * @return MarshallingDecoder */ public static MarshallingDecoder buildMarshallingDecoder() { //首先經過Marshalling工具類的精通方法獲取Marshalling實例對象 參數serial標識建立的是java序列化工廠對象。 final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial"); //建立了MarshallingConfiguration對象,配置了版本號爲5 final MarshallingConfiguration configuration = new MarshallingConfiguration(); configuration.setVersion(5); //根據marshallerFactory和configuration建立provider UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration); //構建Netty的MarshallingDecoder對象,倆個參數分別爲provider和單個消息序列化後的最大長度 MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024); return decoder; } /** * 建立Jboss Marshalling編碼器MarshallingEncoder * @return MarshallingEncoder */ public static MarshallingEncoder buildMarshallingEncoder() { final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial"); final MarshallingConfiguration configuration = new MarshallingConfiguration(); configuration.setVersion(5); MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration); //構建Netty的MarshallingEncoder對象,MarshallingEncoder用於實現序列化接口的POJO對象序列化爲二進制數組 MarshallingEncoder encoder = new MarshallingEncoder(provider); return encoder; } }
Client
dom
package day4; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; public class Client { public static void main(String[] args) throws Exception{ EventLoopGroup group = new NioEventLoopGroup(); Bootstrap b = new Bootstrap(); final int port = 8765; final String serverIP = "127.0.0.1"; b.group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel sc) throws Exception { sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder()); sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder()); sc.pipeline().addLast(new ClientHandler(port)); } }); ChannelFuture cf = b.connect(serverIP, port).sync(); cf.channel().closeFuture().sync(); group.shutdownGracefully(); } }
Client Handlersocket
package day4; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import io.netty.util.ReferenceCountUtil; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; /** * Created by zhangfengzhe on 2017/2/4. */ public class ClientHandler extends ChannelHandlerAdapter { private String ip; private int port; private ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1); private ScheduledFuture<?> scheduledFuture; private static final String SUCCESS = "OK"; public ClientHandler(){} public ClientHandler(int port) { this.port = port; //獲取本機IP try { this.ip = InetAddress.getLocalHost().getHostAddress(); } catch (UnknownHostException e) { e.printStackTrace(); } } //通道創建初始化時 發送信息 準備握手驗證 @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { String authInfo = this.ip + ":" + this.port; ctx.writeAndFlush(authInfo); } //當服務器發送認證信息後,開始啓動心跳發送 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if(msg instanceof String){ //認證成功 if(SUCCESS.equals((String)msg)){ this.scheduledFuture = scheduledExecutorService.scheduleWithFixedDelay(new HeartTask(ctx,ip,port),2,3, TimeUnit.SECONDS); }else{ System.out.println("服務器發來消息:" + msg); } } ReferenceCountUtil.release(msg); } //若是出現異常 取消定時 @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); if(this.scheduledFuture != null){ this.scheduledFuture.cancel(true); this.scheduledFuture = null; } } }
Client和Server創建通道初始化的時候,Client會向服務器發送信息用於認證。在實際開發中,Client在發送心跳前,須要和Server端進行握手驗證,會涉及到加解密,這裏爲了簡單起見,省去了這些過程。從上面的代碼也能夠看到,若是服務端認證成功,那麼Client會開始啓動定時線程去執行任務,那麼接下來,咱們看看這個心跳任務。
分佈式
心跳任務HeartTaskide
package day4; import io.netty.channel.ChannelHandlerContext; import org.hyperic.sigar.CpuPerc; import org.hyperic.sigar.Mem; import org.hyperic.sigar.Sigar; import java.util.Date; import java.util.Random; /** * Created by zhangfengzhe on 2017/2/4. */ public class HeartTask implements Runnable{ //持有引用,方便讀寫操做 private ChannelHandlerContext ctx; private HeartInfo heartInfo = new HeartInfo(); public HeartTask(ChannelHandlerContext ctx, String ip, int port) { this.ctx = ctx; heartInfo.setIp(ip); heartInfo.setPort(port); } @Override public void run() { try{ //利用sigar獲取 內存/CPU方面的信息 ; 利用CTX給服務器端發送消息 Sigar sigar = new Sigar(); //內存使用信息memory Mem mem = sigar.getMem(); heartInfo.getMemInfo().put("total",String.valueOf(mem.getTotal())); heartInfo.getMemInfo().put("used",String.valueOf(mem.getUsed())); heartInfo.getMemInfo().put("free",String.valueOf(mem.getFree())); //CPU使用信息 CpuPerc cpuPerc = sigar.getCpuPerc(); heartInfo.getCpuInfo().put("user",String.valueOf(cpuPerc.getUser())); heartInfo.getCpuInfo().put("sys",String.valueOf(cpuPerc.getSys())); heartInfo.getCpuInfo().put("wait",String.valueOf(cpuPerc.getWait())); heartInfo.getCpuInfo().put("idle",String.valueOf(cpuPerc.getIdle())); heartInfo.setLasttime(new Date()); ctx.writeAndFlush(heartInfo); }catch (Exception e){ e.printStackTrace(); } } }
首先,爲了方便在心跳任務中進行讀寫操做,HeartTask持有ChannelHandlerContext的引用。其次,爲了方便收集系統的內存、CPU信息,這裏使用了Sigar,也是在實際中引用很是普遍的一個工具。
Server
package day4; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; public class Server { public static void main(String[] args) throws Exception{ EventLoopGroup pGroup = new NioEventLoopGroup(); EventLoopGroup cGroup = new NioEventLoopGroup(); ServerBootstrap b = new ServerBootstrap(); b.group(pGroup, cGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) //設置日誌 .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { protected void initChannel(SocketChannel sc) throws Exception { sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder()); sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder()); sc.pipeline().addLast(new ServerHandler()); } }); ChannelFuture cf = b.bind(8765).sync(); cf.channel().closeFuture().sync(); pGroup.shutdownGracefully(); cGroup.shutdownGracefully(); } }
Server Handler
package day4; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; /** * Created by zhangfengzhe on 2017/2/4. */ public class ServerHandler extends ChannelHandlerAdapter { //KEY: ip:port VALUE: HeartInfo private Map<String,HeartInfo> heartInfoMap = new HashMap<String, HeartInfo>(); private static final List<String> authList = new ArrayList<String>(); static { //從其餘地方加載出來的IP列表 authList.add("192.168.99.219:8765"); } //服務器會接收到2種消息 一個是客戶端初始化時發送過來的認證信息 第二個是心跳信息 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if(msg instanceof String){ if(authList.contains(msg)){ //驗證經過 ctx.writeAndFlush("OK"); }else{ ctx.writeAndFlush("不在認證列表中..."); } }else if(msg instanceof HeartInfo){ System.out.println((HeartInfo)msg); ctx.writeAndFlush("心跳接收成功!"); HeartInfo heartInfo = (HeartInfo)msg; heartInfoMap.put(heartInfo.getIp() + ":" + heartInfo.getPort(),heartInfo); } } }
運行結果
Client端
Server端
到這裏,心跳檢測就實現了,就這麼簡單,你會了麼,See U~