使用Netty如何作到單機秒級接收35萬個對象

單純netty結合protostuff進行rpc對象傳輸的demo網上有不少,大部分都是一個模子刻出來的,一開始我也是抄了一個,本地測試暢通無阻,未發生任何異常。java

部署預發環境,進行壓測後,問題巨多,各類報錯層出不窮。固然,壓測時我用的數據量大、發送請求很是密集,單機是每秒前100ms發送2萬個對象,其餘900ms歇息,死循環發送,共計40臺機器做爲客戶端,同時往2臺netty Server服務器發送對象,那麼平均每一個server每秒大概要接收40萬個對象,因爲後面還有業務邏輯,邏輯每秒只能處理35萬實測。程序員

對於網上的代碼,進行了屢次修改,反覆測試,最終是達到了不報錯無異常,單機秒級接收35萬個對象以上,故寫篇文章記錄一下,文中代碼會和線上邏輯保持一致。數組

Protostuff序列化和反序列化

這個沒什麼特殊的,網上找個工具類就行了。緩存

引入pom安全

<protostuff.version>1.7.2</protostuff.version>
<dependency>
   <groupId>io.protostuff</groupId>
   <artifactId>protostuff-core</artifactId>
   <version>${protostuff.version}</version>
</dependency>

<dependency>
   <groupId>io.protostuff</groupId>
   <artifactId>protostuff-runtime</artifactId>
   <version>${protostuff.version}</version>
</dependency>


public class ProtostuffUtils {
   /**
    * 避免每次序列化都從新申請Buffer空間
    * 這句話在實際生產上沒有意義,耗時減小的極小,但高併發下,若是還用這個buffer,會報異常說buffer還沒清空,就又被使用了
    */
//    private static LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
   /**
    * 緩存Schema
    */
   private static Map<Class<?>, Schema<?>> schemaCache = new ConcurrentHashMap<>();

   /**
    * 序列化方法,把指定對象序列化成字節數組
    *
    * @param obj
    * @param <T>
    * @return
    */
   @SuppressWarnings("unchecked")
   public static <T> byte[] serialize(T obj) {
       Class<T> clazz = (Class<T>) obj.getClass();
       Schema<T> schema = getSchema(clazz);
       LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
       byte[] data;
       try {
           data = ProtobufIOUtil.toByteArray(obj, schema, buffer);
//            data = ProtostuffIOUtil.toByteArray(obj, schema, buffer);
       } finally {
           buffer.clear();
       }

       return data;
   }

   /**
    * 反序列化方法,將字節數組反序列化成指定Class類型
    *
    * @param data
    * @param clazz
    * @param <T>
    * @return
    */
   public static <T> T deserialize(byte[] data, Class<T> clazz) {
       Schema<T> schema = getSchema(clazz);
       T obj = schema.newMessage();
       ProtobufIOUtil.mergeFrom(data, obj, schema);
//        ProtostuffIOUtil.mergeFrom(data, obj, schema);
       return obj;
   }

   @SuppressWarnings("unchecked")
   private static <T> Schema<T> getSchema(Class<T> clazz) {
       Schema<T> schema = (Schema<T>) schemaCache.get(clazz);
       if (Objects.isNull(schema)) {
           //這個schema經過RuntimeSchema進行懶建立並緩存
           //因此能夠一直調用RuntimeSchema.getSchema(),這個方法是線程安全的
           schema = RuntimeSchema.getSchema(clazz);
           if (Objects.nonNull(schema)) {
               schemaCache.put(clazz, schema);
           }
       }

       return schema;
   }
}

此處有坑,就是最上面大部分網上代碼都是用了static的buffer。在單線程狀況下沒有問題。在多線程狀況下,很是容易出現buffer一次使用後還沒有被clear,就再次被另外一個線程使用,會拋異常。而所謂的避免每次都申請buffer空間,實測性能影響極其微小。服務器

另裏面兩次ProtostuffIOUtil都改爲了ProtobufIOUtil,由於也是出過異常,修改後未見有異常。多線程

自定義序列化方式

解碼器decoder:併發

import com.jd.platform.hotkey.common.model.HotKeyMsg;
import com.jd.platform.hotkey.common.tool.ProtostuffUtils;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;

import java.util.List;

/**
* @author wuweifeng
* @version 1.0
* @date 2020-07-29
*/
public class MsgDecoder extends ByteToMessageDecoder {
   @Override
   protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf in, List<Object> list) {
       try {

           byte[] body = new byte[in.readableBytes()];  //傳輸正常
           in.readBytes(body);

           list.add(ProtostuffUtils.deserialize(body, HotKeyMsg.class));

//            if (in.readableBytes() < 4) {
//                return;
//            }
//            in.markReaderIndex();
//            int dataLength = in.readInt();
//            if (dataLength < 0) {
//                channelHandlerContext.close();
//            }
//            if (in.readableBytes() < dataLength) {
//                in.resetReaderIndex();
//                return;
//            }
//
//            byte[] data = new byte[dataLength];
//            in.readBytes(data);
//
//            Object obj = ProtostuffUtils.deserialize(data, HotKeyMsg.class);
//            list.add(obj);
       } catch (Exception e) {
           e.printStackTrace();
       }
   }
}

編碼器 encoderapp

import com.jd.platform.hotkey.common.model.HotKeyMsg;
import com.jd.platform.hotkey.common.tool.Constant;
import com.jd.platform.hotkey.common.tool.ProtostuffUtils;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;

/**
* @author wuweifeng
* @version 1.0
* @date 2020-07-30
*/
public class MsgEncoder extends MessageToByteEncoder {

   @Override
   public void encode(ChannelHandlerContext ctx, Object in, ByteBuf out) {
       if (in instanceof HotKeyMsg) {
           byte[] bytes = ProtostuffUtils.serialize(in);
           byte[] delimiter = Constant.DELIMITER.getBytes();

           byte[] total = new byte[bytes.length + delimiter.length];
           System.arraycopy(bytes, 0, total, 0, bytes.length);
           System.arraycopy(delimiter, 0, total, bytes.length, delimiter.length);

           out.writeBytes(total);
       }
   }
}

先看Decoder解碼器,這個是用來netty收到消息後,進行解碼,將字節轉爲對象(自定義的HotKeyMsg)用的。裏面有一堆被我註釋掉了,註釋掉的,應該在網上找到的帖子都是那麼寫的。這種方式自己在普通場景下是沒問題的,解碼還算正常,可是當上幾十萬時很是容易出現粘包問題。因此我是在這個解碼器前增長了一個DelimiterBasedFrameDecoder分隔符解碼器。ide

當收到消息時,先過這個分隔符解碼器,以後到MsgDecoder那裏時,就是已經分隔好的一個對象字節流了,就能夠直接用proto工具類進行反序列化的。Constant.DELIMITER是我自定義的一個特殊字符串,用來作分隔符。

再看encoder,編碼器,首先將要傳輸的對象用ProtostuffUtils序列化爲byte[],而後在尾巴上掛上我自定義的那個分隔符。這樣在對外發送對象時,就會走這個編碼器,並被加上分隔符。

對應的server端代碼大概是這樣:

1.png

2.png

3,.png

以後在Handler裏就能夠直接使用這個傳輸的對象了。

再看client端

4,.png

和Server端是同樣的,也是這幾個編解碼器,沒有區別。由於netty和server之間通信,我都是用的同一個對象定義。

5.png

同理handler也是同樣的。

單機和集羣

以上都寫完後,其實就能夠測試了,咱們能夠啓動一個client,一個server,而後搞個死循環往Server發這個對象了,而後你在server端在收到這個對象後,再直接把這個對象也寫回來,原樣發送到客戶端。會發現運行的很順暢,每秒發N萬個沒問題,編解碼都正常,client和server端都比較正常,當前前提是ProtoBuf的工具類和個人同樣,不要共享那個buffer。網上找的文章基本上到這樣也就結束了,隨便發幾個消息沒問題也就算OK。然而實際上,這種代碼上線後,會坑的不要不要的。

其實本地測試也很容易,再啓動幾個客戶端,都連同一個Server,而後給他死循環發對象,再看看兩端會不會有異常。這種狀況下,和第一種的區別其實客戶端沒什麼變化,Server端就有變化了,以前同時只給一個client發消息,如今同時給兩個client發消息,這一步若是不謹慎就會出問題了,建議自行嘗試。

以後,咱們再加點料,我啓動兩個Server,分別用兩個端口,線上實際上是兩臺不一樣的server服務器,client會同時往兩臺server死循環發對象,以下圖代碼。

發消息,咱們經常使用的就是channel.writeAndFlush(),你們能夠把那個sync去掉,而後跑一下代碼看看。會發現異常拋的一坨一坨的。咱們明明是往兩個不一樣的channel發消息,只不過期間是同時,結果就是發生了嚴重的粘包。server端收到的消息不少都是不規範的,會大量報錯。若是在兩個channel發送間隔100ms,狀況就解決了。固然,最終咱們可使用sync同步發送,這樣就不會拋異常了。

6,.png

以上代碼經測試,40臺client,2臺Server,平均每一個server每秒大概接收40萬個對象,能夠持續穩定運行。

最後

感謝你們看到這裏,文章有不足,歡迎你們指出;若是你以爲寫得不錯,那就給我一個贊吧。

也歡迎你們關注個人公衆號:程序員麥冬,麥冬天天都會分享java相關技術文章或行業資訊,歡迎你們關注和轉發文章!

相關文章
相關標籤/搜索