概述html
雙工(Duplex)模式的消息交換方式體如今消息交換過程當中,參與的雙方都可以向對方發送消息。基於雙工MEP消息交換能夠當作是多個基本模式下(好比請求-回覆模式和單項模式)消息交換的組合。雙工MEP又具備一些變體,好比典型的訂閱-發佈模式就能夠當作是雙工模式的一種表現形式。雙工消息交換模式使服務端回調(Callback)客戶端操做成爲可能。服務器
在Wcf中不是全部的綁定協議都支持回調操做,BasicHttpBinding,WSHttpBinding綁定協議不支持回調操做;NetTcpBinding和NetNamedPipeBinding綁定支持回調操做;WSDualHttpBinding綁定是經過設置兩個HTTP信道來支持雙向通訊,因此它也支持回調操做。併發
兩種典型的雙工MEPtcp
1.請求過程當中的回調this
這是一種比較典型的雙工消息交換模式的表現形式,客戶端在進行服務調用的時候,附加上一個回調對象;服務在對處理該處理中,經過客戶端附加的回調對象(其實是調用回調服務的代理對象)回調客戶端的操做(該操做在客戶端執行)。整個消息交換的過程實際上由兩個基本的消息交換構成,其一是客戶端正常的服務請求,其二則是服務端對客戶端的回調。二者能夠採用請求-回覆模式,也能夠採用單向(One-way)的MEP進行消息交換。下圖描述了這樣的過程,服務調用和回調都採用請求-回覆MEP。3d
2.訂閱-發佈代理
訂閱-發佈模式是雙工模式的一個典型的變體。在這個模式下,消息交換的雙方變成了訂閱者和發佈者,若干訂閱者就某個主題向發佈者申請訂閱,發佈者將全部的訂閱者保存在一個訂閱者列表中,在某個時刻將主題發送給該主題的全部訂閱者。實際上基於訂閱-發佈模式的消息交換也能夠當作是兩個基本模式下消息交換的組合,申請訂閱是一個單向模式的消息交換(若是訂閱者行爲獲得訂閱的回饋,該消息交換也能夠採用請求-回覆模式);而主題發佈也是一個基於單向模式的消息交換過程。訂閱-發佈消息交換模式以下圖所示。xml
示例htm
接下來咱們將會建立一個簡單的Wcf通訊服務,包括使使用NetTcpBinding實現雙工通訊,和監控雙工通訊過程當中的客戶端和服務端一方斷開後的捕捉事件。對象
項目如圖所示
第一步:
先建立IGateWayService和INotifyCallBack接口
[ServiceContract(CallbackContract = typeof(INotifyCallBack))] public interface IGateWayService { [OperationContract] void RegisterClient(string clientName); [OperationContract] string GetData(int value); [OperationContract] CompositeType GetDataUsingDataContract(CompositeType composite); } // 使用下面示例中說明的數據約定將複合類型添加到服務操做。 [DataContract] public class CompositeType { bool boolValue = true; string stringValue = "Hello "; [DataMember] public bool BoolValue { get { return boolValue; } set { boolValue = value; } } [DataMember] public string StringValue { get { return stringValue; } set { stringValue = value; } } }
INotifyCallBack.cs以下:
public interface INotifyCallBack { [OperationContract(IsOneWay = true)] void NotifyFunction(string sender); }
記住在IGateWayService接口上方設置Attribute [ServiceContract(CallbackContract = typeof(INotifyCallBack))] 這樣設置表示這個接口是支持回調的。
接下來定義一個ClientRegisterInfo.cs來定義客戶端的名字和客戶端的INotifyCallBack屬性,再定義一個Timer 來調用INotifyCallBack給客戶端發送消息。再經過
wcf 的ICommunicationObject來定義通訊出錯和關閉的事件。
public class ClientRegisterInfo { public ClientRegisterInfo() { _senderTimer.Elapsed += OnSenderMessage; _senderTimer.Start(); } private void OnSenderMessage(object sender, ElapsedEventArgs e) { if (_notifyCallBack != null) { var communication = _notifyCallBack as ICommunicationObject; if(communication.State==CommunicationState.Opened) _notifyCallBack.NotifyFunction(DateTime.Now.ToString()); } } public Timer _senderTimer=new Timer(10*1000); private INotifyCallBack _notifyCallBack; public INotifyCallBack NotifyCallBack { get { return _notifyCallBack; } set { lock (_syncNotifyObj) { _notifyCallBack = value; if (_notifyCallBack != null) { var communication = _notifyCallBack as ICommunicationObject; if (communication != null) { communication.Closed += OnChannelClose; communication.Faulted += OnChannelFault; } } } } } private readonly object _syncNotifyObj = new object(); private void OnChannelFault(object sender, EventArgs e) { ClientInfoCache.Instance.Remove(this); } private void OnChannelClose(object sender, EventArgs e) { ClientInfoCache.Instance.Remove(this); } public string ClientName { get; set; } }
再定義一個單例來保存客戶端的信息。
public class ClientInfoCache { private static readonly object SyncObj = new object(); private static ClientInfoCache _instance; public static ClientInfoCache Instance { get { lock (SyncObj) { if (_instance == null) _instance = new ClientInfoCache(); } return _instance; } } private ClientInfoCache() { _clientList = new List<ClientRegisterInfo>(); } private List<ClientRegisterInfo> _clientList; private static object SyncOperator = new object(); /// <summary> /// Add client entity /// </summary> /// <param name="entity">client entity</param> public void Add(ClientRegisterInfo entity) { if (entity == null) return; lock (SyncOperator) { var findClient = _clientList.FirstOrDefault( t => t.ClientName.Equals(entity.ClientName, StringComparison.OrdinalIgnoreCase)); if (findClient == null) _clientList.Add(entity); else { findClient.NotifyCallBack = entity.NotifyCallBack; } } } /// <summary> /// Remove client /// </summary> /// <param name="entity">Client entity</param> public void Remove(ClientRegisterInfo entity) { lock (SyncOperator) { _clientList.Remove(entity); } } }
再新建個控制檯運應程序來啓動Wcf,代碼以下:
public class Program { static void Main(string[] args) { StartListener(); } private static void StartListener() { try { using (var host = new ServiceHost(typeof(GateWayService))) { host.Opened += delegate { Console.WriteLine("[Server] Begins to listen request on " + host.BaseAddresses[0]); }; host.Open(); Console.Read(); } } catch (Exception ex) { } } }
在App.config設置配置以下:
<?xml version="1.0" encoding="utf-8" ?> <configuration> <system.serviceModel> <bindings> <netTcpBinding> <binding name="longTimeoutBinding" closeTimeout="01:10:00" openTimeout="01:10:00" receiveTimeout="10:10:00" sendTimeout="10:10:00" maxBufferPoolSize="655350000" maxBufferSize="655350000" maxReceivedMessageSize="655350000"> <readerQuotas maxDepth="32" maxStringContentLength="655350000" maxArrayLength="655350000" maxBytesPerRead="655350000" maxNameTableCharCount="655350000" /> <reliableSession inactivityTimeout="23:59:59" /> <security mode="None" /> </binding> </netTcpBinding> </bindings> <behaviors> <serviceBehaviors> <behavior name="NewBehavior"> <serviceMetadata httpGetEnabled="True" httpGetUrl="Http://localhost:7789/" httpsGetEnabled="True"/> <serviceDebug includeExceptionDetailInFaults="False" /> <serviceThrottling maxConcurrentCalls="1000" maxConcurrentSessions="1000" maxConcurrentInstances="1000" /> </behavior> </serviceBehaviors> </behaviors> <services> <service name="WcfService.GateWayService" behaviorConfiguration="NewBehavior" > <endpoint address="net.tcp://localhost:7788/GatewayService.svc" binding="netTcpBinding" contract="WcfService.IGateWayService" name="WcfService_GateWayService" bindingConfiguration="longTimeoutBinding" > </endpoint> <endpoint address="mex" binding="mexTcpBinding" contract="IMetadataExchange" ></endpoint> <host > <baseAddresses > <add baseAddress="net.tcp://localhost:7788/GatewayService.svc" /> <add baseAddress="Http://localhost:7789/" /> </baseAddresses> </host > </service> </services> </system.serviceModel> </configuration>
longTimeoutBinding是設置傳輸的屬性,如最大傳輸大小,TimeOut的時間等。
在客戶端新建個WcfCallBack.cs 繼承IGateWayServiceCallback接口,代碼以下。
[CallbackBehavior(ConcurrencyMode = ConcurrencyMode.Multiple)] public class WcfCallBack : IGateWayServiceCallback { public void NotifyFunction(string sender) { Console.WriteLine("Get a message,message info is {0}", sender); } }
設置屬性[CallbackBehavior(ConcurrencyMode = ConcurrencyMode.Multiple)]表示服務器是經過併發的給客戶端來發送消息的。
控制檯代碼以下
class Program { private static GateWayServiceClient _client; static void Main(string[] args) { var cb = new WcfCallBack(); var context = new InstanceContext(cb); _client = new GateWayServiceClient(context); _client.RegisterClient("Test1"); ((ICommunicationObject)_client).Closed += OnChannelClose; ((ICommunicationObject)_client).Faulted += OnChannelFaulted; Console.WriteLine("Input Q to exit."); while (string.Compare(Console.ReadLine(), ConsoleKey.Q.ToString(), StringComparison.OrdinalIgnoreCase) != 0) { } } private static void OnChannelFaulted(object sender, EventArgs e) { if (FaultedEvent != null) FaultedEvent(sender, e); } private static void OnChannelClose(object sender, EventArgs e) { if (CloseEvent != null) CloseEvent(sender, e); } public static EventHandler CloseEvent; public static EventHandler FaultedEvent; }
運行的結果以下圖:
當我關閉客戶端時,能捕捉到Closed和Faulted事件
當我關閉服務端時,在客戶端能捕捉到Faulted事件
總結:
Wcf 通訊使用簡單,功能豐富。