主页 > 创业  > 

C#并发集合-ConcurrentQueue

C#并发集合-ConcurrentQueue

在多线程环境下,使用这个集合接受网络请求

using System.Collections; using System.Collections.Concurrent; using System.Runtime.Serialization; namespace ConsoleApp1.Concurrent; /* * 并发队列 ConcurrentQueue * 数据结构:使用 数组 + 单链表 * 结构图: * tail head * segment1 -> segment2 -> ... -> segment... * arr[0] arr[0] arr[0] * arr[1] arr[1] arr[1] * arr[...] arr[...] arr[...] * arr[32] arr[32] arr[32] * 操作: * 进队 * tail => * 累加 tail segment 中的 high(入队方向从 0 到 31) * 入队完成,if segment 内数组已满(high == 31),则提前将下一个 segment 创建,并且当前 tail.next 指向新的segment, tail 再执行 新的segment * 出队 * head => * 累加 head segment 中的 low(出队方向从 0 到 31) * if segment 内数组已空(low == 31),则提前将下一个 segment 创建,并且当前 tail.next 指向新的segment, tail 再执行 新的segment */ public class ConcurrentQueue<T> : IProducerConsumerCollection<T> { private volatile ConcurrentQueue<T>.Segment m_head; private volatile ConcurrentQueue<T>.Segment m_tail; internal volatile int m_numSnapshotTakers; private const int SEGMENT_SIZE_32 = 32; public ConcurrentQueue() { this.m_head = this.m_tail = new Segment(0L, this); } private void InitializeFromCollection(IEnumerable<T> collection) { throw new NotImplementedException(); } public ConcurrentQueue(IEnumerable<T> collection) { throw new NotImplementedException(); } private void OnSerializing(StreamingContext context) { throw new NotImplementedException(); } private void OnSerialized(StreamingContext context) { throw new NotImplementedException(); } void ICollection.CopyTo(Array array, int index) { throw new NotImplementedException(); } private IEnumerator<T> GetEnumerator(Segment head, Segment tail, int headLow, int tailHigh) { throw new NotImplementedException(); } public IEnumerator<T> GetEnumerator() { throw new NotImplementedException(); } IEnumerator IEnumerable.GetEnumerator() => GetEnumerator(); public void CopyTo(Array array, int index) { throw new NotImplementedException(); } public int Count { get { this.GetHeadTailPositions(out Segment head, out Segment tail, out int headLow, out int tailHigh); return head == tail ? tailHigh - headLow + 1 : SEGMENT_SIZE_32 - headLow + SEGMENT_SIZE_32 * (int)(tail.m_index - head.m_index - 1L) + (tailHigh + 1); } } private void GetHeadTailPositions(out Segment head, out Segment tail, out int headLow, out int tailHigh) { head = this.m_head; tail = this.m_tail; headLow = head.Low; tailHigh = tail.High; SpinWait spinWait = new SpinWait(); while (head != this.m_head || tail != this.m_tail || headLow != head.Low || tailHigh != tail.High || head.m_index > tail.m_index) { spinWait.SpinOnce(); head = this.m_head; tail = this.m_tail; headLow = head.Low; tailHigh = tail.High; } } public bool IsSynchronized => false; public object SyncRoot => throw new NotSupportedException(); public void CopyTo(T[] array, int index) { throw new NotImplementedException(); } public bool TryAdd(T item) { this.Enqueue(item); return true; } public bool TryTake(out T item) => this.TryDequeue(out item); public void Enqueue(T item) { SpinWait spinWait = new SpinWait(); while (!this.m_tail.TryAppend(item)) spinWait.SpinOnce(); } public bool TryDequeue(out T result) { while (!this.IsEmpty) { if (this.m_head.TryRemove(out result)) return true; } result = default(T); return false; } public bool TryPeek(out T result) { Interlocked.Increment(ref this.m_numSnapshotTakers); while (!this.IsEmpty) { if (this.m_head.TryPeek(out result)) { Interlocked.Decrement(ref this.m_numSnapshotTakers); return true; } } result = default(T); Interlocked.Decrement(ref this.m_numSnapshotTakers); return false; } public bool IsEmpty { get { ConcurrentQueue<T>.Segment head = this.m_head; if (!head.IsEmpty) return false; if (head.Next == null) return true; SpinWait spinWait = new SpinWait(); for (; head.IsEmpty; head = this.m_head) { if (head.Next == null) return true; spinWait.SpinOnce(); } return false; } } public T[] ToArray() { throw new NotImplementedException(); } internal struct VolatileBool { public volatile bool m_value; public VolatileBool(bool value) => this.m_value = value; } private class Segment { // 存在实际的元素 internal volatile T[] m_array; internal volatile VolatileBool[] m_state; // 单链表,指向下一个 segment internal ConcurrentQueue<T>.Segment Next => this.m_next; private volatile ConcurrentQueue<T>.Segment m_next; // 记录当前 segment 为 第几个(从0开始,计算总Count 会使用) internal readonly long m_index; // 当前 segment 中 的 可出队下标 internal int Low => Math.Min(this.m_low, SEGMENT_SIZE_32); private volatile int m_low; // 当前 segment 中 的 可进队下标 internal int High => Math.Max(this.m_high, 31); private volatile int m_high; // 属于哪个队列 private volatile ConcurrentQueue<T> m_source; internal Segment(long index, ConcurrentQueue<T> source) { this.m_array = new T[SEGMENT_SIZE_32]; this.m_state = new VolatileBool[SEGMENT_SIZE_32]; this.m_high = -1; this.m_index = index; this.m_source = source; } internal bool IsEmpty => this.Low > this.High; // 创建下一个 segment internal void Grow() { this.m_next = new Segment(this.m_index + 1L, this.m_source); // 将新创建的 segment 设置为 当前 队尾 this.m_source.m_tail = this.m_next; } internal bool TryAppend(T value) { // 当前 segment 队尾 已经到头,无法添加元素 if (this.m_high >= 31) return false; int high = SEGMENT_SIZE_32; try { } finally { // 原值性的增加队尾下标 high = Interlocked.Increment(ref this.m_high); if (high <= 31) { // 设置队尾元素 this.m_array[high] = value; // 设置元素添加标志(由于 value 可能是任意值,这里使用额外的bool 表示是否添加) this.m_state[high].m_value = true; } // 队尾 已经 到头,开始创建一下 segment if (high == 31) this.Grow(); } // 如果抢到的下标超过,则直接返回失败 return high <= 31; } internal bool TryRemove(out T result) { SpinWait spinWait1 = new SpinWait(); int low = this.Low; for (int high = this.High; low <= high; high = this.High) { // 原子性的尝试对 这个 segment 的 队头下标 + 1,这里可能多个线程同时抢占 if (Interlocked.CompareExchange(ref this.m_low, low + 1, low) == low) { SpinWait spinWait2 = new SpinWait(); // 当前存在进队元素,但是尚未完成进队 的全部操作,进队操作完成 m_value == true while (!this.m_state[low].m_value) spinWait2.SpinOnce(); // 获取这个队列元素 result = this.m_array[low]; /* * 是否有操作持有快照,没有则直接赋值元素为默认值 */ if (this.m_source.m_numSnapshotTakers <= 0) this.m_array[low] = default(T); /* * 如果已经取到 当前 segment 队列的 队头 => low 到达 数组的最后一个下标 * Note:这里为什么不能写成 if(low == 31)?, 其实是可以的 */ if (low + 1 >= SEGMENT_SIZE_32) { // 等待进入下一个 segment,根据 入队函数可知,当一个segment 队头达到最后一个位置后,会立马创建 下一个 segment SpinWait spinWait3 = new SpinWait(); while (this.m_next == null) spinWait3.SpinOnce(); // 将下一个 segment 设置为 队头 segment this.m_source.m_head = this.m_next; } // 抢到 队列中的数据 return true; } spinWait1.SpinOnce(); // 前面 没有 抢到 队列中的数据,重新获取当前的最新 Low => 队头下标 low = this.Low; } // 未抢到 队列中的数据,直接返回失败 result = default(T); return false; } internal bool TryPeek(out T result) { result = default(T); int low = this.Low; // 队列无元素 if (low > this.High) return false; // 当前队头元素,入队尚未完成,自旋一会 SpinWait spinWait = new SpinWait(); while (!this.m_state[low].m_value) spinWait.SpinOnce(); // 获取当前队头元素 result = this.m_array[low]; return true; } } }

 

标签:

C#并发集合-ConcurrentQueue由讯客互联创业栏目发布,感谢您对讯客互联的认可,以及对我们原创作品以及文章的青睐,非常欢迎各位朋友分享到个人网站或者朋友圈,但转载请说明文章出处“C#并发集合-ConcurrentQueue