0.背景簡介數組
微軟在 .NET 框架中提供了多種實用的線程同步手段,其中包括 monitor 類及 reader-writer鎖。但跨進程的同步方法仍是很是欠缺。另外,目前也沒有方便的線程間及進程間傳遞消息的方法。例如C/S和SOA,又或者生產者/消費者模式中就經常須要傳遞消息。爲此我編寫了一個獨立完整的框架,實現了跨線程和跨進程的同步和通信。這框架內包含了信號量,信箱,內存映射文件,阻塞通道,及簡單消息流控制器等組件。這篇文章裏提到的類同屬於一個開源的庫項目(BSD許可),你能夠從這裏下載到 www.cdrnet.net/projects/threadmsg/.安全
這個框架的目的是:架構
- 封裝性:經過MSMQ消息隊列發送消息的線程無需關心消息是發送到另外一個線程仍是另外一臺機器。
- 簡單性:向其餘進程發送消息只需調用一個方法。
注意:我刪除了本文中所有代碼的XML註釋以節省空間。若是你想知道這些方法和參數的詳細信息,請參考附件中的代碼。併發
1.先看一個簡單例子app
使用了這個庫後,跨進程的消息傳遞將變得很是簡單。我將用一個小例子來做示範:一個控制檯程序,根據參數能夠做爲發送方也能夠做爲接收方運行。在發送程序裏,你能夠輸入必定的文本併發送到信箱內(返回key),接收程序將顯示全部從信箱內收到的消息。你能夠運行無數個發送程序和接收程序,可是每一個消息只會被具體的某一個接收程序所收到。
框架
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
|
[Serializable]
struct
Message
{
public
string
Text;
}
class
Test
{
IMailBox mail;
public
Test()
{
mail =
new
ProcessMailBox(
"TMProcessTest"
,1024);
}
public
void
RunWriter()
{
Console.WriteLine(
"Writer started"
);
Message msg;
while
(
true
)
{
msg.Text = Console.ReadLine();
if
(msg.Text.Equals(
"exit"
))
break
;
mail.Content = msg;
}
}
public
void
RunReader()
{
Console.WriteLine(
"Reader started"
);
while
(
true
)
{
Message msg = (Message)mail.Content;
Console.WriteLine(msg.Text);
}
}
[STAThread]
static
void
Main(
string
[] args)
{
Test test =
new
Test();
if
(args.Length > 0)
test.RunWriter();
else
test.RunReader();
}
}
|
信箱一旦建立以後(這上面代碼裏是 ProcessMailBox ),接收消息只須要讀取 Content 屬性,發送消息只須要給這個屬性賦值。當沒有數據時,獲取消息將會阻塞當前線程;發送消息時若是信箱裏已經有數據,則會阻塞當前線程。正是有了這個阻塞,整個程序是徹底基於中斷的,而且不會過分佔用CPU(不須要進行輪詢)。發送和接收的消息能夠是任意支持序列化(Serializable)的類型。socket
然而,實際上暗地裏發生的事情有點複雜:消息經過內存映射文件來傳遞,這是目前惟一的跨進程共享內存的方法,這個例子裏咱們只會在 pagefile 裏面產生虛擬文件。對這個虛擬文件的訪問是經過 win32 信號量來確保同步的。消息首先序列化成二進制,而後再寫進該文件,這就是爲何須要聲明Serializable屬性。內存映射文件和 win32 信號量都須要調用 NT內核的方法。多得了 .NET 框架中的 Marshal 類,咱們能夠避免編寫不安全的代碼。咱們將在下面討論更多的細節。ide
2. .NET裏面的跨線程/進程同步oop
線程/進程間的通信須要共享內存或者其餘內建機制來發送/接收數據。即便是採用共享內存的方式,也還須要一組同步方法來容許併發訪問。
同一個進程內的全部線程都共享公共的邏輯地址空間(堆)。對於不一樣進程,從 win2000 開始就已經沒法共享內存。然而,不一樣的進程能夠讀寫同一個文件。WinAPI提供了多種系統調用方法來映射文件到進程的邏輯空間,及訪問系統內核對象(會話)指向的 pagefile 裏面的虛擬文件。不管是共享堆,仍是共享文件,併發訪問都有可能致使數據不一致。咱們就這個問題簡單討論一下,該怎樣確保線程/進程調用的有序性及數據的一致性。
2.1 線程同步
.NET 框架和 C# 提供了方便直觀的線程同步方法,即 monitor 類和 lock 語句(本文將不會討論 .NET 框架的互斥量)。對於線程同步,雖然本文提供了其餘方法,咱們仍是推薦使用 lock 語句。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
void
Work1()
{
NonCriticalSection1();
Monitor.Enter(
this
);
try
{
CriticalSection();
}
finally
{
Monitor.Exit(
this
);
}
NonCriticalSection2();
}
void
Work2()
{
NonCriticalSection1();
lock
(
this
)
{
CriticalSection();
}
NonCriticalSection2();
}
|
Work1 和 Work2 是等價的。在C#裏面,不少人喜歡第二個方法,由於它更短,且不容易出錯。
2.2 跨線程信號量
信號量是經典的同步基本概念之一(由 Edsger Dijkstra 引入)。信號量是指一個有計數器及兩個操做的對象。它的兩個操做是:獲取(也叫P或者等待),釋放(也叫V或者收到信號)。信號量在獲取操做時若是計數器爲0則阻塞,不然將計數器減一;在釋放時將計數器加一,且不會阻塞。雖然信號量的原理很簡單,可是實現起來有點麻煩。好在,內建的 monitor 類有阻塞特性,能夠用來實現信號量。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
|
public
sealed
class
ThreadSemaphore : ISemaphore
{
private
int
counter;
private
readonly
int
max;
public
ThreadSemaphore() :
this
(0,
int
.Max) {}
public
ThreadSemaphore(
int
initial) :
this
(initial,
int
.Max) {}
public
ThreadSemaphore(
int
initial,
int
max)
{
this
.counter = Math.Min(initial,max);
this
.max = max;
}
public
void
Acquire()
{
lock
(
this
)
{
counter--;
if
(counter < 0 && !Monitor.Wait(
this
))
throw
new
SemaphoreFailedException();
}
}
public
void
Acquire(TimeSpan timeout)
{
lock
(
this
)
{
counter--;
if
(counter < 0 && !Monitor.Wait(
this
,timeout))
throw
new
SemaphoreFailedException();
}
}
public
void
Release()
{
lock
(
this
)
{
if
(counter >= max)
throw
new
SemaphoreFailedException();
if
(counter < 0)
Monitor.Pulse(
this
);
counter++;
}
}
}
|
信號量在複雜的阻塞情景下更加有用,例如咱們後面將要討論的通道(channel)。你也可使用信號量來實現臨界區的排他性(以下面的 Work3),可是我仍是推薦使用內建的 lock 語句,像上面的 Work2 那樣。
請注意:若是使用不當,信號量也是有潛在危險的。正確的作法是:當獲取信號量失敗時,千萬不要再調用釋放操做;當獲取成功時,不管發生了什麼錯誤,都要記得釋放信號量。遵循這樣的原則,你的同步纔是正確的。Work3 中的 finally 語句就是爲了保證正確釋放信號量。注意:獲取信號量( s.Acquire() )的操做必須放到 try 語句的外面,只有這樣,當獲取失敗時纔不會調用釋放操做。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
ThreadSemaphore s =
new
ThreadSemaphore(1);
void
Work3()
{
NonCriticalSection1();
s.Acquire();
try
{
CriticalSection();
}
finally
{
s.Release();
}
NonCriticalSection2();
}
|
2.3 跨進程信號量
爲了協調不一樣進程訪問同一資源,咱們須要用到上面討論過的概念。很不幸,.NET 中的 monitor 類不能夠跨進程使用。可是,win32 API提供的內核信號量對象能夠用來實現跨進程同步。 Robin Galloway-Lunn 介紹了怎樣將 win32 的信號量映射到 .NET 中(見 Using Win32 Semaphores in C# )。咱們的實現也相似:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
|
[DllImport(
"kernel32"
,EntryPoint=
"CreateSemaphore"
,
SetLastError=
true
,CharSet=CharSet.Unicode)]
internal
static
extern
uint
CreateSemaphore(
SecurityAttributes auth,
int
initialCount,
int
maximumCount,
string
name);
[DllImport(
"kernel32"
,EntryPoint=
"WaitForSingleObject"
,
SetLastError=
true
,CharSet=CharSet.Unicode)]
internal
static
extern
uint
WaitForSingleObject(
uint
hHandle,
uint
dwMilliseconds);
[DllImport(
"kernel32"
,EntryPoint=
"ReleaseSemaphore"
,
SetLastError=
true
,CharSet=CharSet.Unicode)]
[
return
: MarshalAs( UnmanagedType.VariantBool )]
internal
static
extern
bool
ReleaseSemaphore(
uint
hHandle,
int
lReleaseCount,
out
int
lpPreviousCount);
[DllImport(
"kernel32"
,EntryPoint=
"CloseHandle"
,SetLastError=
true
,
CharSet=CharSet.Unicode)]
[
return
: MarshalAs( UnmanagedType.VariantBool )]
internal
static
extern
bool
CloseHandle(
uint
hHandle);
public
class
ProcessSemaphore : ISemaphore, IDisposable
{
private
uint
handle;
private
readonly
uint
interruptReactionTime;
public
ProcessSemaphore(
string
name) :
this
(
name,0,
int
.MaxValue,500) {}
public
ProcessSemaphore(
string
name,
int
initial) :
this
(
name,initial,
int
.MaxValue,500) {}
public
ProcessSemaphore(
string
name,
int
initial,
int
max,
int
interruptReactionTime)
{
this
.interruptReactionTime = (
uint
)interruptReactionTime;
this
.handle = NTKernel.CreateSemaphore(
null
, initial, max, name);
if
(handle == 0)
throw
new
SemaphoreFailedException();
}
public
void
Acquire()
{
while
(
true
)
{
//looped 0.5s timeout to make NT-blocked threads interruptable.
uint
res = NTKernel.WaitForSingleObject(handle,
interruptReactionTime);
try
{System.Threading.Thread.Sleep(0);}
catch
(System.Threading.ThreadInterruptedException e)
{
if
(res == 0)
{
//Rollback
int
previousCount;
NTKernel.ReleaseSemaphore(handle,1,
out
previousCount);
}
throw
e;
}
if
(res == 0)
return
;
if
(res != 258)
throw
new
SemaphoreFailedException();
}
}
public
void
Acquire(TimeSpan timeout)
{
uint
milliseconds = (
uint
)timeout.TotalMilliseconds;
if
(NTKernel.WaitForSingleObject(handle, milliseconds) != 0)
throw
new
SemaphoreFailedException();
}
public
void
Release()
{
int
previousCount;
if
(!NTKernel.ReleaseSemaphore(handle, 1,
out
previousCount))
throw
new
SemaphoreFailedException();
}
#region IDisposable Member
public
void
Dispose()
{
if
(handle != 0)
{
if
(NTKernel.CloseHandle(handle))
handle = 0;
}
}
#endregion
}
|
有一點很重要:win32中的信號量是能夠命名的。這容許其餘進程經過名字來建立相應信號量的句柄。爲了讓阻塞線程能夠中斷,咱們使用了一個(很差)的替代方法:使用超時和 Sleep(0)。咱們須要中斷來安全關閉線程。更好的作法是:肯定沒有線程阻塞以後才釋放信號量,這樣程序才能夠徹底釋放資源並正確退出。
你可能也注意到了:跨線程和跨進程的信號量都使用了相同的接口。全部相關的類都使用了這種模式,以實現上面背景介紹中提到的封閉性。須要注意:出於性能考慮,你不該該將跨進程的信號量用到跨線程的場景,也不該該將跨線程的實現用到單線程的場景。
3. 跨進程共享內存:內存映射文件
咱們已經實現了跨線程和跨進程的共享資源訪問同步。可是傳遞/接收消息還須要共享資源。對於線程來講,只須要聲明一個類成員變量就能夠了。可是對於跨進程來講,咱們須要使用到 win32 API 提供的內存映射文件(Memory Mapped Files,簡稱MMF)。使用 MMF和使用 win32 信號量差很少。咱們須要先調用 CreateFileMapping 方法來建立一個內存映射文件的句柄:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
[DllImport(
"Kernel32.dll"
,EntryPoint=
"CreateFileMapping"
,
SetLastError=
true
,CharSet=CharSet.Unicode)]
internal
static
extern
IntPtr CreateFileMapping(
uint
hFile,
SecurityAttributes lpAttributes,
uint
flProtect,
uint
dwMaximumSizeHigh,
uint
dwMaximumSizeLow,
string
lpName);
[DllImport(
"Kernel32.dll"
,EntryPoint=
"MapViewOfFile"
,
SetLastError=
true
,CharSet=CharSet.Unicode)]
internal
static
extern
IntPtr MapViewOfFile(IntPtr hFileMappingObject,
uint
dwDesiredAccess,
uint
dwFileOffsetHigh,
uint
dwFileOffsetLow,
uint
dwNumberOfBytesToMap);
[DllImport(
"Kernel32.dll"
,EntryPoint=
"UnmapViewOfFile"
,
SetLastError=
true
,CharSet=CharSet.Unicode)]
[
return
: MarshalAs( UnmanagedType.VariantBool )]
internal
static
extern
bool
UnmapViewOfFile(IntPtr lpBaseAddress);
public
static
MemoryMappedFile CreateFile(
string
name,
FileAccess access,
int
size)
{
if
(size < 0)
throw
new
ArgumentException(
"Size must not be negative"
,
"size"
);
IntPtr fileMapping = NTKernel.CreateFileMapping(0xFFFFFFFFu,
null
,
(
uint
)access,0,(
uint
)size,name);
if
(fileMapping == IntPtr.Zero)
throw
new
MemoryMappingFailedException();
return
new
MemoryMappedFile(fileMapping,size,access);
}
|
咱們但願直接使用 pagefile 中的虛擬文件,因此咱們用 -1(0xFFFFFFFF) 來做爲文件句柄來建立咱們的內存映射文件句柄。咱們也指定了必填的文件大小,以及相應的名稱。這樣其餘進程就能夠經過這個名稱來同時訪問該映射文件。建立了內存映射文件後,咱們就能夠映射這個文件不一樣的部分(經過偏移量和字節大小來指定)到咱們的進程地址空間。咱們經過 MapViewOfFile 系統方法來指定:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
public
MemoryMappedFileView CreateView(
int
offset,
int
size,
MemoryMappedFileView.ViewAccess access)
{
if
(
this
.access == FileAccess.ReadOnly && access ==
MemoryMappedFileView.ViewAccess.ReadWrite)
throw
new
ArgumentException(
"Only read access to views allowed on files without write access"
,
"access"
);
if
(offset < 0)
throw
new
ArgumentException(
"Offset must not be negative"
,
"size"
);
if
(size < 0)
throw
new
ArgumentException(
"Size must not be negative"
,
"size"
);
IntPtr mappedView = NTKernel.MapViewOfFile(fileMapping,
(
uint
)access,0,(
uint
)offset,(
uint
)size);
return
new
MemoryMappedFileView(mappedView,size,access);
}
|
在不安全的代碼中,咱們能夠將返回的指針強制轉換成咱們指定的類型。儘管如此,咱們不但願有不安全的代碼存在,因此咱們使用 Marshal 類來從中讀寫咱們的數據。偏移量參數是用來從哪裏開始讀寫數據,相對於指定的映射視圖的地址。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
public
byte
ReadByte(
int
offset)
{
return
Marshal.ReadByte(mappedView,offset);
}
public
void
WriteByte(
byte
data,
int
offset)
{
Marshal.WriteByte(mappedView,offset,data);
}
public
int
ReadInt32(
int
offset)
{
return
Marshal.ReadInt32(mappedView,offset);
}
public
void
WriteInt32(
int
data,
int
offset)
{
Marshal.WriteInt32(mappedView,offset,data);
}
public
void
ReadBytes(
byte
[] data,
int
offset)
{
for
(
int
i=0;i<data.Length;i++)
data[i] = Marshal.ReadByte(mappedView,offset+i);
}
public
void
WriteBytes(
byte
[] data,
int
offset)
{
for
(
int
i=0;i<data.Length;i++)
Marshal.WriteByte(mappedView,offset+i,data[i]);
}
|
可是,咱們但願讀寫整個對象樹到文件中,因此咱們須要支持自動進行序列化和反序列化的方法。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
public
object
ReadDeserialize(
int
offset,
int
length)
{
byte
[] binaryData =
new
byte
[length];
ReadBytes(binaryData,offset);
System.Runtime.Serialization.Formatters.Binary.BinaryFormatter formatter
=
new
System.Runtime.Serialization.Formatters.Binary.BinaryFormatter();
System.IO.MemoryStream ms =
new
System.IO.MemoryStream(
binaryData,0,length,
true
,
true
);
object
data = formatter.Deserialize(ms);
ms.Close();
return
data;
}
public
void
WriteSerialize(
object
data,
int
offset,
int
length)
{
System.Runtime.Serialization.Formatters.Binary.BinaryFormatter formatter
=
new
System.Runtime.Serialization.Formatters.Binary.BinaryFormatter();
byte
[] binaryData =
new
byte
[length];
System.IO.MemoryStream ms =
new
System.IO.MemoryStream(
binaryData,0,length,
true
,
true
);
formatter.Serialize(ms,data);
ms.Flush();
ms.Close();
WriteBytes(binaryData,offset);
}
|
請注意:對象序列化以後的大小不該該超過映射視圖的大小。序列化以後的大小老是比對象自己佔用的內存要大的。我沒有試過直接將對象內存流綁定到映射視圖,那樣作應該也能夠,甚至可能帶來少許的性能提高。
4. 信箱:在線程/進程間傳遞消息
這裏的信箱與 Email 及 NT 中的郵件槽(Mailslots)無關。它是一個只能保留一個對象的安全共享內存結構。信箱的內容經過一個屬性來讀寫。若是信箱內容爲空,試圖讀取該信箱的線程將會阻塞,直到另外一個線程往其中寫內容。若是信箱已經有了內容,當一個線程試圖往其中寫內容時將被阻塞,直到另外一個線程將信箱內容讀取出去。信箱的內容只能被讀取一次,它的引用在讀取後自動被刪除。基於上面的代碼,咱們已經能夠實現信箱了。
4.1 跨線程的信箱
咱們可使用兩個信號量來實現一個信箱:一個信號量在信箱內容爲空時觸發,另外一個在信箱有內容時觸發。在讀取內容以前,線程先等待信箱已經填充了內容,讀取以後觸發空信號量。在寫入內容以前,線程先等待信箱內容清空,寫入以後觸發滿信號量。注意:空信號量在一開始時就被觸發了。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
public
sealed
class
ThreadMailBox : IMailBox
{
private
object
content;
private
ThreadSemaphore empty, full;
public
ThreadMailBox()
{
empty =
new
ThreadSemaphore(1,1);
full =
new
ThreadSemaphore(0,1);
}
public
object
Content
{
get
{
full.Acquire();
object
item = content;
empty.Release();
return
item;
}
set
{
empty.Acquire();
content = value;
full.Release();
}
}
}
|
4.2 跨進程信箱
跨進程信箱與跨線程信箱的實現基本上同樣簡單。不一樣的是咱們使用兩個跨進程的信號量,而且咱們使用內存映射文件來代替類成員變量。因爲序列化可能會失敗,咱們使用了一小段異常處理來回滾信箱的狀態。失敗的緣由有不少(無效句柄,拒絕訪問,文件大小問題,Serializable屬性缺失等等)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
|
public
sealed
class
ProcessMailBox : IMailBox, IDisposable
{
private
MemoryMappedFile file;
private
MemoryMappedFileView view;
private
ProcessSemaphore empty, full;
public
ProcessMailBox(
string
name,
int
size)
{
empty =
new
ProcessSemaphore(name+
".EmptySemaphore.MailBox"
,1,1);
full =
new
ProcessSemaphore(name+
".FullSemaphore.MailBox"
,0,1);
file = MemoryMappedFile.CreateFile(name+
".MemoryMappedFile.MailBox"
,
MemoryMappedFile.FileAccess.ReadWrite,size);
view = file.CreateView(0,size,
MemoryMappedFileView.ViewAccess.ReadWrite);
}
public
object
Content
{
get
{
full.Acquire();
object
item;
try
{item = view.ReadDeserialize();}
catch
(Exception e)
{
//Rollback
full.Release();
throw
e;
}
empty.Release();
return
item;
}
set
{
empty.Acquire();
try
{view.WriteSerialize(value);}
catch
(Exception e)
{
//Rollback
empty.Release();
throw
e;
}
full.Release();
}
}
#region IDisposable Member
public
void
Dispose()
{
view.Dispose();
file.Dispose();
empty.Dispose();
full.Dispose();
}
#endregion
}
|
到這裏咱們已經實現了跨進程消息傳遞(IPC)所須要的組件。你可能須要再回頭本文開頭的那個例子,看看 ProcessMailBox 應該如何使用。
5.通道:基於隊列的消息傳遞
信箱最大的限制是它們每次只能保存一個對象。若是一系列線程(使用同一個信箱)中的一個線程須要比較長的時間來處理特定的命令,那麼整個系列都會阻塞。一般咱們會使用緩衝的消息通道來處理,這樣你能夠在方便的時候從中讀取消息,而不會阻塞消息發送者。這種緩衝經過通道來實現,這裏的通道比信箱要複雜一些。一樣,咱們將分別從線程和進程級別來討論通道的實現。
5.1 可靠性
信箱和通道的另外一個重要的不一樣是:通道擁有可靠性。例如:自動將發送失敗(可能因爲線程等待鎖的過程當中被中斷)的消息轉存到一個內置的容器中。這意味着處理通道的線程能夠安全地中止,同時不會丟失隊列中的消息。這經過兩個抽象類來實現, ThreadReliability 和 ProcessReliability。每一個通道的實現類都繼承其中的一個類。
5.2 跨線程的通道
跨線程的通道基於信箱來實現,可是使用一個同步的隊列來做爲消息緩衝而不是一個變量。得益於信號量,通道在空隊列時阻塞接收線程,在隊列滿時阻塞發送線程。這樣你就不會碰到由入隊/出隊引起的錯誤。爲了實現這個效果,咱們用隊列大小來初始化空信號量,用0來初始化滿信號量。若是某個發送線程在等待入隊的時候被中斷,咱們將消息複製到內置容器中,並將異常往外面拋。在接收操做中,咱們不須要作異常處理,由於即便線程被中斷你也不會丟失任何消息。注意:線程只有在阻塞狀態才能被中斷,就像調用信號量的獲取操做(Aquire)方法時。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
|
public
sealed
class
ThreadChannel : ThreadReliability, IChannel
{
private
Queue queue;
private
ThreadSemaphore empty, full;
public
ThreadChannel(
int
size)
{
queue = Queue.Synchronized(
new
Queue(size));
empty =
new
ThreadSemaphore(size,size);
full =
new
ThreadSemaphore(0,size);
}
public
void
Send(
object
item)
{
try
{empty.Acquire();}
catch
(System.Threading.ThreadInterruptedException e)
{
DumpItem(item);
throw
e;
}
queue.Enqueue(item);
full.Release();
}
public
void
Send(
object
item, TimeSpan timeout)
{
try
{empty.Acquire(timeout);}
...
}
public
object
Receive()
{
full.Acquire();
object
item = queue.Dequeue();
empty.Release();
return
item;
}
public
object
Receive(TimeSpan timeout)
{
full.Acquire(timeout);
...
}
protected
override
void
DumpStructure()
{
lock
(queue.SyncRoot)
{
foreach
(
object
item
in
queue)
DumpItem(item);
queue.Clear();
}
}
}
|
5.3 跨進程通道
實現跨進程通道有點麻煩,由於你須要首先提供一個跨進程的緩衝區。一個可能的解決方法是使用跨進程信箱並根據須要將接收/發送方法加入隊列。爲了不這種方案的幾個缺點,咱們將直接使用內存映射文件來實現一個隊列。MemoryMappedArray 類將內存映射文件分紅幾部分,能夠直接使用數組索引來訪問。 MemoryMappedQueue 類,爲這個數組提供了一個經典的環(更多細節請查看附件中的代碼)。爲了支持直接以 byte/integer 類型訪問數據並同時支持二進制序列化,調用方須要先調用入隊(Enqueue)/出隊(Dequeue)操做,而後根據須要使用讀寫方法(隊列會自動將數據放到正確的位置)。這兩個類都不是線程和進程安全的,因此咱們須要使用跨進程的信號量來模擬互斥量(也可使用 win32 互斥量),以此實現相互間的互斥訪問。除了這兩個類,跨進程的通道基本上和跨線程信箱同樣。一樣,咱們也須要在 Send() 中處理線程中斷及序列化可能失敗的問題。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
|
public
sealed
class
ProcessChannel : ProcessReliability, IChannel, IDisposable
{
private
MemoryMappedFile file;
private
MemoryMappedFileView view;
private
MemoryMappedQueue queue;
private
ProcessSemaphore empty, full, mutex;
public
ProcessChannel(
int
size,
string
name,
int
maxBytesPerEntry)
{
int
fileSize = 64+size*maxBytesPerEntry;
empty =
new
ProcessSemaphore(name+
".EmptySemaphore.Channel"
,size,size);
full =
new
ProcessSemaphore(name+
".FullSemaphore.Channel"
,0,size);
mutex =
new
ProcessSemaphore(name+
".MutexSemaphore.Channel"
,1,1);
file = MemoryMappedFile.CreateFile(name+
".MemoryMappedFile.Channel"
,
MemoryMappedFile.FileAccess.ReadWrite,fileSize);
view = file.CreateView(0,fileSize,
MemoryMappedFileView.ViewAccess.ReadWrite);
queue =
new
MemoryMappedQueue(view,size,maxBytesPerEntry,
true
,0);
if
(queue.Length < size || queue.BytesPerEntry < maxBytesPerEntry)
throw
new
MemoryMappedArrayFailedException();
}
public
void
Send(
object
item)
{
try
{empty.Acquire();}
catch
(System.Threading.ThreadInterruptedException e)
{
DumpItemSynchronized(item);
throw
e;
}
try
{mutex.Acquire();}
catch
(System.Threading.ThreadInterruptedException e)
{
DumpItemSynchronized(item);
empty.Release();
throw
e;
}
queue.Enqueue();
try
{queue.WriteSerialize(item,0);}
catch
(Exception e)
{
queue.RollbackEnqueue();
mutex.Release();
empty.Release();
throw
e;
}
mutex.Release();
full.Release();
}
public
void
Send(
object
item, TimeSpan timeout)
{
try
{empty.Acquire(timeout);}
...
}
public
object
Receive()
{
full.Acquire();
mutex.Acquire();
object
item;
queue.Dequeue();
try
{item = queue.ReadDeserialize(0);}
catch
(Exception e)
{
queue.RollbackDequeue();
mutex.Release();
full.Release();
throw
e;
}
mutex.Release();
empty.Release();
return
item;
}
public
object
Receive(TimeSpan timeout)
{
full.Acquire(timeout);
...
}
protected
override
void
DumpStructure()
{
mutex.Acquire();
byte
[][] dmp = queue.DumpClearAll();
for
(
int
i=0;i<dmp.Length;i++)
DumpItemSynchronized(dmp[i]);
mutex.Release();
}
#region IDisposable Member
public
void
Dispose()
{
view.Dispose();
file.Dispose();
empty.Dispose();
full.Dispose();
mutex.Dispose();
}
#endregion
}
|
6. 消息路由
咱們目前已經實現了線程和進程同步及消息傳遞機制(使用信箱和通道)。當你使用阻塞隊列的時候,有可能會遇到這樣的問題:你須要在一個線程中同時監聽多個隊列。爲了解決這樣的問題,咱們提供了一些小型的類:通道轉發器,多用複用器,多路複用解碼器和通道事件網關。你也能夠經過簡單的 IRunnable 模式來實現相似的通道處理器。IRunnable模式由兩個抽象類SingleRunnable和 MultiRunnable 來提供(具體細節請參考附件中的代碼)。
6.1 通道轉發器
通道轉發器僅僅監聽一個通道,而後將收到的消息轉發到另外一個通道。若是有必要,轉發器能夠將每一個收到的消息放到一個信封中,並加上一個數字標記,而後再轉發出去(下面的多路利用器使用了這個特性)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
public
class
ChannelForwarder : SingleRunnable
{
private
IChannel source, target;
private
readonly
int
envelope;
public
ChannelForwarder(IChannel source,
IChannel target,
bool
autoStart,
bool
waitOnStop)
:
base
(
true
,autoStart,waitOnStop)
{
this
.source = source;
this
.target = target;
this
.envelope = -1;
}
public
ChannelForwarder(IChannel source, IChannel target,
int
envelope,
bool
autoStart,
bool
waitOnStop)
:
base
(
true
,autoStart,waitOnStop)
{
this
.source = source;
this
.target = target;
this
.envelope = envelope;
}
protected
override
void
Run()
{
//NOTE: IChannel.Send is interrupt save and
//automatically dumps the argument.
if
(envelope == -1)
while
(running)
target.Send(source.Receive());
else
{
MessageEnvelope env;
env.ID = envelope;
while
(running)
{
env.Message = source.Receive();
target.Send(env);
}
}
}
}
|
6.2 通道多路複用器和通道複用解碼器
通道多路複用器監聽多個來源的通道並將接收到的消息(消息使用信封來標記來源消息)轉發到一個公共的輸出通道。這樣就能夠一次性地監聽多個通道。複用解碼器則是監聽一個公共的輸出通道,而後根據信封將消息轉發到某個指定的輸出通道。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
|
public
class
ChannelMultiplexer : MultiRunnable
{
private
ChannelForwarder[] forwarders;
public
ChannelMultiplexer(IChannel[] channels,
int
[] ids,
IChannel output,
bool
autoStart,
bool
waitOnStop)
{
int
count = channels.Length;
if
(count != ids.Length)
throw
new
ArgumentException(
"Channel and ID count mismatch."
,
"ids"
);
forwarders =
new
ChannelForwarder[count];
for
(
int
i=0;i<count;i++)
forwarders[i] =
new
ChannelForwarder(channels[i],
output,ids[i],autoStart,waitOnStop);
SetRunnables((SingleRunnable[])forwarders);
}
}
public
class
ChannelDemultiplexer : SingleRunnable
{
private
HybridDictionary dictionary;
private
IChannel input;
public
ChannelDemultiplexer(IChannel[] channels,
int
[] ids,
IChannel input,
bool
autoStart,
bool
waitOnStop)
:
base
(
true
,autoStart,waitOnStop)
{
this
.input = input;
int
count = channels.Length;
if
(count != ids.Length)
throw
new
ArgumentException(
"Channel and ID count mismatch."
,
"ids"
);
dictionary =
new
HybridDictionary(count,
true
);
for
(
int
i=0;i<count;i++)
dictionary.add(ids[i],channels[i]);
}
protected
override
void
Run()
{
//NOTE: IChannel.Send is interrupt save and
//automatically dumps the argument.
while
(running)
{
MessageEnvelope env = (MessageEnvelope)input.Receive();
IChannel channel = (IChannel)dictionary[env.ID];
channel.send(env.Message);
}
}
}
|
6.3 通道事件網關
通道事件網關監聽指定的通道,在接收到消息時觸發一個事件。這個類對於基於事件的程序(例如GUI程序)頗有用,或者在使用系統線程池(ThreadPool)來初始化輕量的線程。須要注意的是:使用 WinForms 的程序中你不能在事件處理方法中直接訪問UI控件,只能調用Invoke 方法。由於事件處理方法是由事件網關線程調用的,而不是UI線程。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
public
class
ChannelEventGateway : SingleRunnable
{
private
IChannel source;
public
event
MessageReceivedEventHandler MessageReceived;
public
ChannelEventGateway(IChannel source,
bool
autoStart,
bool
waitOnStop) :
base
(
true
,autoStart,waitOnStop)
{
this
.source = source;
}
protected
override
void
Run()
{
while
(running)
{
object
c = source.Receive();
MessageReceivedEventHandler handler = MessageReceived;
if
(handler !=
null
)
handler(
this
,
new
MessageReceivedEventArgs(c));
}
}
}
|
7. 比薩外賣店的例子
萬事俱備,只欠東風。咱們已經討論了這個同步及消息傳遞框架中的大部分重要的結構和技術(本文沒有討論框架中的其餘類如Rendezvous及Barrier)。就像開頭同樣,咱們用一個例子來結束這篇文章。此次咱們用一個小型比薩外賣店來作演示。下圖展現了這個例子:四個並行進程相互之間進行通信。圖中展現了消息(數據)是如何使用跨進程通道在四個進程中流動的,且在每一個進程中使用了性能更佳的跨線程通道和信箱。
一開始,一個顧客點了一個比薩和一些飲料。他調用了顧客(customer)接口的方法,向顧客訂單(CustomerOrders)通道發送了一個下單(Order)消息。接單員,在顧客下單後,發送了兩條配餐指令(分別對應比薩和飲料)到廚師指令(CookInstruction)通道。同時他經過收銀(CashierOrder)通道將訂單轉發給收銀臺。收銀臺從價格中心獲取總價並將票據發給顧客,但願能提升收銀的速度 。與此同時,廚師將根據配餐指令將餐配好以後交給打包員工。打包員工處理好以後,等待顧客付款,而後將外賣遞給顧客。
爲了運行這個例子,打開4個終端(cmd.exe),用 "PizzaDemo.exe cook" 啓動多個廚師進程(多少個均可以),用 "PizzaDemo.exe backend" 啓動後端進程,用 "PizzaDemo.exe facade" 啓動顧客接口門面(用你的程序名稱來代替 PizzaDemo )。注意:爲了模擬真實情景,某些線程(例如廚師線程)會隨機休眠幾秒。按下回車鍵就會中止和退出進程。若是你在進程正在處理數據的時候退出,你將能夠在內存轉存報告的結尾看到幾個未處理的消息。在真實世界的程序裏面,消息通常都會被轉存到磁盤中,以便下次可使用。
這個例子使用了上文中討論過的幾個機制。好比說,收銀臺使用一個通道複用器(ChannelMultiplexer)來監聽顧客的訂單和支付通道,用了兩個信箱來實現價格服務。分發時使用了一個通道事件網關(ChannelEventGateway),顧客在食物打包完成以後立刻會收到通知。你也能夠將這些程序註冊成 Windows NT 服務運行,也能夠遠程登陸後運行。
8. 總結
本文已經討論了C#中如何基於服務的架構及實現跨進程同步和通信。而後,這個不是惟一的解決方案。例如:在大項目中使用那麼多的線程會引來嚴重的問題。這個框架中缺失的是事務支持及其餘的通道/信箱實現(例如命名管道和TCP sockets)。這個框架中可能也有許多不足之處。