SLG手遊Java服務器的設計與開發——網絡通訊

  • 文章版權歸騰訊GAD全部,禁止匿名轉載;禁止商業使用;禁止我的使用。

1、前言

上文分析了咱們這款SLG的架構,本章着重講解咱們的網絡通訊架構,由上文的功能分析咱們能夠得知,遊戲的全部功能基本上屬於非及時的通訊機制,因此依靠HTTP短鏈接就可以基本知足遊戲的通訊需求。
固然,咱們先撇開國戰部分不說,由於國戰部分咱們正在優化開發最新版本,以前咱們作的版本是想經過異步戰鬥的機制達到實時戰鬥效果,經過使用HTTP的請求機制,加上前端畫面表現,讓玩家感受到即時戰鬥的感受,既能在地圖上能看到其餘玩家的行進隊列,又能進入城池多國混戰。惋惜的是,讓異步戰鬥來實現實時戰鬥的效果,會產生不少問題,最終由於機制的問題而商議出必須優化一版再上線。因此目前全部的功能均經過HTTP實現,若是後期國戰須要使用TCP長鏈接能夠單獨對國戰部分使用TCP長鏈接實現。前端

2、通訊框架

1.使用Netty

在開始設計通訊機制時,就須要選擇合適的通訊框架,固然,咱們也能夠本身動手寫底層通訊的實現,不過在前人已有成熟框架的狀況下,咱們大而沒必要重複造輪子,因爲在通訊方面,咱們並無太多的個性化需求,所以基本上成熟的通訊框架都能知足目前所需。選擇框架,無非就是看它們的底層架構是否符合需求、資料是否齊全、API文檔是否詳細、以及成熟案例有多少。上文提到,可使用HTTP通訊協議的框架中,有Servlet、Spring、Struts、Mina和Netty等常見的通訊框架,在這其中我選擇了Netty,Servlet、Spring和Struts屬於同一系列,他們的底層都是Servlet的實現,在Servlet3.0之前均是BIO通訊模式,而Mina和Netty均屬於基於Java NIO的通訊框架,因爲通訊機制的不一樣,基於NIO的通訊程序比基於BIO的通訊程序能承受更多的併發鏈接,而在後者的框架選擇中,其實並無太多的誰好與很差,Mina和Netty底層都是Java NIO的封裝,而且二者的底層框架也是大體同樣(其做者其實就是一我的),選擇Netty更多的是由於Netty的有更多的資料可查,遇到問題可能會更容易解決,而且我我的在同時使用過Mina和Netty的狀況下,認爲Netty的API更友好,使用起來更方便(我的感受哈)。綜合種種緣由,我選擇了Netty做爲個人底層通訊框架。java

2.Netty的特色

選擇了Netty,咱們就應該明白Netty的一些特色,Netty具備如下特色:
1.異步、非阻塞、基於事件驅動的NIO框架
2.支持多種傳輸層通訊協議,包括TCP、UDP等
3.開發異步HTTP服務端和客戶端應用程序
4.提供對多種應用層協議的支持,包括TCP私有協議、HTTP協議、WebSocket協議、文件傳輸等
5.默認提供多種編解碼能力,包括Java序列化、Google的ProtoBuf、二進制編解碼、Jboss marshalling、文本字符串、base6四、簡單XML等,這些編解碼框架能夠被用戶直接使用
6.提供形式多樣的編解碼基礎類庫,能夠很是方便的實現私有協議棧編解碼框架的二次定製和開發
7.經典的ChannelFuture-listener機制,全部的異步IO操做均可以設置listener進行監聽和獲取操做結果
8.基於ChannelPipeline-ChannelHandler的責任鏈模式,能夠方便的自定義業務攔截器用於業務邏輯定製
9.安全性:支持SSL、HTTPS
10.可靠性:流量整形、讀寫超時控制機制、緩衝區最大容量限制、資源的優雅釋放等
11.簡潔的API和啓動輔助類,簡化開發難度,減小代碼量json

3.NIO

Netty是基於NIO的通訊框架,爲何要使用NIO而不是用傳統的BIO通訊機制呢,由於在BIO的線程模型上,存在着致命缺陷,因爲線程模型問題,接入用戶數與服務端創造線程數是1:1的關係,也就是說每個用戶從接入斷開鏈接,服務端都要創造一個與之對應的線程作處理,一旦併發用戶數增多,再好配置的服務器也有可能會有由於線程開銷問題形成服務器崩潰宕機的狀況。除此以外,BIO的全部IO操做都是同步的,當IO線程處理業務邏輯時,也會出現同步阻塞,其餘請求都要進入阻塞狀態。
相反,NIO的通訊機制能夠很好地解決BIO的線程開銷問題,NIO採用Reactor通訊模式,一個Reactor線程聚合一個多路複用Selector,這個Selector可同時註冊、監聽、輪迴上百個Channel請求,這種狀況下,一個IO線程就能夠處理N個客戶端的同時接入,接入用戶數與線程數爲N:1的關係,而且IO總數有限,不會出現頻繁上下文切換,提升了CPU利用率,而且全部的 IO 操做都是異步的,即便業務線程直接進行IO操做,也不會被同步阻塞,系統再也不依賴外部的網絡環境和外部應用程序的處理性能bootstrap

4.Netty架構

Netty採用經典的MVC三層架構:
1.第一層:Reactor通訊調度層,它由一系列輔助類組成,包括Reactor線程NioEventLoop 以及其父類、NioSocketChannel/NioServerSocketChannel 以及其父類、ByteBuffer 以及由其衍生出來的各類 Buffer、Unsafe 以及其衍生出的各類內部子類等。
2.第二層:職責鏈ChannelPipeLine,它負責調度事件在職責鏈中的傳播,支持動態的編排職責鏈,職責鏈能夠選擇性的攔截本身關心的事件,對於其它IO操做和事件忽略,Handler同時支持inbound和outbound事件
3.第三層:業務邏輯編排層,業務邏輯編排層一般有兩類:一類是純粹的業務邏輯編排,還有一類是應用層協議插件,用於協議相關的編解碼和鏈路管理,例如CMPP協議插件promise

3、基於Netty實現的HTTP Server

Netty其實更適合使用建立TCP長鏈接的Server,可是其也提供了HTTP的實現封裝,咱們也能夠很容易的實現基於Netty的HTTP服務器。Netty實現HTTP服務器主要經過HttpResponseEncoder和HttpRequestDecoder來進行HTTP請求的解碼以及HTTP響應的編碼,經過HttpRequest和HttpResponse接口來實現對請求的解析以及對響應的構造。本節先描述整個處理流程,而後經過源碼進行分享。緩存

1.處理流程

使用Netty實現的HTTP Server的處理流程以下:
1.HttpServer接收到客戶端的HttpRequest,打開Channel鏈接
2.pipeline中的HttpInHandler調用channelRead方法讀取Channel中的ChannelHandlerContext和Object
3.channelRead中調用實現類HttpInHandlerImp中的處理,將請求按照Get或Post方式進行解析,並將數據轉爲ProtoMessage,而後轉交給MsgHandler處理
4.MsgHandler將其封裝爲Message類添加到userid哈希的消息處理隊列中,並對隊列中的消息調用handle進行遊戲的邏輯處理
5.在邏輯處理中,調用HttpInHandler的writeJSON方法構造並返回HttpResponse響應消息
6.HttpOutHandler截取消息並打印log日誌
7.HttpResponse響應消息返回給客戶端並斷開Channel鏈接
整個流程的流程圖以下:
網絡處理流程安全

2.HttpServer

HttpServer中負責創造並啓動Netty實例,並綁定咱們的邏輯Handler到pipeline,使請求進入咱們本身的邏輯處理服務器

package com.kidbear._36.net.http;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponseEncoder;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.util.concurrent.DefaultThreadFactory;

import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Reader;
import java.nio.charset.Charset;
import java.util.Properties;
import java.util.concurrent.Executors;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HttpServer {
    public static Logger log = LoggerFactory.getLogger(HttpServer.class);
    public static HttpServer inst;
    public static Properties p;
    public static String ip;
    public static int port;
    private NioEventLoopGroup bossGroup = null;
    private NioEventLoopGroup workGroup = null;

    private HttpServer() {

    }

    public static HttpServer getInstance() {
        if (inst == null) {
            inst = new HttpServer();
            inst.initData();
        }
        return inst;
    }

    public void initData() {
        try {
            p = readProperties();
            ip = p.getProperty("ip");
            port = Integer.parseInt(p.getProperty("port"));
        } catch (IOException e) {
            log.error("socket配置文件讀取錯誤");
            e.printStackTrace();
        }
    }

    public void start() {
        bossGroup = new NioEventLoopGroup(0, Executors.newCachedThreadPool());// boss線程組
        workGroup = new NioEventLoopGroup(0, Executors.newCachedThreadPool());// work線程組
        ServerBootstrap bootstrap = new ServerBootstrap();
        bootstrap.group(bossGroup, workGroup);
        bootstrap.channel(NioServerSocketChannel.class);
        bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                ChannelPipeline pipeline = ch.pipeline();
                /* http request解碼 */
                pipeline.addLast("decoder", new HttpRequestDecoder());
                pipeline.addLast("aggregator", new HttpObjectAggregator(65536));
                /* http response 編碼 */
                pipeline.addLast("encoder", new HttpResponseEncoder());
                pipeline.addLast("http-chunked", new ChunkedWriteHandler());
                /* http response handler */
                pipeline.addLast("outbound", new HttpOutHandler());
                /* http request handler */
                pipeline.addLast("inbound", new HttpInHandler());
            }
        });
        bootstrap.bind(port);
        log.info("端口{}已綁定", port);
    }

    public void shut() {
        if (bossGroup != null && workGroup != null) {
            bossGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
        }
        log.info("端口{}已解綁", port);
    }

    /**
     * 讀配置socket文件
     * 
     * @return
     * @throws IOException
     */
    protected Properties readProperties() throws IOException {
        Properties p = new Properties();
        InputStream in = HttpServer.class
                .getResourceAsStream("/net.properties");
        Reader r = new InputStreamReader(in, Charset.forName("UTF-8"));
        p.load(r);
        in.close();
        return p;
    }
}

代碼中首先使用NioEventLoopGroup構造boss線程和work線程,而後構造ServerBootstrap,來設置Server的一些屬性,包括在pipeline中添加Http的編碼解碼以及邏輯處理相關類。經過調用該類的start方法便可啓動此HTTP服務器,其中端口在配置文件中配置好,啓動時從配置文件讀取。網絡

3.HttpInHandler

Http請求的處理器,綁定在pipeLine中,負責請求的解析與邏輯處理,代碼以下:架構

package com.kidbear._36.net.http;

import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpResponseStatus;

/**
 * @ClassName: HttpServerHandler
 * @Description: netty處理器
 * @author 何金成
 * @date 2015年12月18日 下午6:27:06
 * 
 */
public class HttpInHandler extends ChannelHandlerAdapter {

    public HttpInHandlerImp handler = new HttpInHandlerImp();

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg)
            throws Exception {
        handler.channelRead(ctx, msg);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        handler.exceptionCaught(ctx, cause);
    }

    public static void writeJSON(ChannelHandlerContext ctx,
            HttpResponseStatus status, Object msg) {
        HttpInHandlerImp.writeJSON(ctx, status, msg);
    }

    public static void writeJSON(ChannelHandlerContext ctx, Object msg) {
        HttpInHandlerImp.writeJSON(ctx, msg);
    }
}

其中的實現方法我都將其分離出來爲單獨的類來處理,我這樣作主要爲了我之後能經過JSP熱修復Bug(之後會講到,經過JSP熱加載的原理實現線上項目的熱修復),分離出來的實現類代碼以下:

package com.kidbear._36.net.http;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream;
import io.netty.buffer.ByteBufOutputStream;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.QueryStringDecoder;
import io.netty.handler.codec.http.multipart.Attribute;
import io.netty.handler.codec.http.multipart.DefaultHttpDataFactory;
import io.netty.handler.codec.http.multipart.HttpPostRequestDecoder;
import io.netty.handler.codec.http.multipart.InterfaceHttpData;
import io.netty.util.CharsetUtil;

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.kidbear._36.core.GameServer;
import com.kidbear._36.net.MsgHandler;
import com.kidbear._36.net.ProtoMessage;
import com.kidbear._36.net.ResultCode;
import com.kidbear._36.net.rpc.JsonRpcServers;
import com.kidbear._36.util.Constants;
import com.kidbear._36.util.encrypt.XXTeaCoder;

public class HttpInHandlerImp {
    private static Logger log = LoggerFactory.getLogger(HttpInHandlerImp.class);
    public static String DATA = "data";
    public static volatile boolean CODE_DEBUG = false;
    public ConcurrentHashMap<String, Future> executeMap = new ConcurrentHashMap<String, Future>();

    public void channelRead(final ChannelHandlerContext ctx, final Object msg)
            throws Exception {
        /** work線程的內容轉交線程池管理類處理,縮短work線程耗時 **/
        if (!GameServer.shutdown) {// 服務器開啓的狀況下
            DefaultFullHttpRequest req = (DefaultFullHttpRequest) msg;
            if (req.getMethod() == HttpMethod.GET) { // 處理get請求
                getHandle(ctx, req);
            }
            if (req.getMethod() == HttpMethod.POST) { // 處理POST請求
                postHandle(ctx, req);
            }
        } else {// 服務器已關閉
            JSONObject jsonObject = new JSONObject();
            jsonObject.put("errMsg", "server closed");
            writeJSON(ctx, jsonObject);
        }
    }

    private void postHandle(final ChannelHandlerContext ctx,
            final DefaultFullHttpRequest req) {
        HttpPostRequestDecoder decoder = new HttpPostRequestDecoder(
                new DefaultHttpDataFactory(false), req);
        // 邏輯接口處理
        try {
            InterfaceHttpData data = decoder.getBodyHttpData(DATA);
            if (data != null) {
                String val = ((Attribute) data).getValue();
                val = codeFilter(val);
                log.info("ip:{},read :{}", ctx.channel().remoteAddress(),
                        val);
                ProtoMessage msg = null;
                try {
                    msg = JSON.parseObject(val, ProtoMessage.class);
                } catch (Exception e) {
                    log.error("gameData的json格式轉換錯誤");
                    HttpInHandler.writeJSON(ctx,
                            HttpResponseStatus.NOT_ACCEPTABLE,
                            "not acceptable");
                    return;
                }
                Long userid = msg.getUserid();
                // 添加到消息處理隊列
                // MsgHandler.getInstance().addMsg(userid, msg, ctx);
                // 直接處理消息
                // MsgHandler.getInstance().handle(new Message(msg, ctx));
                // 處理消息隊列
                MsgHandler.getInstance().handleMsg(userid, msg, ctx);
            }
        } catch (Exception e) {
            // 異常日誌
            log.error("post error msg:", e);
            e.printStackTrace();
            // Print our stack trace
            StringBuffer eBuffer = new StringBuffer(e.getMessage() + ",");
            StackTraceElement[] trace = e.getStackTrace();
            for (StackTraceElement traceElement : trace) {
                eBuffer.append("\r\n " + traceElement);
            }
            HttpInHandler.writeJSON(ctx, ProtoMessage.getErrorResp(
                    ResultCode.SERVER_ERR, eBuffer.toString()));
        }
    }

    private void getHandle(final ChannelHandlerContext ctx,
            DefaultFullHttpRequest req) {
        QueryStringDecoder decoder = new QueryStringDecoder(req.getUri());
        Map<String, List<String>> params = decoder.parameters();
        List<String> typeList = params.get("type");
        if (Constants.MSG_LOG_DEBUG) {
            log.info("ip:{},read :{}", ctx.channel().remoteAddress(),
                    typeList.get(0));
        }
        writeJSON(ctx, HttpResponseStatus.NOT_IMPLEMENTED, "not implement");
    }

    /**
     * @Title: codeFilter
     * @Description: 編解碼過濾
     * @param val
     * @return
     * @throws UnsupportedEncodingException
     *             String
     * @throws
     */
    private String codeFilter(String val) throws UnsupportedEncodingException {
        val = val.contains("%") ? URLDecoder.decode(val, "UTF-8") : val;
        String valTmp = val;
        val = CODE_DEBUG ? XXTeaCoder.decryptBase64StringToString(val,
                XXTeaCoder.key) : val;
        if (Constants.MSG_LOG_DEBUG) {
            if (val == null) {
                val = valTmp;
            }
        }
        return val;
    }

    public static void writeJSON(ChannelHandlerContext ctx,
            HttpResponseStatus status, Object msg) {
        String sentMsg = null;
        if (msg instanceof String) {
            sentMsg = (String) msg;
        } else {
            sentMsg = JSON.toJSONString(msg);
        }
        sentMsg = CODE_DEBUG ? XXTeaCoder.encryptToBase64String(sentMsg,
                XXTeaCoder.key) : sentMsg;
        writeJSON(ctx, status,
                Unpooled.copiedBuffer(sentMsg, CharsetUtil.UTF_8));
        ctx.flush();
    }

    public static void writeJSON(ChannelHandlerContext ctx, Object msg) {
        String sentMsg = null;
        if (msg instanceof String) {
            sentMsg = (String) msg;
        } else {
            sentMsg = JSON.toJSONString(msg);
        }
        sentMsg = CODE_DEBUG ? XXTeaCoder.encryptToBase64String(sentMsg,
                XXTeaCoder.key) : sentMsg;
        writeJSON(ctx, HttpResponseStatus.OK,
                Unpooled.copiedBuffer(sentMsg, CharsetUtil.UTF_8));
        ctx.flush();
    }

    private static void writeJSON(ChannelHandlerContext ctx,
            HttpResponseStatus status, ByteBuf content /*
                                                         * , boolean isKeepAlive
                                                         */) {
        if (ctx.channel().isWritable()) {
            FullHttpResponse msg = null;
            if (content != null) {
                msg = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status,
                        content);
                msg.headers().set(HttpHeaders.Names.CONTENT_TYPE,
                        "application/json; charset=utf-8");
                msg.headers().set("userid", 101);
            } else {
                msg = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status);
            }
            if (msg.content() != null) {
                msg.headers().set(HttpHeaders.Names.CONTENT_LENGTH,
                        msg.content().readableBytes());
            }
            // not keep-alive
            ctx.write(msg).addListener(ChannelFutureListener.CLOSE);
        }
    }

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        log.error("netty exception:", cause);
    }
}

以上代碼實現了使用Netty中封裝的Http請求的解析類對消息進行Get或Post解析,並使用了Http相應的構造類對返回消息進行Http消息格式的構造。

4.MsgHandler

以上代碼包含了Netty中的Get請求和Post請求的解析處理,請求消息以及響應消息的XXTea加密解密等。其中,服務器接受到請求後,會將請求交給一個消息處理類進行具體的消息處理,消息處理器MsgHandler的代碼以下:

package com.kidbear._36.net;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.MessageSizeEstimator.Handle;

import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.alibaba.fastjson.JSONObject;
import com.kidbear._36.core.GameServer;
import com.kidbear._36.core.Router;
import com.kidbear._36.manager.log.LogMgr;
import com.kidbear._36.net.http.HttpInHandler;
import com.kidbear._36.task.ExecutorPool;

/**
 * @ClassName: MsgHandler
 * @Description: 消息處理器
 * @author 何金成
 * @date 2016年8月22日 下午12:04:23
 * 
 */
public class MsgHandler {
    public static Logger logger = LoggerFactory.getLogger(MsgHandler.class);
    public static MsgHandler handler;

    public static MsgHandler getInstance() {
        return handler == null ? new MsgHandler() : handler;
    }

    protected MsgHandler() {

    }

    /**
     * @Fields msgMap : 併發消息處理map
     */
    public static ConcurrentMap<Long, BlockingQueue<Message>> msgMap = new ConcurrentHashMap<Long, BlockingQueue<Message>>();

    public void handleMsg(Long userid, ProtoMessage msg,
            ChannelHandlerContext ctx) throws InterruptedException {
        // add message
        Message message = new Message();
        message.msg = msg;
        message.ctx = ctx;
        BlockingQueue<Message> queue = null;
        if (msgMap.containsKey(userid)) {
            queue = msgMap.get(userid);
            queue.put(message);
        } else {
            queue = new LinkedBlockingQueue<Message>();
            queue.put(message);
            msgMap.put(userid, queue);
        }
        // log
        LogMgr.getInstance().concurrentLog(msgMap);
        // handle message
        while (!queue.isEmpty()) {
            message = queue.take();
            if (queue.size() == 0) {
                msgMap.remove(userid);
            }
            handle(message);
        }
    }

    /**
     * @Title: addMsg
     * @Description: 添加消息處處理隊列
     * @param userid
     * @param msg
     * @param ctx
     * @throws InterruptedException
     *             void
     * @throws
     */
    public void addMsg(Long userid, ProtoMessage msg, ChannelHandlerContext ctx)
            throws InterruptedException {
        Message message = new Message();
        message.msg = msg;
        message.ctx = ctx;
        if (msgMap.containsKey(userid)) {
            BlockingQueue<Message> queue = msgMap.get(userid);
            queue.put(message);
        } else {
            BlockingQueue<Message> queue = new LinkedBlockingQueue<Message>();
            queue.put(message);
            msgMap.put(userid, queue);
        }
        LogMgr.getInstance().concurrentLog(msgMap);
    }

    /**
     * @Title: run
     * @Description: 處理消息隊列 void
     * @throws
     */
    public void run() {
        ExecutorPool.msgHandleThread.execute(new Runnable() {
            @Override
            public void run() {
                logger.info("消息處理線程開啓");
                while (!GameServer.shutdown) {
                    for (Iterator<Long> iterator = msgMap.keySet().iterator(); iterator
                            .hasNext();) {
                        Long userid = iterator.next();
                        BlockingQueue<Message> queue = msgMap.get(userid);
                        try {
                            Message msg = queue.take();
                            if (queue.size() == 0) {
                                iterator.remove();
                            }
                            handle(msg);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                            logger.error("msg handle err:{}", e);
                        }
                    }
                }
            }
        });
    }

    public void handle(final Message message) {
        ExecutorPool.channelHandleThread.execute(new Runnable() {
            @Override
            public void run() {
                Short typeid = message.msg.getTypeid();
                if (typeid == null) {
                    logger.error("沒有typeid");
                    HttpInHandler.writeJSON(message.ctx,
                            ProtoMessage.getErrorResp("沒有typeid"));
                    return;
                }
                JSONObject msgData = message.msg.getData();
                Router.getInstance().route(typeid, msgData,
                        message.msg.getUserid(), message.ctx);
            }
        });
    }
}

以上代碼包含handleMsg、handle和addMsg方法,msgMap中包含每一個用戶的userid哈希對應的消息處理隊列,本來個人設想是在服務器啓動時,調用MsgHandler的run方法啓動消息處理,無限循環的遍歷msgMap,來處理全部玩家的消息處理隊列,請求接入時,直接添加消息到msgMap的相應玩家的消息隊列,而後由這個run方法中的線程來處理全部的消息,後來考慮到效率問題,改成直接在HttpInHandler中調用handleMsg方法,直接處理消息請求。每一個玩家分配一個消息隊列來進行處理主要是爲了考慮到單個玩家的併發請求的狀況。hash使用ConcurrentMap主要是考慮到這個Map的併發使用情景,使用ConcurrentMap的桶鎖機制可讓它在併發情境中有更高的處理效率。

5.Message

MsgHandle中使用的Message類是對消息的封裝包括ProtoMessage和ChannelHandlerContext,代碼以下:

package com.kidbear._36.net;

import io.netty.channel.ChannelHandlerContext;

import java.util.concurrent.BlockingQueue;

public class Message {
    public ProtoMessage msg;
    public ChannelHandlerContext ctx;

    public Message() {
    }

    public Message(ProtoMessage msg, ChannelHandlerContext ctx) {
        this.msg = msg;
        this.ctx = ctx;
    }
}

6.ProtoMessage

ProtoMessage是通訊中對消息格式的封裝,消息格式定義爲:"{typeid:1,userid:1,data:{}}",typeid表明遊戲中接口的協議號,userid表明玩家id,data表明具體傳輸的數據,其代碼以下:

package com.kidbear._36.net;

import java.io.Serializable;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;

public class ProtoMessage implements Serializable {

    /**
     * @Fields serialVersionUID : TODO
     */
    private static final long serialVersionUID = -3460913241121151489L;
    private Short typeid;
    private Long userid;
    public JSONObject data;

    public ProtoMessage() {

    }

    public <T> ProtoMessage(Short typeid, T data) {
        this.typeid = typeid;
        this.setData(data);
    }

    public <T> ProtoMessage(T data) {
        this.setData(data);
    }

    public static ProtoMessage getResp(String msg, int code) {
        JSONObject ret = new JSONObject();
        ret.put("code", code);
        if (msg != null) {
            ret.put("err", msg);
        }
        return new ProtoMessage(ret);
    }

    public static ProtoMessage getSuccessResp() {
        return getResp(null, ResultCode.SUCCESS);
    }

    public static ProtoMessage getEmptyResp() {
        return new ProtoMessage();
    }

    public static ProtoMessage getErrorResp(String msg) {
        return getResp(msg, ResultCode.COMMON_ERR);
    }

    public static ProtoMessage getErrorResp(short id) {
        return getResp(null, ResultCode.COMMON_ERR);
    }

    public static ProtoMessage getErrorResp(int code) {
        return getResp(null, code);
    }

    public static ProtoMessage getErrorResp(int code, String msg) {
        return getResp(msg, code);
    }

    public JSONObject getData() {
        return this.data;
    }

    public void setData(JSONObject data) {
        this.data = data;
    }

    public <T> T getData(Class<T> t) {// 轉換爲對象傳遞
        return JSON.parseObject(JSON.toJSONString(data), t);
    }

    public <T> void setData(T t) {
        this.data = JSON.parseObject(JSON.toJSONString(t), JSONObject.class);
    }

    public Short getTypeid() {
        return typeid;
    }

    public void setTypeid(Short typeid) {
        this.typeid = typeid;
    }

    public Long getUserid() {
        return userid;
    }

    public void setUserid(Long userid) {
        this.userid = userid;
    }
}

7.HttpOutHandler

綁定在pipeLine中,負責處理相應消息,其實響應消息的處理在HttpInHandler的writeJSON方法中已經完成,使用DefaultFullHttpResponse對響應消息進行Http格式構造,而後調用ChannelHandlerContext的write方法直接write到消息管道中,而且在完成消息傳輸後自動關閉管道。而HttpOutHandler則只是截取響應消息並進行log打印輸出一下,而後繼續調用super發送出去,其接口及實現類代碼以下:
HttpOutHandler:

package com.kidbear._36.net.http;

import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;

public class HttpOutHandler extends ChannelHandlerAdapter {

    public HttpOutHandlerImp handler = new HttpOutHandlerImp();

    @Override
    public void write(ChannelHandlerContext ctx, Object msg,
            ChannelPromise promise) throws Exception {
        super.write(ctx, msg, promise);
        handler.write(ctx, msg, promise);
    }
}

HttpOutHandlerImp:

package com.kidbear._36.net.http;

import java.nio.charset.Charset;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.kidbear._36.util.Constants;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import io.netty.buffer.UnpooledUnsafeDirectByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.HttpHeaders;

public class HttpOutHandlerImp {
    public Logger logger = LoggerFactory.getLogger(HttpOutHandlerImp.class);

    public void write(ChannelHandlerContext ctx, Object msg,
            ChannelPromise promise) throws Exception {
        if (Constants.MSG_LOG_DEBUG) {
            DefaultFullHttpResponse resp = (DefaultFullHttpResponse) msg;
            logger.info("ip:{},write:{}", ctx.channel().remoteAddress(), resp
                    .content().toString(Charset.forName("UTF-8")));
        }
    }
}

4、總結

本章內容介紹咱們的這款遊戲的網絡通訊的處理方式,整體來講,對目前的策劃需求,以及目前的用戶量來講,這個通訊框架已經能知足,但客觀的說,這個網絡架構仍是存在不少問題的,好比通訊使用JSON字符串,使得通訊數據的大小沒有獲得很好地處理,若是使用ProtoBuffer這樣高效的二進制數據傳輸會有更小的數據傳輸量。另外,通訊徹底採用Http通訊,使得遊戲中一些須要實時展現的效果只能經過請求——響應式來獲取最新數據,好比遊戲中的郵件、戰報等功能,只能經過客戶端的不斷請求來獲取到最新消息,實時效果經過非實時通訊來實現,會有不少冗餘的請求,浪費帶寬資源,若是之後玩家數量太多,對網絡通訊這塊,咱們確定還會再進行優化。 下章內容,咱們會對遊戲中的數據緩存與存儲進行介紹。

相關文章
相關標籤/搜索