本文提供Delphi一個基於原子操作的無鎖隊列,簡易高效。適用於多線程大吞吐量操作的隊列。
可用於Android系統和32,64位Windows系統。
感謝殲10和qsl提供了修改建議!
有如下問題:
1.必須實現開辟內存
2.隊列大小必須是2的冪
3.不能壓入空指針
unit utAtomFIFO; interface Uses SysUtils, SyncObjs; Type TAtomFIFO = Class Protected FWritePtr: Integer; FReadPtr: Integer; FCount:Integer; FHighBound:Integer; FisEmpty:Integer; FData: array of Pointer; function GetSize:Integer; Public procedure Push(Item: Pointer); function Pop: Pointer; Constructor Create(Size: Integer); Virtual; Destructor Destroy; Override; Procedure Empty; property Size: Integer read GetSize; property UsedCount:Integer read FCount; End; Implementation {$I InterlockedAPIs.inc} //創建隊列,大小必須是2的冪,需要開辟足夠大的隊列,防止隊列溢出 Constructor TAtomFIFO.Create(Size: Integer); var i:NativeInt; OK:Boolean; Begin Inherited Create; OK:=(Size and (Size-1)=0); if not OK then raise Exception.Create('FIFO長度必須大於等於256並為2的冪'); try SetLength(FData, Size); FHighBound:=Size-1; except Raise Exception.Create('FIFO申請內存失敗'); end; End; Destructor TAtomFIFO.Destroy; Begin SetLength(FData, 0); Inherited; End; procedure TAtomFIFO.Empty; begin while (InterlockedExchange(FReadPtr, 0)<>0) and (InterlockedExchange(FWritePtr, 0)<>0) and (InterlockedExchange(FCount, 0)<>0) do; end; function TAtomFIFO.GetSize: Integer; begin Result:=FHighBound+1; end; procedure TAtomFIFO.Push(Item:Pointer); var N:Integer; begin if Item=nil then Exit; N:=InterlockedIncrement(FWritePtr) and FHighBound; FData[N]:=Item; InterlockedIncrement(FCount); end; Function TAtomFIFO.Pop:Pointer; var N:Integer; begin if InterlockedDecrement(FCount)<0 then begin InterlockedIncrement(FCount); Result:=nil; end else begin N:=InterlockedIncrement(FReadPtr) and FHighBound; while FData[N]=nil do Sleep(1); Result:=FData[N]; FData[N]:=nil; end; end; End.
InterlockedAPIs.inc
{*******************************************************} { } { CodeGear Delphi Runtime Library } { } { Copyright(c) 1995-2014 Embarcadero Technologies, Inc. } { } {*******************************************************} {$IFDEF CPUX86} function InterlockedAdd(var Addend: Integer; Increment: Integer): Integer; asm MOV ECX,EAX MOV EAX,EDX LOCK XADD [ECX],EAX ADD EAX,EDX end; function InterlockedCompareExchange(var Target: Integer; Exchange: Integer; Comparand: Integer): Integer; asm XCHG EAX,ECX LOCK CMPXCHG [ECX],EDX end; function InterlockedCompareExchangePointer(var Target: Pointer; Exchange: Pointer; Comparand: Pointer): Pointer; asm JMP InterlockedCompareExchange end; function InterlockedDecrement(var Addend: Integer): Integer; asm MOV EDX,-1 JMP InterlockedAdd end; function InterlockedExchange(var Target: Integer; Value: Integer): Integer; asm MOV ECX,EAX MOV EAX,[ECX] @@loop: LOCK CMPXCHG [ECX],EDX JNZ @@loop end; function InterlockedExchangePointer(var Target: Pointer; Value: Pointer): Pointer; asm JMP InterlockedExchange end; function InterlockedIncrement(var Addend: Integer): Integer; asm MOV EDX,1 JMP InterlockedAdd end; {$ENDIF CPUX86} {$IFDEF CPUX64} function InterlockedExchangeAdd(var Addend: Integer; Value: Integer): Integer; asm .NOFRAME MOV EAX,EDX LOCK XADD [RCX].Integer,EAX end; function InterlockedDecrement(var Addend: LongInt): LongInt; asm .NOFRAME MOV EAX,-1 LOCK XADD [RCX].Integer,EAX DEC EAX end; function InterlockedIncrement(var Addend: LongInt): LongInt; asm MOV EAX,1 LOCK XADD [RCX].Integer,EAX INC EAX end; function InterlockedCompareExchange(var Destination: Integer; Exchange: Integer; Comparand: Integer): Integer; asm .NOFRAME MOV EAX,R8d LOCK CMPXCHG [RCX].Integer,EDX end; function InterlockedCompareExchange64(var Destination: Int64; Exchange: Int64; Comparand: Int64): Int64; overload; asm .NOFRAME MOV RAX,R8 LOCK CMPXCHG [RCX],RDX end; function InterlockedCompareExchangePointer(var Destination: Pointer; Exchange: Pointer; Comparand: Pointer): Pointer; asm .NOFRAME MOV RAX,R8 LOCK CMPXCHG [RCX],RDX end; function InterlockedExchangePointer(var Target: Pointer; Value: Pointer): Pointer; asm .NOFRAME LOCK XCHG [RCX],RDX MOV RAX,RDX end; function InterlockedExchange(var Target: Integer; Value: Integer): Integer;// inline; asm .NOFRAME LOCK XCHG [RCX],EDX MOV EAX,EDX end; {$ENDIF CPUX64} {$IFDEF CPUARM} function InterlockedAdd(var Addend: Integer; Increment: Integer): Integer; begin Result := AtomicIncrement(Addend, Increment); end; function InterlockedCompareExchange(var Target: Integer; Exchange: Integer; Comparand: Integer): Integer; begin Result := AtomicCmpExchange(Target, Exchange, Comparand); end; function InterlockedCompareExchangePointer(var Target: Pointer; Exchange: Pointer; Comparand: Pointer): Pointer; begin Result := AtomicCmpExchange(Target, Exchange, Comparand); end; function InterlockedDecrement(var Addend: Integer): Integer; begin Result := AtomicDecrement(Addend); end; function InterlockedExchange(var Target: Integer; Value: Integer): Integer; begin Result := AtomicExchange(Target, Value); end; function InterlockedExchangePointer(var Target: Pointer; Value: Pointer): Pointer; begin Result := AtomicExchange(Target, Value); end; function InterlockedIncrement(var Addend: Integer): Integer; begin Result := AtomicIncrement(Addend); end; {$ENDIF CPUARM}
性能測試:
采用天地弦提供的評估程序,進行了一些修改,分別對使用不同的臨界區的隊列進行對比結果如下:
其中Swith是因隊列讀空,進行線程上下文切換的次數