前言
bootstrap
前面咱們已經基於不一樣維度介紹關於Netty的不少知識了,包括通訊原理、框架工做機制、核心組件、應用實戰,以及不一樣場景對不一樣方案的選擇等等。那麼咱們此次就研究一下咱們項目中基於Netty端對端開發中如何搭建一個完整的應用框架,以供開發人員嵌入他們關注的各類應用部件等。緩存
實現Netty應用級框架須要考慮哪些因素服務器
不少人問,咱們在基於某種網絡通訊框架構建咱們本身的應用框架的時候,究竟須要考慮到哪些方面?咱們如何構建一個與業務解耦的應用基礎設施、定製協議格式、健壯性的機制等來支撐咱們的開發呢?你們也能夠在下方的留言討論,而就我的的理解和相關實踐經驗,我認爲至少應考慮到如下的問題:網絡
網絡通訊協議的選擇,方案的比較, 咱們應基於TCP?UDP?仍是應用層的一些成熟協議?...session
網絡I/O模型該採用何種?BIO?NIO?IO複用?AIO?仍是信號驅動IO呢?架構
底層通訊框架咱們要定製,仍是沿用已有的成熟框架?框架
咱們場景是否須要統必定製全局可複用的交互協議、報文等?異步
是否應該建設一種高效可靠的消息編解碼機制支撐快速通訊?ide
來自客戶端的鏈接、業務請求等是否須要有認證和鑑權?工具
當業務通訊發生異常了,可否方便看到和追蹤通訊報文細節(更便於咱們一步一步查找問題緣由)??
當服務端由於非預期緣由斷開或崩潰,而後發現修復重啓後是否每一個客戶端都要手動再鏈接一下??
當服務器空閒一段時間後,是否該有一種機制自動觸發心跳檢測網絡的健康情況??
... ...
還有不少本文就不一一列舉。那咱們今天就以上考慮到的問題點來手寫實現一個基於Netty通訊框架的應用級框架,而後驗證咱們的問題是否能得以圓滿解決。
應用框架實戰
在如下應用框架中咱們將給出以上問題的解決方案
網絡通訊協議的選擇,方案的比較, 咱們應基於TCP?UDP?仍是應用層的一些成熟協議? 採用TCP/IP
網絡I/O模型該採用何種?BIO?NIO?IO複用?AIO?仍是信號驅動IO呢?採用NIO非阻塞模型
底層通訊框架咱們要定製,仍是沿用已有的成熟框架? 基於Netty框架
咱們場景是否須要統必定製全局可複用的交互協議、報文等? 私有協議定義和交互約定方式
是否應該建設一種高效可靠的消息編解碼機制支撐快速通訊?採用ByteToMessage/MessageToByte/Kryo
來自客戶端的鏈接、業務請求等服務端是否須要有認證和鑑權?採用服務端白名單
當業務通訊發生異常了,可否方便看到和追蹤通訊報文細節(更便於咱們一步一步查找問題緣由)??採用Netty內置Logging機制
當服務端由於非預期緣由斷開或崩潰,而後發現修復重啓後是否每一個客戶端都要手動再鏈接一下?? 設計斷線自動嘗試重連
當服務器空閒一段時間後,是否該有一種機制自動觸發心跳檢測網絡的健康情況??採用IdleState和自動觸發機制
1、基礎設施部分
/**
* @author andychen https://blog.51cto.com/14815984
* @description:Kryo序列化器單例
*/
public class KryoBuilder {
private KryoBuilder(){}
/**
* 獲取單例
* @return
*/
public static Kryo getInstance(){
return SingleKryo.builder;
}
private static class SingleKryo{
private static Kryo builder = new Kryo();
}
/**
* 構建kryo對象和註冊
* @return
*/
public static Kryo build(){
Kryo kryo = getInstance();
kryo.setRegistrationRequired(false);
kryo.register(Arrays.asList("").getClass(), new ArraysAsListSerializer());
kryo.register(GregorianCalendar.class, new GregorianCalendarSerializer());
kryo.register(InvocationHandler.class, new JdkProxySerializer());
kryo.register(BigDecimal.class, new DefaultSerializers.BigDecimalSerializer());
kryo.register(BigInteger.class, new DefaultSerializers.BigIntegerSerializer());
kryo.register(Pattern.class, new RegexSerializer());
kryo.register(BitSet.class, new BitSetSerializer());
kryo.register(URI.class, new URISerializer());
kryo.register(UUID.class, new UUIDSerializer());
UnmodifiableCollectionsSerializer.registerSerializers(kryo);
SynchronizedCollectionsSerializer.registerSerializers(kryo);
kryo.register(HashMap.class);
kryo.register(ArrayList.class);
kryo.register(LinkedList.class);
kryo.register(HashSet.class);
kryo.register(TreeSet.class);
kryo.register(Hashtable.class);
kryo.register(Date.class);
kryo.register(Calendar.class);
kryo.register(ConcurrentHashMap.class);
kryo.register(SimpleDateFormat.class);
kryo.register(GregorianCalendar.class);
kryo.register(Vector.class);
kryo.register(BitSet.class);
kryo.register(StringBuffer.class);
kryo.register(StringBuilder.class);
kryo.register(Object.class);
kryo.register(Object[].class);
kryo.register(String[].class);
kryo.register(byte[].class);
kryo.register(char[].class);
kryo.register(int[].class);
kryo.register(float[].class);
kryo.register(double[].class);
return kryo;
}
}
/**
* @author andychen https://blog.51cto.com/14815984
* @description:Kryo序列化器
*/
public class KryoSerializer {
private static final Kryo kryo = KryoBuilder.build();
/**
* 序列化
*
* @param object
* @param buf
*/
public static void serialize(Object object, ByteBuf buf) {
ByteArrayOutputStream stream = new ByteArrayOutputStream();
try{
Output out = new Output(stream);
kryo.writeClassAndObject(out, object);
out.flush();
out.close();
byte[] bytes = stream.toByteArray();
stream.flush();
stream.close();
/**
* 寫入buffer
*/
buf.writeBytes(bytes);
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 反序列化
* @param buf 數據緩衝
* @return
*/
public static Object deserialize(ByteBuf buf) {
try(ByteBufInputStream stream = new ByteBufInputStream(buf)) {
Input input = new Input(stream);
return kryo.readClassAndObject(input);
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
}
/**
* @author andychen https://blog.51cto.com/14815984
* @description:Kryo編碼器
*/
public class KryoEncoder extends MessageToByteEncoder<Message> {
/**
* 編碼實現
* @param channelHandlerContext 處理器上下文
* @param message 報文
* @param byteBuf 對端數據緩衝
* @throws Exception
*/
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, Message message, ByteBuf byteBuf) throws Exception {
KryoSerializer.serialize(message, byteBuf);
channelHandlerContext.flush();
}
}
/**
* @author andychen https://blog.51cto.com/14815984
* @description:Kryo解碼器
*/
public class KryoDecoder extends ByteToMessageDecoder {
/**
* 解碼實現
* @param channelHandlerContext 處理器上下文
* @param byteBuf 對端緩衝
* @param list 反序列化列表
* @throws Exception
*/
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
Object object = KryoSerializer.deserialize(byteBuf);
list.add(object);
}
}
/**
* @author andychen https://blog.51cto.com/14815984
* @description:系統快捷工具類
*/
public final class Utility {
/**
* 構建報文
* @param sessionId 會話Id
* @param msg 主體
* @return
*/
public static Message buildMessage(int sessionId, Object msg, byte type){
//獲取校驗碼
final int OFFSET = 9;
int seed = sessionId+(sessionId > OFFSET ? sessionId : sessionId+OFFSET);
String crc = CRC.getCRC16(seed);
MessageHeader header = new MessageHeader();
header.setCrc(crc);
header.setLength(calcBufferLen(msg));
header.setSessionId(sessionId);
header.setType(type);
Message message = new Message();
message.setHeader(header);
message.setBody(msg);
return message;
}
/**
* 是否IP認證經過
* @return
*/
public static boolean isIPPassed(String ip){
for (String p : Constant.WHITELIST){
if(ip.equals(p)){
return true;
}
}
return false;
}
/**
* 計算報文長度
* @param msg 報文對象
* @return int
*/
private static int calcBufferLen(Object msg){
try(ByteArrayOutputStream stream = new ByteArrayOutputStream();
ObjectOutputStream output = new ObjectOutputStream(stream)){
output.writeObject(msg);
return stream.toByteArray().length;
}catch (IOException e){
e.printStackTrace();
}
return 0;
}
}
/**
* @author andychen https://blog.51cto.com/14815984
* @description:私有報文
*/
public final class Message {//<T extends Object>
/**
* 報文頭
*/
private MessageHeader header;
/**
* 報文主體
*/
private Object body;
public MessageHeader getHeader() {
return header;
}
public void setHeader(MessageHeader header) {
this.header = header;
}
public Object getBody() {
return body;
}
public void setBody(Object body) {
this.body = body;
}
@Override
public String toString() {
return "Message [header=" + this.header + "][body="+this.body+"]";
}
}
/**
* @author andychen https://blog.51cto.com/14815984
* @description:報文頭
*/
public final class MessageHeader {
/**
* CRC校驗碼
*/
private String crc;
/**
* 會話id
*/
private int sessionId;
/**
* 報文長度
*/
private int length;
/**
* 報文類型碼
*/
private byte type;
/**
* 報文優先級
*/
private int priority;
/**
* 報文附件
*/
private Map<String,Object> attachment = new HashMap<>();
public String getCrc() {
return crc;
}
public void setCrc(String crc) {this.crc = crc;}
public int getSessionId() {
return sessionId;
}
public void setSessionId(int sessionId) {
this.sessionId = sessionId;
}
public int getLength() {
return length;
}
public void setLength(int length) {
this.length = length;
}
public byte getType() {
return type;
}
public void setType(byte type) {
this.type = type;
}
public int getPriority() {
return priority;
}
public void setPriority(int priority) {
this.priority = priority;
}
public Map<String, Object> getAttachment() {
return attachment;
}
public void setAttachment(Map<String, Object> attachment) {
this.attachment = attachment;
}
@Override
public String toString() {
return "MessageHeader [crc=" + this.crc + ", length=" + this.length
+ ", sessionId=" + this.sessionId + ", type=" + this.type + ", priority="
+ this.priority + ", attachment=" + this.attachment + "]";
}
}
/**
* @author andychen https://blog.51cto.com/14815984
* @description:數據報文類型
*/
public enum MessageType {
/**
* 認證請求
*/
AUTH_REQUEST((byte)0),
/**
* 認證應答
*/
AUTH_REPLY((byte)1),
/**
* 心跳請求
*/
HEARTBEAT_REQUEST((byte)2),
/**
* 心跳應答
*/
HEARTBEAT_REPLY((byte)3),
/**
* 普通請求
*/
REQUEST((byte)4),
/**
* 普通應答
*/
REPLY((byte)5);
public byte getValue() {
return value;
}
private final byte value;
MessageType(byte b) {
this.value = b;
};
}
2、客戶端部分
/**
* @author andychen https://blog.51cto.com/14815984
* @description:框架客戶端啓動器類
*/
public class ClientStarter {
/**
* 日誌處理
*/
private static final Log log = LogFactory.getLog(ClientStarter.class);
/**
* 客戶端啓動
* @param args
*/
public static void main(String[] args) throws Exception {
ClientTask client = new ClientTask(Constant.SERV_HOST, Constant.SERV_PORT);
new Thread(client).start();
while (!client.isConnected()){
synchronized (client){
client.wait();
}
}
log.info("與服務器鏈接已創建,準備通訊...");
/**
* 採用在控制檯適時輸入消息主體的方式,發送報文
*/
Scanner scanner = new Scanner(System.in);
for (;;){
String body = scanner.next();
if(null != body && !"".equals(body)){
if(!body.equalsIgnoreCase("exit")){
client.send(body);
}else{
client.close();
/**
* 等待鏈接正常關閉通知
*/
while (client.isConnected()){
synchronized(client){
client.wait();
}
}
scanner.close();
System.exit(0);//提示正常退出
}
}
}
}
}
/**
* @author andychen https://blog.51cto.com/14815984
* @description:客戶端封裝
*/
public class ClientTask implements Runnable {
private final String host;
private final int port;
public ClientTask(String host, int port) {
this.host = host;
this.port = port;
}
/**
* 日誌處理
*/
private static final Log log = LogFactory.getLog(ClientTask.class);
/**
* 報文計數器
*/
public final static AtomicInteger counter = new AtomicInteger(0);
/**
* 這裏用1個後臺線程,定時執行檢測客戶端鏈接是否斷開,若非用戶斷開則自動嘗試重連
*/
private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
/**
* 客戶端鏈接通道
*/
private Channel channel;
/**
* 工做線程池組
*/
private EventLoopGroup group = new NioEventLoopGroup();
/**
* 是否意外關閉:出異常或網絡斷開(區別於人爲主動關閉)
*/
private volatile boolean except_closed = true;
/**
* 是否鏈接成功
*/
private volatile boolean connected = false;
private static Object _obj = new Object();
/**
* 是否鏈接
* @return
*/
public boolean isConnected(){
return this.connected;
}
/**
* 執行客戶端
*/
@Override
public void run() {
try {
this.connect();
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 鏈接服務器端
*/
public void connect() throws Exception{
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
//設置TCP底層保溫發送不延遲
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializerExt());
//(1)異步鏈接服務器端,等待發送和接收報文
ChannelFuture future = bootstrap.connect(new InetSocketAddress(this.host, this.port)).sync();
this.channel = future.sync().channel();
//(2)通知其餘等待線程,鏈接已創建
synchronized(this){
this.connected = true;
this.notifyAll();
}
this.channel.closeFuture().sync();
} finally {
//檢測並執行重鏈接
this.reconnect();
}
}
/**
* 關閉鏈接:非正常關閉
*/
public void close(){
this.except_closed = false;
this.channel.close();
}
/**
* 發送報文
* @param body 報文主體
*/
public void send(String body){
if(null != this.channel && this.channel.isActive()){
Message message = Utility.buildMessage(counter.incrementAndGet(), body, MessageType.REQUEST.getValue());
this.channel.writeAndFlush(message);
return;
}
log.info("通訊還沒有創建,請稍後再試...");
}
/**
* 執行重鏈接
* @throws Exception
*/
private void reconnect() throws Exception{
//主動關閉被檢測到
if(this.except_closed){
log.info("鏈接非正常關閉,準備嘗試重連...");
this.executor.execute(new ReconnectTask(this.host, this.port));
}else{
//主動關閉鏈接:釋放資源
this.relese();
}
}
/**
* 關閉鏈接釋放資源,通知其它等待的線程
* @throws Exception
*/
private void relese() throws Exception{
this.channel = null;
this.group.shutdownGracefully().sync();
synchronized (this){
this.connected = false;
this.notifyAll();
}
}
/**
* 嘗試重連服務器任務
*/
private class ReconnectTask implements Runnable{
private final String h;
private final int p;
public ReconnectTask(String h, int p) {
this.h = h;
this.p = p;
}
/**
* 嘗試重連
*/
@Override
public void run() {
try {
//間隔1秒重試一次
Thread.sleep(1000);
connect();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
/**
* @author andychen https://blog.51cto.com/14815984
* @description:客戶端通道初始化器擴展
*/
public class ChannelInitializerExt extends ChannelInitializer<SocketChannel> {
/**
* 初始化通道
* @param channel 通道
* @throws Exception 異常
*/
@Override
protected void initChannel(SocketChannel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
//(1)報文粘包處理
pipeline.addLast(new LengthFieldBasedFrameDecoder(Constant.MAX_MSG_LEN,0,2,0,2));
//(2)給報文增長分割長度
pipeline.addLast(new LengthFieldPrepender(2));
//(3)報文解碼器
pipeline.addLast(new KryoDecoder());
//(4)報文編碼器
pipeline.addLast(new KryoEncoder());
//(5)鏈接超時檢測
pipeline.addLast(new ReadTimeoutHandler(Constant.TIMEOUT_SECONDS));
//(6)認證請求
pipeline.addLast(new AuthenticationHandler());
//(7)心跳處理:發送心跳
pipeline.addLast(new HeartbeatHandler());
}
}
/**
* @author andychen https://blog.51cto.com/14815984
* @description:客戶端認證請求
*/
public class AuthenticationHandler extends ChannelInboundHandlerAdapter {
/**
* 日誌處理
*/
private static final Log log = LogFactory.getLog(AuthenticationHandler.class);
/**
* 全局計數器
*/
private final static AtomicInteger counter = new AtomicInteger(0);
/**
* 通道開啓事件
* @param ctx 處理器上下文
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
/**
* 發起認證請求
*/
Message message = Utility.buildMessage(counter.incrementAndGet(), "Auth Request",
MessageType.AUTH_REQUEST.getValue());
ctx.writeAndFlush(message);
}
/**
* 處理網絡讀取事件
* @param ctx 處理器上下文
* @param msg 報文
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Message message = (Message)msg;
if(null != message){
MessageHeader header = message.getHeader();
//處理認證應答
if(null != header && header.getType() == MessageType.AUTH_REPLY.getValue()){
String body = message.getBody().toString();
if(body.equals(AuthenticationResult.FAILED)){
log.info("Authentication failed, channel close..");
ctx.close();
return;
}
log.info("Authentication is ok: "+message);
}
}
ctx.fireChannelRead(msg);
}
/**
* 客戶端認證異常處理
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.fireExceptionCaught(cause);
}
}
/**
* @author andychen https://blog.51cto.com/14815984
* @description:客戶端心跳處理器
*/
public class HeartbeatHandler extends ChannelInboundHandlerAdapter {
/**
* 日誌處理
*/
private static final Log log = LogFactory.getLog(AuthenticationHandler.class);
/**
* 心跳定時任務
*/
private volatile ScheduledFuture<?> scheduleHeartbeat;
/**
* 處理客戶端心跳請求報文
* @param ctx 處理器上下文
* @param msg 消息對象
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Message message = (Message)msg;
if(null != message){
MessageHeader header = message.getHeader();
//處理認證應答
if(header.getType() == MessageType.AUTH_REPLY.getValue()){
//登陸完成後,開啓客戶端對服務端心跳
this.scheduleHeartbeat = ctx.executor().scheduleAtFixedRate(new HeartbeatTask(ctx),
0, Constant.HEARTBEAT_TIMEOUT,
TimeUnit.MILLISECONDS);
return;
}
//處理心跳應答
if(header.getType() == MessageType.HEARTBEAT_REPLY.getValue()){
log.info("Client recevied server heartbeat: "+message);
ReferenceCountUtil.release(msg);
return;
}
}
ctx.fireChannelRead(msg);
}
/**
* 客戶端捕獲心跳異常
* @param ctx 處理器上下文
* @param cause 異常
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
if(null != this.scheduleHeartbeat){
this.scheduleHeartbeat.cancel(true);
this.scheduleHeartbeat = null;
}
//傳遞給TailHandler處理
ctx.fireExceptionCaught(cause);
}
/**
* 定義心跳任務
*/
private class HeartbeatTask implements Runnable{
/**
* 心跳計數器
*/
private final AtomicInteger counter = new AtomicInteger(0);
private final ChannelHandlerContext ctx;
public HeartbeatTask(ChannelHandlerContext ctx) {
this.ctx = ctx;
}
/**
* 心跳任務執行
*/
@Override
public void run() {
//客戶端心跳報文
Message heartbeat = Utility.buildMessage(this.counter.incrementAndGet(), Constant.HEARTBEAT_ACK,
MessageType.HEARTBEAT_REQUEST.getValue());
this.ctx.writeAndFlush(heartbeat);
}
}
}
3、服務器部分
/**
* @author andychen https://blog.51cto.com/14815984
* @description:框架服務端啓動器類
*/
public class ServerStarter {
/**
* 日誌處理
*/
private static final Log log = LogFactory.getLog(ServerStarter.class);
/**
* 服務端啓動
* @param args
*/
public static void main(String[] args) throws Exception {
//主線程池組:負責處理鏈接
EventLoopGroup main = new NioEventLoopGroup();
//工做線程池組:負責請求對應的業務Handler處理
EventLoopGroup work = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(main, work)
.channel(NioServerSocketChannel.class)
//設置底層協議接收緩存隊列最大長度
.option(ChannelOption.SO_BACKLOG, Constant.TCP_MAX_QUEUE_SIZE)
.childHandler(new ChannelInitializerExt());
//綁定端口,等待同步報文
bootstrap.bind(Constant.SERV_PORT).sync();
log.info("Server started and listen port: "+Constant.SERV_PORT+"...");
}
}
/**
* @author andychen https://blog.51cto.com/14815984
* @description:服務器通道初始化器
*/
public class ChannelInitializerExt extends ChannelInitializer<SocketChannel> {
/**
* 初始化處理器
* @param channel 鏈接通道
* @throws Exception 異常
*/
@Override
protected void initChannel(SocketChannel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
//(1)日誌打印處理:能夠打印報文字節碼
pipeline.addLast(new LoggingHandler(LogLevel.INFO));
//(2)處理粘包問題:帶長度
pipeline.addLast(new LengthFieldBasedFrameDecoder(Constant.MAX_MSG_LEN,0,2,0,2));
//(3)報文編碼器:消息發送增長分隔符
pipeline.addLast(new LengthFieldPrepender(2));
//(4)私有報文解碼
pipeline.addLast(new KryoDecoder());
//(5)私有報文編碼
pipeline.addLast(new KryoEncoder());
//(6)通道鏈接超時檢測,發送心跳
pipeline.addLast(new ReadTimeoutHandler(Constant.TIMEOUT_SECONDS));
//(7)身份認證應答
pipeline.addLast(new AuthenticationHandler());
//(8)心跳應答
pipeline.addLast(new HeartbeatHandler());
//(9)其餘業務處理...
pipeline.addLast(new OtherServiceHandler());
}
}
/**
* @author andychen https://blog.51cto.com/14815984
* @description:服務端身份認證處理器
*/
public class AuthenticationHandler extends ChannelInboundHandlerAdapter {
/**
* 日誌處理
*/
private static final Log log = LogFactory.getLog(AuthenticationHandler.class);
/**
* 定義認證業務計數器
*/
private static final AtomicInteger counter = new AtomicInteger(0);
/**
* 緩存已認證ip列表
*/
private static final List<String> authedIPList = new LinkedList<>();
/**
* 認證業務處理
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Message message = (Message)msg;
Message authMessage;
if(null != message){
MessageHeader header = message.getHeader();
//處理認證請求
if(null != header && header.getType() == MessageType.AUTH_REQUEST.getValue()){
String ip = ctx.channel().remoteAddress().toString();
String result;
//重複登陸
if(authedIPList.contains(ip)){
result = AuthenticationResult.REPEAT_AUTH.toString();
}else{
//是否ip認證經過
if(Utility.isIPPassed(ip)){
authedIPList.add(ip);
result = AuthenticationResult.SUCCESS.toString();
}else{
result = AuthenticationResult.FAILED.toString();
}
}
authMessage = Utility.buildMessage(counter.incrementAndGet(), result, MessageType.AUTH_REPLY.getValue());
ctx.writeAndFlush(authMessage);
//釋放對象,再也不向後傳遞
ReferenceCountUtil.release(msg);
log.info("Server reply client auth request:"+authMessage);
return;
}
}
ctx.fireChannelRead(msg);
}
/**
* 認證處理器捕獲異常處理
* @param ctx 處理器上下文
* @param cause 異常
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
authedIPList.remove(ctx.channel().remoteAddress().toString());
ctx.close();
ctx.fireExceptionCaught(cause);
}
}
/**
* @author andychen https://blog.51cto.com/14815984
* @description:服務器端心跳包處理器
*/
public class HeartbeatHandler extends ChannelInboundHandlerAdapter {
/**
* 日誌處理
*/
private static final Log log = LogFactory.getLog(HeartbeatHandler.class);
/**
* 會話計數器
*/
private final AtomicInteger counter = new AtomicInteger(0);
/**
* 處理心跳報文
* @param ctx 處理器上下文
* @param msg 消息報文
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Message message = (Message)msg;
if(null != message){
MessageHeader header = message.getHeader();
/**
* 處理心跳請求
*/
if(null != header && header.getType() == MessageType.HEARTBEAT_REQUEST.getValue()){
log.info("Server recevied client heartbeat: "+message);
//應答報文
Message heartbeat = Utility.buildMessage(counter.incrementAndGet(), Constant.HEARTBEAT_ACK,
MessageType.HEARTBEAT_REPLY.getValue());
ctx.writeAndFlush(heartbeat);
//引用計數器釋放對象
ReferenceCountUtil.release(msg);
return;
}
}
ctx.fireChannelRead(msg);
}
}
/**
* @author andychen https://blog.51cto.com/14815984
* @description:其它業務處理
*/
public class OtherServiceHandler extends SimpleChannelInboundHandler<Message> {
/**
* 日誌處理
*/
private static final Log log = LogFactory.getLog(OtherServiceHandler.class);
/**
* 讀取對端發送的報文
* @param ctx 處理器上下文
* @param message 報文
* @throws Exception
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, Message message) throws Exception {
log.info(message);
}
/**
* 鏈接斷開事件
* @param ctx 上下文
* @throws Exception
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
log.info("["+ctx.channel().remoteAddress()+"]斷開了鏈接...");
}
}
運行檢驗
開發小結
以上就是咱們基於Netty實現的一整套通訊應用框架的和核心代碼。全部的業務開發均可定製和構建相似此的基礎應用框架,開發Handler處理器的業務人員可任意嵌入被解耦化的業務領域Handler。可採用Handler自動注入Netty管道的方式零侵入框架,支持更多更復雜的業務!但願本文能給你們提供靠譜和一站式的借鑑。你們有任何關於Netty的問題能夠在下方留言,謝謝關注!