Concurrent.txt 20191106 System.Collections.Concurrent Namespace The System.Collections.Concurrent namespace provides several thread-safe collection classes that should be used in place of the corresponding types in the System.Collections and System.Collections.Generic namespaces whenever multiple threads are accessing the collection concurrently. However, access to elements of a collection object through extension methods or through explicit interface implementations are not guaranteed to be thread-safe and may need to be synchronized by the caller. System.Collections.Concurrent Namespace 下, 提供了幾個可替代 Collections 的多執行緒版本. 使用時, 必須正確使用延伸方法, 或者需要主執行緒同步執行, 以確保多執行緒控管. 以下為 System.Collections.Concurrent 下提供的物件: Blocking Collection Provides blocking and bounding capabilities for thread-safe collections that implement IProducerConsumerCollection. ConcurrentBag Represents a thread-safe, unordered collection of objects. ConcurrentDictionary Represents a thread-safe collection of key/value pairs that can be accessed by multiple threads concurrently. ConCurrentQueue Represents a thread-safe first in-first out (FIFO) collection. ConcurrentStatck Represents a thread-safe last in-first out (LIFO) collection. OrderablePartitioner Represents a particular manner of splitting an orderable data source into multiple partitions. Partitioner Provides common partitioning strategies for arrays, lists, and enumerables. Partitioner Represents a particular manner of splitting a data source into multiple partitions. ---------- 20191106 private static void RunYieldTest() { var products = GetStrings(); Dictionary newProducts = new Dictionary(); Console.WriteLine("來源資料筆數:{0}", products.Count()); Console.WriteLine("執行前結果資料筆數:{0}", newProducts.Count()); Parallel.ForEach(products, x => { newProducts.Add(x.Key, x.Value); }); Console.WriteLine("執行後結果資料筆數:{0}", newProducts.Count()); } private static void RunYieldTestConcurrent() { var products = GetStrings(); ConcurrentDictionary newProducts = new ConcurrentDictionary(); Console.WriteLine("來源資料筆數:{0}", products.Count()); Console.WriteLine("執行前結果資料筆數:{0}", newProducts.Count()); Parallel.ForEach(products, x => { newProducts.TryAdd(x.Key, x.Value); }); Console.WriteLine("執行後結果資料筆數:{0}", newProducts.Count()); } https://dotblogs.com.tw/mileslin/2016/03/13/150234 Concurrent collections collection的操作通常都是非執行緒安全的,例如List。 意思是說當會一個集合進行多執行緒操作的時候,會造成不可預期的情況,例如資料遺漏、索引重複等等... 如以下範例: public void Run() { var products = GetProducts(); List newProducts = new List(); Console.WriteLine("來源資料筆數:{0}", products.Count()); Console.WriteLine("執行前結果資料筆數:{0}", newProducts.Count()); Parallel.ForEach(products, x => { int id = x.Id; string name = x.Name; int stock = 100 + x.Id; var newProduct = new NewProduct() { Id = id, Name = name, Stock = stock }; newProducts.Add(newProduct); }); Console.WriteLine("執行後結果資料筆數:{0}", newProducts.Count()); } private IEnumerable GetProducts() { for (int i = 0; i < 100000; i++) { yield return new Product() { Id = i, Name = "Miles" + i }; } } 結果: 來源資料筆數:100000 執行前結果資料筆數:0 執行後結果資料筆數:99979 很明顯,計算後的資料遺失了21筆,這不是我們要的結果。 Concurrent collections .NET Framework提供了一些執行緒安全的collection,可使用在多執行緒的環境底下。 BlockingCollection 可設定Collection最高上限 ConcurrentBag 無順序性的集合 ConcurrentDicitionary Key,Value的集合 ConcurrentQueue Queue集合的概念 first in, first out ConcurrentStack Stack集合的概念, last in, first out .NET提供了以上五種執行緒安全的collection,用法大同小異,只是呼叫的api不同而已。 用法跟一般List也差不多,這邊以ConcurrentBag作範例: public void Run() { var products = GetProducts(); ConcurrentBag newProducts = new ConcurrentBag(); Console.WriteLine("來源資料筆數:{0}", products.Count()); Console.WriteLine("執行前結果資料筆數:{0}", newProducts.Count()); Parallel.ForEach(products, x => { int id = x.Id; string name = x.Name; int stock = 100 + x.Id; var newProduct = new NewProduct() { Id = id, Name = name, Stock = stock }; newProducts.Add(newProduct); }); Console.WriteLine("執行後結果資料筆數:{0}", newProducts.Count()); } private IEnumerable GetProducts() { for (int i = 0; i < 100000; i++) { yield return new Product() { Id = i, Name = "Miles" + i }; } } 其實只是把List改成ConcurrentBag而已,就達成了執行緒安全的操作。 結果: 來源資料筆數:100000 執行前結果資料筆數:0 執行後結果資料筆數:100000 執行結果如預期,沒有遺漏。 因為Concurrent collections實作IEnumerable的關係,所以支援Entity Framework,可將結果的collection直接用AddRange的方式Insert,這個對資料有大量運算,且又要跟資料庫互動方面,真的是還蠻方便的。 範例如下: public void Run() { LinqDemoEntities db = new LinqDemoEntities(); var products = GetProducts(); ConcurrentBag cb = new ConcurrentBag(); Console.WriteLine("來源資料筆數:{0}", products.Count()); Console.WriteLine("執行前結果資料筆數:{0}", db.Order.Count()); Parallel.ForEach(products, x => { int id = x.Id; string name = x.Name; int stock = 100 + x.Id; var order = new Order() { Id = id, OrderName = name }; cb.Add(order); }); db.Order.AddRange(cb); db.SaveChanges(); Console.WriteLine("執行後結果資料筆數:{0}", db.Order.Count()); } private IEnumerable GetProducts() { for (int i = 0; i < 1000; i++) { yield return new Product() { Id = i, Name = "Miles" + i }; } } 結果: 小節: 使用多執行緒運算一定要使用concurrent collections,這樣才不會對資料上產生非預期的影響。