最近想在DIOCP中加入任務調度線程,DIOCP的工作線程作為生產者(producer)將接受到的數據對象,投遞到任務調度線程中,然後統一進行分配。然而這一切都需要一個隊列, 這幾天都在關注無鎖隊列。
首先是一個隊列,簡單的隊列就是,生產者把數據壓入隊列(push), 消費者通過隊列Pop出數據進行處理。
簡單的隊列就是提供Push,和Pop函數。
我們用一個鏈表來存儲數據。Head ->data01->data02...data_n->Tail, 每個數據塊的結構如下
type PVarQueueBlock = ^TVarQueueBlock; TVarQueueBlock = packed record value:Variant; next:PVarQueueBlock; end;
1.在進行Push壓入數據時壓入將Tail.next指向新壓入的數據塊, 然後用新的數據塊做Tail
procedure TSimpleQueue.pushQueue(pvData: PVarQueueBlock); begin if FHead = nil then begin FHead = pvData; FTail := FHead; end else begin FTail.next := pvData; FTail := pvData; end; Inc(FCount); end;
2.在進行Pop數據時把Head數據塊取出,然後用Head數據塊指向的下一塊當作Head.
function TSimpleQueue.popQueue: PVarQueueBlock; var lvTemp, lvRet:PVarQueueBlock; begin lvTemp := FHead; if (lvTemp = nil) then begin //沒有任何可以Pop出的值 Result := nil; exit; end; // FHead := FHead.next; Dec(FCount); Result :=lvTemp; end;
上面就是簡單的隊列
上面的實現的隊列在多線程情況下是不安全的。如果要在多並發下隊列要進行加鎖,在push和pop時加鎖也是一種辦法。可以直接用臨界就可以了,但是我們要做的是無鎖隊列
首先記住多並發設計規則:決不要假設任何代碼會連續執行
上面的push操作
FTail.next := pvData; FTail := pvData;
也許執行了FTail.next:=pvData後,會被另外的線程搶走,然後FTail進行了新的賦值,這樣在進行FTail := pvData;這樣整個數據鏈條就會被破壞。
如果這兩行我們能一次行完成,這樣就可以實現無鎖操作了,這樣我們需要引入原子操作.Interlocked中的函數。
說無鎖其實不太確切,只是鎖的粒度小了。我們是使用api的InterlockedCompareExchange函數來實現的。
查一下MSDN
http://msdn.microsoft.com/en-us/library/windows/desktop/ms683560(v=vs.85).aspx
LONG __cdecl InterlockedCompareExchange( _Inout_ LONG volatile *Destination, _In_ LONG Exchange, _In_ LONG Comparand );
A pointer to the destination value.
The exchange value.
The value to compare to Destination.
The function returns the initial value of the Destination parameter.
大概解釋一下。這個函數是比較後進行交換。第一個參數是要存放目的的數據,第二個是交換數據,第三個是比較數據(與第一個比較), 如果交換返回的數據和第三個參數一樣。
這樣就可以在push和pop一步完成。
這裡貼出push的pop操作
procedure TVarQueue.pushQueue(pvData: PVarQueueBlock); var lvTemp:PVarQueueBlock; lvPointer:Pointer; begin while True do begin lvTemp := FTail; while lvTemp.next <> nil do lvTemp := lvTemp.next; if InterlockedCompareExchangePointer(Pointer(lvTemp.next), Pointer(pvData), nil) = nil then begin break; end; end; FTail := pvData; Inc(FCount); end;
function TVarQueue.popQueue: PVarQueueBlock; var lvTemp, lvRet:PVarQueueBlock; lvPointer:Pointer; begin ///為了方便 隊列中始終保留一個FHead數據塊 /// 也就是說FHead指向的下一個數據塊才是第一個數據塊 /// while True do begin lvTemp := FHead; if (lvTemp = nil) or (lvTemp.next = nil) then begin //沒有任何可以Pop出的值 Result := nil; exit; end; if InterlockedCompareExchangePointer(Pointer(FHead), lvTemp.next, lvTemp) = lvTemp then begin break; end; end; Dec(FCount); lvRet := lvTemp.next; Result := lvRet; lvTemp.next := nil; Dispose(lvTemp); //返回的是head.next end;
後續我會上傳完整的代碼到DIOCP項目中。
如有漏洞,敬請指出。歡迎假如DIOCP群討論