項目中需要對兩個不同格式的存儲設備進行數據轉錄,因為數據量非常大,所以時間非常緩慢;解決 方案是使用ReaderWriterSlim類建立一個共享的同步數據,可以支持一個線程讀取外部設備,向同步數據 寫入;多個線程從同步數據中讀取,轉換格式,然後寫入到本地設備。
本例中采用Queue<T>作為存放數據的集合,寫入線程向它的尾部寫入對象,讀取線程從它的頭 部獲取對象。
需要注意的是,由於Queue會拋棄已處理的對象,所以在同步數據隊列中無法驗證數據對象的唯一性, 被寫入的數據庫需要去掉唯一約束,或在寫入時向數據庫請求驗證。
首先定義一個讀寫接口:
namespace Common { public interface IReaderWriter<T> { T Read(int argument); void Write(int arugment, T instance); void Delete(int argument); void Clear(); } }
然後實現一個隊列的讀寫器:
namespace Common { public class QueueReaderWriter<T> : IReaderWriter<T> { private Queue<T> queues; public QueueReaderWriter() { queues = new Queue<T>(); } public QueueReaderWriter(int capacity) { queues = new Queue<T>(capacity); } #region IReadWrite<T> 成員 public T Read(int argument) { return queues.FirstOrDefault(); } public void Write(int arugment, T instance) { queues.Enqueue(instance); } public void Delete(int argument) { queues.Dequeue(); } public void Clear() { queues.Clear(); queues.TrimExcess(); } #endregion } }
使用ReaderWriterLockSlim實現同步數據類:
namespace Common { public class SynchronizedWriteData<T> : IDisposable { private ReaderWriterLockSlim _dataLock = new ReaderWriterLockSlim(); private IReaderWriter<T> _innerData; private SynchronizedWriteData() { } public SynchronizedWriteData(IReaderWriter<T> innerData) { _innerData = innerData; } public T Read() { _dataLock.EnterReadLock(); try { return _innerData.Read(0); } finally { _dataLock.ExitReadLock(); } } public T Read(int argument) { _dataLock.EnterReadLock(); try { return _innerData.Read(argument); } finally { _dataLock.ExitReadLock(); } } public void Add(T instance) { _dataLock.EnterWriteLock(); try { _innerData.Write(0, instance); } finally { _dataLock.ExitWriteLock(); } } public void Add(int argument, T instance) { _dataLock.EnterWriteLock(); try { _innerData.Write(argument, instance); } finally { _dataLock.ExitWriteLock(); } } public bool AddWithTimeout(T instance, int timeout) { if (_dataLock.TryEnterWriteLock(timeout)) { try { _innerData.Write(0, instance); } finally { _dataLock.ExitWriteLock(); } return true; } else { return false; } } public bool AddWithTimeout(int argument, T instance, int timeout) { if (_dataLock.TryEnterWriteLock(timeout)) { try { _innerData.Write(argument, instance); } finally { _dataLock.ExitWriteLock(); } return true; } else { return false; } } public void Delete() { _dataLock.EnterWriteLock(); try { _innerData.Delete(0); } finally { _dataLock.ExitWriteLock(); } } public void Delete(int argument) { _dataLock.EnterWriteLock(); try { _innerData.Delete(argument); } finally { _dataLock.ExitWriteLock(); } } #region IDisposable 成員 public void Dispose() { try { _dataLock.EnterWriteLock(); { try { _innerData.Clear(); } finally { _dataLock.ExitWriteLock(); } } } finally { _dataLock.Dispose(); } } #endregion } } namespace ExternalDataHandle { /// <summary> /// 從外部數據源獲取到內部數據源的適配器抽象類 /// </summary> /// <typeparam name="T">T 數據對象類型</typeparam> public abstract class ExternalDataAdapter<T> : IDisposable { /// <summary> /// 外部數據源連接字符串 /// </summary> protected abstract string ConnectString { get; } /// <summary> /// 提供初始化數據適配器的方法 /// </summary> protected abstract void Initialize(); /// <summary> /// 提供數據傳遞的方法 /// </summary> public abstract void Transmit(); /// <summary> /// 提供從外部數據設備讀取數據的方法 /// </summary> protected abstract void ReadFromExternalDevice(); /// <summary> /// 提供保存數據到內部設備的方法 /// </summary> protected abstract void SaveToInternalDevice(); #region IDisposable 成員 public abstract void Dispose(); #endregion } }
多線程數據轉錄類,本例只使用了一個讀取線程:
namespace ExternalDataHandle { /// <summary> /// 提供多線程方式從外部數據源獲取到內部數據源的適配器類 /// </summary> /// <typeparam name="T"></typeparam> public abstract class MultiThreadAdapter<T> : ExternalDataAdapter<T> { protected SynchronizedWriteData<T> _data; protected Thread _readThread; protected abstract override string ConnectString { get; } protected abstract override void Initialize(); public sealed override void Transmit() { _readThread = new Thread(new ThreadStart(ReadFromExternalDevice)); _readThread.Start(); Thread.Sleep(10000); while (_readThread.IsAlive) { SaveToInternalDevice(); } _readThread.Join(); } protected abstract override void ReadFromExternalDevice(); protected abstract override void SaveToInternalDevice(); public override void Dispose() { if (_data != null) { _data.Dispose(); } } } }