BlockingCollection.txt 20191106 BlockingCollection Provides blocking and bounding capabilities for thread-safe collections that implement IProducerConsumerCollection. 參考 Concurrent.txt using System; using System.Collections.Concurrent; using System.Threading; using System.Threading.Tasks; class BlockingCollectionDemo { static async Task Main() { await AddTakeDemo.BC_AddTakeCompleteAdding(); TryTakeDemo.BC_TryTake(); FromToAnyDemo.BC_FromToAny(); await ConsumingEnumerableDemo.BC_GetConsumingEnumerable(); Console.WriteLine("Press any key to exit."); Console.ReadKey(); } } class AddTakeDemo { // Demonstrates: // BlockingCollection.Add() // BlockingCollection.Take() // BlockingCollection.CompleteAdding() public static async Task BC_AddTakeCompleteAdding() { using (BlockingCollection bc = new BlockingCollection()) { // Spin up a Task to populate the BlockingCollection using (Task t1 = Task.Run(() => { bc.Add(1); bc.Add(2); bc.Add(3); bc.CompleteAdding(); })) { // Spin up a Task to consume the BlockingCollection using (Task t2 = Task.Run(() => { try { // Consume consume the BlockingCollection while (true) Console.WriteLine(bc.Take()); } catch (InvalidOperationException) { // An InvalidOperationException means that Take() was called on a completed collection Console.WriteLine("That's All!"); } })) { await Task.WhenAll(t1, t2); } } } } } class TryTakeDemo { // Demonstrates: // BlockingCollection.Add() // BlockingCollection.CompleteAdding() // BlockingCollection.TryTake() // BlockingCollection.IsCompleted public static void BC_TryTake() { // Construct and fill our BlockingCollection using (BlockingCollection bc = new BlockingCollection()) { int NUMITEMS = 10000; for (int i = 0; i < NUMITEMS; i++) bc.Add(i); bc.CompleteAdding(); int outerSum = 0; // Delegate for consuming the BlockingCollection and adding up all items Action action = () => { int localItem; int localSum = 0; while (bc.TryTake(out localItem)) localSum += localItem; Interlocked.Add(ref outerSum, localSum); }; // Launch three parallel actions to consume the BlockingCollection Parallel.Invoke(action, action, action); Console.WriteLine("Sum[0..{0}) = {1}, should be {2}", NUMITEMS, outerSum, ((NUMITEMS * (NUMITEMS - 1)) / 2)); Console.WriteLine("bc.IsCompleted = {0} (should be true)", bc.IsCompleted); } } } class FromToAnyDemo { // Demonstrates: // Bounded BlockingCollection // BlockingCollection.TryAddToAny() // BlockingCollection.TryTakeFromAny() public static void BC_FromToAny() { BlockingCollection[] bcs = new BlockingCollection[2]; bcs[0] = new BlockingCollection(5); // collection bounded to 5 items bcs[1] = new BlockingCollection(5); // collection bounded to 5 items // Should be able to add 10 items w/o blocking int numFailures = 0; for (int i = 0; i < 10; i++) { if (BlockingCollection.TryAddToAny(bcs, i) == -1) numFailures++; } Console.WriteLine("TryAddToAny: {0} failures (should be 0)", numFailures); // Should be able to retrieve 10 items int numItems = 0; int item; while (BlockingCollection.TryTakeFromAny(bcs, out item) != -1) numItems++; Console.WriteLine("TryTakeFromAny: retrieved {0} items (should be 10)", numItems); } } class ConsumingEnumerableDemo { // Demonstrates: // BlockingCollection.Add() // BlockingCollection.CompleteAdding() // BlockingCollection.GetConsumingEnumerable() public static async Task BC_GetConsumingEnumerable() { using (BlockingCollection bc = new BlockingCollection()) { // Kick off a producer task await Task.Run(async () => { for (int i = 0; i < 10; i++) { bc.Add(i); await Task.Delay(100); // sleep 100 ms between adds } // Need to do this to keep foreach below from hanging bc.CompleteAdding(); }); // Now consume the blocking collection with foreach. // Use bc.GetConsumingEnumerable() instead of just bc because the // former will block waiting for completion and the latter will // simply take a snapshot of the current state of the underlying collection. foreach (var item in bc.GetConsumingEnumerable()) { Console.WriteLine(item); } } } } Remarks BlockingCollection is a thread-safe collection class that provides the following: .An implementation of the producer/consumer pattern; BlockingCollection is a wrapper for the IProducerConsumerCollection interface. .Concurrent addition and removal of items from multiple threads with the Add and Take methods. .A bounded collection that blocks Add and Take operations when the collection is full or empty. .Cancellation of Add or Take operations by using a CancellationToken object in the TryAdd or TryTake method. Important This type implements the IDisposable interface. When you have finished using the type, you should dispose of it either directly or indirectly. To dispose of the type directly, call its Dispose method in a try/catch block. To dispose of it indirectly, use a language construct such as using (in C#) or Using (in Visual Basic). For more information, see the "Using an Object that Implements IDisposable" section in the IDisposable interface topic. Also, note that the Dispose() method is not thread-safe. All other public and protected members of BlockingCollection are thread-safe and may be used concurrently from multiple threads. IProducerConsumerCollection represents a collection that allows for thread-safe adding and removal of data. BlockingCollection is used as a wrapper for an IProducerConsumerCollection instance, and allows removal attempts from the collection to block until data is available to be removed. Similarly, you can create a BlockingCollection to enforce an upper bound on the number of data elements allowed in the IProducerConsumerCollection; addition attempts to the collection may then block until space is available to store the added items. In this manner, BlockingCollection is similar to a traditional blocking queue data structure, except that the underlying data storage mechanism is abstracted away as an IProducerConsumerCollection. BlockingCollection supports bounding and blocking. Bounding means that you can set the maximum capacity of the collection. Bounding is important in certain scenarios because it enables you to control the maximum size of the collection in memory, and it prevents the producing threads from moving too far ahead of the consuming threads.Multiple threads or tasks can add items to the collection concurrently, and if the collection reaches its specified maximum capacity, the producing threads will block until an item is removed. Multiple consumers can remove items concurrently, and if the collection becomes empty, the consuming threads will block until a producer adds an item. A producing thread can call the CompleteAdding method to indicate that no more items will be added. Consumers monitor the IsCompleted property to know when the collection is empty and no more items will be added. Add and Take operations are typically performed in a loop. You can cancel a loop by passing in a CancellationToken object to the TryAdd or TryTake method, and then checking the value of the token's IsCancellationRequested property on each iteration. If the value is true, it is up to you to respond the cancellation request by cleaning up any resources and exiting the loop. When you create a BlockingCollection object, you can specify not only the bounded capacity but also the type of collection to use. For example, you could specify a ConcurrentQueue object for first in, first out (FIFO) behavior, or a ConcurrentStack object for last in, first out (LIFO) behavior. You can use any collection class that implements the IProducerConsumerCollection interface. The default collection type for BlockingCollection is ConcurrentQueue. Do not modify the underlying collection directly. Use BlockingCollection methods to add or remove elements. The BlockingCollection object can become corrupted if you change the underlying collection directly.