欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 汽车 > 新车 > 浅谈C#之ConcurrentQueue

浅谈C#之ConcurrentQueue

2025/7/11 1:31:31 来源:https://blog.csdn.net/a876106354/article/details/141949796  浏览:    关键词:浅谈C#之ConcurrentQueue

一、基本介绍

ConcurrentQueue<T> 是一个线程安全的队列,它允许多个线程同时对队列进行操作而不会相互干扰。它是 System.Collections.Concurrent 命名空间下的一个类,提供了基本的队列操作,如 Enqueue(入队)、TryDequeue(尝试出队)、TryPeek(尝试查看队首元素)等,并且是线程安全的。

二、关键特性

线程安全:不需要额外的同步机制,就可以在多线程环境中安全地使用。

无锁:内部使用原子操作来保证线程安全,通常比使用锁有更好的性能。

阻塞操作:虽然 ConcurrentQueue<T> 本身不提供阻塞操作,但可以与其他同步原语(如 SemaphoreSlim 或 CancellationToken)结合使用来实现阻塞行为

三、简单示例

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;class Program
{static void Main(){ConcurrentQueue<int> queue = new ConcurrentQueue<int>();CancellationTokenSource cts = new CancellationTokenSource();// 生产者线程Task producer = Task.Run(() =>{for (int i = 0; i < 10; i++){queue.Enqueue(i);Console.WriteLine($"Produced: {i}");Thread.Sleep(100); // 模拟工作}}, cts.Token);// 消费者线程Task consumer = Task.Run(() =>{while (!cts.Token.IsCancellationRequested){if (queue.TryDequeue(out int item)){Console.WriteLine($"Consumed: {item}");}else{Thread.Yield(); // 让出 CPU 时间片}}}, cts.Token);// 等待一段时间,然后取消任务Thread.Sleep(1500);cts.Cancel();Task.WaitAll(producer, consumer);}
}

四、完整示例

1.与 BlockingCollection<T> 结合使用

BlockingCollection<T> 是一个线程安全的集合,提供了数据结构和同步原语的组合,可以与 ConcurrentQueue<T> 结合使用来实现生产者-消费者模式。

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;class Program
{static void Main(){BlockingCollection<int> blockingCollection = new BlockingCollection<int>(new ConcurrentQueue<int>(), 10);Task producer = Task.Run(() =>{for (int i = 0; i < 20; i++){blockingCollection.Add(i);Console.WriteLine($"Produced: {i}");Thread.Sleep(100);}blockingCollection.CompleteAdding();});Task consumer = Task.Run(() =>{foreach (var item in blockingCollection.GetConsumingEnumerable()){Console.WriteLine($"Consumed: {item}");Thread.Sleep(150);}});Task.WaitAll(producer, consumer);}
}

2. 使用 CancellationToken 实现优雅的取消

ConcurrentQueue<T> 可以与 CancellationToken 结合使用,以实现任务的优雅取消。

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;class Program
{static void Main(){ConcurrentQueue<int> queue = new ConcurrentQueue<int>();CancellationTokenSource cts = new CancellationTokenSource();Task producer = Task.Run(() =>{for (int i = 0; i < 20; i++){if (cts.Token.IsCancellationRequested){Console.WriteLine("Cancellation requested");return;}queue.Enqueue(i);Console.WriteLine($"Produced: {i}");Thread.Sleep(100);}}, cts.Token);Task consumer = Task.Run(() =>{while (!cts.Token.IsCancellationRequested){if (queue.TryDequeue(out int item)){Console.WriteLine($"Consumed: {item}");}else{Thread.Yield();}}}, cts.Token);Thread.Sleep(1500);cts.Cancel();Task.WaitAll(producer, consumer);}
}

与 SemaphoreSlim 实现并发控制

SemaphoreSlim 可以与 ConcurrentQueue<T> 结合使用,以控制同时访问资源的线程数量。

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;class Program
{static void Main(){ConcurrentQueue<int> queue = new ConcurrentQueue<int>();SemaphoreSlim semaphore = new SemaphoreSlim(3);Task producer = Task.Run(() =>{for (int i = 0; i < 20; i++){semaphore.Wait();queue.Enqueue(i);Console.WriteLine($"Produced: {i}");Thread.Sleep(100);semaphore.Release();}});Task consumer = Task.Run(() =>{while (!queue.IsEmpty){semaphore.Wait();if (queue.TryDequeue(out int item)){Console.WriteLine($"Consumed: {item}");Thread.Sleep(150);}semaphore.Release();}});Task.WaitAll(producer, consumer);}
}

 使用 IProducerConsumerCollection<T> 接口

ConcurrentQueue<T> 实现了 IProducerConsumerCollection<T> 接口,这使得它可以与任何需要这种接口的 API 一起使用。

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;class Program
{static void Main(){IProducerConsumerCollection<int> collection = new ConcurrentQueue<int>();Task producer = Task.Run(() =>{for (int i = 0; i < 20; i++){collection.TryAdd(i);Console.WriteLine($"Produced: {i}");Thread.Sleep(100);}});Task consumer = Task.Run(() =>{while (collection.TryTake(out int item)){Console.WriteLine($"Consumed: {item}");Thread.Sleep(150);}});Task.WaitAll(producer, consumer);}
}

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词