disruptor 用戶封裝自己的消費者,把消費者注入到消費者容器,消費者容器實現自動創建 緩存隊列,生產者;
文中用到的 disruptor C#移植源代碼
https://github.com/bingyang001/disruptor-net-3.3.0-alpha
作者博客 http://www.cnblogs.com/liguo/p/3296166.html
消費者容器:
/// <summary>
/// 消費者管理器
/// </summary>
/// <typeparam name="TProduct">產品</typeparam>
public class Workers<TProduct> where TProduct : Producer<TProduct>, new()
{
private readonly WorkerPool<TProduct> _workerPool;
public Workers(List<IWorkHandler<TProduct>> handers, IWaitStrategy waitStrategy = null, int bufferSize = 1024*64)
{
if (handers == null || handers.Count == 0)
throw new ArgumentNullException("消費事件處理數組為空!");
if (handers.Count == 1)
_ringBuffer = RingBuffer<TProduct>.CreateSingleProducer(() => new TProduct(), bufferSize,
waitStrategy ?? new YieldingWaitStrategy());
else
{
_ringBuffer = RingBuffer<TProduct>.CreateMultiProducer(() => new TProduct(), bufferSize,
waitStrategy ?? new YieldingWaitStrategy());
}
_workerPool = new WorkerPool<TProduct>(_ringBuffer
, _ringBuffer.NewBarrier()
, new FatalExceptionHandler()
, handers.ToArray());
_ringBuffer.AddGatingSequences(_workerPool.getWorkerSequences());
}
public void Start()
{
_workerPool.start(TaskScheduler.Default);
}
public Producer<TProduct> CreateOneProducer()
{
return new Producer<TProduct>(this._ringBuffer);
}
public void DrainAndHalt()
{
_workerPool.drainAndHalt();
}
private readonly RingBuffer<TProduct> _ringBuffer;
}
生產者(產品): 所有的產品都應該繼承自生產者
/// <summary>
/// 生產者對象
/// </summary>
/// <typeparam name="TProduct">產品類型</typeparam>
public class Producer<TProduct> where TProduct:Producer<TProduct>
{
long _sequence;
private RingBuffer<TProduct> _ringBuffer;
public Producer()
{
}
public Producer(RingBuffer<TProduct> ringBuffer )
{
_ringBuffer = ringBuffer;
}
/// <summary>
/// 獲取可修改的產品
/// </summary>
/// <returns></returns>
public Producer<TProduct> Enqueue()
{
long sequence = _ringBuffer.Next();
Producer<TProduct> producer = _ringBuffer[sequence];
producer._sequence = sequence;
if (producer._ringBuffer == null)
producer._ringBuffer = _ringBuffer;
return producer;
}
/// <summary>
/// 提交產品修改
/// </summary>
public void Commit()
{
_ringBuffer.Publish(_sequence);
}
}
--------------------------------------------------------
以上就實現了,測試代碼
先創建 產品對象:
/// <summary>
/// 產品/繼承生產者
/// </summary>
public class Product : Producer<Product>
{
//產品包含的屬下隨便定義,無要求,只需要繼承自生產者就行了
public long Value { get; set; }
public string Guid { get; set; }
}
創建消費者對象
/// <summary>
/// 消費處理對象
/// </summary>
public class WorkHandler : IWorkHandler<Product>
{
public void OnEvent(Product @event)
{
//Test是測試對象數據准確(數據重復或者丟失數據)
Test.UpdateCacheByOut(@event.Guid);
//收到產品,在這裡寫處理代碼
}
}
測試代碼:
可創建1個或者多個的生產者對象,消費者處理對象;不一定太多,多不一定快; 建議生產者創建一個就行了,多線程操作一個生產者對象; 消費者對象可以根據實際情況創建多少個;
//創建2個消費者,2個生產者, 2個消費者表示,框架會有2個線程去處理消費產品
Workers<Product> workers = new Workers<Product>( new List<IWorkHandler<Product>>() {new WorkHandler(), new WorkHandler()}); Producer<Product> producerWorkers = workers.CreateOneProducer(); Producer<Product> producerWorkers1 = workers.CreateOneProducer();
//開始消費
workers.Start();
產品生產:
可以在任何引用生產者的地方,把產品放進隊列中. 這裡 放入隊列的方法和平時不太一樣. 這裡采用的是,從隊列裡面拿去一個位置,然後把產品放進去; 具體的做法 ,找生產者,獲取一個產品對象,然後修改產品屬性,最後提交修改.
var obj = producer.Enqueue();
//修改產品屬性
obj.Commit();
以上是關鍵代碼:
完整的測試類 : 包含測試數據正確性, 性能,在不校驗正確性的時候,每秒ops 1千萬左右.
class Test
{
public static long PrePkgInCount = 0;
public static long PrePkgOutCount = 0;
public static long PkgInCount = 0;
public static long PkgOutCount = 0;
static ConcurrentDictionary<string, string> InCache = new ConcurrentDictionary<string, string>();
static ConcurrentDictionary<string, string> OutCache = new ConcurrentDictionary<string, string>();
private static long Seconds;
static void Main(string[] args)
{
Workers<Product> workers = new Workers<Product>(
new List<IWorkHandler<Product>>() {new WorkHandler(), new WorkHandler()});
Producer<Product> producerWorkers = workers.CreateOneProducer();
Producer<Product> producerWorkers1 = workers.CreateOneProducer();
workers.Start();
Task.Run(delegate
{
while (true)
{
Thread.Sleep(1000);
Seconds++;
long intemp = PkgInCount;
long outemp = PkgOutCount;
Console.WriteLine(
$"In ops={intemp - PrePkgInCount},out ops={outemp - PrePkgOutCount},inCacheCount={InCache.Count},OutCacheCount={OutCache.Count},RunningTime={Seconds}");
PrePkgInCount = intemp;
PrePkgOutCount = outemp;
}
});
Task.Run(delegate { Run(producerWorkers); });
Task.Run(delegate { Run(producerWorkers); });
Task.Run(delegate { Run(producerWorkers1); });
Console.Read();
}
public static void Run(Producer<Product> producer)
{
for (int i = 0; i < int.MaxValue; i++)
{
var obj = producer.Enqueue();
CheckRelease(obj as Product);
obj.Commit();
}
}
public static void CheckRelease(Product publisher)
{
Interlocked.Increment(ref PkgInCount);
return; //不檢查正確性
publisher.Guid = Guid.NewGuid().ToString();
InCache.TryAdd(publisher.Guid, string.Empty);
}
public static void UpdateCacheByOut(string guid)
{
Interlocked.Increment(ref Test.PkgOutCount);
if (guid != null)
if (InCache.ContainsKey(guid))
{
string str;
InCache.TryRemove(guid, out str);
}
else
{
OutCache.TryAdd(guid, string.Empty);
}
}
/// <summary>
/// 產品/繼承生產者
/// </summary>
public class Product : Producer<Product>
{
//產品包含的屬下隨便定義,無要求,只需要繼承自生產者就行了
public long Value { get; set; }
public string Guid { get; set; }
}
/// <summary>
/// 消費處理對象
/// </summary>
public class WorkHandler : IWorkHandler<Product>
{
public void OnEvent(Product @event)
{
Test.UpdateCacheByOut(@event.Guid);
//收到產品,在這裡寫處理代碼
}
}
}