開發過程當中常常會碰到這樣的場景:須要從一個地方獲取一些數據,而後處理數據並將其保存在數據庫中。安全
1
2
3
4
5
6
7
8
9
10
|
private
void
FetchData() {}
private
void
SaveData() {}
static
void
Main(
string
[] args)
{
for
(
int
i = 0; i < 10; i++)
{
FetchData();
// 獲取數據
SaveData();
// 處理並保存
}
}
|
例如上述代碼例子這樣順序執行,執行會很慢,緣由是獲取數據和處理並保存的過程均可能致使阻塞,然而FetchData()每次取數據並不須要等待上一條數據保存完成再進行。post
這樣的場景很是適合用生產者消費者隊列:生產者就是FetchData(),用來生產數據;消費者SaveData(),用來消費數據。測試
舉個實際例子,咱們須要經過一個Web Api獲取一些城市的天氣狀況,並將其保存到數據庫中。this
實現方式:url
- 須要一個任務隊列,生產者能夠向隊列中插入任務,消費者能夠從任務隊列中取出任務來執行。
- 爲保證線程安全,使用一個鎖來保護這個隊列的訪問。
- 制定一個退出策略,在全部任務完成時釋放資源。
下邊是實現的完整代碼:spa
class Program { // 任務隊列 static Queue<string> _tasks = new Queue<string>(); // 爲保證線程安全,使用一個鎖來保護_task的訪問 readonly static object _locker = new object(); // 經過 _wh 給工做線程發信號 static EventWaitHandle _wh = new AutoResetEvent(false); static Thread _worker; static void Main(string[] args) { // 須要獲取天氣狀況的城市對應代碼 var cityIds = new List<int> {101280601, 101010100, 101020100, 101110101, 101040100}; // 任務開始,啓動工做線程 _worker = new Thread(Work); _worker.Start(); // 生產者將數據插入隊裏中,並給工做線程發信號 foreach (var cityId in cityIds) EnqueueTask(FetchData(cityId)); // 任務結束 Dispose(); } /// <summary>執行工做</summary> static void Work() { while (true) { string work = null; lock (_locker) { if (_tasks.Count > 0) { work = _tasks.Dequeue(); // 有任務時,出列任務 if (work == null) // 退出機制:當碰見一個null任務時,表明任務結束 return; } } if (work != null) SaveData(work); // 任務不爲null時,處理並保存數據 else _wh.WaitOne(); // 沒有任務了,等待信號 } } /// <summary>插入任務</summary> static void EnqueueTask(string task) { lock (_locker) _tasks.Enqueue(task); // 向隊列中插入任務 _wh.Set(); // 給工做線程發信號 } /// <summary>結束釋放</summary> static void Dispose() { EnqueueTask(null); // 插入一個Null任務,通知工做線程退出 _worker.Join(); // 等待工做線程完成 _wh.Close(); // 釋放資源 } /// <summary>獲取數據</summary> static string FetchData(int cityId) { var wc = new WebClient { Encoding = Encoding.UTF8 }; var url = string.Format("http://www.weather.com.cn/adat/sk/{0}.html", cityId); return wc.DownloadString(url); } /// <summary>處理保存</summary> static void SaveData(string data) { var weatherInfo = (JsonConvert.DeserializeObject(data, typeof(Dictionary<string, Weatherinfo>)) as Dictionary<string, Weatherinfo>)["weatherinfo"]; Console.WriteLine("[{0}]:{1} 氣溫({2}) 風向({3}) 風力({4})", weatherInfo.Time, weatherInfo.City, weatherInfo.Temp, weatherInfo.Wd, weatherInfo.Ws); Thread.Sleep(200); // 模擬數據保存 } } public class Weatherinfo { public string City { get; set; } public string Temp { get; set; } public string Time { get; set; } public string Wd { get; set; } public string Ws { get; set; } } }
解釋:線程
- Main方法中,咱們首先啓動了一個工做線程,因爲此時隊列中沒有任務,所以工做線程在等待信號。
- 經過EnqueueTask向隊列中插入任務,並經過等待句柄_wh發信號給工做線程,工做線程收到信號後就開始執行處理保存。
- 當生產者獲取完全部數據時,插入null任務,並等待工做線程完成。工做線程最後執行到null任務時退出。