微軟對C#(4.0)的框架添加了全新的並發編程框架,現在我們也能用C#開發支持並發概念的程序的。在並發編程中最讓人煩惱的應該就是如何數據同步:避免髒讀和髒寫,當然我們可以通過Lock技術來實現,也可以使用微軟提供給我們的並發集合,這些集合都提供了TryDo方法。用它們對數據的讀/寫操作能在TryDo返回True的情況下執行。我們來看看它們吧:
IProducerConsumerCollection
所有的並發集合都實現了這個接口,TryAdd和TryTake分別在讀和寫的時候判斷是否能正常進行,不行則返回false。
[csharp]
public interface IProducerConsumerCollection<T> : IEnumerable<T>, ICollection, IEnumerable
{
void CopyTo(T[] array, int index);
T[] ToArray();
bool TryAdd(T item);
bool TryTake(out T item);
}
public interface IProducerConsumerCollection<T> : IEnumerable<T>, ICollection, IEnumerable
{
void CopyTo(T[] array, int index);
T[] ToArray();
bool TryAdd(T item);
bool TryTake(out T item);
}
ConcurrentQueue
並發隊列,隊列類型的數據結構。
[csharp]
public static void ConcurrentQueueTest()
{
Parallel.For<Queue<string>>(1, 10000,
() =>
{
while (strNornalQueue.Count != 26)
{
if (strNornalQueue.Count == 0)
{
for (int i = 65; i <= 90; i++)
{
strNornalQueue.Enqueue(Convert.ToChar(i).ToString());
}
}
}
Console.WriteLine("-------------start------------");
return strNornalQueue;
},
(index, state, head) =>
{
string tmp = string.Empty;
tmp = head.Dequeue();
Console.WriteLine(string.Format("The element '{0}' was set by thread {1}", tmp, System.Threading.Thread.CurrentThread.ManagedThreadId));
head.Enqueue(tmp);
return strNornalQueue;
},
(result) =>
{
Console.WriteLine("-------------end------------------");
});
Console.WriteLine(string.Format("current collection has {0} elements, Has duplicate data:{1}", strNornalQueue.Count, IsDuplicate<string>(strNornalQueue.GetEnumerator())));
foreach (string item in strNornalQueue)
{
Console.WriteLine(item);
}
}
public static void ConcurrentQueueTest()
{
Parallel.For<Queue<string>>(1, 10000,
() =>
{
while (strNornalQueue.Count != 26)
{
if (strNornalQueue.Count == 0)
{
for (int i = 65; i <= 90; i++)
{
strNornalQueue.Enqueue(Convert.ToChar(i).ToString());
}
}
}
Console.WriteLine("-------------start------------");
return strNornalQueue;
},
(index, state, head) =>
{
string tmp = string.Empty;
tmp = head.Dequeue();
Console.WriteLine(string.Format("The element '{0}' was set by thread {1}", tmp, System.Threading.Thread.CurrentThread.ManagedThreadId));
head.Enqueue(tmp);
return strNornalQueue;
},
(result) =>
{
Console.WriteLine("-------------end------------------");
});
Console.WriteLine(string.Format("current collection has {0} elements, Has duplicate data:{1}", strNornalQueue.Count, IsDuplicate<string>(strNornalQueue.GetEnumerator())));
foreach (string item in strNornalQueue)
{
Console.WriteLine(item);
}
}
ConcurrentStack
並發棧,棧類型的數據結構。
[csharp]
public static void ConcurrentStackTest()
{
Parallel.For<ConcurrentStack<string>>(1, 10000,
() =>
{
while (strStack.Count != 26)
{
if (strStack.Count == 0)
{
for (int i = 65; i <= 90; i++)
{
strStack.Push(Convert.ToChar(i).ToString());
}
}
}
Console.WriteLine("-------------start------------");
return strStack;
},
(index, state, head) =>
{
string tmp = string.Empty;
if (head.TryPop(out tmp))
{
Console.WriteLine(string.Format("The element '{0}' was set by thread {1}", tmp, System.Threading.Thread.CurrentThread.ManagedThreadId));
head.Push(tmp);
}
else
{
Console.WriteLine("queue is buzy now");
}
return strStack;
},
(result) =>
{
Console.WriteLine("-------------end------------------");
});
Console.WriteLine(string.Format("current collection has {0} elements, Has duplicate data:{1}", strStack.Count, IsDuplicate<string>(strStack.GetEnumerator())));
foreach (string item in strStack)
{
Console.WriteLine(item);
}
}
public static void ConcurrentStackTest()
{
Parallel.For<ConcurrentStack<string>>(1, 10000,
() =>
{
while (strStack.Count != 26)
{
if (strStack.Count == 0)
{
for (int i = 65; i <= 90; i++)
{
strStack.Push(Convert.ToChar(i).ToString());
}
}
}
Console.WriteLine("-------------start------------");
return strStack;
},
(index, state, head) =>
{
string tmp = string.Empty;
if (head.TryPop(out tmp))
{
Console.WriteLine(string.Format("The element '{0}' was set by thread {1}", tmp, System.Threading.Thread.CurrentThread.ManagedThreadId));
head.Push(tmp);
}
else
{
Console.WriteLine("queue is buzy now");
}
return strStack;
},
(result) =>
{
Console.WriteLine("-------------end------------------");
});
Console.WriteLine(string.Format("current collection has {0} elements, Has duplicate data:{1}", strStack.Count, IsDuplicate<string>(strStack.GetEnumerator())));
foreach (string item in strStack)
{
Console.WriteLine(item);
}
}
ConcurrentDictionary
並發字典,字典類型的數據結構。
[csharp]
public static void ConcurrentDictionary()
{
for (int i = 65; i <= 90; i++)
{
strDictionary.TryAdd(Convert.ToChar(i).ToString(), Convert.ToChar(i).ToString());
}
Parallel.For<ConcurrentDictionary<string,string>>(1, 10000,
() =>
{
Console.WriteLine("-------------start------------");
return strDictionary;
},
(index, state, head) =>
{
string tmp = string.Empty;
if (head.TryRemove(Convert.ToChar(new Random().Next(65,90)).ToString(), out tmp))
{
Console.WriteLine(string.Format("The element '{0}' was set by thread {1}", tmp, System.Threading.Thread.CurrentThread.ManagedThreadId));
head.TryAdd(tmp, tmp);
}
else
{
Console.WriteLine("queue is buzy now");
}
return strDictionary;
},
(result) =>
{
Console.WriteLine("-------------end------------------");
});
}
public static void ConcurrentDictionary()
{
for (int i = 65; i <= 90; i++)
{
strDictionary.TryAdd(Convert.ToChar(i).ToString(), Convert.ToChar(i).ToString());
}
Parallel.For<ConcurrentDictionary<string,string>>(1, 10000,
() =>
{
Console.WriteLine("-------------start------------");
return strDictionary;
},
(index, state, head) =>
{
string tmp = string.Empty;
if (head.TryRemove(Convert.ToChar(new Random().Next(65,90)).ToString(), out tmp))
{
Console.WriteLine(string.Format("The element '{0}' was set by thread {1}", tmp, System.Threading.Thread.CurrentThread.ManagedThreadId));
head.TryAdd(tmp, tmp);
}
else
{
Console.WriteLine("queue is buzy now");
}
return strDictionary;
},
(result) =>
{
Console.WriteLine("-------------end------------------");
});
}
ConcurrentBag
類似堆棧的數據結構。
[csharp]
public static void ConcurrentBag()
{
for (int i = 65; i <= 90; i++)
{
strBag.Add(Convert.ToChar(i).ToString());
}
Parallel.For<ConcurrentBag<string>>(1, 10000,
() =>
{
return strBag;
},
(index, state, head) =>
{
string tmp = string.Empty;
if (strBag.TryTake(out tmp))
{
Console.WriteLine(string.Format("The element '{0}' was set by thread {1}", tmp, System.Threading.Thread.CurrentThread.ManagedThreadId));
strBag.Add(tmp);
}
else
{
Console.WriteLine("queue is buzy now");
}
return strBag;
},
(result) =>
{
Console.WriteLine("-------------end------------------");
});
Console.WriteLine(string.Format("current collection has {0} elements, Has duplicate data:{1}", strBag.Count, IsDuplicate<string>(strBag.GetEnumerator())));
foreach (string item in strBag)
{
Console.WriteLine(item);
}
}
public static void ConcurrentBag()
{
for (int i = 65; i <= 90; i++)
{
strBag.Add(Convert.ToChar(i).ToString());
}
Parallel.For<ConcurrentBag<string>>(1, 10000,
() =>
{
return strBag;
},
(index, state, head) =>
{
string tmp = string.Empty;
if (strBag.TryTake(out tmp))
{
Console.WriteLine(string.Format("The element '{0}' was set by thread {1}", tmp, System.Threading.Thread.CurrentThread.ManagedThreadId));
strBag.Add(tmp);
}
else
{
Console.WriteLine("queue is buzy now");
}
return strBag;
},
(result) =>
{
Console.WriteLine("-------------end------------------");
});
Console.WriteLine(string.Format("current collection has {0} elements, Has duplicate data:{1}", strBag.Count, IsDuplicate<string>(strBag.GetEnumerator())));
foreach (string item in strBag)
{
Console.WriteLine(item);
}
}
BlockingCollection
並發集合,在程序操作完之前會一直阻塞其他程序對其進行操作。
[csharp]
public static void BlockingCollectionTest()
{
Parallel.For<BlockingCollection<string>>(1, 10000,
() =>
{
while (strBlockCollection.Count != 26)
{
if (strBlockCollection.Count == 0)
{
for (int i = 65; i <= 90; i++)
{
strBlockCollection.Add(Convert.ToChar(i).ToString());
}
}
}
Console.WriteLine("-------------start------------");
return strBlockCollection;
},
(index, state, head) =>
{
string tmp = string.Empty;
tmp=head.Take();
Console.WriteLine(string.Format("The element '{0}' was set by thread {1}", tmp, System.Threading.Thread.CurrentThread.ManagedThreadId));
head.Add(tmp);
return strBlockCollection;
},
(result) =>
{
Console.WriteLine("-------------end------------------");
});
Console.WriteLine(string.Format("current collection has {0} elements, Has duplicate data:{1}", strBlockCollection.Count, IsDuplicate<string>(strBlockCollection.AsEnumerable().GetEnumerator())));
foreach (string item in strBlockCollection)
{
Console.WriteLine(item);
}
}
public static void BlockingCollectionTest()
{
Parallel.For<BlockingCollection<string>>(1, 10000,
() =>
{
while (strBlockCollection.Count != 26)
{
if (strBlockCollection.Count == 0)
{
for (int i = 65; i <= 90; i++)
{
strBlockCollection.Add(Convert.ToChar(i).ToString());
}
}
}
Console.WriteLine("-------------start------------");
return strBlockCollection;
},
(index, state, head) =>
{
string tmp = string.Empty;
tmp=head.Take();
Console.WriteLine(string.Format("The element '{0}' was set by thread {1}", tmp, System.Threading.Thread.CurrentThread.ManagedThreadId));
head.Add(tmp);
return strBlockCollection;
},
(result) =>
{
Console.WriteLine("-------------end------------------");
});
Console.WriteLine(string.Format("current collection has {0} elements, Has duplicate data:{1}", strBlockCollection.Count, IsDuplicate<string>(strBlockCollection.AsEnumerable().GetEnumerator())));
foreach (string item in strBlockCollection)
{
Console.WriteLine(item);
}
}