開發過程中經常會碰到這樣的場景:需要從一個地方獲取一些數據,然後處理數據並將其保存在數據庫中。
private void FetchData() {} private void SaveData() {} static void Main(string[] args) { for (int i = 0; i < 10; i++) { FetchData(); // 獲取數據 SaveData(); // 處理並保存 } }
例如上述代碼例子這樣順序執行,執行會很慢,原因是獲取數據和處理並保存的過程都可能導致阻塞,然而FetchData()每次取數據並不需要等待上一條數據保存完成再進行。
這樣的場景非常適合用生產者消費者隊列:生產者就是FetchData(),用來生產數據;消費者SaveData(),用來消費數據。
舉個實際例子,我們需要通過一個Web Api獲取一些城市的天氣情況,並將其保存到數據庫中。
實現方式:
下邊是實現的完整代碼:
class Program { // 任務隊列 static Queue<string> _tasks = new Queue<string>(); // 為保證線程安全,使用一個鎖來保護_task的訪問 readonly static object _locker = new object(); // 通過 _wh 給工作線程發信號 static EventWaitHandle _wh = new AutoResetEvent(false); static Thread _worker; static void Main(string[] args) { // 需要獲取天氣情況的城市對應代碼 var cityIds = new List<int> {101280601, 101010100, 101020100, 101110101, 101040100}; // 任務開始,啟動工作線程 _worker = new Thread(Work); _worker.Start(); // 生產者將數據插入隊裡中,並給工作線程發信號 foreach (var cityId in cityIds) EnqueueTask(FetchData(cityId)); // 任務結束 Dispose(); } /// <summary>執行工作</summary> static void Work() { while (true) { string work = null; lock (_locker) { if (_tasks.Count > 0) { work = _tasks.Dequeue(); // 有任務時,出列任務 if (work == null) // 退出機制:當遇見一個null任務時,代表任務結束 return; } } if (work != null) SaveData(work); // 任務不為null時,處理並保存數據 else _wh.WaitOne(); // 沒有任務了,等待信號 } } /// <summary>插入任務</summary> static void EnqueueTask(string task) { lock (_locker) _tasks.Enqueue(task); // 向隊列中插入任務 _wh.Set(); // 給工作線程發信號 } /// <summary>結束釋放</summary> static void Dispose() { EnqueueTask(null); // 插入一個Null任務,通知工作線程退出 _worker.Join(); // 等待工作線程完成 _wh.Close(); // 釋放資源 } /// <summary>獲取數據</summary> static string FetchData(int cityId) { var wc = new WebClient { Encoding = Encoding.UTF8 }; var url = string.Format("http://www.weather.com.cn/adat/sk/{0}.html", cityId); return wc.DownloadString(url); } /// <summary>處理保存</summary> static void SaveData(string data) { var weatherInfo = (JsonConvert.DeserializeObject(data, typeof(Dictionary<string, Weatherinfo>)) as Dictionary<string, Weatherinfo>)["weatherinfo"]; Console.WriteLine("[{0}]:{1} 氣溫({2}) 風向({3}) 風力({4})", weatherInfo.Time, weatherInfo.City, weatherInfo.Temp, weatherInfo.Wd, weatherInfo.Ws); Thread.Sleep(200); // 模擬數據保存 } } public class Weatherinfo { public string City { get; set; } public string Temp { get; set; } public string Time { get; set; } public string Wd { get; set; } public string Ws { get; set; } } }
解釋:
參考:Threading in C# --> 中文翻譯