C# ConcurrentQueue 使用详解

总目录


前言

在C#多线程编程中,数据共享如同走钢丝——稍有不慎就会引发竞态条件(Race Condition)或死锁。传统Queue<T>在并发场景下需要手动加锁,而ConcurrentQueue<T>作为.NET Framework 4.0 引入的线程安全集合,采用无锁算法(Lock-Free),能显著提升高并发场景下的性能。今天,我们就来深入探讨一下 ConcurrentQueue<T> 的使用方法和特性。


一、基本信息

1. 基本概念

  • ConcurrentQueue<T> 是一个线程安全的先进先出(FIFO)队列,属于 System.Collections.Concurrent 命名空间。它遵循先进先出(FIFO)的原则,允许多个线程同时对队列进行操作,而无需额外的锁机制。
  • 用于在生产者和消费者场景中高效地处理数据。但需要注意的是,它并不保证元素在同一个线程内入队顺序和出队顺序完全一致。

2. 核心特性速览

1) 线程安全保证

  • 无锁设计:通过CAS(Compare-And-Swap)原子操作实现高效并发
    • 无锁编程ConcurrentQueue<T> 使用了无锁编程技术,减少了锁的开销,提高了性能。
    • 原子操作:队列的入队和出队操作是原子性的,这意味着即使在多线程环境下,操作也不会被打断
  • FIFO原则:先进先出(但线程间顺序不绝对保证,在多线程环境下,队列的顺序可能会受到线程调度的影响。)
  • 高吞吐量:实测在16线程并发下吞吐量可达普通锁队列的3倍+
  • 内存高效:采用链表结构动态扩展,避免数组复制的开销

2) 性能对比(基准测试)

操作类型ConcurrentQueueQueue+Lock
100万次入队45 ms210 ms
100万次出队38 ms195 ms

3. 适用场景

  • 生产者 - 消费者模式(日志记录、任务分发)
    • 在生产者 - 消费者模式中,多个生产者线程同时向队列中放入任务(元素),多个消费者线程从队列中取出任务执行。ConcurrentQueue可以完美适配这种场景,确保数据的安全传递和并发操作的效率。例如,多个网络请求到达服务器(生产者),服务器将这些请求放入ConcurrentQueue,然后多个工作线程从队列中取出请求进行处理(消费者)。
  • 任务调度系统
    • 当需要调度多个任务按照顺序执行时,ConcurrentQueue可以用来存储任务的顺序。多个调度器线程可以从队列中取出任务并分配到合适的资源上执行,保证任务的有序性和并发性。

二、基本操作

1. 初始化队列

var queue = new ConcurrentQueue<string>();

2. 入队操作(Enqueue)

  • Enqueue方法用于向队列中添加元素。例如:
ConcurrentQueue<int> queue = new ConcurrentQueue<int>();
queue.Enqueue(1);
queue.Enqueue(2);
  • 在多线程环境下,多个线程可以同时调用Enqueue方法,而不需要担心数据冲突问题。
// 多线程安全添加
Parallel.For(0, 1000, i => {queue .Enqueue($"Item_{i}");
});

2. 出队操作(TryDequeue)

  • TryDequeue方法尝试从队列中取出一个元素。示例代码如下:
int value;
if (queue.TryDequeue(out value))
{Console.WriteLine(value);
}
// 或
if (queue.TryDequeue(out int value2))
{Console.WriteLine(value2);
}
  • 如果队列中有元素,TryDequeue会成功取出元素并将队列修改为相应的状态,返回true
  • 如果队列为空,则返回falsevalue保持其初始值。这一特性使得它在多线程并发访问队列时非常方便,不需要像普通队列那样额外进行线程同步处理。

3. 查看队首元素(TryPeek)

  • TryPeek方法可以查看队列的第一个元素而不将其移除队列。例如:
ConcurrentQueue<int> queue= new ConcurrentQueue<int>();
for (int i = 0; i < 10000; i++)
{queue.Enqueue(i);
}
int result = 0;
if (!queue.TryPeek(out result))
{Console.WriteLine("TryPeek failed when it should have succeeded");
}
else if (result!= 0)
{Console.WriteLine($"Expected TryPeek result of 0, got {result}");
}

4. TryGetNonEnumeratedCount 与 Count

1)TryGetNonEnumeratedCount 的作用

TryGetNonEnumeratedCount 是 .NET 6+ 引入的通用集合操作方法,其作用如下:

  • 尝试在不枚举集合的情况下获取元素数量
  • 对于实现了ICollection接口的类型(如ConcurrentQueue<T>ConcurrentBag<T>),直接返回Count属性值
  • 避免某些集合类型(如普通IEnumerable)需要枚举才能计数的性能损耗

2)与Count的区别

特性TryGetNonEnumeratedCountCount 属性
适用范围所有IEnumerable类型具体集合类型
返回值类型bool(是否成功获取)int(直接返回数量)
实现机制通过接口检查优化路径直接访问内部计数器
对未实现ICollection的集合可能返回false并需要枚举不可用

3) 示例

var queue = new ConcurrentQueue<int>();
queue.Enqueue(1);
queue.Enqueue(2);// 传统方式(直接访问 Count 属性)
Console.WriteLine($"Count: {queue.Count}"); // 新方式(实现 ICollection 接口的通用方法)
if (queue.TryGetNonEnumeratedCount(out int count)) {Console.WriteLine($"Non-enumerated count: {count}");
}

对于ConcurrentQueue<T>,两种方式本质相同。但在编写通用集合处理代码时,TryGetNonEnumeratedCount能更好地兼容各种集合类型,避免对未实现ICollection接口的集合进行低效枚举

5. 其他操作

1)清空队列

// 清空队列(.NET 5+)
queue.Clear();  // 注意:非原子操作!

2)IsEmpty

判断集合是否为空(同样存在瞬时性,可能不准确)。

TryDequeue 可能失败,需结合循环或超时机制

while (!queue.IsEmpty)
{if (queue.TryDequeue(out int item)) Process(item);
}

3)批量操作

// 转换为数组
var snapshot = concurrentQueue.ToArray();// 复制到目标数组
string[] buffer = new string[100];
concurrentQueue.CopyTo(buffer, 0);

三、为什么需要 ConcurrentQueue?

在多线程环境中,普通的队列(如 Queue<T>)可能会引发线程安全问题。例如,当多个线程同时对队列进行读写操作时,可能会导致数据丢失、异常或程序崩溃。而 ConcurrentQueue<T> 内部实现了高效的线程同步机制,确保了在并发场景下的数据安全。

1. 非线程安全案例

using System.Collections;class Program
{static void Main(){// 非线程安全版本(错误示例)var unsafeQueue = new Queue<int>();Parallel.For(0, 1000, i => {unsafeQueue.Enqueue(i); // 会导致数据丢失或抛出异常});Console.WriteLine($"非安全集合数量: {unsafeQueue.Count}"); // 结果通常小于1000}
}

运行结果

  • 运行代码时,unsafeQueue .Count 通常会小于 1000,甚至可能抛出异常。
  • 结果不确定:由于线程竞争是随机的,每次运行的结果可能不同。

2. 为什么不安全?

1) 问题根源

  • 线程不安全的 Queue
    • Queue 是普通的先进先出(FIFO)集合,但不保证多线程并发操作的安全性。
    • 当多个线程同时调用 Enqueue() 时,可能发生以下问题:
      • 数据覆盖:多个线程可能同时修改队列的底层数组和内部索引(如 _size 和 _tail),导致写入位置冲突,部分数据被覆盖。
      • 容量扩展竞争:当队列需要扩容时,多个线程可能同时触发内部数组的重新分配,导致数据丢失或数组损坏。
      • 计数不一致:Count 属性的值可能因线程间竞争而无法正确累加。
  • Parallel.For 的并发写入
    • Parallel.For(0, 1000, i => { … }) 会创建多个线程并行执行 Enqueue(i)。

2)错误场景

假设两个线程同时执行 Enqueue()

  • 线程 A 和线程 B 同时读取队列的当前尾部索引 _tail,假设此时 _tail = 5。
  • 线程 A 将值写入索引 5,然后更新 _tail 为 6。
  • 线程 B 也将值写入索引 5(因为它在步骤 1 中读到的 _tail 是 5),覆盖线程 A 写入的数据。
  • 最终队列实际写入的数据少于预期,且 Count 的值可能小于 1000。

3. 解决方案

1)使用线程安全的 ConcurrentQueue<T>

var safeQueue = new ConcurrentQueue<int>();
Parallel.For(0, 1000, i => 
{safeQueue.Enqueue(i);  // 线程安全
});
Console.WriteLine($"安全集合数量: {safeQueue.Count}");  // 结果为 1000
  • ConcurrentQueue 内部通过无锁算法或细粒度锁保证线程安全。

2)手动同步(lock 语句)

var unsafeQueue = new Queue<int>();
object lockObj = new object();Parallel.For(0, 1000, i => 
{lock (lockObj) {       // 强制串行化写入unsafeQueue.Enqueue(i);}
});
  • 通过锁强制每次 Enqueue 操作串行执行,但会牺牲并发性能。

4. QueueConcurrentQueue

  1. Queue的区别
    • 在普通的Queue<T>中,如果不是线程安全的环境,在多线程同时进行入队和出队操作时可能会产生数据混乱等问题,需要手动进行加锁等操作来保证线程安全。而ConcurrentQueue<T>是线程安全的,不需要额外的锁操作就能正确处理并发情况。
  2. 性能优势
    • 在高并发场景下,ConcurrentQueue的非阻塞算法(无锁)相比使用锁的传统队列有更好的性能。例如,普通使用锁的入队和出队操作(如下代码),在高并发时会导致线程频繁阻塞和唤醒:
    • ConcurrentQueue通过原子操作避免了线程阻塞,提高了并发处理效率。
    public class LockedQueue<T>
    {private Queue<T> _queue = new Queue<T>();private object _lock = new object();public void Enqueue(T item){lock (_lock){_queue.Enqueue(item);}}public bool TryDequeue(out T result){lock (_lock){if (_queue.Count > 0){result = _queue.Dequeue();return true;}result = default;return false;}}
    }
    

5. 使用示例

using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;public class Program
{static void Main(){ConcurrentQueue<int> queue = new ConcurrentQueue<int>();// 生产者线程Task producer = Task.Run(() =>{for (int i = 0; i < 10; i++){queue.Enqueue(i);Console.WriteLine($"Enqueued: {i}");}});// 消费者线程Task consumer = Task.Run(() =>{while (true){if (queue.TryDequeue(out int result)){Console.WriteLine($"Dequeued: {result}");}}});Task.WaitAll(producer, consumer);}
}

在这个示例中,生产者线程负责向队列中添加数据,消费者线程负责从队列中移除数据。由于 ConcurrentQueue<T> 的线程安全性,我们无需担心线程冲突问题。

四、典型应用场景

1. 生产者-消费者模式(带优雅关闭)

public class PipelineExample
{private readonly ConcurrentQueue<DataPacket> _queue = new();private readonly CancellationTokenSource _cts = new();public void StartProcessing(int consumerCount){// 生产者线程Task.Run(() =>{while (!_cts.IsCancellationRequested){var data = ReceiveNetworkPacket();_queue.Enqueue(data);}});// 消费者线程池Parallel.For(0, consumerCount, i =>{while (true){if (_queue.TryDequeue(out var data)){ProcessData(data);}else if (_cts.IsCancellationRequested){break;}else{SpinWait.SpinUntil(() => !_queue.IsEmpty || _cts.IsCancellationRequested);}}});}public void Stop() => _cts.Cancel();
}
ConcurrentQueue<SensorData> dataQueue = new();// 生产者线程
Task.Run(() => 
{while (true) {var data = ReadSensor();dataQueue.Enqueue(data);Thread.Sleep(100);}
});// 消费者线程
Task.Run(() => 
{while (true) {if (dataQueue.TryDequeue(out SensorData data)) {SaveToDatabase(data);}else {Thread.Sleep(50); // 降低CPU占用}}
});

2. 高并发日志系统设计

public static class AsyncLogger
{private static readonly ConcurrentQueue<string> _logQueue = new();private static readonly AutoResetEvent _signal = new(false);static AsyncLogger(){Task.Run(() =>{using var writer = new StreamWriter("app.log");while (true){_signal.WaitOne();while (_logQueue.TryDequeue(out var message)){writer.WriteLine($"[{DateTime.UtcNow:O}] {message}");}writer.Flush();}});}public static void Log(string message){_logQueue.Enqueue(message);_signal.Set();}
}

五、注意事项

  1. 元素顺序的相对性
    • 虽然ConcurrentQueue遵循FIFO原则,但是由于并发操作的存在,同一个线程内先入队的元素可能会后出队。在编写代码时需要考虑到这种情况,避免对元素顺序有过于严格的预期。
    • 虽然号称FIFO,但在以下场景可能出现顺序异常:
    // 线程A
    cq.Enqueue(1); // 时间戳T1
    cq.Enqueue(2); // T2// 线程B
    cq.Enqueue(3); // T1.5// 可能出队顺序:1 → 3 → 2
    
  2. 内存管理
    • 在高频率入队和出队操作中,要注意内存的使用情况,因为队列中的元素可能会随着时间不断积累(如果没有及时消费),可能会导致内存占用过高。
    • 对象池模式:复用出队对象,减少GC压力
    • 容量监控:定期检查cq.Count,设置阈值报警
// 对象池示例
var objectPool = new ObjectPool<DataModel>(() => new DataModel());
var item = objectPool.Get();
try {// 使用item...
} finally {objectPool.Return(item);
}
  1. 避免频繁计数Count 属性需要遍历链表,复杂度O(n)

六、 替代方案

当需要线程安全的先进先出集合时,ConcurrentQueue<T>通常是首选。但在以下场景需考虑替代方案:

  • 优先级队列PriorityQueue(.NET 6+)
  • 延迟处理System.Threading.Channels
  • 跨进程通信MemoryMappedFile + 环形缓冲区
  • 在需要阻塞操作时考虑结合 BlockingCollection

与其他并发容器的对比

特性ConcurrentQueueBlockingCollectionChannels
阻塞操作✔️✔️ (.NET Core+)
边界控制✔️✔️
内存效率
适用场景非阻塞队列有界集合异步管道

结语

回到目录页:C#/.NET 知识汇总
希望以上内容可以帮助到大家,如文中有不对之处,还请批评指正。


参考资料:
ConcurrentQueue<T> 类

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/70306.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

在Vue项目中使用three.js在前端页面展示PLY文件或STL文件

前言&#xff1a;这是一个3d打印局域网管理系统的需求 一、安装three.js three.js官网&#xff1a;https://threejs.org/docs/#manual/en/introduction/Installation 我用的是yarn,官网用的是npm 二、使用three.js 1.在script部分导入three.js import * as THREE from thr…

DeepSeek 助力 Vue 开发:打造丝滑的右键菜单(RightClickMenu)

前言&#xff1a;哈喽&#xff0c;大家好&#xff0c;今天给大家分享一篇文章&#xff01;并提供具体代码帮助大家深入理解&#xff0c;彻底掌握&#xff01;创作不易&#xff0c;如果能帮助到大家或者给大家一些灵感和启发&#xff0c;欢迎收藏关注哦 &#x1f495; 目录 Deep…

大数据学习(46) - Flink按键分区处理函数

&&大数据学习&& &#x1f525;系列专栏&#xff1a; &#x1f451;哲学语录: 承认自己的无知&#xff0c;乃是开启智慧的大门 &#x1f496;如果觉得博主的文章还不错的话&#xff0c;请点赞&#x1f44d;收藏⭐️留言&#x1f4dd;支持一下博主哦&#x1f91…

10分钟上手DeepSeek开发:SpringBoot + Vue2快速构建AI对话系统

作者&#xff1a;后端小肥肠 目录 1. 前言 为什么选择DeepSeek&#xff1f; 本文技术栈 2. 环境准备 2.1. 后端项目初始化 2.2. 前端项目初始化 3. 后端服务开发 3.1. 配置文件 3.2. 核心服务实现 4. 前端服务开发 4.1. 聊天组件ChatWindow.vue开发 5. 效果展示及源…

Transformer多头注意力并行计算原理与工业级实现:从数学推导到PyTorch工程优化

一、核心数学原理剖析 1.1 多头注意力矩阵分解 Q XW^Q ∈ R^{nd_k} K XW^K ∈ R^{nd_k} V XW^V ∈ R^{nd_v} 多头分解公式&#xff1a; head_i Attention(QW_i^Q, KW_i^K, VW_i^V) 其中 W_i^Q ∈ R^{d_kd_k/h}, W_i^K ∈ R^{d_kd_k/h}, W_i^V ∈ R^{d_vd_v/h} (h为头数…

通过监督微调提升多语言大语言模型性能

引言 澳鹏助力一家全球科技公司提升其大语言模型&#xff08;LLM&#xff09;的性能。通过提供结构化的人工反馈形式的大语言模型训练数据&#xff0c;让该模型在30多种语言、70多种方言中的表现得到优化。众包人员们进行多轮对话&#xff0c;并依据回复的相关性、连贯性、准确…

大数据开发治理平台~DataWorks(核心功能汇总)

目录 数据集成 功能概述 使用限制 功能相关补充说明 数据开发 功能概述 数据建模 功能概述 核心技术与架构 数据分析 功能概述 数据治理 数据地图 功能概述 数据质量 功能概述 数据治理资产 功能概述 使用限制 数据服务 功能概述 数据集成 DataWorks的数据…

用Nginx打造防盗链护盾

用Nginx打造防盗链护盾 一、你的网站正在"为他人做嫁衣"&#xff1f; 想象一下这个场景&#xff1a; 你精心拍摄的摄影作品、录制的课程视频、设计的原创素材&#xff0c;被其他网站直接盗用链接。 更气人的是——当用户在他们网站查看这些资源时&#xff0c;消耗的…

STM32 看门狗

目录 背景 独立看门狗&#xff08;IWDG&#xff09; 寄存器访问保护 窗口看门狗&#xff08;WWDG&#xff09; 程序 独立看门狗 设置独立看门狗程序 第一步、使能对独立看门狗寄存器的写操作 第二步、设置预分频和重装载值 第三步、喂狗 第四步、使能独立看门狗 喂狗…

Kubernetes的Ingress 资源是什么?

在Kubernetes中&#xff0c;Ingress资源是一种用于管理集群外部对内部服务访问的API对象&#xff0c;主要用于将不同的外部请求路由到集群内的不同服务&#xff0c;以下是关于它的详细介绍&#xff1a; 定义与作用 Ingress资源定义了从集群外部到内部服务的HTTP和HTTPS路由规…

vue3-03初学vue3中的配置项setup(Composition API (组合API组件中所用到的:数据、方法等,均要配置在setup中)

1.关于setup Vue3.0中一个新的配置项&#xff0c;值为一个函数.setup是所有Composition API (组合API)“表演的舞台”m组件中所用到的:数据、方法等等&#xff0c;均要配置在setup中。 2..setup函数使用 setup函数的两种返回值 1.若返回一个对象&#xff0c;则对象中的属性、…

【go语言规范】 使用函数式选项 Functional Options 模式处理可选配置

如何处理可选配置&#xff1f; Config Struct 方式 (config-struct/main.go) 这是最简单的方式&#xff0c;使用一个配置结构体&#xff1a; 定义了一个简单的 Config 结构体&#xff0c;包含 Port 字段创建服务器时直接传入配置对象优点&#xff1a;简单直接缺点&#xff1a…

leetcode 2585. 获得分数的方法数

题目如下 数据范围 莫要被困难的外衣骗了&#xff0c;本题就是有数量限制的完全背包问题。显然我们可以令 f(x,y)为当有x种题目时分数为y时的方法数 令某种题目的数量为k 那么方法数应该是 f(x,y) f(x - 1,y - k * (分值))其中(0 < k < 题目数量)通过代码 class So…

深入理解JavaScript中的异步编程与Promise

一、引言 在JavaScript的世界中&#xff0c;异步编程是一个核心概念&#xff0c;尤其是在处理网络请求、文件操作或任何可能阻塞主线程的任务时。本文将深入探讨JavaScript中的异步编程模型&#xff0c;特别是Promise对象的使用。 二、异步编程基础 2.1 什么是异步编程&…

VS Code 如何搭建C/C++开发环境

目录 1.VS Code是什么 2. VS Code的下载和安装 2.1 下载和安装 2.2.1 下载 2.2.2 安装 2.2 环境的介绍 2.3 安装中文插件 3. VS Code配置C/C开发环境 3.1 下载和配置MinGW-w64编译器套件 3.1.1 下载 3.1.2 配置 3.2 安装C/C插件 3.3 重启VSCode 4. 在VSCode上编写…

如何查询网站是否被百度蜘蛛收录?

一、使用site命令查询 这是最直接的方法。在百度搜索框中输入“site:你的网站域名”&#xff0c;例如“site:example.com”&#xff08;请将“example.com”替换为你实际的网站域名&#xff09;。如果搜索结果显示了你的网站页面&#xff0c;并且显示了收录的页面数量&#xf…

数仓搭建:DWS层(服务数据层)

DWS层示例: 搭建日主题宽表 需求 维度 步骤 在hive中建数据库dws >>建表 CREATE DATABASE if NOT EXISTS DWS; 建表sql CREATE TABLE yp_dws.dws_sale_daycount( --维度 city_id string COMMENT 城市id, city_name string COMMENT 城市name, trade_area_id string COMME…

伪类选择器

作用&#xff1a;选中特殊状态的元素 一、动态伪类 1. :link 超链接 未被访问 的状态。 2. :visited 超链接 访问过 的状态。 3. :hover 鼠标 悬停 在元素上的状态。 4. :active 元素 激活 的状态。 什么是激活&#xff1f; —— 按下鼠标不松开。 注意点&#xf…

Kubernetes:EKS 中 Istio Ingress Gateway 负载均衡器配置及常见问题解析

引言 在云原生时代&#xff0c;Kubernetes 已经成为容器编排的事实标准。AWS EKS (Elastic Kubernetes Service) 作为一项完全托管的 Kubernetes 服务&#xff0c;简化了在 AWS 上运行 Kubernetes 的复杂性。Istio 作为服务网格领域的佼佼者&#xff0c;为微服务提供了流量管理…

Docker安装Kafka(不依赖ZooKeeper)

创建docker-compose.yaml version: "3.9" #版本号 services:kafka:image: apache/kafka:3.9.0container_name: kafkahostname: kafkaports:- 9092:9092 # 容器内部之间使用的监听端口- 9094:9094 # 容器外部访问监听端口environment:KAFKA_NODE_ID: 1KAFKA_PROCES…