能耗監測平臺GPRS通信服務器的架構設計

在這個文章裏面我將用一個實際的案例來分享如何來構建一個可以接受3000+個鏈接的GPRS通信服務器軟件。在這裏,我將分享GPRS通信服務器設計過程當中面臨的問題,分享通信協議的設計,分享基於異步事件的設計,分享避免內存泄露的解決方案,分享軟件的發佈與升級方法,分享通信協議的單元測試構建等。數據庫

 

1 GPRS通信服務器軟件介紹緩存

首先咱們來看一下這個通信服務器軟件,以下圖所示。通信服務器軟件的做用是遵循國家能耗平臺技術導則的數據傳輸導則,與GPRS硬件進行通信,實現數據的遠程傳輸和遠程實時控制。安全

image

這個軟件的主要功能有:服務器

(1)接收GPRS採集器的鏈接,實現對採集器的控制;網絡

(2)實現能耗A~D類數據庫的管理。session

下面我來介紹通信服務器的設計方法和思路,接着再介紹如何實現。架構

 

2 通信服務器的設計模型併發

2.1 通信服務器架構框架

通信服務器架構採用的是異步事件 + 分層的體系結構。經過異步事件實現不一樣職責代碼的分離,利用分層將相同職責的代碼組織到同一個層次。總體設計以下所示。異步

image

通信服務器使用EventDispatcher實現不一樣事件類型的異步路由。通信服務器是整個系統的中心,它接收來自硬件層GPRS的鏈接(實際是HTTP的鏈接),爲每個鏈接建立一個會話(CommProtocol),每個會話使用一個線程(也支持線程池)進行通信,HttpCommServer實現會話的管理,此外,與領域層實現事件傳遞。領域層實現與上層應用的通信,包括:(1)將數據結果存儲到數據庫;(2)經過消息隊列接受來自硬件的通信指令;(3)與通信服務器打交道。通信協議層實現與不一樣鏈接的採集器進行通信,它是整個系統的難點。

 

2.2 通信協議層的設計

通信協議層核心類爲CommProtocol,它使用線程來與硬件通信,與硬件的通信過程被拆分一個個的對話,每個對話用一個RoundTrip類來表示,CommProtocol使用一個RoundTrip來存儲全部的對話,利用線程不停的輪詢存儲的對話,而後一個個的按順序/按優先級來執行對話。下圖是通信協議層的設計模型。

image

在與硬件的通信過程當中,通信以對話做爲單位的,以消息做爲對話的基石,一次對話實現一組消息的傳遞。通信協議中,消息有兩種類型:(1)服務器發送給硬件的消息稱爲主動消息;(2)硬件發送給服務器的消息稱爲被動消息。對話則有三種類型:(1)服務器發送消息給硬件,而後等待硬件的回覆消息或者不等待回覆,咱們稱之爲主動對話;(2)服務器等待硬件發送的數據,收到數據後給硬件回覆或者不回覆,咱們稱之爲被動對話;(3)以上兩者的組合,來實現一組控制指令的傳遞,咱們稱之爲組合對話。在這個模型中,服務器須要來控制硬件時,會調用CommProtocol的一個方法,好比QueryConfig方法,用於查詢硬件的配置信息,此時,將建立一個主動會話,而後發送到對話隊列中,對話處理線程將從對話隊列中按序取出對話,並執行;當對話隊列爲空時,對話處理線程將會使用被動對話類型註冊表,嘗試從通信鏈路獲取一條完整消息,而後建立一個被動對話並執行。在對話處理線程處理一個主動對話時,它一般是:(1)使用消息適配器發送一個消息,若是失敗後,會重試幾回;接着使用消息適配器來獲取一條響應或者直接返回,當消息發送時會拋出OnMessageSend事件,當對話成功時會發出OnCompleted事件,當失敗時拋出OnError事件。相似的,被動對話的設計也類似,不一樣的是,其消息已經提早收到了。下面咱們就來看看通信協議層詳細的設計。

 

2.3 通信協議層詳細設計

2.3.1 消息的設計

首先咱們先來看看公共建築數據傳輸規範裏面的消息定義方式。

下面咱們來看看消息類型的設計

image

在上述的消息定義中,MessageBase表示全部消息的基類由消息頭、消息體組成,它們都從MessagePart派生。每個消息頭由MessageHeader,它定義了能耗建築的建築物ID、採集器ID和消息類型。MessageSerializer消息序列化靜態類用於實現消息的解析與反解析。

 

如下XML格式是服務器配置數據採集器時的消息格式。通信時,服務器發送一個period類型的消息,用於配置採集器定時上報數據的間隔,而後數據採集器響應一條period_ack消息。

<?xml version="1.0" encoding="utf-8" ?>
<root>
  <!-- 通用部分 -->
  <!-- 
      building_id:樓棟編號
      gateway_id:採集器編號
      type:配置信息數據包的類型
      -->
  <common>
    <building_id>XXXXXX</building_id >
    <gateway_id>XXX</gateway_id >
    <type>以2種操做類型之一</type>
  </common>
  <!-- 配置信息 -->
  <!--操做有2種類型
          period:表示服務器對採集器採集週期的配置,period子元素有效
          period_ack:表示採集器對服務器採集週期配置信息的應答
       -->
  <config operation="period/period_ack">
    <period>15</period>
  </config>
</root>

 

根據規範的消息格式,咱們定義的配置消息由主動消息體、主動消息、被動消息體和被動消息四個類構成。

主動消息體定義以下。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Xml.Serialization;

namespace UIShell.EcmCommServerService.Protocol.Message
{
    [XmlRoot("config", Namespace = "", IsNullable = false)]
    public class ConfigActiveMessageBody : MessagePart
    {
        [XmlAttribute("operation")]
        public string Operation { get; set; }

        [XmlElement("period")]
        public int Period { get; set; }

        public ConfigActiveMessageBody()
        {
            Operation = "period";
        }
    }
}

 

主動消息定義以下。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Xml.Serialization;
using UIShell.EcmCommServerService.Utility;

namespace UIShell.EcmCommServerService.Protocol.Message
{
    [XmlRoot("root", Namespace = "", IsNullable = false)]
    public class ConfigActiveMessage : MessageBase
    {
        public static ConfigActiveMessage New(string buildingId, string gatewayId, int period)
        {
            var message = new ConfigActiveMessage();
            message.Header.BuildingId = buildingId;
            message.Header.GatewayId = gatewayId;
            message.Body.Period = period;

            return message;
        }
        
        [XmlElement("config")]
        public ConfigActiveMessageBody Body { get; set; }

        public ConfigActiveMessage()
            : base(StringEnum.GetStringValue(MessageType.Config_Period))
        {
            Body = new ConfigActiveMessageBody();
        }
    }
}

 

被動消息體定義以下。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Xml.Serialization;

namespace UIShell.EcmCommServerService.Protocol.Message
{
    [XmlRoot("config", Namespace = "", IsNullable = false)]
    public class ConfigAckPassiveMessageBody : MessagePart
    {
        [XmlAttribute("operation")]
        public string Operation { get; set; }

        public ConfigAckPassiveMessageBody()
        {
            Operation = "period_ack";
        }
    }
}

 

被動消息定義以下。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Xml.Serialization;
using UIShell.EcmCommServerService.Utility;

namespace UIShell.EcmCommServerService.Protocol.Message
{
    [XmlRoot("root", Namespace = "", IsNullable = false)]
    public class ConfigAckPassiveMessage : MessageBase
    {
        public static ConfigAckPassiveMessage New(string buildingId, string gatewayId)
        {
            var message = new ConfigAckPassiveMessage();
            message.Header.BuildingId = buildingId;
            message.Header.GatewayId = gatewayId;

            return message;
        }
        
        [XmlElement("config")]
        public ConfigAckPassiveMessageBody Body { get; set; }

        public ConfigAckPassiveMessage()
            : base(StringEnum.GetStringValue(MessageType.Config_Period_Ack))
        {
            Body = new ConfigAckPassiveMessageBody();
        }
    }
}

 

根據以上的模式,咱們爲能耗平臺定義的全部消息以下。

image

 

2.3.2 RoundTrip的設計

RoundTrip表示一次對話,由一組消息的交換來實現。RoundTrip有三種類型,其設計以下所示。RoundTripBase是對話的基類,它定義了OnCompleted、OnError異步事件、Start方法和其它基本屬性;ActiveRoundTripBase表示主動對話基類,表示服務器發送給採集器消息,而後等待或者不等待消息,這個類在RoundTripBase基礎上定義了OnMessageSend異步事件;PassiveRoundTripBase表示被動對話基類,定義了OnMessageReceived事件,表示已經從採集器接收到消息。這些基類都與領域知識無關,只是定義了對話基類所需的方法、屬性、事件。

image

ActiveRoundTrip則是針對能耗平臺定義的全部主動消息的基類,它定義了領域相關的屬性,實現了Start方法,並定義了相關抽象類。咱們來看一下Start方法,它首先調用Send方法來發送消息,而後拋出OnMessageSend異步事件,接着調用ReceiveResponseMessage嘗試從採集器收取消息而後拋出OnCompleted異步事件,這個過程若是失敗了,可以重試,不過若是重試也失敗,則拋出OnError事件。

public override void Start()
{
    Trace(string.Format("開始與集中器{0}會話。", ToBeSentMessage.Header.GatewayId));

    // 若是發送失敗,則重試。
    // 嘗試次數爲: 1 + 失敗時重複次數
    for (int i = 0; i <= MessageConstants.RetryTimesOnTimeout; i++)
    {
        try
        {
            Send();
            OnRoundTripMessageSend(new RoundTripEventArgs() { RoundTrip = this });
                    
            try
            {
                Trace(string.Format("開始第{0}次消息接收。", i + 1));
                ReceiveResponseMessages();
                OnRoundTripCompleted(new RoundTripEventArgs() { RoundTrip = this });
                break;
            }
            catch (Exception ex)
            {
                Trace(string.Format("第{0}次接收消息失敗,由於:{1},繼續嘗試。", i + 1, CommProtocol.GetErrorMessage(ex)));
                throw;
            }
        }
        catch (Exception ex)
        {
            Trace(string.Format("第{0}次發送命令失敗,由於:{1},繼續嘗試。", i + 1, CommProtocol.GetErrorMessage(ex)));
            if (i == MessageConstants.RetryTimesOnTimeout)
            {
                Trace(string.Format("第{0}次發送命令失敗,由於:{1},中止嘗試。", i + 1, CommProtocol.GetErrorMessage(ex)));
                OnRoundTripError(new RoundTripEventArgs() { RoundTrip = this, Exception = ex });
                throw;
            }
        }
    }

    Completed = true;

    if (ReceivedResponseMessages != null)
    {
        Trace(string.Format("當前會話'{0}'接收了{1}個響應消息,詳細以下:", RoundTripDescription, ReceivedResponseMessages.Length));
        foreach (var message in ReceivedResponseMessages)
        {
            //Trace("響應命令內容:" + ProtocolUtility.BytesToHexString(message.ToContent()));
            Trace("響應消息:" + message.ToString());
        }
    }
    else
    {
        Trace("當前會話接收的響應消息爲空。");
    }

    Trace(string.Format("與集中器{0}會話成功。", ToBeSentMessage.Header.GatewayId));
}

 

發送消息Send方法實現以下,它使用StreamAdapter來發送一條原始消息。

public override void Send()
{
    StreamAdapter.SendRawMessage(ToBeSentMessage.ToContent(), ToBeSentMessage.ToXmlContent());
}

 

而ReceiveResponseMessages方法則是一個抽象方法。

public abstract void ReceiveResponseMessages();

 

同理,PassiveRoundTrip則是針對能耗平臺定義的全部被動消息的基類,它定義了領域相關的屬性,實現了Start方法和相應的抽象方法。

public abstract TResponseMessage CreateResponseMessage();

public override void Receive()
{
    if (ReceivedMessage == null)
    {
        MessageHeader header;
        var receivedMessageContent = StreamAdapter.ReceiveRawMessage(BuildingId, GatewayId, ReceivedMessageType, out header);

        try
        {
            ReceivedMessage = MessageSerialiser.DeserializeRaw<TReceivedMessage>(receivedMessageContent);
        }
        catch (Exception ex)
        {
            throw new ReceiveMessageException("Parse the received message failed.", ex) { ErrorStatus = ReceiveMessageStatus.Failed };
        }
    }
}

public void SendResponseMessage()
{
    ResponsedMessage = CreateResponseMessage();
    if (ResponsedMessage != null)
    {
        //Trace("開始發送的響應消息內容:" + ProtocolUtility.BytesToHexString(ResponsedMessage.ToContent()));
        Trace("開始發送的響應消息:" + ResponsedMessage.ToXmlContent());

        StreamAdapter.SendRawMessage(ResponsedMessage.ToContent(), ResponsedMessage.ToString());
    }
    else
    {
        Trace("不發送響應消息。");
    }
}

public override void Start()
{
    Trace(string.Format("開始嘗試與集中器{0}進行被動式會話。", GatewayId));

    try
    {
        Receive();
        OnRoundTripMessageReceived(new RoundTripEventArgs() { RoundTrip = this });

        try
        {
            //Trace("接收到消息內容:" + ProtocolUtility.BytesToHexString(ReceivedMessage.ToContent()));
            Trace("接收到消息:" + ReceivedMessage.ToXmlContent());
            SendResponseMessage();
            Completed = true;
            OnRoundTripCompleted(new RoundTripEventArgs() { RoundTrip = this });
        }
        catch (Exception ex)
        {
            Trace(string.Format("嘗試發送響應消息到集中器{0}失敗,由於:{1}。", GatewayId, CommProtocol.GetErrorMessage(ex)));
            throw;
        }
    }
    catch (Exception ex)
    {
        Trace(string.Format("嘗試從集中器{0}接收消息失敗,由於:{1}。", GatewayId, CommProtocol.GetErrorMessage(ex)));
        OnRoundTripError(new RoundTripEventArgs() { RoundTrip = this, Exception = ex });
        throw;
    }

    Trace(string.Format("與集中器{0}進行被動式會話成功。", GatewayId));
}

 

組合對話CompositeRoundTrip是根據能耗平臺設計的,它比較簡單,主要是控制每條對話的執行時序,默認的實現就是按順序來執行每個對話。

public override void Start()
{
    int i = 1;
    int roundTripsCount = RoundTrips.Count;
    Trace(string.Format("開始組合會話,由{0}個子會話組成。", roundTripsCount));
    RoundTripBase roundTrip;
    while (RoundTrips.Count > 0)
    {
        roundTrip = RoundTrips.Dequeue();
        try
        {
            Trace(string.Format("開始執行第{0}個子會話。", i));
            roundTrip.Start();
            Trace(string.Format("第{0}個子會話執行完成。", i));
        }
        catch (Exception ex)
        {
            Trace(string.Format("組合會話失敗,第{0}個子會話執行失敗。", i));
            OnRoundTripError(new RoundTripEventArgs() { RoundTrip = roundTrip, Exception = ex });
            throw;
        }
        finally
        {
            roundTrip.Dispose();
        }
        i++;
    }
    Trace(string.Format("組合會話完成,由{0}個子會話組成。", roundTripsCount));
    OnRoundTripCompleted(new RoundTripEventArgs() { RoundTrip = this });
}

 

接下來咱們看看一個主動對話的實現,以ConfigActiveRoundTrip爲例。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Sockets;
using System.Text;
using UIShell.EcmCommServerService.Protocol.Message;
using UIShell.EcmCommServerService.Utility;

namespace UIShell.EcmCommServerService.Protocol.RoundTrip.Active
{
    public class ConfigActiveRoundTrip : ActiveRoundTrip<ConfigActiveMessage, ConfigAckPassiveMessage>
    {
        public ConfigActiveRoundTrip(
            string buildingId,
            string gatewayId,
            int period,
            MessageConstants messageConstants,
            TcpClient client)
            : base(buildingId, gatewayId, StringEnum.GetStringValue(MessageType.Config_Period_Ack),
            ConfigActiveMessage.New(buildingId, gatewayId, period),
            messageConstants, client)
        {
        }

        public override void ReceiveResponseMessages()
        {
            MessageHeader header;
            var messageContent = ReceiveRawMessage(out header);
            var message = MessageSerialiser.DeserializeRaw<ConfigAckPassiveMessage>(messageContent);
            ReceivedResponseMessages = new ConfigAckPassiveMessage[] { message };
        }
    }
}

 

下面再看看被動對話的實現,這是一條心跳檢測消息,由採集器定時發送給服務器來保持通信鏈路。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Sockets;
using System.Text;
using UIShell.EcmCommServerService.Protocol.Message;
using UIShell.EcmCommServerService.Utility;

namespace UIShell.EcmCommServerService.Protocol.RoundTrip.Passive
{
    public class HeartBeatPassiveRoundTrip : PassiveRoundTrip<HeartBeatNotifyPassiveMessage, HeartBeatTimeActiveMessage>
    {
        public HeartBeatPassiveRoundTrip(
            string buildingId,
            string gatewayId,
            MessageConstants messageConstants,
            TcpClient client)
            : base(buildingId, gatewayId, StringEnum.GetStringValue(MessageType.HeartBeat_Notify), messageConstants, client)
        {
            IsKeepAliveRoundTrip = true;
        }

        public HeartBeatPassiveRoundTrip(
            HeartBeatNotifyPassiveMessage receiveMessage,
            MessageConstants messageConstants,
            TcpClient client)
            : this(receiveMessage.Header.BuildingId, receiveMessage.Header.GatewayId, messageConstants, client)
        {
            ReceivedMessage = receiveMessage;
        }

        public override HeartBeatTimeActiveMessage CreateResponseMessage()
        {
            return HeartBeatTimeActiveMessage.New(BuildingId, GatewayId, DateTime.Now);
        }
    }
}

 

下面咱們看看一個比較複雜的對話,文件傳輸。文件傳輸的過程爲:(1)將文件分包,而後一包一包傳輸;(2)查詢缺包狀況;(3)若是有缺包,則繼續發送缺失的包,直至成功;若是沒有缺包,則傳輸完成。

public override void Start()
{
    // 檢查離線存儲區是否存在未傳輸完成的文件
    var item = ContinuousFileStorage.GetNotCompletedFile(BuildingId, GatewayId);

    if (item == null && Content == null) // 說明被調用的是protected的構造函數,用於檢測是否須要進行斷點續傳。
    {
        Trace(string.Format("集中器{0}不須要進行文件斷點續傳。", GatewayId));
        return;
    }

    try
    {
        if (item == null) // 從頭開始傳輸文件
        {
            Trace(string.Format("集中器{0}開始進行文件傳輸。", GatewayId));
            Trace(string.Format("文件名稱:{0},文件長度:{1},包大小:{2},包數:{3}。", FileName, Content.Length, PackageSize, PackageCount));
            // 建立離線存儲區
            ContinuousFileStorage.StartFileTransfer(BuildingId, GatewayId, FileType, FileName, Content, PackageSize);

            List<int> indexes = new List<int>();

            for (int index = 1; index <= PackageCount; index++)
            {
                indexes.Add(index);
            }

            SendFilePackage(indexes);
        }
        else // 開始斷點續傳
        {
            Trace(string.Format("集中器{0}上次文件傳輸未完成,繼續進行斷點續傳。", GatewayId));
            Trace(string.Format("斷點續傳的文件名稱:{0},文件長度:{1},包大小:{2},包數:{3}。", FileName, Content.Length, PackageSize, PackageCount));
        }

        // 查詢丟失的包並重傳
        if (QueryLostPackageAndResend())
        {
            // 刪除離線存儲區
            ContinuousFileStorage.EndFileTransfer(BuildingId, GatewayId, FileType, FileName, PackageSize);
            Trace(string.Format("集中器{0}文件傳輸成功。", GatewayId));
            OnRoundTripCompleted(new RoundTripEventArgs() { RoundTrip = this });
        }
        else
        {
            Trace(string.Format("集中器{0}文件傳輸未完成。", GatewayId));
            OnRoundTripError(new RoundTripEventArgs() { RoundTrip = this, Exception = new Exception(string.Format("集中器{0}文件傳輸未完成。", GatewayId)) });
            ContinuousFileStorage.IncrementFileTransferFailedCount(BuildingId, GatewayId);
        }
    }
    catch(Exception ex)
    {
        OnRoundTripError(new RoundTripEventArgs() { RoundTrip = this, Exception = ex });
        ContinuousFileStorage.IncrementFileTransferFailedCount(BuildingId, GatewayId);
    }
}

 

最後咱們看一下能耗平臺的對話類型,它由主動、被動和組合對話構成。

image

 

2.3.3 通信協議類的設計

通信協議類是系統的一個核心類,它爲每個通信鏈接建立了一個獨立的通信線程和對話隊列,並在隊列空閒的時候一直嘗試從鏈路中獲取被動消息,一旦有被動消息獲取,則建立被動對話,而後發送到隊列中。

如下方法是實現的核心,通信線程首先從對話隊列中獲取對話,而後運行該對話,若是對話拋出了CommStreamException,說明鏈路關閉,則須要中止當前通信協議;若是拋出了ThreadAboutException,說明被終止,則須要直接拋出異常;另外,若是對話隊列爲空時,則嘗試檢查被動對話。

public bool Start()
{
    if (_started)
    {
        return true;
    }

    FireOnStarting();

    _commThread = new Thread(() =>
    {

        RoundTripBase roundTrip;
        while (!_exited)
        {
            Monitor.Enter(_queue.SyncRoot);

            roundTrip = Dequeue();
            if (roundTrip != null)
            {
                try
                {
                    try
                    {
                        Monitor.Exit(_queue.SyncRoot);
                        OnRoundTripStartingHandler(this, new RoundTripEventArgs() { RoundTrip = roundTrip });
                        roundTrip.Start();
                    }
                    catch (ThreadAbortException)
                    {
                        Trace("通信線程被終止。");
                        throw;
                    }
                    catch (CommStreamException ex) // 沒法獲取Stream的時候,直接退出??須要加一個標誌位
                    // 須要拋出事件,通知後續處理,如將RoundTrip另存
                    {
                        _exited = true;
                        roundTrip.Trace("會話失敗,由於:鏈路已經關閉。");
                        _log.Error(string.Format("Start the round trip '{0}' error.", roundTrip), ex);
                    }
                    catch (Exception ex)
                    {
                        string error = GetErrorMessage(ex);
                        roundTrip.Trace(string.Format("會話失敗,由於:{0}。", error));
                        _log.Error(string.Format("Start the round trip '{0}' error.", roundTrip), ex);
                    }

                    if (!_exited)
                    {
                        roundTrip.Trace(Environment.NewLine);
                        OnRoundTripStartedHandler(this, new RoundTripEventArgs() { RoundTrip = roundTrip });
                    }
                    else
                    {
                        // 1 將當前失敗的RoundTrip保存入隊
                        FailedRoundTrips.Enqueue(roundTrip);
                        // 2 保存其它沒有處理的RoundTrip
                        do
                        {
                            roundTrip = _queue.Dequeue();
                            if (roundTrip != null)
                            {
                                FailedRoundTrips.Enqueue(roundTrip);
                            }
                        } while (roundTrip != null);
                        // 3 中止當前協議
                        Stop();
                    }
                    // 執行完RoundTrip後,開始清理資源
                    roundTrip.Dispose();
                }
                catch (ThreadAbortException)
                {
                    Trace("通信線程被終止。");
                    throw;
                }
                catch (Exception ex)
                {
                    _log.Error("Unhandled exception in CommProtocol.", ex);
                }
            }
            else
            {
                Monitor.Exit(_queue.SyncRoot);
                OnIdleHandler(this, new RoundTripEventArgs());
                ContinuousFileTransfer();
                try
                {
                    CheckPassiveRoundTripAvailableAndEnqueue();
                    if (_queue.Count == 0)
                    {
                        Thread.Sleep((int)MessageConstants.PassiveRoundTripCheckInterval.TotalMilliseconds);
                    }
                }
                catch (ThreadAbortException)
                {
                    Trace("通信線程被終止。");
                    throw;
                }
                catch (CommStreamException ex) // 沒法獲取Stream的時候,直接退出??須要加一個標誌位
                // 須要拋出事件,通知後續處理,如將RoundTrip另存
                {
                    _exited = true;
                    Trace("檢查被動消息失敗,由於:鏈路已經關閉。");
                    _log.Error("Check the passive message error.", ex);
                }
                catch (Exception ex)
                {
                    string error = GetErrorMessage(ex);
                    Trace(string.Format("檢查被動消息失敗,由於:{0}。", error));
                    _log.Error("Check the passive message error.", ex);
                }
                //_autoResetEvent.WaitOne();
            }
        }
    });

    _commThread.Start();
            
    _started = true;

    FireOnStarted();
    return true;
}

 

檢查被動對話的方法實現以下,首先檢查鏈路是否關閉,若是關閉,則直接中止協議;接着從共享緩存獲取被動消息,若是查找到被動消息,則建立被動對話,而後加入對話隊列;最後,嘗試從鏈路中讀取一條被動消息。

public void CheckPassiveRoundTripAvailableAndEnqueue()
{
    Trace("開始檢查被動通信。");

    if (!NetworkUtility.IsConnected(Client))
    {
        Trace("被動通信檢測時,鏈路已經關閉,關閉會話。");
        Stop();
        return;
    }

    // 1 Check ShardInputBuffer
    UIShell.OSGi.Utility.Tuple<MessageHeader, byte[]> tuple;
    foreach (var pair in _passiveRoundTripFactoryRegistry)
    {
        while ((tuple = SharedInputBuffer.FindAndThenRemove(BuildingId, GatewayId, pair.Key)) != null)
        {
            Trace(string.Format("從共享緩衝區獲取到一條被動消息,消息頭爲:'{0}'。", pair.Key));
            Enqueue(pair.Value(tuple.Item1, tuple.Item2));
        }
    }
    // 2 Check StreamAdapter
    if (Client.Available == 0)
    {
        Trace("通信鏈路沒有可用數據。");
        if (!NetworkUtility.IsConnected(Client))
        {
            Trace("被動通信檢測時,鏈路已經關閉,關閉會話。");
            Stop();
            return;
        }
    }
    else
    {
        while (Client.Available > 0)
        {
            MessageHeader header;
            var content = CommStreamAdapter.ReceiveOneRawMessage(out header);
            if (content != null && content.Length > 0 && header != null)
            {
                // 1 若是當前消息在被動檢測時收到,但不屬於註冊的被動消息,則放棄該消息。
                if (header.BuildingId.Equals(BuildingId) && header.GatewayId.Equals(GatewayId) && !_passiveRoundTripFactoryRegistry.ContainsKey(header.MessageType))
                {
                    Trace(string.Format("從通信鏈路獲取到一條被動消息,該消息不是註冊的被動消息,忽略它,被忽略的消息頭爲:'{0}'。", header));
                    continue;
                }
                // 2 若是當前消息在被動檢測時收到,並不屬於當前通信線程處理的範圍,則添加到共享緩衝區。
                // TODO: 這可能會產生一個Bug,若是接收到其它線程的消息時怎麼辦?
                else if (!header.BuildingId.Equals(BuildingId) || !header.GatewayId.Equals(GatewayId) || !_passiveRoundTripFactoryRegistry.ContainsKey(header.MessageType))
                {
                    Trace(string.Format("從通信鏈路獲取到一條被動消息,添加到共享緩衝區,消息頭爲:'{0}'。", header));
                    SharedInputBuffer.AddSharedBufferItem(new OSGi.Utility.Tuple<MessageHeader, byte[]> { Item1 = header, Item2 = content });
                }
                else // 3 若是是當前能夠處理的被動消息,則建立一個被動RoundTrip
                {
                    CreateRoundTripDelegate createRoundTrip;
                    if (_passiveRoundTripFactoryRegistry.TryGetValue(header.MessageType, out createRoundTrip))
                    {
                        Trace(string.Format("從通信鏈路獲取到一條被動消息,添加到通信隊列,消息頭爲:'{0}'。", header));
                        Enqueue(createRoundTrip(header, content));
                    }
                    else
                    {
                        Trace(string.Format("從通信鏈路獲取到一條被動消息,添加到共享緩衝區,消息頭爲:'{0}'。", header));
                        SharedInputBuffer.AddSharedBufferItem(new OSGi.Utility.Tuple<MessageHeader, byte[]> { Item1 = header, Item2 = content });
                    }
                }
            }
        }

        if (!NetworkUtility.IsConnected(Client))
        {
            Trace("被動通信檢測時,鏈路已經關閉,關閉會話。");
            Stop();
            return;
        }
    }

    Trace("檢查被動通信完成。");
}

 

通信協議使用RoundTripQueue來保存全部的對話,它是一個線程安全類,如下是Enqueue方法的實現。

public RoundTripQueue FailedRoundTrips = new RoundTripQueue();

public void Enqueue(RoundTripBase roundTrip)
{
    if (!_started)
    {
        throw new Exception("The protocol is not started yet or exited.");
    }
    _queue.Enqueue(roundTrip);
    OnRoundTripEnquedHandler(this, new RoundTripEventArgs() { RoundTrip = roundTrip });

    if (!(roundTrip.IsKeepAliveRoundTrip) || MessageConstants.ShowKeepAliveMessage)
    {
        roundTrip.OnTraceMessageAdded += DispatchAsyncTraceMessageAddedEvent;
    }
    try
    {
        _autoResetEvent.Set();
    }
    catch
    {
    }
}

 

上述3個方法實現了整個通信模型。對於主動對話,咱們還會爲通信協議建立一個相應的方法,並將對話加入到隊列中。下面是CommProtocol通信協議類中Config的方法實現,Public方法爲向領域層暴露的功能,而Internal方法則爲了內部的單元測試,其實現很是簡單。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using UIShell.EcmCommServerService.Protocol.RoundTrip.Active;

namespace UIShell.EcmCommServerService.Protocol
{
    public partial class CommProtocol
    {
        public void Config(
            int period,
            EventHandler<RoundTripEventArgs> onMessageSend,
            EventHandler<RoundTripEventArgs> onCompleted,
            EventHandler<RoundTripEventArgs> onError)
        {
            ConfigActiveRoundTrip roundTrip;
            Config(period, onMessageSend, onCompleted, onError, out roundTrip);
        }

        internal void Config(
            int period,
            EventHandler<RoundTripEventArgs> onMessageSend,
            EventHandler<RoundTripEventArgs> onCompleted,
            EventHandler<RoundTripEventArgs> onError, out ConfigActiveRoundTrip roundTrip)
        {
            var configRoundTrip = new ConfigActiveRoundTrip(BuildingId, GatewayId, period, MessageConstants, Client);
            if (onMessageSend != null)
            {
                configRoundTrip.OnMessageSend += onMessageSend;
            }
            if (onCompleted != null)
            {
                configRoundTrip.OnCompleted += onCompleted;
            }
            if (onError != null)
            {
                configRoundTrip.OnError += onError;
            }
            Enqueue(configRoundTrip);
            roundTrip = configRoundTrip;
        }
    }
}

 

2.3.4 通信服務的實現

通信服務器HttpCommServer用於打開一個TCP端口,接受TCP鏈接,當鏈接登陸成功後,爲每個鏈接建立一個會話,並與領域層業務邏輯關連。下面咱們看一下它的實現,其核心方法爲ListenGprsRequest,在該方法中,首先爲每個鏈接進行一次身份驗證,驗證經過後,建立一個會話,而後添加到會話列表中。

public partial class HttpCommServer : TrackableBase
{
    public MessageConstants MessageConstants {        get;        private set;    }
    public string IPAddressString    {        get;        private set;    }
    public int Port    {        get;        private set;    }
    public ThreadSafeList<CommProtocol> Sessions    {        get;        private set;    }

    private Thread _listenerThread;
    private TcpListener _listener;
    private volatile bool _exited;
    private ILog _log;
    private object _syncRoot = new object();

    public HttpCommServer(string ipaddress, int port, MessageConstants messageConstants)
    {
        IPAddressString = ipaddress;
        Port = port;
        Sessions = new ThreadSafeList<CommProtocol>();

        MessageConstants = messageConstants;

        _log = BundleActivator.LogService.CreateLog(BundleActivator.Bundle, GetType());

        RegisterDomainHandlerCreationDelegates();
    }

    public CommProtocol GetSession(string buildingId, string gatewayId)
    {
        return Sessions.Find(s => s.BuildingId.Equals(buildingId) && s.GatewayId.Equals(gatewayId));
    }

    public void Start()
    {
        lock (_syncRoot)
        {
            IPEndPoint local = new IPEndPoint(IPAddress.Parse(IPAddressString), Port);
            _listener = new TcpListener(local);
            _listener.Start();
            _listenerThread = new Thread(new ThreadStart(ListenGprsRequest));
            _listenerThread.Start();
        }
        OnSessionChanged += OnSessionChangedForDomain;
    }

    private void ListenGprsRequest()
    {
        while (!_exited)
        {
            // 接受一次鏈接
            if (!_exited)
            {
                try
                {
                    TcpClient tcpClient = null;
                    try
                    {
                        tcpClient = _listener.AcceptTcpClient();

                        Trace(string.Format("接收到來自IP地址'{0}'的鏈接。", (tcpClient.Client.RemoteEndPoint as IPEndPoint).Address));
                        _log.Info(string.Format("Accept new connection from ip '{0}'.", (tcpClient.Client.RemoteEndPoint as IPEndPoint).Address));
                        lock (_syncRoot)
                        {
                            if (!_exited)
                            {
                                var loginRoundTrip = new LoginCompositeRoundTrip(MessageConstants, tcpClient);
                                loginRoundTrip.ParentTracker = this;
                                loginRoundTrip.Start();
                                loginRoundTrip.Dispose();
                                var session = new CommProtocol(loginRoundTrip.BuildingId, loginRoundTrip.GatewayId, MessageConstants, tcpClient);
                                session.ParentTracker = this;
                                session.Start();
                                AddSession(session);
                                // 清空離線存儲區
                                ContinuousDataStorage.Reset(loginRoundTrip.GatewayId);
                                _log.Info(string.Format("Start the session for gateway '{0}' of the building '{1}'.", session.GatewayId, session.BuildingId));
                            }
                        }
                    }
                    catch (ThreadAbortException)
                    {
                        throw;
                    }
                    catch (Exception ex)
                    {
                        try
                        {
                            if (tcpClient != null) // 登陸失敗,斷開鏈接
                            {
                                tcpClient.Close();
                            }
                        }
                        catch { }
                        // Trace(string.Format("登陸失敗,失敗IP地址爲'{0}'。", tcpClient != null ? (tcpClient.Client.RemoteEndPoint as IPEndPoint).Address.ToString() : "N/A"));
                        _log.Error("The connection login failed.", ex);
                    }
                }
                catch (ThreadAbortException)
                {
                    throw;
                }
                catch (Exception ex)
                {
                    _log.Error("Can not listen any more.", ex);
                    break;
                }
            }
        }
    }

    public void Stop()
    {
        if (_exited)
        {
            return;
        }
            
        _log.Info("The server is stopping.");
        lock (_syncRoot)
        {
            _log.Info("The sessions are stopping.");

            // 不能使用 Sessions.ForEach(s => s.Stop()),這是由於s.Stop將會刪除Sessions
            // 從而改變ForEach的行爲,形成Session泄露。
            var sesions = Sessions.ToArray();
            foreach (var session in sesions)
            {
                session.Stop();
            }
            OnSessionChanged -= OnSessionChangedForDomain;
            _log.Info("The sessions are stopped and cleared.");
            _listener.Stop();

            _log.Info("The listener is stopped.");

            Thread.Sleep(1000);
            try
            {
                _listenerThread.Abort();
            }
            catch
            {
            }
            _log.Info("The listener thread is stopped.");
            _exited = true;                
        }
        _log.Info("The server is stopped.");
        SharedInputBuffer.ClearSharedBuffer();
    }

    public event EventHandler<SessionChangedEventArgs> OnSessionChanged;

    public void AddSession(CommProtocol session)
    {
        var oldSession = Sessions.Find(p => p.BuildingId.Equals(session.BuildingId) && p.GatewayId.Equals(session.GatewayId));
        if (oldSession != null)
        {
            RemoveSession(oldSession);
            _log.Info(string.Format("The session for gateway '{0}' of building '{1}' already existed, it will be deleted first.", session.GatewayId, session.BuildingId));
        }
            
        Sessions.Add(session);
        session.OnStopped += OnSessionStopped;
        _log.Info(string.Format("Add the session for gateway '{0}' of building '{1}'.", session.GatewayId, session.BuildingId));
        Trace(string.Format("爲採集器'{0}'建立通信會話,目前會話數目爲'{1}'。", session.GatewayId, SessionNumber));

        if (OnSessionChanged != null)
        {
            OnSessionChanged(this, new SessionChangedEventArgs() { ChangedAction = CollectionChangedAction.Add, BuildingId = session.BuildingId, GatewayId = session.GatewayId, Session = session });
        }
    }

    private void OnSessionStopped(object sender, EventArgs e)
    {
        RemoveSession(sender as CommProtocol);
    }

    public void RemoveSession(CommProtocol session)
    {
        session.OnStopped -= OnSessionStopped;
        Sessions.Remove(session);
        _log.Info(string.Format("Remove the session for gateway '{0}' of building '{1}'.", session.GatewayId, session.BuildingId));
        Trace(string.Format("集中器'{0}'通信會話已經斷開,目前會話數目爲'{1}'。", session.GatewayId, SessionNumber));
        if (OnSessionChanged != null)
        {
            OnSessionChanged(this, new SessionChangedEventArgs() { ChangedAction = CollectionChangedAction.Remove, BuildingId = session.BuildingId, GatewayId = session.GatewayId, Session = session });
        }
    }
}

 

關於通信服務器核心的實現已經介紹完成了,下面咱們來看看領域層的實現。

 

3 領域層的實現

本系統的核心設計是基於事件 + 分層的體系結構。通信服務器與數據採集器硬件的通信都與領域相關邏輯有關。爲了使程序設計更加簡單化,引入事件對各個層次的代碼解耦,經過事件來關聯領域知識與硬件的通信過程,這樣也方便通信協議層的測試。這裏HttpCommServer管理了全部的通信會話實例和對話-領域處理器管理。

image

如下是領域邏輯關聯的代碼。它的做用爲:1 監聽SessionChanged事件,爲每個Session的OnRoundTripEnqueued建立領域處理事件; 2 在領域處理事件中,爲RoundTrip關聯相應的領域處理類,領域處理類訂閱了RoundTrip的OnCompleted和OnError事件,在裏面進行相應處理。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using UIShell.EcmCommServerService.Domain;
using UIShell.EcmCommServerService.Protocol;
using UIShell.EcmCommServerService.Protocol.RoundTrip.Passive;
using UIShell.OSGi.Utility;

namespace UIShell.EcmCommServerService.Server
{
    public partial class HttpCommServer
    {
        /// <summary>
        /// 跟蹤每個Session的RoundTripEnqueued事件,當有RoundTrip註冊時,便註冊事件,處理領域知識。
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        private void OnSessionChangedForDomain(object sender, SessionChangedEventArgs e)
        {
            if (e.ChangedAction == OSGi.CollectionChangedAction.Add)
            {
                e.Session.OnRoundTripEnqued += OnSessionRoundTripEnqued;
            }
            else
            {
                e.Session.OnRoundTripEnqued -= OnSessionRoundTripEnqued;
            }
        }

        private Dictionary<Type, CreateDomainHandlerDelegate> _handlers = new Dictionary<Type, CreateDomainHandlerDelegate>();

        private void RegisterDomainHandlerCreationDelegates()
        {
            _handlers.Add(typeof(DataReportPassiveRoundTrip), roundTrip => new DataReportDomainHandler() { RoundTrip = roundTrip });
        }

        private void OnSessionRoundTripEnqued(object sender, RoundTripEventArgs e)
        {
            CreateDomainHandlerDelegate del;
            if (_handlers.TryGetValue(e.RoundTrip.GetType(), out del))
            {
                del(e.RoundTrip);
                _log.Info(string.Format("Create handler for RoundTrip '{0}' completed.", e.RoundTrip.GetType().FullName));
            }
            else
            {
                _log.Info(string.Format("The handler for RoundTrip '{0}' not found.", e.RoundTrip.GetType().FullName));
            }
        }
    }
}

 

下面是領域處理類RoundTripDomainHandler的基類。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using UIShell.EcmCommServerService.Protocol;
using UIShell.OSGi.Utility;

namespace UIShell.EcmCommServerService.Domain
{
    /// <summary>
    /// RoundTrip領域處理器,當RoundTrip操做成功時,將相應結果保存到數據庫;
    /// 相反,若是操做失敗,則須要作異常處理。
    /// </summary>
    /// <typeparam name="TRoundTrip">RoundTrip類型</typeparam>
    public abstract class RoundTripDomainHandler
    {
        private RoundTripBase _roundTrip;

        public RoundTripBase RoundTrip
        {
            get
            {
                return _roundTrip;
            }
            set
            {
                if (_roundTrip == null)
                {
                    AssertUtility.NotNull(value);
                    _roundTrip = value;
                    _roundTrip.OnCompleted += OnCompleted;
                    _roundTrip.OnError += OnError;
                }
            }
        }
        public abstract void OnCompleted(object sender, RoundTripEventArgs e);
        public abstract void OnError(object sender, RoundTripEventArgs e);        
    }

    public delegate RoundTripDomainHandler CreateDomainHandlerDelegate(RoundTripBase roundTrip);
}

 

如下則是數據上報對話的相關領域處理。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using UIShell.EcmCommServerService.Protocol;
using UIShell.EcmCommServerService.Protocol.RoundTrip.Passive;
using UIShell.EcmCommServerService.Server;

namespace UIShell.EcmCommServerService.Domain
{
    public class DataReportDomainHandler : RoundTripDomainHandler
    {
        public override void OnCompleted(object sender, RoundTripEventArgs e)
        {
            var dataReportRoundTrip = RoundTrip as DataReportPassiveRoundTrip;
            e.RoundTrip.Trace("開始將會話結果持久化到數據存儲。");
            // ...
            e.RoundTrip.Trace("將會話結果持久化到數據存儲成功。");
        }

        public override void OnError(object sender, RoundTripEventArgs e)
        {
            // 爲採集器關聯的Building建立一條失敗記錄
            // ...
        }
    }
}

領域處理類將會調用數據訪問模型來操做數據庫。整個通信服務器的大體實現已經介紹完成,接下來我將介紹一些很是有意思的技術細節。

 

4 通信服務器有意思的技術細節

4.1 共享緩存

按照個人理解,GPRS通信服務器指定端口的網絡存儲保存了與全部硬件設備的通信數據,所以,咱們須要來區分數據是由哪一個採集器發送過來的;此外,在通信過程當中,咱們須要處理好通信的時序,就是說服務器向採集器發送配置主動消息時,指望採集器響應一條結果,此時返回的消息多是其它消息,由於整個通信是雙工的,採集器也能夠主動向服務器發送消息。所以,咱們使用一個SharedInputBuffer來處理上述兩個問題。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using UIShell.OSGi.Collection;

namespace UIShell.EcmCommServerService.Protocol
{
    /// <summary>
    /// 添加共享輸入緩衝區的緣由以下:
    /// 1 對於GPRS服務器,全部的會話都將從同一個網絡數據緩衝區中讀取數據;
    /// 2 每個集中器對應一個通信會話;
    /// 3 這樣集中器A從網絡數據緩衝區讀取數據時,可能讀取到來自集中器B的數據,
    /// 所以,咱們須要使用緩衝區將集中器B的數據緩存起來,並繼續讀取直到讀取到
    /// A的數據或者讀取失敗;
    /// 4 此外,每個集中器讀取數據時,都先嚐試從共享緩衝區讀取數據,而後再
    /// 嘗試從網絡數據緩衝區讀取。
    /// </summary>
    public static class SharedInputBuffer
    {
        private static ThreadSafeList<UIShell.OSGi.Utility.Tuple<MessageHeader, byte[]>> _sharedBuffer = new ThreadSafeList<OSGi.Utility.Tuple<MessageHeader, byte[]>>();

        public static ThreadSafeList<UIShell.OSGi.Utility.Tuple<MessageHeader, byte[]>> SharedBuffer
        {
            get
            {
                return _sharedBuffer;
            }
        }

        public static int Count
        {
            get
            {
                using (var locker = SharedBuffer.Lock())
                {
                    return locker.Count;
                }
            }
        }

        public static UIShell.OSGi.Utility.Tuple<MessageHeader, byte[]> FindAndThenRemoveByMessageType(string type)
        {
            return FindAndThenRemove(p => p.Item1.MessageType.Equals(type));
        }

        public static UIShell.OSGi.Utility.Tuple<MessageHeader, byte[]> FindAndThenRemove(string buildingId, string gatewayId, string type)
        {
            return FindAndThenRemove(p => p.Item1.BuildingId.Equals(buildingId) && p.Item1.GatewayId.Equals(gatewayId) && p.Item1.MessageType.Equals(type));
        }

        public static UIShell.OSGi.Utility.Tuple<MessageHeader, byte[]> FindAndThenRemove(Predicate<UIShell.OSGi.Utility.Tuple<MessageHeader, byte[]>> predicate)
        {
            using (var locker = SharedBuffer.Lock())
            {
                if (locker.Count > 0)
                {
                    // 查找緩衝項
                    var item = SharedBuffer.Find(predicate);
                    if (item != null)
                    {
                        // 刪除並返回
                        RemoveSharedBufferItem(item);
                        return item;
                    }
                }
                return null;
            }
        }

        public static void AddSharedBufferItem(UIShell.OSGi.Utility.Tuple<MessageHeader, byte[]> item)
        {
            SharedBuffer.Add(item);
        }

        public static void RemoveSharedBufferItem(UIShell.OSGi.Utility.Tuple<MessageHeader, byte[]> item)
        {
            SharedBuffer.Remove(item);
        }

        public static void ClearSharedBuffer()
        {
            SharedBuffer.Clear();
        }
    }
}

 

4.2 內存泄露

在整個通信服務器中,整個通信過程當中,建立了大量的RoundTrip實例,通信服務器的運行是7 × 24小時 × 365天不間斷的運行,如何保證這個通信服務器在持久的運行中,內存不會持續增長/CPU不會持續增加,從而保證系統不會崩潰,是必須解決的一個問題。在系統運行初期,咱們很快就面臨這個問題的威脅,就是在系統運行中,內存一直在增加。所以,在初期,咱們使用CLR Profiler來調試系統的內存和CPU使用狀況,初步的調優記錄以下所示。

profiler

通過分析,發現新建的RoundTrip實例在對話執行完成後,並無被CLR回收,從而致使與RoundTrip關聯的類型都一直存留在內存中。瞭解.NET GC垃圾回收原理的同志應該知道,GC的條件是引用計數爲0。通過分析,發現RoundTrip沒有被釋放的緣由在於,咱們的UI訂閱了每個新建的RoundTrip的OnMessageSend/OnCompleted/OnError事件,用於打印通信過程當中的交互的全部消息,這些事件在RoundTrip執行完成後,沒有釋放,從而致使RoundTrip的引用計數始終不是0。

image

所以,咱們爲RoundTrip實現了IDisposable接口,在其實現中,來釋放全部的事件句柄。

private List<EventHandler<RoundTripEventArgs>> _onCompletedEventHandlers = new List<EventHandler<RoundTripEventArgs>>();
/// <summary>
/// 這是一個異步事件,避免在處理事件時,阻塞其它RoundTrip的運行。
/// </summary>
public event EventHandler<RoundTripEventArgs> OnCompleted
{
    add
    {
        _onCompletedEventHandlers.Add(value);
    }
    remove
    {
        _onCompletedEventHandlers.Remove(value);
    }
}

public override void Dispose()
{
    _onCompletedEventHandlers.Clear();
    _onErrorEventHandlers.Clear();
    base.Dispose();
}

在CommProtocol通信協議類中,每個RoundTrip執行完成後,都將調用Dispose方法。

try
{
    try
    {
        Monitor.Exit(_queue.SyncRoot);
        OnRoundTripStartingHandler(this, new RoundTripEventArgs() { RoundTrip = roundTrip });
        roundTrip.Start();
    }
    catch (ThreadAbortException)
    {
        Trace("通信線程被終止。");
        throw;
    }
    catch (CommStreamException ex) // 沒法獲取Stream的時候,直接退出??須要加一個標誌位
    // 須要拋出事件,通知後續處理,如將RoundTrip另存
    {
        _exited = true;
        roundTrip.Trace("會話失敗,由於:鏈路已經關閉。");
        _log.Error(string.Format("Start the round trip '{0}' error.", roundTrip), ex);
    }
    catch (Exception ex)
    {
        string error = GetErrorMessage(ex);
        roundTrip.Trace(string.Format("會話失敗,由於:{0}。", error));
        _log.Error(string.Format("Start the round trip '{0}' error.", roundTrip), ex);
    }
    // ......
    // 執行完RoundTrip後,開始清理資源
    roundTrip.Dispose();
}
catch (ThreadAbortException)
{
    Trace("通信線程被終止。");
    throw;
}
catch (Exception ex)
{
    _log.Error("Unhandled exception in CommProtocol.", ex);
}

 

4.3 單元測試

該通信協議的單元測試,有三個步驟:(1)在靜態類中啓動OSGi.NET插件框架;(2)在Setup方法中啓動服務器,服務器IP爲本機IP——127.0.0.1,而後建立一個TCP鏈接,模擬鏈接操做,首先先登陸;(3)執行一個RoundTrip測試,模擬服務器和GPRS鏈接客戶端的行爲。

如下方法用於啓動OSGi.NET插件框架。

[TestFixture]
    public partial class ProtocolTest
    {
        private HttpCommServer _commServer;
        private TcpClient _tcpClient;
        private CommProtocol _currentSession;
        private CommStreamAdapter _clientStremAdapter;
        public AutoResetEvent AutoResetEvent { get; set; }

        public const string BuildingId = "b001";
        public const string GatewayId = "g001";
        static ProtocolTest()
        {
            // 加載插件運行時,準備運行環境
            if (BundleRuntime.Instance == null)
            {
                BundleRuntime bundleRuntime = new BundleRuntime("../../../");
                bundleRuntime.Start();
            }
        }
    }

啓動通信服務器並模擬用戶登陸,_commServer爲服務器,_tcpClient爲模擬客戶端鏈接,_clientStreamAdapter爲客戶端鏈接適配器。

[SetUp]
public void Setup()
{
    AutoResetEvent = new AutoResetEvent(false);
    MessageConstants.GprsMessageConstants.Timeout = new TimeSpan(0, 0, 5);
    MessageConstants.GprsMessageConstants.RetryTimesOnTimeout = 1;
    _commServer = new HttpCommServer("127.0.0.1", 39999, MessageConstants.GprsMessageConstants);
    _commServer.OnTraceMessageAdded += (sender, e) => { Debug.WriteLine(e.Message); };
    _commServer.Start();

    _tcpClient = new TcpClient();
    _tcpClient.Connect("127.0.0.1", 39999);
    _clientStremAdapter = new CommStreamAdapter(_commServer.MessageConstants, _tcpClient);
    _clientStremAdapter.ParentTracker = _commServer;

    var request = ValidateRequestPassiveMessage.New(BuildingId, GatewayId);
    _clientStremAdapter.SendRawMessage(request.ToContent(), request.ToXmlContent());

    var prefix = ProtocolUtility.BytesToHexString(MessageConstants.XmlMessagePrefixBytes);
    var root = ProtocolUtility.BytesToHexString(MessageConstants.XmlMessageRootStartBytes);
    var common = ProtocolUtility.BytesToHexString(MessageConstants.XmlMessageCommonStartBytes);
    var commonend = ProtocolUtility.BytesToHexString(MessageConstants.XmlMessageCommonEndBytes);
    var rootend = ProtocolUtility.BytesToHexString(MessageConstants.XmlMessageRootEndBytes);

    MessageHeader header;
    var messageContent = _clientStremAdapter.ReceiveOneRawMessage(out header);

    Assert.AreEqual(header.BuildingId, BuildingId);
    Assert.AreEqual(header.GatewayId, GatewayId);
    Assert.AreEqual(header.MessageType, StringEnum.GetStringValue(MessageType.Validate_Sequence));

    var sequenceMessage = MessageSerialiser.DeserializeRaw<ValidateSequenceActiveMessage>(messageContent);
    string md5 = CreateSequenceAndHash(sequenceMessage.Body.Sequence);
    var md5Message = ValidateMd5PassiveMessage.New(BuildingId, GatewayId, md5);

    _clientStremAdapter.SendRawMessage(md5Message.ToContent(), md5Message.ToXmlContent());

    messageContent = _clientStremAdapter.ReceiveOneRawMessage(out header);

    Assert.AreEqual(header.BuildingId, BuildingId);
    Assert.AreEqual(header.GatewayId, GatewayId);
    Assert.AreEqual(header.MessageType, StringEnum.GetStringValue(MessageType.Validate_Result));

    var resultMessage = MessageSerialiser.DeserializeRaw<ValidateResultActiveMessage>(messageContent);
    Assert.AreEqual(resultMessage.Body.Result, "pass");

    while (_currentSession == null)
    {
        _currentSession = _commServer.Sessions.Find(s => s.BuildingId.Equals(BuildingId) && s.GatewayId.Equals(GatewayId));
        Thread.Sleep(1000);
    }
}

接着就能夠來定義一個測試。這個測試在OnMessageSend事件中,客戶端將模擬通信協議,發送一個響應消息。因爲通信過程是基於異步方式,咱們須要使用AutoResetEvent來等待對話完成信號。等對話執行完成時,再來檢查結果。

[Test]
public void ConfigRoundTrip()
{
    bool completed = false;
    Exception ex = null;
    MessageHeader receivedMessageHeader = null;
    ConfigActiveMessage receivedMessage = null;

    _currentSession.Config(10,
        (sender, e) => {
            var configMessage = _clientStremAdapter.ReceiveOneRawMessage(out receivedMessageHeader);
            receivedMessage = MessageSerialiser.DeserializeRaw<ConfigActiveMessage>(configMessage);

            var configAckMessage = ConfigAckPassiveMessage.New(BuildingId, GatewayId);
            _clientStremAdapter.SendRawMessage(configAckMessage.ToContent(), configAckMessage.ToXmlContent());
        },
        (sender, e) => {
            ex = e.Exception;
            completed = true;
            AutoResetEvent.Set();
        },
        (sender, e) => {
            ex = e.Exception;
            completed = false;
            AutoResetEvent.Set();
        });

    AutoResetEvent.WaitOne();

    Assert.IsTrue(completed);
    Assert.AreEqual(receivedMessageHeader.MessageType, StringEnum.GetStringValue(MessageType.Config_Period));
    Assert.AreEqual(receivedMessage.Body.Period, 10);
}

如下是單元測試的輸出消息。

------ Test started: Assembly: UIShell.EcmCommServerService.dll ------

[Id:1, 2013-07-07 19:17:16]接收到來自IP地址'127.0.0.1'的鏈接。
[Id:1, 2013-07-07 19:17:16]開始清空緩衝區。
[Id:1, 2013-07-07 19:17:16]清空緩衝區成功。
[Id:1, 2013-07-07 19:17:16]開始發送命令:<?xml version="1.0" encoding="utf-8"?><root><common><building_id>b001</building_id><gateway_id>g001</gateway_id><type>request</type></common><id_validate operation="request"></id_validate></root>
[Id:1, 2013-07-07 19:17:16]正在讀取消息,目前沒有可用數據,等待數據。
[Id:1, 2013-07-07 19:17:16]登陸組合會話開始。
[Id:1, 2013-07-07 19:17:16]開始嘗試與集中器N/A進行被動式會話。
[Id:4, 2013-07-07 19:17:16]接收到消息,消息頭爲:<?xml version="1.0" encoding="utf-8"?><common><building_id>b001</building_id><gateway_id>g001</gateway_id><type>request</type></common>。
[Id:1, 2013-07-07 19:17:16]接收到消息:<?xml version="1.0" encoding="utf-8"?><root><common><building_id>b001</building_id><gateway_id>g001</gateway_id><type>request</type></common><id_validate operation="request"></id_validate></root>
[Id:1, 2013-07-07 19:17:16]開始發送的響應消息:<?xml version="1.0" encoding="utf-8"?><root><common><building_id>b001</building_id><gateway_id>g001</gateway_id><type>sequence</type></common><id_validate operation="sequence"><sequence>14b12261-5182-47a6-bdfa-cf21e4e5cfd7-bcc63da5-5694-427f-b358-9a113125f74d</sequence></id_validate></root>
[Id:4, 2013-07-07 19:17:16]開始清空緩衝區。
[Id:4, 2013-07-07 19:17:16]清空緩衝區成功。
[Id:4, 2013-07-07 19:17:16]開始發送命令:<?xml version="1.0" encoding="utf-8"?><root><common><building_id>b001</building_id><gateway_id>g001</gateway_id><type>sequence</type></common><id_validate operation="sequence"><sequence>14b12261-5182-47a6-bdfa-cf21e4e5cfd7-bcc63da5-5694-427f-b358-9a113125f74d</sequence></id_validate></root>
[Id:1, 2013-07-07 19:17:16]與集中器g001進行被動式會話成功。
[Id:1, 2013-07-07 19:17:16]收到請求消息併發送序列'14b12261-5182-47a6-bdfa-cf21e4e5cfd7-bcc63da5-5694-427f-b358-9a113125f74d',該序列計算的MD5值爲'8224D3FC5FCC21E45E82FF5F9AB364CD'。
[Id:1, 2013-07-07 19:17:16]開始嘗試與集中器g001進行被動式會話。
[Id:6, 2013-07-07 19:17:16]共享緩衝區的消息數量:0。
[Id:6, 2013-07-07 19:17:16]正在讀取消息,目前沒有可用數據,等待數據。
[Id:1, 2013-07-07 19:17:19]接收到消息,消息頭爲:<?xml version="1.0" encoding="utf-8"?><common><building_id>b001</building_id><gateway_id>g001</gateway_id><type>sequence</type></common>。
[Id:1, 2013-07-07 19:17:19]開始清空緩衝區。
[Id:1, 2013-07-07 19:17:19]清空緩衝區成功。
[Id:1, 2013-07-07 19:17:19]開始發送命令:<?xml version="1.0" encoding="utf-8"?><root><common><building_id>b001</building_id><gateway_id>g001</gateway_id><type>md5</type></common><id_validate operation="md5"><md5>8224D3FC5FCC21E45E82FF5F9AB364CD</md5></id_validate></root>
[Id:1, 2013-07-07 19:17:19]正在讀取消息,目前沒有可用數據,等待數據。
[Id:6, 2013-07-07 19:17:19]接收到消息,消息頭爲:<?xml version="1.0" encoding="utf-8"?><common><building_id>b001</building_id><gateway_id>g001</gateway_id><type>md5</type></common>。
[Id:1, 2013-07-07 19:17:19]接收到消息:<?xml version="1.0" encoding="utf-8"?><root><common><building_id>b001</building_id><gateway_id>g001</gateway_id><type>md5</type></common><id_validate operation="md5"><md5>8224D3FC5FCC21E45E82FF5F9AB364CD</md5></id_validate></root>
[Id:1, 2013-07-07 19:17:19]開始發送的響應消息:<?xml version="1.0" encoding="utf-8"?><root><common><building_id>b001</building_id><gateway_id>g001</gateway_id><type>result</type></common><id_validate operation="result"><result>pass</result></id_validate></root>
[Id:6, 2013-07-07 19:17:19]開始清空緩衝區。
[Id:6, 2013-07-07 19:17:19]清空緩衝區成功。
[Id:6, 2013-07-07 19:17:19]開始發送命令:<?xml version="1.0" encoding="utf-8"?><root><common><building_id>b001</building_id><gateway_id>g001</gateway_id><type>result</type></common><id_validate operation="result"><result>pass</result></id_validate></root>
[Id:1, 2013-07-07 19:17:19]與集中器g001進行被動式會話成功。
[Id:1, 2013-07-07 19:17:19]登陸組合會話完成,登陸結果爲:成功。
[Id:1, 2013-07-07 19:17:19]爲採集器'g001'建立通信會話,目前會話數目爲'1'。

[Id:13, 2013-07-07 19:17:33]開始與集中器g001會話。
[Id:13, 2013-07-07 19:17:33]開始清空緩衝區。
[Id:13, 2013-07-07 19:17:33]清空緩衝區成功。
[Id:13, 2013-07-07 19:17:33]開始發送命令:<?xml version="1.0" encoding="utf-8"?><root><common><building_id>b001</building_id><gateway_id>g001</gateway_id><type>period</type></common><config operation="period"><period>10</period></config></root>
[Id:1, 2013-07-07 19:17:36]接收到消息,消息頭爲:<?xml version="1.0" encoding="utf-8"?><common><building_id>b001</building_id><gateway_id>g001</gateway_id><type>pack_lost</type></common>。
[Id:1, 2013-07-07 19:17:36]接收到消息,消息頭爲:<?xml version="1.0" encoding="utf-8"?><common><building_id>b001</building_id><gateway_id>g001</gateway_id><type>pack_lost</type></common>。
[Id:1, 2013-07-07 19:17:36]接收到消息,消息頭爲:<?xml version="1.0" encoding="utf-8"?><common><building_id>b001</building_id><gateway_id>g001</gateway_id><type>period</type></common>。
[Id:1, 2013-07-07 19:17:44]開始清空緩衝區。
[Id:1, 2013-07-07 19:17:44]清空緩衝區成功。
[Id:1, 2013-07-07 19:17:44]開始發送命令:<?xml version="1.0" encoding="utf-8"?><root><common><building_id>b001</building_id><gateway_id>g001</gateway_id><type>period_ack</type></common><config operation="period_ack"></config></root>
[Id:13, 2013-07-07 19:17:46]開始第1次消息接收。
[Id:13, 2013-07-07 19:17:46]共享緩衝區的消息數量:0。
[Id:1, 2013-07-07 19:17:46]開始檢查被動通信。
[Id:1, 2013-07-07 19:17:49]通信鏈路沒有可用數據。
[Id:13, 2013-07-07 19:17:46]當前會話'未知'接收了1個響應消息,詳細以下:
[Id:13, 2013-07-07 19:17:46]響應消息:<?xml version="1.0" encoding="utf-8"?><root><common><building_id>b001</building_id><gateway_id>g001</gateway_id><type>period_ack</type></common><config operation="period_ack"></config></root>
[Id:13, 2013-07-07 19:17:46]與集中器g001會話成功。


[Id:1, 2013-07-07 19:17:49]檢查被動通信完成。
[Id:1, 2013-07-07 19:17:50]通信線程被終止。

 

對於被動對話,其測試方法須要稍做改變,由於被動消息的發起在在通信會話檢測到有被動消息時纔會建立一個被動對話的,所以,咱們首先須要先模擬客戶端發送一條被動消息,並監聽當前會話的OnRoundTripStarted事件,以下所示。

[Test]
public void HeartBeatPassiveRoundTrip()
{
    bool completed = false;
    Exception ex = null;
    HeartBeatTimeActiveMessage responseMessage = null;

    _currentSession.OnRoundTripStarted += (sender, e) => {
        if (e.RoundTrip is HeartBeatPassiveRoundTrip)
        {
            var roundTrip = e.RoundTrip as HeartBeatPassiveRoundTrip;
            responseMessage = roundTrip.ResponsedMessage;
            ex = e.Exception;
            completed = true;
            AutoResetEvent.Set();
        }
    };

    var heartbeatMessage = HeartBeatNotifyPassiveMessage.New(BuildingId, GatewayId);
    _clientStremAdapter.SendRawMessage(heartbeatMessage.ToContent(), heartbeatMessage.ToXmlContent());

    AutoResetEvent.WaitOne();

    Assert.IsTrue(completed);
    Assert.NotNull(responseMessage);
    Assert.AreEqual(responseMessage.Header.MessageType, StringEnum.GetStringValue(MessageType.HeartBeat_Time));
}

 

4.4 程序的部署與升級

通信服務器軟件由軟件團隊開發,硬件團隊測試,而且須要部署到多個點。爲了不手工部署和升級麻煩,整個通信服務器基於開放工廠(http://www.iopenworks.com/)平臺開發,程序使用開放工廠提供的OSGi.NET插件框架(http://www.iopenworks.com/Products/SDKDownload)構建,使用開放工廠私有插件倉庫實現應用程序的自動升級。

 

如下是整個通信服務器的代碼。核心是一個CommServerService插件,實現了通信服務。

image

整個應用程序由12個插件構成,其它插件均爲開放工廠提供的插件和Web界面應用程序插件。

image

在發佈通信服務插件的時候,右鍵,點擊「發佈插件」菜單,便可將插件及其升級版本發佈到插件倉庫。

image

接着,在後面的頁面中輸入私有倉庫用戶名/密碼便可發佈。

image

發佈完成後,你能夠在私有倉庫中,查看到該插件的發佈版本。

image

發佈完成後,進入該系統插件管理頁面的私有倉庫,便可下載到最新升級包。以下所示。

image

下面就能夠下載安裝升級包了。

image

好了,整個GPRS通信服務器的構建方法就分享到這。

5 總結

(1)爲通信協議設計了一個好的模型,這個模型以消息、對話爲基礎;

(2)採用了不錯的架構,基於事件 + 分層,事件很是適用於異步處理和解耦,分層易於代理的理解和組織;

(3)很是OO,整個設計採用比較優雅的面向對象設計,遵照OO的設計原則SRP、OCP等;

(4)使用插件化的方法,進行模塊化開發;

(5)引用單元測試保證通信協議可測試性,避免與硬件聯調。

相關文章
相關標籤/搜索