一個過程間通信同步的C#框架舉薦。本站提示廣大學習愛好者:(一個過程間通信同步的C#框架舉薦)文章只能為提供參考,不一定能成為您想要的結果。以下是一個過程間通信同步的C#框架舉薦正文
0.配景簡介
微軟在 .NET 框架中供給了多種適用的線程同步手腕,個中包含 monitor 類及 reader-writer鎖。但跨過程的同步辦法照樣異常完善。別的,今朝也沒無方便的線程間及過程間傳遞新聞的辦法。例如C/S和SOA,又或許臨盆者/花費者形式中就經常須要傳遞新聞。為此我編寫了一個自力完全的框架,完成了跨線程和跨過程的同步和通信。這框架內包括了旌旗燈號量,信箱,內存映照文件,壅塞通道,及簡略新聞流掌握器等組件。這篇文章裡提到的類同屬於一個開源的庫項目(BSD允許),你可以從這裡下載到 www.cdrnet.net/projects/threadmsg/.
這個框架的目標是:
留意:我刪除本文中全體代碼的XML正文以節儉空間。假如你想曉得這些辦法和參數的具體信息,請參考附件中的代碼。
1.先看一個簡略例子
應用了這個庫後,跨過程的新聞傳遞將變得異常簡略。我將用一個小例子來作示范:一個掌握台法式,依據參數可以作為發送方也能夠作為吸收方運轉。在發送法式裡,你可以輸出必定的文本並發送到信箱內(前往key),吸收法式將顯示一切從信箱內收到的新聞。你可以運轉有數個發送法式和吸收法式,然則每一個新聞只會被詳細的某一個吸收法式所收到。
[Serializable] struct Message { public string Text; } class Test { IMailBox mail; public Test() { mail = new ProcessMailBox("TMProcessTest",1024); } public void RunWriter() { Console.WriteLine("Writer started"); Message msg; while(true) { msg.Text = Console.ReadLine(); if(msg.Text.Equals("exit")) break; mail.Content = msg; } } public void RunReader() { Console.WriteLine("Reader started"); while(true) { Message msg = (Message)mail.Content; Console.WriteLine(msg.Text); } } [STAThread] static void Main(string[] args) { Test test = new Test(); if(args.Length > 0) test.RunWriter(); else test.RunReader(); } }
信箱一旦創立以後(這下面代碼裡是 ProcessMailBox ),吸收新聞只須要讀取 Content 屬性,發送新聞只須要給這個屬性賦值。當沒稀有據時,獲得新聞將會壅塞以後線程;發送新聞時假如信箱裡曾經稀有據,則會壅塞以後線程。恰是有了這個壅塞,全部法式是完整基於中止的,而且不會過度占用CPU(不須要停止輪詢)。發送和吸收的新聞可所以隨意率性支撐序列化(Serializable)的類型。
但是,現實上背後產生的工作有點龐雜:新聞經由過程內存映照文件來傳遞,這是今朝獨一的跨過程同享內存的辦法,這個例子裡我們只會在 pagefile 外面發生虛擬文件。對這個虛擬文件的拜訪是經由過程 win32 旌旗燈號量來確保同步的。新聞起首序列化成二進制,然後再寫進該文件,這就是為何須要聲明Serializable屬性。內存映照文件和 win32 旌旗燈號量都須要挪用 NT內核的辦法。多得了 .NET 框架中的 Marshal 類,我們可以免編寫不平安的代碼。我們將鄙人面評論辯論更多的細節。
2. .NET外面的跨線程/過程同步
線程/過程間的通信須要同享內存或許其他內建機制來發送/吸收數據。即便是采取同享內存的方法,也還須要一組同步辦法來許可並發拜訪。
統一個過程內的一切線程都同享公共的邏輯地址空間(堆)。關於分歧過程,從 win2000 開端就曾經沒法同享內存。但是,分歧的過程可以讀寫統一個文件。WinAPI供給了多種體系挪用辦法來映照文件到過程的邏輯空間,及拜訪體系內查對象(會話)指向的 pagefile 外面的虛擬文件。不管是同享堆,照樣同享文件,並發拜訪都有能夠招致數據紛歧致。我們就這個成績簡略評論辯論一下,該如何確保線程/過程挪用的有序性及數據的分歧性。
2.1 線程同步
.NET 框架和 C# 供給了便利直不雅的線程同步辦法,即 monitor 類和 lock 語句(本文將不會評論辯論 .NET 框架的互斥量)。關於線程同步,固然本文供給了其他辦法,我們照樣推舉應用 lock 語句。
void Work1() { NonCriticalSection1(); Monitor.Enter(this); try { CriticalSection(); } finally { Monitor.Exit(this); } NonCriticalSection2(); } void Work2() { NonCriticalSection1(); lock(this) { CriticalSection(); } NonCriticalSection2(); }
Work1 和 Work2 是等價的。在C#外面,許多人愛好第二個辦法,由於它更短,且不輕易失足。
2.2 跨線程旌旗燈號量
旌旗燈號量是經典的同步根本概念之一(由 Edsger Dijkstra 引入)。旌旗燈號量是指一個有計數器及兩個操作的對象。它的兩個操作是:獲得(也叫P或許期待),釋放(也叫V或許收到旌旗燈號)。旌旗燈號量在獲得操作時假如計數器為0則壅塞,不然將計數器減一;在釋放時將計數器加一,且不會壅塞。固然旌旗燈號量的道理很簡略,然則完成起來有點費事。好在,內建的 monitor 類有壅塞特征,可以用來完成旌旗燈號量。
public sealed class ThreadSemaphore : ISemaphore { private int counter; private readonly int max; public ThreadSemaphore() : this(0, int.Max) {} public ThreadSemaphore(int initial) : this(initial, int.Max) {} public ThreadSemaphore(int initial, int max) { this.counter = Math.Min(initial,max); this.max = max; } public void Acquire() { lock(this) { counter--; if(counter < 0 && !Monitor.Wait(this)) throw new SemaphoreFailedException(); } } public void Acquire(TimeSpan timeout) { lock(this) { counter--; if(counter < 0 && !Monitor.Wait(this,timeout)) throw new SemaphoreFailedException(); } } public void Release() { lock(this) { if(counter >= max) throw new SemaphoreFailedException(); if(counter < 0) Monitor.Pulse(this); counter++; } } }
旌旗燈號量在龐雜的壅塞情形下加倍有效,例如我們前面將要評論辯論的通道(channel)。你也能夠應用旌旗燈號量來完成臨界區的排他性(以下面的 Work3),然則我照樣推舉應用內建的 lock 語句,像下面的 Work2 那樣。
請留意:假如應用欠妥,旌旗燈號量也是有潛伏風險的。准確的做法是:當獲得旌旗燈號量掉敗時,萬萬不要再挪用釋放操作;當獲得勝利時,不管產生了甚麼毛病,都要記得釋放旌旗燈號量。遵守如許的准繩,你的同步才是准確的。Work3 中的 finally 語句就是為了包管准確釋放旌旗燈號量。留意:獲得旌旗燈號量( s.Acquire() )的操作必需放到 try 語句的裡面,只要如許,當獲得掉敗時才不會挪用釋放操作。
ThreadSemaphore s = new ThreadSemaphore(1); void Work3() { NonCriticalSection1(); s.Acquire(); try { CriticalSection(); } finally { s.Release(); } NonCriticalSection2(); }
2.3 跨過程旌旗燈號量
為了調和分歧過程拜訪統一資本,我們須要用到下面評論辯論過的概念。很不幸,.NET 中的 monitor 類弗成以跨過程應用。然則,win32 API供給的內核旌旗燈號量對象可以用來完成跨過程同步。 Robin Galloway-Lunn 引見了如何將 win32 的旌旗燈號量映照到 .NET 中(見 Using Win32 Semaphores in C# )。我們的完成也相似:
[DllImport("kernel32",EntryPoint="CreateSemaphore", SetLastError=true,CharSet=CharSet.Unicode)] internal static extern uint CreateSemaphore( SecurityAttributes auth, int initialCount, int maximumCount, string name); [DllImport("kernel32",EntryPoint="WaitForSingleObject", SetLastError=true,CharSet=CharSet.Unicode)] internal static extern uint WaitForSingleObject( uint hHandle, uint dwMilliseconds); [DllImport("kernel32",EntryPoint="ReleaseSemaphore", SetLastError=true,CharSet=CharSet.Unicode)] [return : MarshalAs( UnmanagedType.VariantBool )] internal static extern bool ReleaseSemaphore( uint hHandle, int lReleaseCount, out int lpPreviousCount); [DllImport("kernel32",EntryPoint="CloseHandle",SetLastError=true, CharSet=CharSet.Unicode)] [return : MarshalAs( UnmanagedType.VariantBool )] internal static extern bool CloseHandle(uint hHandle); public class ProcessSemaphore : ISemaphore, IDisposable { private uint handle; private readonly uint interruptReactionTime; public ProcessSemaphore(string name) : this( name,0,int.MaxValue,500) {} public ProcessSemaphore(string name, int initial) : this( name,initial,int.MaxValue,500) {} public ProcessSemaphore(string name, int initial, int max, int interruptReactionTime) { this.interruptReactionTime = (uint)interruptReactionTime; this.handle = NTKernel.CreateSemaphore(null, initial, max, name); if(handle == 0) throw new SemaphoreFailedException(); } public void Acquire() { while(true) { //looped 0.5s timeout to make NT-blocked threads interruptable. uint res = NTKernel.WaitForSingleObject(handle, interruptReactionTime); try {System.Threading.Thread.Sleep(0);} catch(System.Threading.ThreadInterruptedException e) { if(res == 0) { //Rollback int previousCount; NTKernel.ReleaseSemaphore(handle,1,out previousCount); } throw e; } if(res == 0) return; if(res != 258) throw new SemaphoreFailedException(); } } public void Acquire(TimeSpan timeout) { uint milliseconds = (uint)timeout.TotalMilliseconds; if(NTKernel.WaitForSingleObject(handle, milliseconds) != 0) throw new SemaphoreFailedException(); } public void Release() { int previousCount; if(!NTKernel.ReleaseSemaphore(handle, 1, out previousCount)) throw new SemaphoreFailedException(); } #region IDisposable Member public void Dispose() { if(handle != 0) { if(NTKernel.CloseHandle(handle)) handle = 0; } } #endregion }
有一點很主要:win32中的旌旗燈號量是可以定名的。這許可其他過程經由過程名字來創立響應旌旗燈號量的句柄。為了讓壅塞線程可以中止,我們應用了一個(欠好)的替換辦法:應用超時和 Sleep(0)。我們須要中止來平安封閉線程。更好的做法是:肯定沒有線程壅塞以後才釋放旌旗燈號量,如許法式才可以完整釋放資本並准確加入。
你能夠也留意到了:跨線程和跨過程的旌旗燈號量都應用了雷同的接口。一切相干的類都應用了這類形式,以完成下面配景引見中提到的關閉性。須要留意:出於機能斟酌,你不該該將跨過程的旌旗燈號量用到跨線程的場景,也不該該將跨線程的完成用到單線程的場景。
3. 跨過程同享內存:內存映照文件
我們曾經完成了跨線程和跨過程的同享資本拜訪同步。然則傳遞/吸收新聞還須要同享資本。關於線程來講,只須要聲明一個類成員變量便可以了。然則關於跨過程來講,我們須要應用到 win32 API 供給的內存映照文件(Memory Mapped Files,簡稱MMF)。應用 MMF和應用 win32 旌旗燈號量差不多。我們須要先挪用 CreateFileMapping 辦法來創立一個內存映照文件的句柄:
[DllImport("Kernel32.dll",EntryPoint="CreateFileMapping", SetLastError=true,CharSet=CharSet.Unicode)] internal static extern IntPtr CreateFileMapping(uint hFile, SecurityAttributes lpAttributes, uint flProtect, uint dwMaximumSizeHigh, uint dwMaximumSizeLow, string lpName); [DllImport("Kernel32.dll",EntryPoint="MapViewOfFile", SetLastError=true,CharSet=CharSet.Unicode)] internal static extern IntPtr MapViewOfFile(IntPtr hFileMappingObject, uint dwDesiredAccess, uint dwFileOffsetHigh, uint dwFileOffsetLow, uint dwNumberOfBytesToMap); [DllImport("Kernel32.dll",EntryPoint="UnmapViewOfFile", SetLastError=true,CharSet=CharSet.Unicode)] [return : MarshalAs( UnmanagedType.VariantBool )] internal static extern bool UnmapViewOfFile(IntPtr lpBaseAddress); public static MemoryMappedFile CreateFile(string name, FileAccess access, int size) { if(size < 0) throw new ArgumentException("Size must not be negative","size"); IntPtr fileMapping = NTKernel.CreateFileMapping(0xFFFFFFFFu,null, (uint)access,0,(uint)size,name); if(fileMapping == IntPtr.Zero) throw new MemoryMappingFailedException(); return new MemoryMappedFile(fileMapping,size,access); }
我們願望直接應用 pagefile 中的虛擬文件,所以我們用 -1(0xFFFFFFFF) 來作為文件句柄來創立我們的內存映照文件句柄。我們也指定了必填的文件年夜小,和響應的稱號。如許其他過程便可以經由過程這個稱號來同時拜訪該映照文件。創立了內存映照文件後,我們便可以映照這個文件分歧的部門(經由過程偏移量和字節年夜小來指定)到我們的過程地址空間。我們經由過程 MapViewOfFile 體系辦法來指定:
public MemoryMappedFileView CreateView(int offset, int size, MemoryMappedFileView.ViewAccess access) { if(this.access == FileAccess.ReadOnly && access == MemoryMappedFileView.ViewAccess.ReadWrite) throw new ArgumentException( "Only read access to views allowed on files without write access", "access"); if(offset < 0) throw new ArgumentException("Offset must not be negative","size"); if(size < 0) throw new ArgumentException("Size must not be negative","size"); IntPtr mappedView = NTKernel.MapViewOfFile(fileMapping, (uint)access,0,(uint)offset,(uint)size); return new MemoryMappedFileView(mappedView,size,access); }
在不平安的代碼中,我們可以將前往的指針強迫轉換成我們指定的類型。雖然如斯,我們不願望有不平安的代碼存在,所以我們應用 Marshal 類來從中讀寫我們的數據。偏移量參數是用來從哪裡開端讀寫數據,絕對於指定的映照視圖的地址。
public byte ReadByte(int offset) { return Marshal.ReadByte(mappedView,offset); } public void WriteByte(byte data, int offset) { Marshal.WriteByte(mappedView,offset,data); } public int ReadInt32(int offset) { return Marshal.ReadInt32(mappedView,offset); } public void WriteInt32(int data, int offset) { Marshal.WriteInt32(mappedView,offset,data); } public void ReadBytes(byte[] data, int offset) { for(int i=0;i<data.Length;i++) data[i] = Marshal.ReadByte(mappedView,offset+i); } public void WriteBytes(byte[] data, int offset) { for(int i=0;i<data.Length;i++) Marshal.WriteByte(mappedView,offset+i,data[i]); }
然則,我們願望讀寫全部對象樹到文件中,所以我們須要支撐主動停止序列化和反序列化的辦法。
public object ReadDeserialize(int offset, int length) { byte[] binaryData = new byte[length]; ReadBytes(binaryData,offset); System.Runtime.Serialization.Formatters.Binary.BinaryFormatter formatter = new System.Runtime.Serialization.Formatters.Binary.BinaryFormatter(); System.IO.MemoryStream ms = new System.IO.MemoryStream( binaryData,0,length,true,true); object data = formatter.Deserialize(ms); ms.Close(); return data; } public void WriteSerialize(object data, int offset, int length) { System.Runtime.Serialization.Formatters.Binary.BinaryFormatter formatter = new System.Runtime.Serialization.Formatters.Binary.BinaryFormatter(); byte[] binaryData = new byte[length]; System.IO.MemoryStream ms = new System.IO.MemoryStream( binaryData,0,length,true,true); formatter.Serialize(ms,data); ms.Flush(); ms.Close(); WriteBytes(binaryData,offset); }
請留意:對象序列化以後的年夜小不該該跨越映照視圖的年夜小。序列化以後的年夜小老是比對象自己占用的內存要年夜的。我沒有試過直接將對象內存流綁定到映照視圖,那樣做應當也能夠,乃至能夠帶來大批的機能晉升。
4. 信箱:在線程/過程間傳遞新聞
這裡的信箱與 Email 及 NT 中的郵件槽(Mailslots)有關。它是一個只能保存一個對象的平安同享內存構造。信箱的內容經由過程一個屬性來讀寫。假如信箱內容為空,試圖讀取該信箱的線程將會壅塞,直到另外一個線程往個中寫內容。假如信箱曾經有了內容,當一個線程試圖往個中寫內容時將被壅塞,直到另外一個線程將信箱內容讀掏出去。信箱的內容只能被讀取一次,它的援用在讀取後主動被刪除。基於下面的代碼,我們曾經可以完成信箱了。
4.1 跨線程的信箱
我們可使用兩個旌旗燈號量來完成一個信箱:一個旌旗燈號量在信箱內容為空時觸發,另外一個在信箱有內容時觸發。在讀取內容之前,線程先期待信箱曾經填充了內容,讀取以後觸發空旌旗燈號量。在寫入內容之前,線程先期待信箱內容清空,寫入以後觸發滿旌旗燈號量。留意:空旌旗燈號量在一開端時就被觸發了。
public sealed class ThreadMailBox : IMailBox { private object content; private ThreadSemaphore empty, full; public ThreadMailBox() { empty = new ThreadSemaphore(1,1); full = new ThreadSemaphore(0,1); } public object Content { get { full.Acquire(); object item = content; empty.Release(); return item; } set { empty.Acquire(); content = value; full.Release(); } } }
4.2 跨過程信箱
跨過程信箱與跨線程信箱的完成根本上一樣簡略。分歧的是我們應用兩個跨過程的旌旗燈號量,而且我們應用內存映照文件來取代類成員變量。因為序列化能夠會掉敗,我們應用了一小段異常處置往返滾信箱的狀況。掉敗的緣由有許多(有效句柄,謝絕拜訪,文件年夜小成績,Serializable屬性缺掉等等)。
public sealed class ProcessMailBox : IMailBox, IDisposable { private MemoryMappedFile file; private MemoryMappedFileView view; private ProcessSemaphore empty, full; public ProcessMailBox(string name,int size) { empty = new ProcessSemaphore(name+".EmptySemaphore.MailBox",1,1); full = new ProcessSemaphore(name+".FullSemaphore.MailBox",0,1); file = MemoryMappedFile.CreateFile(name+".MemoryMappedFile.MailBox", MemoryMappedFile.FileAccess.ReadWrite,size); view = file.CreateView(0,size, MemoryMappedFileView.ViewAccess.ReadWrite); } public object Content { get { full.Acquire(); object item; try {item = view.ReadDeserialize();} catch(Exception e) { //Rollback full.Release(); throw e; } empty.Release(); return item; } set { empty.Acquire(); try {view.WriteSerialize(value);} catch(Exception e) { //Rollback empty.Release(); throw e; } full.Release(); } } #region IDisposable Member public void Dispose() { view.Dispose(); file.Dispose(); empty.Dispose(); full.Dispose(); } #endregion }
到這裡我們曾經完成了跨過程新聞傳遞(IPC)所須要的組件。你能夠須要再回頭本文開首的誰人例子,看看 ProcessMailBox 應當若何應用。
5.通道:基於隊列的新聞傳遞
信箱最年夜的限制是它們每次只能保留一個對象。假如一系列線程(應用統一個信箱)中的一個線程須要比擬長的時光來處置特定的敕令,那末全部系列都邑壅塞。平日我們會應用緩沖的新聞通道來處置,如許你可以在便利的時刻從中讀撤消息,而不會壅塞新聞發送者。這類緩沖經由過程通道來完成,這裡的通道比信箱要龐雜一些。異樣,我們將分離從線程和過程級別來評論辯論通道的完成。
5.1 靠得住性
信箱和通道的另外一個主要的分歧是:通道具有靠得住性。例如:主動將發送掉敗(能夠因為線程期待鎖的進程中被中止)的新聞轉存到一個內置的容器中。這意味著處置通道的線程可以平安地停滯,同時不會喪失隊列中的新聞。這經由過程兩個籠統類來完成, ThreadReliability 和 ProcessReliability。每一個通道的完成類都繼續個中的一個類。
5.2 跨線程的通道
跨線程的通道基於信箱來完成,然則應用一個同步的隊列來作為新聞緩沖而不是一個變量。得益於旌旗燈號量,通道在空隊列時壅塞吸收線程,在隊列滿時壅塞發送線程。如許你就不會碰著由入隊/出隊激發的毛病。為了完成這個後果,我們用隊列年夜小來初始化空旌旗燈號量,用0來初始化滿旌旗燈號量。假如某個發送線程在期待入隊的時刻被中止,我們將新聞復制到內置容器中,並將異常往裡面拋。在吸收操作中,我們不須要做異常處置,由於即便線程被中止你也不會喪失任何新聞。留意:線程只要在壅塞狀況能力被中止,就像挪用旌旗燈號量的獲得操作(Aquire)辦法時。
public sealed class ThreadChannel : ThreadReliability, IChannel { private Queue queue; private ThreadSemaphore empty, full; public ThreadChannel(int size) { queue = Queue.Synchronized(new Queue(size)); empty = new ThreadSemaphore(size,size); full = new ThreadSemaphore(0,size); } public void Send(object item) { try {empty.Acquire();} catch(System.Threading.ThreadInterruptedException e) { DumpItem(item); throw e; } queue.Enqueue(item); full.Release(); } public void Send(object item, TimeSpan timeout) { try {empty.Acquire(timeout);} ... } public object Receive() { full.Acquire(); object item = queue.Dequeue(); empty.Release(); return item; } public object Receive(TimeSpan timeout) { full.Acquire(timeout); ... } protected override void DumpStructure() { lock(queue.SyncRoot) { foreach(object item in queue) DumpItem(item); queue.Clear(); } } }
5.3 跨過程通道
完成跨過程通道有點費事,由於你須要起首供給一個跨過程的緩沖區。一個能夠的處理辦法是應用跨過程信箱並依據須要將吸收/發送辦法參加隊列。為了不這類計劃的幾個缺陷,我們將直接應用內存映照文件來完成一個隊列。MemoryMappedArray 類將內存映照文件分紅幾部門,可以直接應用數組索引來拜訪。 MemoryMappedQueue 類,為這個數組供給了一個經典的環(更多細節請檢查附件中的代碼)。為了支撐直接以 byte/integer 類型拜訪數據並同時支撐二進制序列化,挪用方須要先挪用入隊(Enqueue)/出隊(Dequeue)操作,然後依據須要應用讀寫辦法(隊列會主動將數據放到准確的地位)。這兩個類都不是線程和過程平安的,所以我們須要應用跨過程的旌旗燈號量來模仿互斥量(也能夠應用 win32 互斥量),以此完成互相間的互斥拜訪。除這兩個類,跨過程的通道根本上和跨線程信箱一樣。異樣,我們也須要在 Send() 中處置線程中止及序列化能夠掉敗的成績。
public sealed class ProcessChannel : ProcessReliability, IChannel, IDisposable { private MemoryMappedFile file; private MemoryMappedFileView view; private MemoryMappedQueue queue; private ProcessSemaphore empty, full, mutex; public ProcessChannel( int size, string name, int maxBytesPerEntry) { int fileSize = 64+size*maxBytesPerEntry; empty = new ProcessSemaphore(name+".EmptySemaphore.Channel",size,size); full = new ProcessSemaphore(name+".FullSemaphore.Channel",0,size); mutex = new ProcessSemaphore(name+".MutexSemaphore.Channel",1,1); file = MemoryMappedFile.CreateFile(name+".MemoryMappedFile.Channel", MemoryMappedFile.FileAccess.ReadWrite,fileSize); view = file.CreateView(0,fileSize, MemoryMappedFileView.ViewAccess.ReadWrite); queue = new MemoryMappedQueue(view,size,maxBytesPerEntry,true,0); if(queue.Length < size || queue.BytesPerEntry < maxBytesPerEntry) throw new MemoryMappedArrayFailedException(); } public void Send(object item) { try {empty.Acquire();} catch(System.Threading.ThreadInterruptedException e) { DumpItemSynchronized(item); throw e; } try {mutex.Acquire();} catch(System.Threading.ThreadInterruptedException e) { DumpItemSynchronized(item); empty.Release(); throw e; } queue.Enqueue(); try {queue.WriteSerialize(item,0);} catch(Exception e) { queue.RollbackEnqueue(); mutex.Release(); empty.Release(); throw e; } mutex.Release(); full.Release(); } public void Send(object item, TimeSpan timeout) { try {empty.Acquire(timeout);} ... } public object Receive() { full.Acquire(); mutex.Acquire(); object item; queue.Dequeue(); try {item = queue.ReadDeserialize(0);} catch(Exception e) { queue.RollbackDequeue(); mutex.Release(); full.Release(); throw e; } mutex.Release(); empty.Release(); return item; } public object Receive(TimeSpan timeout) { full.Acquire(timeout); ... } protected override void DumpStructure() { mutex.Acquire(); byte[][] dmp = queue.DumpClearAll(); for(int i=0;i<dmp.Length;i++) DumpItemSynchronized(dmp[i]); mutex.Release(); } #region IDisposable Member public void Dispose() { view.Dispose(); file.Dispose(); empty.Dispose(); full.Dispose(); mutex.Dispose(); } #endregion }
6. 新聞路由
我們今朝曾經完成了線程和過程同步及新聞傳遞機制(應用信箱和通道)。當你應用壅塞隊列的時刻,有能夠會碰到如許的成績:你須要在一個線程中同時監聽多個隊列。為懂得決如許的成績,我們供給了一些小型的類:通道轉發器,多用復用器,多路復用解碼器和通道事宜網關。你也能夠經由過程簡略的 IRunnable 形式來完成相似的通道處置器。IRunnable形式由兩個籠統類SingleRunnable和 MultiRunnable 來供給(詳細細節請參考附件中的代碼)。
6.1 通道轉發器
通道轉發器僅僅監聽一個通道,然後將收到的新聞轉發到另外一個通道。假如有需要,轉發器可以將每一個收到的新聞放到一個信封中,並加上一個數字標志,然後再轉收回去(上面的多路應用器應用了這個特征)。
public class ChannelForwarder : SingleRunnable { private IChannel source, target; private readonly int envelope; public ChannelForwarder(IChannel source, IChannel target, bool autoStart, bool waitOnStop) : base(true,autoStart,waitOnStop) { this.source = source; this.target = target; this.envelope = -1; } public ChannelForwarder(IChannel source, IChannel target, int envelope, bool autoStart, bool waitOnStop) : base(true,autoStart,waitOnStop) { this.source = source; this.target = target; this.envelope = envelope; } protected override void Run() { //NOTE: IChannel.Send is interrupt save and //automatically dumps the argument. if(envelope == -1) while(running) target.Send(source.Receive()); else { MessageEnvelope env; env.ID = envelope; while(running) { env.Message = source.Receive(); target.Send(env); } } } }
6.2 通道多路復用器和通道復用解碼器
通道多路復用器監聽多個起源的通道並將吸收到的新聞(新聞應用信封來標志起源新聞)轉發到一個公共的輸入通道。如許便可以一次性地監聽多個通道。復用解碼器則是監聽一個公共的輸入通道,然後依據信封將新聞轉發到某個指定的輸入通道。
public class ChannelMultiplexer : MultiRunnable { private ChannelForwarder[] forwarders; public ChannelMultiplexer(IChannel[] channels, int[] ids, IChannel output, bool autoStart, bool waitOnStop) { int count = channels.Length; if(count != ids.Length) throw new ArgumentException("Channel and ID count mismatch.","ids"); forwarders = new ChannelForwarder[count]; for(int i=0;i<count;i++) forwarders[i] = new ChannelForwarder(channels[i], output,ids[i],autoStart,waitOnStop); SetRunnables((SingleRunnable[])forwarders); } } public class ChannelDemultiplexer : SingleRunnable { private HybridDictionary dictionary; private IChannel input; public ChannelDemultiplexer(IChannel[] channels, int[] ids, IChannel input, bool autoStart, bool waitOnStop) : base(true,autoStart,waitOnStop) { this.input = input; int count = channels.Length; if(count != ids.Length) throw new ArgumentException("Channel and ID count mismatch.","ids"); dictionary = new HybridDictionary(count,true); for(int i=0;i<count;i++) dictionary.add(ids[i],channels[i]); } protected override void Run() { //NOTE: IChannel.Send is interrupt save and //automatically dumps the argument. while(running) { MessageEnvelope env = (MessageEnvelope)input.Receive(); IChannel channel = (IChannel)dictionary[env.ID]; channel.send(env.Message); } } }
6.3 通道事宜網關
通道事宜網關監聽指定的通道,在吸收到新聞時觸發一個事宜。這個類關於基於事宜的法式(例如GUI法式)很有效,或許在應用體系線程池(ThreadPool)來初始化輕量的線程。須要留意的是:應用 WinForms 的法式中你不克不及在事宜處置辦法中直接拜訪UI控件,只能挪用Invoke 辦法。由於事宜處置辦法是由事宜網關線程挪用的,而不是UI線程。
public class ChannelEventGateway : SingleRunnable { private IChannel source; public event MessageReceivedEventHandler MessageReceived; public ChannelEventGateway(IChannel source, bool autoStart, bool waitOnStop) : base(true,autoStart,waitOnStop) { this.source = source; } protected override void Run() { while(running) { object c = source.Receive(); MessageReceivedEventHandler handler = MessageReceived; if(handler != null) handler(this,new MessageReceivedEventArgs(c)); } } }
7. 比薩外賣店的例子
萬事俱備,只欠春風。我們曾經評論辯論了這個同步及新聞傳遞框架中的年夜部門主要的構造和技巧(本文沒有評論辯論框架中的其他類如Rendezvous及Barrier)。就像開首一樣,我們用一個例子來停止這篇文章。此次我們用一個小型比薩外賣店來做演示。下圖展現了這個例子:四個並行過程互相之間停止通信。圖中展現了新聞(數據)是若何應用跨過程通道在四個過程中活動的,且在每一個過程中應用了機能更佳的跨線程通道和信箱。
一開端,一個顧客點了一個比薩和一些飲料。他挪用了顧客(customer)接口的辦法,向顧客定單(CustomerOrders)通道發送了一個下單(Order)新聞。接單員,在顧客下單後,發送了兩條配餐指令(分離對應比薩和飲料)到廚師指令(CookInstruction)通道。同時他經由過程收銀(CashierOrder)通道將定單轉發給收銀台。收銀台從價錢中間獲得總價並將單子發給顧客,願望能進步收銀的速度 。與此同時,廚師將依據配餐指令將餐配好以後交給打包員工。打包員工處置好以後,期待顧客付款,然後將外賣遞給顧客。
為了運轉這個例子,翻開4個終端(cmd.exe),用 "PizzaDemo.exe cook" 啟動多個廚師過程(若干個都可以),用 "PizzaDemo.exe backend" 啟動後端過程,用 "PizzaDemo.exe facade" 啟動顧客接口門面(用你的法式稱號來取代 PizzaDemo )。留意:為了模仿真實情形,某些線程(例如廚師線程)會隨機休眠幾秒。按下回車鍵就會停滯和加入過程。假如你在過程正在處置數據的時刻加入,你將可以在內存轉存申報的開頭看到幾個未處置的新聞。在真實世界的法式外面,新聞普通都邑被轉存到磁盤中,以便下次可使用。
這個例子應用了上文中評論辯論過的幾個機制。好比說,收銀台應用一個通道復用器(ChannelMultiplexer)來監聽顧客的定單和付出通道,用了兩個信箱來完成價錢辦事。分發時應用了一個通道事宜網關(ChannelEventGateway),顧客在食品打包完成以後立時會收到告訴。你也能夠將這些法式注冊成 Windows NT 辦事運轉,也能夠長途登錄後運轉。
8. 總結
本文曾經評論辯論了C#中若何基於辦事的架構及完成跨過程同步和通信。然後,這個不是獨一的處理計劃。例如:在年夜項目中應用那末多的線程會引來嚴重的成績。這個框架中缺掉的是事務支撐及其他的通道/信箱完成(例如定名管道和TCP sockets)。這個框架中能夠也有很多缺乏的地方。