程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> .NET網頁編程 >> C# >> 關於C# >> 大量數據轉錄的多線程和同步處理實現

大量數據轉錄的多線程和同步處理實現

編輯:關於C#

項目中需要對兩個不同格式的存儲設備進行數據轉錄,因為數據量非常大,所以時間非常緩慢;解決 方案是使用ReaderWriterSlim類建立一個共享的同步數據,可以支持一個線程讀取外部設備,向同步數據 寫入;多個線程從同步數據中讀取,轉換格式,然後寫入到本地設備。

本例中采用Queue<T>作為存放數據的集合,寫入線程向它的尾部寫入對象,讀取線程從它的頭 部獲取對象。

需要注意的是,由於Queue會拋棄已處理的對象,所以在同步數據隊列中無法驗證數據對象的唯一性, 被寫入的數據庫需要去掉唯一約束,或在寫入時向數據庫請求驗證。

首先定義一個讀寫接口:

namespace Common
{
    public interface IReaderWriter<T>
    {
        T Read(int argument);
        void Write(int arugment, T instance);
        void Delete(int argument);
        void Clear();
    }
}

然後實現一個隊列的讀寫器:

namespace Common
{
    public class QueueReaderWriter<T> : IReaderWriter<T>
    {
        private Queue<T> queues;
        public QueueReaderWriter()
        {
            queues = new Queue<T>();
        }
        public QueueReaderWriter(int capacity)
        {
            queues = new Queue<T>(capacity);
        }
        #region IReadWrite<T> 成員
        public T Read(int argument)
        {
            return queues.FirstOrDefault();
        }
        public void Write(int arugment, T instance)
        {
            queues.Enqueue(instance);
        }
        public void Delete(int argument)
        {
            queues.Dequeue();
        }
        public void Clear()
        {
            queues.Clear();
            queues.TrimExcess();
        }
        #endregion
    }
}

使用ReaderWriterLockSlim實現同步數據類:

namespace Common
{
    public class SynchronizedWriteData<T> : IDisposable
    {
        private ReaderWriterLockSlim _dataLock = new ReaderWriterLockSlim();
        private IReaderWriter<T> _innerData;
        private SynchronizedWriteData()
        { }
        public SynchronizedWriteData(IReaderWriter<T> innerData)
        {
            _innerData = innerData;
        }
        public T Read()
        {
            _dataLock.EnterReadLock();
            try
            {
                return _innerData.Read(0);
            }
            finally
            {
                _dataLock.ExitReadLock();
            }
        }
        public T Read(int argument)
        {
            _dataLock.EnterReadLock();
            try
            {
                return _innerData.Read(argument);
            }
            finally
            {
                _dataLock.ExitReadLock();
            }
        }
        public void Add(T instance)
        {
            _dataLock.EnterWriteLock();
            try
            {
                _innerData.Write(0, instance);
            }
            finally
            {
                _dataLock.ExitWriteLock();
            }
        }
        public void Add(int argument, T instance)
        {
            _dataLock.EnterWriteLock();
            try
            {
                _innerData.Write(argument, instance);
            }
            finally
            {
                _dataLock.ExitWriteLock();
            }
        }
        public bool AddWithTimeout(T instance, int timeout)
        {
            if (_dataLock.TryEnterWriteLock(timeout))
            {
                try
                {
                    _innerData.Write(0, instance);
                }
                finally
                {
                    _dataLock.ExitWriteLock();
                }
                return true;
            }
            else
            {
                return false;
            }
        }
        public bool AddWithTimeout(int argument, T instance, int timeout)
        {
            if (_dataLock.TryEnterWriteLock(timeout))
            {
                try
                {
                    _innerData.Write(argument, instance);
                }
                finally
                {
                    _dataLock.ExitWriteLock();
                }
                return true;
            }
            else
            {
                return false;
            }
        }
        public void Delete()
        {
            _dataLock.EnterWriteLock();
            try
            {
                _innerData.Delete(0);
            }
            finally
            {
                _dataLock.ExitWriteLock();
            }
        }
        public void Delete(int argument)
        {
            _dataLock.EnterWriteLock();
            try
            {
                _innerData.Delete(argument);
            }
            finally
            {
                _dataLock.ExitWriteLock();
            }
        }
        #region IDisposable 成員
        public void Dispose()
        {
            try
            {
                _dataLock.EnterWriteLock();
                {
                    try
                    {
                        _innerData.Clear();
                    }
                    finally
                    {
                        _dataLock.ExitWriteLock();
                    }
                }
            }
            finally
            {
                _dataLock.Dispose();
            }
        }
        #endregion
    }
} 
namespace ExternalDataHandle
{
    /// <summary>
    /// 從外部數據源獲取到內部數據源的適配器抽象類
    /// </summary>
    /// <typeparam name="T">T 數據對象類型</typeparam>
    public abstract class ExternalDataAdapter<T> : IDisposable
    {
        /// <summary>
        /// 外部數據源連接字符串
        /// </summary>
        protected abstract string ConnectString { get; }
        /// <summary>
        /// 提供初始化數據適配器的方法
        /// </summary>
        protected abstract void Initialize();
        /// <summary>
        /// 提供數據傳遞的方法
        /// </summary>
        public abstract void Transmit();
        /// <summary>
        /// 提供從外部數據設備讀取數據的方法
        /// </summary>
        protected abstract void ReadFromExternalDevice();
        /// <summary>
        /// 提供保存數據到內部設備的方法
        /// </summary>
        protected abstract void SaveToInternalDevice();
        #region IDisposable 成員
        public abstract void Dispose();
        #endregion
    }
}

多線程數據轉錄類,本例只使用了一個讀取線程:

namespace ExternalDataHandle
{
    /// <summary>
    /// 提供多線程方式從外部數據源獲取到內部數據源的適配器類
    /// </summary>
    /// <typeparam name="T"></typeparam>
    public abstract class MultiThreadAdapter<T> : ExternalDataAdapter<T>
    {
        protected SynchronizedWriteData<T> _data;
        protected Thread _readThread;
        protected abstract override string ConnectString { get; }
        protected abstract override void Initialize();
        public sealed override void Transmit()
        {
            _readThread = new Thread(new ThreadStart(ReadFromExternalDevice));
            _readThread.Start();
            Thread.Sleep(10000);
            while (_readThread.IsAlive)
            {
                SaveToInternalDevice();
            }
            _readThread.Join();
        }
        protected abstract override void ReadFromExternalDevice();
        protected abstract override void SaveToInternalDevice();
        public override void Dispose()
        {
            if (_data != null)
            {
                _data.Dispose();
            }
        }
    }
}
  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved