From: 011netservice@gmail.com Date: 2022-04-24 Subject: ConcurrentQueue.txt 歡迎來信交流 ---------- 2021-09-07 using System; using System.Collections.Concurrent; using System.Threading; using System.Threading.Tasks; class CQ_EnqueueDequeuePeek { // Demonstrates: // ConcurrentQueue.Enqueue() // ConcurrentQueue.TryPeek() // ConcurrentQueue.TryDequeue() static void Main () { // Construct a ConcurrentQueue. ConcurrentQueue cq = new ConcurrentQueue(); // Populate the queue. for (int i = 0; i < 10000; i++) { cq.Enqueue(i); } // Peek at the first element. int result; if (!cq.TryPeek(out result)) { Console.WriteLine("CQ: TryPeek failed when it should have succeeded"); } else if (result != 0) { Console.WriteLine("CQ: Expected TryPeek result of 0, got {0}", result); } int outerSum = 0; // An action to consume the ConcurrentQueue. Action action = () => { int localSum = 0; int localValue; while (cq.TryDequeue(out localValue)) localSum += localValue; Interlocked.Add(ref outerSum, localSum); }; // Start 4 concurrent consuming actions. Parallel.Invoke(action, action, action, action); Console.WriteLine("outerSum = {0}, should be 49995000", outerSum); } } Remarks ConcurrentQueue implements the IReadOnlyCollection interface starting with the .NET Framework 4.6; in previous versions of the .NET Framework, the ConcurrentQueue class did not implement this interface. ---------- 2021-09-07 ref: https://iter01.com/502169.html ConcurrentQueue 明明是包含在 System.Collections.Concurrent 名稱空間下, 但是原始碼結構中的檔案卻放在 System.Private.CoreLib 目錄中! 儲存結構 從上面給出的原始碼地址可以猜測出整個結構依然是Segment+Queue的組合,通過一個Segment連結串列實現了Queue結構,但實際上內部又加了新的設計。拋去Queue先不看的話,Segment本身就是一個實現了多生產者多消費者的執行緒安全集合,甚至可以直接拿它當一個固定容量的執行緒安全佇列使用,這點與之前Framework中差別很大。如果結合Queue整體來看,Segment不再是固定容量,而是可以由Queue來控制每個Segment的容量大小(最小是32,上限是1024 * 1024)。 在Framework中,佇列會給每個Segment分配一個索引,雖然這個索引是long型別的,但理論上說佇列容量還是存在上限。在Core中就不一樣了,它取消了這個索引,真正實現了一個無邊界(unbounded)佇列。 我猜測的原因是,在Framework中由於每個Segment是固定大小的,維護一個索引可以很方便的計算佇列裡的元素數量,但是Core中的Segment大小不是固定的,使用索引並不能加快計算速度,使得這個索引不再有意義,這也意味著計算元素數量變得非常複雜。 一張圖看清它的真實面目,這裡繼續沿用上一篇的結構圖稍作修改: 從圖中可以看到,整體結構上基本一致,核心改動就是Segment中增加了Slot(槽)的概念,這是真正儲存資料的地方,同時有一個序列號與之對應。 從程式碼來看一下Segment的核心定義: internal sealed class ConcurrentQueueSegment { //存放資料的容器 internal readonly Slot[] _slots; //這個mask用來計算槽點,可以防止查詢越界 internal readonly int _slotsMask; //首尾位置指標 internal PaddedHeadAndTail _headAndTail; //觀察保留標記,表示當前段在出隊時能否刪除資料 internal bool _preservedForObservation; //標記當前段是否被鎖住 internal bool _frozenForEnqueues; //下一段的指標 internal ConcurrentQueueSegment? _nextSegment; } 其中_preservedForObservation和_frozenForEnqueues會比較難理解,後面再詳細介紹。 再看一下佇列的核心定義: public class ConcurrentQueue : IProducerConsumerCollection, IReadOnlyCollection { //每一段的初始化長度,也是最小長度 private const int InitialSegmentLength = 32; //每一段的最大長度 private const int MaxSegmentLength = 1024 * 1024; //操作多個段時的鎖物件 private readonly object _crossSegmentLock; //尾段指標 private volatile ConcurrentQueueSegment _tail; //首段指標 private volatile ConcurrentQueueSegment _head; } 常規操作 還是按上一篇的套路為主線循序漸進。 建立例項 ConcurrentQueue依然提供了2個建構函式,分別可以建立一個空佇列和指定資料集的佇列。 /// /// Initializes a new instance of the class. /// public ConcurrentQueue() { _crossSegmentLock = new object(); _tail = _head = new ConcurrentQueueSegment(InitialSegmentLength); } 還是熟悉的操作,建立了一個長度是32的Segment並把佇列的首尾指標都指向它,同時建立了鎖物件例項,僅此而已。 進一步看看Segment是怎麼建立的: internal ConcurrentQueueSegment(int boundedLength) { //這裡驗證了長度不能小於2並且必須是2的N次冪 Debug.Assert(boundedLength >= 2, $"Must be >= 2, got {boundedLength}"); Debug.Assert((boundedLength & (boundedLength - 1)) == 0, $"Must be a power of 2, got {boundedLength}"); _slots = new Slot[boundedLength]; //這個mask的作用就是用來計算陣列索引的防止越界,可以用`& _slotsMask`取代`% _slots.Length` _slotsMask = boundedLength - 1; //設定初始序列號 for (int i = 0; i < _slots.Length; i++) { _slots[i].SequenceNumber = i; } } internal struct Slot { [AllowNull, MaybeNull] public T Item; public int SequenceNumber; } 再看看怎麼用集合初始化佇列,這個過程稍微麻煩點,但是很有意思: public ConcurrentQueue(IEnumerable collection) { if (collection == null) { ThrowHelper.ThrowArgumentNullException(ExceptionArgument.collection); } _crossSegmentLock = new object(); //計算得到第一段的長度 int length = InitialSegmentLength; if (collection is ICollection c) { int count = c.Count; if (count > length) { length = Math.Min(ConcurrentQueueSegment.RoundUpToPowerOf2(count), MaxSegmentLength); } } //根據前面計算出來的長度建立一個Segment,再把資料依次入隊 _tail = _head = new ConcurrentQueueSegment(length); foreach (T item in collection) { Enqueue(item); } } 可以看到,第一段的大小是根據初始集合的大小確定的,如果集合大小count大於32就對count進行向上取2的N次冪(RoundUpToPowerOf2)得到實際大小(但是不能超過最大值),否則就按預設值32來初始化。 向上取2的N次冪到底是啥意思??例如count是5,那得到的結果就是8(2×2×2);如果count是9,那結果就是16(2×2×2×2);如果剛好count是8那結果就是8(2×2×2),具體演算法是通過位運算實現的很有意思。至於為什麼一定要是2的N次冪,中間的玄機我也沒搞明白。。 順藤摸瓜,再看看進隊操作如何實現。 元素進隊 /// 在隊尾追加一個元素 public void Enqueue(T item) { // 先嚐試在尾段插入一個元素 if (!_tail.TryEnqueue(item)) { // 如果插入失敗,就意味著尾段已經填滿,需要往後擴容 EnqueueSlow(item); } } private void EnqueueSlow(T item) { while (true) { ConcurrentQueueSegment tail = _tail; // 先嚐試再隊尾插入元素,如果擴容完成了就會成功 if (tail.TryEnqueue(item)) { return; } // 獲得一把鎖,避免多個執行緒同時進行擴容 lock (_crossSegmentLock) { //檢查是否擴容過了 if (tail == _tail) { // 尾段凍結 tail.EnsureFrozenForEnqueues(); // 計算下一段的長度 int nextSize = tail._preservedForObservation ? InitialSegmentLength : Math.Min(tail.Capacity * 2, MaxSegmentLength); var newTail = new ConcurrentQueueSegment(nextSize); // 改變隊尾指向 tail._nextSegment = newTail; // 指標交換 _tail = newTail; } } } } 從以上流程可以看到,擴容的主動權不再由Segment去控制,而是交給了佇列。正因為如此,所以在跨段操作時要先加鎖,在Framework版本中是在原子操作獲得指標後進行的擴容所以不會有這個問題,後面的出隊操作也是一樣的道理。擴容過程中有兩個細節需要重點關注,那就是SegmentFrozen和下一段的長度計算。 從前面Segment的定義中我們看到它維護了一個_frozenForEnqueues標記欄位,表示當前段是否被凍結鎖定,在被鎖住的情況下會讓其他入隊操作失敗,看一下實現過程: // must only be called while queue's segment lock is held internal void EnsureFrozenForEnqueues() { // flag used to ensure we don't increase the Tail more than once if frozen more than once if (!_frozenForEnqueues) { _frozenForEnqueues = true; Interlocked.Add(ref _headAndTail.Tail, FreezeOffset); } } 首先判斷當前凍結狀態,然後把它設定為true,再使用原子操作把尾指標增加了2倍段長的偏移量,這個尾指標才是真正限制當前段不可新增元素的關鍵點,後面講段的元素追加再關聯起來詳細介紹。而為什麼要指定2倍段長這麼一個特殊值呢,目的是為了把尾指標和mask做運算後落在同一個slot上,也就是說雖然兩個指標位置不一樣但是都指向的是同一個槽。 再說說下一段長度的計算問題,它主要是受_preservedForObservation這個欄位影響,正常情況下一段的長度是尾段的2倍,但如果尾段正好被標記為觀察保留(類似於上一篇的擷取快照),那麼下一段的長度依然是初始值32,原作者認為入隊操作不是很頻繁,這樣做主要是為了避免浪費空間。 接著是重頭戲,看一下如何給段追加元素: public bool TryEnqueue(T item) { Slot[] slots = _slots; // 如果發生競爭就自旋等待 SpinWait spinner = default; while (true) { // 獲取當前段的尾指標 int currentTail = Volatile.Read(ref _headAndTail.Tail); // 計算槽點 int slotsIndex = currentTail & _slotsMask; // 讀取對應槽的序列號 int sequenceNumber = Volatile.Read(ref slots[slotsIndex].SequenceNumber); // 判斷槽點序列號和指標是否匹配 int diff = sequenceNumber - currentTail; if (diff == 0) { // 通過原子操作比較交換,保證了只有一個入隊者獲得可用空間 if (Interlocked.CompareExchange(ref _headAndTail.Tail, currentTail + 1, currentTail) == currentTail) { // 把資料存入對應的槽點,以及更新序列號 slots[slotsIndex].Item = item; Volatile.Write(ref slots[slotsIndex].SequenceNumber, currentTail + 1); return true; } } else if (diff < 0) { // 序列號小於指標就說明該段已經裝滿了,直接返回false return false; } // 這次競爭失敗了,只好等下去 spinner.SpinOnce(sleep1Threshold: -1); } } 整個流程的核心就是藉助槽點序列號和尾指標的匹配情況判斷是否有可用空間,因為在初始化的時候序列號是從0遞增,正常情況下尾指標和序列號肯定是匹配的,只有在整個段被裝滿時尾指標才會大於序列號,因為前面的凍結操作會給尾指標追加2倍段長的偏移量。要重點提出的是,只有在資料被寫入並且序列號更新完成後才表示整個位置的元素有效,才能有出隊的機會,在Framework是通過維護一個狀態位來實現這個功能。整個設計很有意思,要慢慢品。 這裡我們可以總結一下序列號的核心作用:假設一個槽點N,對應序列號是Q,它能允許入隊的必要條件之一就是N==Q,由於入隊操作把位置N的序列號修改成N+1,那麼可以猜測出在出隊時的必要條件之一就是滿足Q==N+1。 程式碼中的CompareExchange在上一篇中有介紹,這裡不再重複。另外關於Volatile相關的稍微提一下,它的核心作用是避免記憶體與CPU之間的快取記憶體帶來的資料不一致問題,告訴編譯器直接讀寫原始資料,有興趣的可以找資料瞭解,限於篇幅不過多介紹。 元素出隊 可以猜測到,入隊的時候要根據容量大小進行擴容,那麼與之對應的,出隊的時候就需要對它進行壓縮,也就是丟棄沒有資料的段。 /// 從隊首移除一個元素 public bool TryDequeue([MaybeNullWhen(false)] out T result) => _head.TryDequeue(out result) || TryDequeueSlow(out result); private bool TryDequeueSlow([MaybeNullWhen(false)] out T item) { // 不斷迴圈嘗試出隊,直到成功或失敗為止 while (true) { ConcurrentQueueSegment head = _head; // 嘗試從隊首移除,如果成功就直接返回了 if (head.TryDequeue(out item)) { return true; } // 如果首段為空並且沒有下一段了,則說明整個佇列都沒有資料了,返回失敗 if (head._nextSegment == null) { item = default!; return false; } // 既然下一段不為空,那就再次確認本段是否還能出隊成功,否則就要把它給移除了,等待下次迴圈從下一段出隊 if (head.TryDequeue(out item)) { return true; } // 首段指標要往後移動,表示當前首段已丟棄,跨段操作要先加鎖 lock (_crossSegmentLock) { if (head == _head) { _head = head._nextSegment; } } } } 整體流程基本和入隊一樣,外層通過一個死迴圈不斷嘗試操作,直到出隊成功或者佇列為空返回失敗為止。釋放空間的操作也從Segment轉移到佇列上,所以要加鎖保證執行緒安全。這一步我在程式碼註釋中寫的很詳細就不多解釋了,再看一下核心操作Segment是如何移除元素的: public bool TryDequeue([MaybeNullWhen(false)] out T item) { Slot[] slots = _slots; // 遇到競爭時自旋等待 SpinWait spinner = default; while (true) { // 獲取頭指標地址 int currentHead = Volatile.Read(ref _headAndTail.Head); // 計算槽點 int slotsIndex = currentHead & _slotsMask; // 獲取槽點對應的序列號 int sequenceNumber = Volatile.Read(ref slots[slotsIndex].SequenceNumber); // 比較序列號是否和期望值一樣,為什麼要加1的原因前面入隊時說過 int diff = sequenceNumber - (currentHead + 1); if (diff == 0) { // 通過原子操作比較交換得到可以出隊的槽點,並把頭指標往後移動一位 if (Interlocked.CompareExchange(ref _headAndTail.Head, currentHead + 1, currentHead) == currentHead) { // 取出資料 item = slots[slotsIndex].Item!; // 此時如果該段沒有被標記觀察保護,要把這個槽點的資料清空 if (!Volatile.Read(ref _preservedForObservation)) { slots[slotsIndex].Item = default; Volatile.Write(ref slots[slotsIndex].SequenceNumber, currentHead + slots.Length); } return true; } } else if (diff < 0) { // 這種情況說明該段已經沒有有效資料了,直接返回失敗。 bool frozen = _frozenForEnqueues; int currentTail = Volatile.Read(ref _headAndTail.Tail); if (currentTail - currentHead <= 0 || (frozen && (currentTail - FreezeOffset - currentHead <= 0))) { item = default!; return false; } } // 競爭失敗進入下一輪等待 spinner.SpinOnce(sleep1Threshold: -1); } } 流程和追加元素類似,大部分都寫在備註裡面了,這裡只額外提一下為空的情況。Segment為空只有一種情況,那就是頭尾指標落在了同一個槽點,但這是會出現兩種可能性: 第一種是都落在了非最後一個槽點,意味著該段沒有被裝滿,拿首尾指標相減即可判斷。 第二種是都落在了最後一個槽點,意味著該段已經被裝滿了,如果此時正在進行擴容(frozen),那麼必須要在尾指標的基礎上減去FreezeOffset再去和頭指標判斷,原因前面有說過; 是不是感覺環環相扣、相輔相成、如膠似漆、balabala.....😜 統計元素數量 前面也預告過,因為佇列不再維護段索引,這樣會導致計算元素數量變得非常複雜,複雜到我都不想說這一部分了😭。簡單描述一下就跳過了:核心思路就是一段一段來遍歷,然後計算出每段的大小最後把結果累加,如果涉及多個段還得加鎖,具體到段內部就要根據首尾指標計算槽點得出實際數量等等等等,程式碼很長就不貼出來了。 這裡也嚴重提醒一句,非必要情況下不要呼叫Count不要呼叫Count不要呼叫Count。 接下來重點說一下佇列的IsEmpty。由於Segment不再維護IsEmpty資訊,所以實現方式就有點曲線救國了,通過嘗試能否從隊首位置獲取一個元素來判斷是否佇列為空,也就是常說的TryPeek操作,但細節上稍有不同。 /// /// 判斷佇列是否為空,千萬不要使用Count==0來判斷,也不要直接TryPeek /// public bool IsEmpty => !TryPeek(out _, resultUsed: false); private bool TryPeek([MaybeNullWhen(false)] out T result, bool resultUsed) { ConcurrentQueueSegment s = _head; while (true) { ConcurrentQueueSegment? next = Volatile.Read(ref s._nextSegment); // 從首段中獲取頭部元素,成功的話直接返回true,獲取失敗就意味著首段為空了 if (s.TryPeek(out result, resultUsed)) { return true; } // 如果下一段不為空那就再嘗試從下一段重新獲取 if (next != null) { s = next; } //如果下一段為空就說明整個佇列為空,跳出迴圈直接返回false了 else if (Volatile.Read(ref s._nextSegment) == null) { break; } } result = default!; return false; } 上面的程式碼可以看到有一個特殊的引數resultUsed,它具體會有什麼影響呢,那就得看看Segment是如何peek的: public bool TryPeek([MaybeNullWhen(false)] out T result, bool resultUsed) { // 實際上佇列的TryPeek是一個觀察保護操作,這時resultUsed會標記成true,如果是IsEmpty操作的話就為false,因為並不關心這個元素是否被釋放了 if (resultUsed) { _preservedForObservation = true; Interlocked.MemoryBarrier(); } Slot[] slots = _slots; SpinWait spinner = default; while (true) { int currentHead = Volatile.Read(ref _headAndTail.Head); int slotsIndex = currentHead & _slotsMask; int sequenceNumber = Volatile.Read(ref slots[slotsIndex].SequenceNumber); int diff = sequenceNumber - (currentHead + 1); if (diff == 0) { result = resultUsed ? slots[slotsIndex].Item! : default!; return true; } else if (diff < 0) { bool frozen = _frozenForEnqueues; int currentTail = Volatile.Read(ref _headAndTail.Tail); if (currentTail - currentHead <= 0 || (frozen && (currentTail - FreezeOffset - currentHead <= 0))) { result = default!; return false; } } spinner.SpinOnce(sleep1Threshold: -1); } } 除了最開始的resultUsed判斷,其他的基本和出隊的邏輯一致,前面說的很詳細,這裡不多介紹了。 列舉轉換資料 前面反覆的提到觀察保護,這究竟是個啥意思??為什麼要有這個操作?? 其實看過上一篇文章的話就比較好理解一點,這裡稍微回顧一下方便對比。在Framework中會有擷取快照的操作,也就是類似ToArray\ToList\GetEnumerator這種要做資料迭代,它是通過原子操作維護一個m_numSnapshotTakers欄位來實現對資料的保護,目的是為了告訴其他出隊的執行緒我正在遍歷資料,你們執行出隊的時候不要把資料給刪了我要用的。在Core中也是為了實現同樣的功能才引入了觀察保護的概念,換了一種實現方式而已。 那麼就以ToArray為例是怎麼和其他操作互動的: public T[] ToArray() { // 這一步可以理解為保護現場 SnapForObservation(out ConcurrentQueueSegment head, out int headHead, out ConcurrentQueueSegment tail, out int tailTail); // 計算佇列長度,這也是要返回的陣列大小 long count = GetCount(head, headHead, tail, tailTail); T[] arr = new T[count]; // 開始迭代資料塞到目標陣列中 using (IEnumerator e = Enumerate(head, headHead, tail, tailTail)) { int i = 0; while (e.MoveNext()) { arr[i++] = e.Current; } Debug.Assert(count == i); } return arr; } 上面的程式碼中,有一次獲取佇列長度的操作,還有一次獲取迭代資料的操作,這兩步邏輯比較相似都是對整個佇列進行遍歷,所以做一次資料轉換的開銷非常非常大,使用的時候一定要謹慎。別的不多說,重點介紹一下如何實現保護現場的過程: private void SnapForObservation(out ConcurrentQueueSegment head, out int headHead, out ConcurrentQueueSegment tail, out int tailTail) { // 要保護現場肯定要先來一把鎖 lock (_crossSegmentLock) { head = _head; tail = _tail; // 一段一段進行遍歷 for (ConcurrentQueueSegment s = head; ; s = s._nextSegment!) { // 把每一段的觀察保護標記設定成true s._preservedForObservation = true; // 遍歷到最後一段了就結束 if (s == tail) break; } // 尾段凍結,這樣就不能新增元素 tail.EnsureFrozenForEnqueues(); // 返回兩個指標地址用來對每一個元素進行遍歷 headHead = Volatile.Read(ref head._headAndTail.Head); tailTail = Volatile.Read(ref tail._headAndTail.Tail); } } 可以看到上來就是一把鎖,如果此時正在進行擴容或者收容的操作會直接阻塞掉,運氣好沒有阻塞的話你也不能有新元素入隊了,因為尾段已經凍結鎖死只能自旋等待,而出隊也不能釋放空間了。原話是: At this point, any dequeues from any segment won't overwrite the value, and none of the existing segments can have new items enqueued. 有人就要問,這裡把尾段鎖死那等ToArray()完成後豈不是也不能有新元素入隊了?不用擔心,前面入隊邏輯提到過如果該段被鎖住佇列會新建立一個段然後再嘗試入隊,這樣就能成功了。但是問題又來了,假如前面的段還有很多空位,那豈不是有浪費空間的嫌疑?我們知道沒有觀察保護的時候每段會以2倍長度遞增,這樣的話空間浪費率還是挺高的。帶著疑問提了個Issue問一下: https://github.com/dotnet/runtime/issues/35094 到這裡就基本把.NET Core ConcurrentQueue說完了。 總結 對比Framework下的併發佇列,Core裡面的改動還是不小的,儘管保留了SpinWait和Interlocked相關操作,但是也加入了lock,邏輯上也複雜了很多,我一步步分析和寫文章搞了好幾天。 至於效能對比,我找到一個官方給出的測試結果,有興趣的可以看看: https://github.com/dotnet/runtime/issues/27458#issuecomment-423964046 ---------- 2019-12-30 ConcurrentQueue 類別 https://docs.microsoft.com/zh-tw/dotnet/api/system.collections.concurrent.concurrentqueue-1?view=netframework-4.8 表示安全執行緒的先進先出 (FIFO) 集合。 [System.Runtime.InteropServices.ComVisible(false)] [System.Serializable] public class ConcurrentQueue : System.Collections.Concurrent.IProducerConsumerCollection, System.Collections.Generic.IEnumerable, System.Collections.Generic.IReadOnlyCollection 下列範例會示範如何使用 ConcurrentQueue 來排入佇列和清除佇列專案 using System; using System.Collections.Concurrent; using System.Threading; using System.Threading.Tasks; class CQ_EnqueueDequeuePeek { // Demonstrates: // ConcurrentQueue.Enqueue() // ConcurrentQueue.TryPeek() // ConcurrentQueue.TryDequeue() static void Main () { // Construct a ConcurrentQueue. ConcurrentQueue cq = new ConcurrentQueue(); // Populate the queue. for (int i = 0; i < 10000; i++) { cq.Enqueue(i); } // Peek at the first element. int result; if (!cq.TryPeek(out result)) { Console.WriteLine("CQ: TryPeek failed when it should have succeeded"); } else if (result != 0) { Console.WriteLine("CQ: Expected TryPeek result of 0, got {0}", result); } int outerSum = 0; // An action to consume the ConcurrentQueue. Action action = () => { int localSum = 0; int localValue; while (cq.TryDequeue(out localValue)) localSum += localValue; Interlocked.Add(ref outerSum, localSum); }; // Start 4 concurrent consuming actions. Parallel.Invoke(action, action, action, action); Console.WriteLine("outerSum = {0}, should be 49995000", outerSum); } }