1. Hàng đợi ConcurrentQueue<T> là gì ?
ConcurrentQueue<T> được gọi là hàng đợi đồng thời, nó là một collection được hỗ trợ Thread-Safe trong C#. Hàng đợi đồng thời được ra mắt ở phiên bản C# 4.0 và nó nằm trong namespace System.Collections.Concurrent. ConcurrentQueue có cấu trúc dữ liệu First-In-First-Out (FIFO). Như vậy chúng ta sử dụng ConcurrentQueue<T> khi chúng ta muốn tạo một cơ chế FIFO để truy cập các item trong một collection từ nhiều luồng khác nhau và đảm bảo Thread-Safe.
Cách hoạt động của ConcurrentQueue<T> cũng tương tự như Queue<T>, chỉ khác nhau ở một điểm duy nhất đó là Queue<T> không cung cấp Thread-Safe cho nên nó không thể hoạt động trực tiếp trong môi trường bất đồng bộ trong khi ConcurrentQueue<T> được tạo ra để hoạt động trực tiếp trong môi trường bất đồng bộ. Như vậy, muốn dùng Queue<T> trong môi trường đa luồng thì lập trình viên phải thiết kế một cơ chế thủ công để khoá các hành động, tuy nhiên điều này rất tốn thời gian và dễ gặp lỗi. Vì thế, ConcurrentQueue<T> ra đời là sự lựa chọn lý tưởng để thay thế cho Queue<T> hoạt động trong môi trường đa luồng mà không yêu cầu một cơ chế khoá thủ công nào.
2. Tại sao cần sử dụng ConcurrentQueue<T> trong C# ?
Chúng ta cùng xem một ví dụ bên dưới, tạo ra một hàng đợi có chức năng là lấy trái cây ở đây có Apple và Lemon, có 3 lượt lấy và mỗi lượt lấy sẽ lấy ra số lượng tăng dần. Code bên dưới thực hiện trong một luồng duy nhất.
static void Main(string[] args)
{
Queue<string> queueFruit = new Queue<string>();
GetFruit("Apple", queueFruit);
GetFruit("Lemon", queueFruit);
foreach (string item in queueFruit)
{
Console.WriteLine(item);
}
Console.WriteLine();
}
private static void GetFruit(string nameFruit, Queue<string> queue)
{
for (int i = 0; i < 3; i++)
{
string item = string.Format($"Get {i + 1} {nameFruit} ");
queue.Enqueue(item);
Thread.Sleep(200);
}
}
Kết quả thu được là một danh sách có thứ tự trong hàng đợi như sau

Tiếp theo chúng ta sẽ sử dụng code bên trên nhưng dùng trong môi trường đa luồng, chúng ta sẽ tạo thêm 2 Thread mới ở mỗi tác vụ lấy trái cây
static void Main(string[] args)
{
Queue<string> queueFruit = new Queue<string>();
Task task1 = Task.Run(() =>
{
GetFruit("Apple", queueFruit); // Thực hiện lấy Táo trong một Thread mới
});
Task task2 = Task.Run(() =>
{
GetFruit("Lemon", queueFruit); // Thực hiện lấy Chanh trong một Thread mới
});
Task.WaitAll(task1, task2); //Chờ tất cả lấy xong
// Show kết quả lấy được
foreach (string item in queueFruit)
{
Console.WriteLine(item);
}
}
private static void GetFruit(string nameFruit, Queue<string> queue)
{
for (int i = 0; i < 3; i++)
{
string item = string.Format($"Get {i + 1} {nameFruit} ");
queue.Enqueue(item);
Thread.Sleep(100);
}
}
Lúc này chương trình sẽ chạy và thường xuyên xảy ra lỗi như thế này (không phải mỗi lần chạy đều ra lỗi)

Tại sao chúng ta nhận được một Exception như bên trên ?
Đó là bời vì phương thức Enqueue không được thiết kế để làm việc trong một môi trường song song giữa các luồng, nó không được cung cấp Thread-Safe. Do đó khi làm việc trong môi trường Multi-Thread thì ở mỗi thời điểm khởi chạy chương trình nếu các luồng chạy vào cùng lúc đến đoạn Enqueue thì Exception sẽ xảy ra, hàng đợi sẽ không thể phân biệt luồng nào Enqueue trước.
Vậy làm sao để dùng Queue trong môi trường Multi-Thread được ?
Để dùng được Queue trong môi trường Multi-Thread chúng ta sẽ tạo ra một cơ chế khoá thủ công. Lúc này chúng ta có thể dùng cơ chế có sẵn trong C# đó là lock object để thực hiện khoá đối tượng, mỗi lần một thread đạt được khoá sẽ ngăn chặn không cho thread khác truy cập vào khoá. Chúng ta sẽ thêm lock vào đối tượng là hàng đợi như bên dưới hoặc có thể tạo riêng một object khác.
private static void GetFruit(string nameFruit, Queue<string> queue)
{
for (int i = 0; i < 3; i++)
{
string item = string.Format($"Get {i + 1} {nameFruit} ");
lock (queue)
{
queue.Enqueue(item);
}
Thread.Sleep(100);
}
}
Lúc này chúng ta sẽ thu được kết quả mà không có Exception xảy ra

Nhưng bạn không nên lạm dụng cách trên, nếu bạn có quá nhiều lần gọi vào Enqueue từ nhiều thread khác nhau và từ nhiều phương thức khác nhau thì bạn sẽ phải khoá chúng ở khắp nơi và việc này sẽ gây chậm chương trình hoặc khó debug vì bỏ sót khoá. Đó đó để giải quyết vấn đề trên thì bạn phải dùng đến ConcurrentQueue.
3. Xây dựng ConcurrentQueue<T> với nhiều Thread truy cập
Bây giờ chúng ta sẽ đổi code từ Queue sang ConcurrentQueue và bỏ luôn cơ chế lock đi, bạn sẽ nhận được kết quả như mong muốn và không gặp bất cứ Exception nào cả vì bản thân ConcurrentQueue đã thiết kế giúp bạn cơ chế hoạt động trong nhiều luồng rồi.
static void Main(string[] args)
{
ConcurrentQueue<string> queueFruit = new ConcurrentQueue<string>();
Task task1 = Task.Run(() =>
{
GetFruit("Apple", queueFruit); // Thực hiện lấy Táo trong một Thread mới
});
Task task2 = Task.Run(() =>
{
GetFruit("Lemon", queueFruit); // Thực hiện lấy Chanh trong một Thread mới
});
Task.WaitAll(task1, task2); //Chờ tất cả lấy xong
// Show kết quả lấy được
foreach (string item in queueFruit)
{
Console.WriteLine(item);
}
}
private static void GetFruit(string nameFruit, ConcurrentQueue<string> queue)
{
for (int i = 0; i < 3; i++)
{
string item = string.Format($"Get {i + 1} {nameFruit} ");
queue.Enqueue(item);
Thread.Sleep(100);
}
}
4. Các Method, Properties và Constructor của ConcurrentQueue
Ở đây chúng ta sẽ tìm hiểu một số Method, Properties và Constructor cơ bản thường dùng của ConcurrentQueue
Đầu tiên chúng ta thấy ConcurrentQueue nằm trong namespace System.Collections.Concurrent

Có hai Constructor chính
ConcurrentQueue(): khởi tạo mới một instance của ConcurrentQueue
ConcurrentQueue(IEnumerable collection): khởi tạo mới một instance của ConcurrentQueue qua một collection cho trước.
Phương thức thường dùng
Enqueue(T item): đưa các item vào hàng đợi và cùng kiểu dữ liệu T, nếu khác kiểu thì sẽ gây lỗi
Ví dụ: ConcurrentQueue<int> queue= new ConcurrentQueue<int>();
ở đây bạn tạo ra hàng đợi đồng thời có item kiểu int thì chỉ có thể enqueue các giá trị kiểu int.
queue.Enqueue(0);
queue.Enqueue(1);
queue.Enqueue("hello"); // Lỗi biên dịch
Cách để truy cập vào ConcurrentQueue, bạn có thể dùng câu lệnh foreach
foreach(var item in queue)
{
// sử dụng các item
}
Quá trình Enqueue chỉ hợp lệ khi các phần tử Enqueue là cùng kiểu dữ liệu
static void Main(string[] args)
{
ConcurrentQueue<string> queueFruit = new ConcurrentQueue<string>();
queueFruit.Enqueue("Apple");
queueFruit.Enqueue("Banana");
queueFruit.Enqueue("Coconut");
queueFruit.Enqueue(10); // lỗi biên dịch xảy ra do không thể thêm kiểu int vào kiểu string
// Show kết quả lấy được
foreach (string item in queueFruit)
{
Console.WriteLine(item);
}
}
TryDequeue(out T result): phương thức này sẽ lấy ra phần tử ở đầu hàng đợi là kết quả trả về sẽ nằm ở tham số result nếu lấy ra thành công, sau khi lấy xong phần tử sẽ không còn ở trong hàng đợi nữa. Ngược lại nếu không có phần tử nào được lấy ra thì giá trị của result sẽ không xác định. Phương thức này sẽ trả về true nếu lấy ra thành công và false nếu lấy ra thất bại.
Xem ví dụ dưới đây
static void Main(string[] args)
{
ConcurrentQueue<string> queueFruit = new ConcurrentQueue<string>();
queueFruit.Enqueue("Apple");
queueFruit.Enqueue("Banana");
queueFruit.Enqueue("Coconut");
int totalFruit = queueFruit.Count; // lấy tổng số item lúc này là 3
for (int i = 0; i < totalFruit; i++)
{
bool isDoneGetFruit = queueFruit.TryDequeue(out string? fruit);
if (isDoneGetFruit)
{
Console.WriteLine("Done get fruit: " + fruit);
}
}
totalFruit = queueFruit.Count; // lúc này là 0, không còn phần tử nào
Console.WriteLine("Total item: " + totalFruit );
}

TryPeek(out T result): lấy ra phần từ ở đầu hàng đợi nhưng không xoá khỏi hàng đợi sau khi lấy xong
static void Main(string[] args)
{
ConcurrentQueue<string> queueFruit = new ConcurrentQueue<string>();
queueFruit.Enqueue("Apple");
queueFruit.Enqueue("Banana");
queueFruit.Enqueue("Coconut");
int totalFruit = queueFruit.Count;
bool isPeek = queueFruit.TryPeek(out string? peek); // Sẽ luôn lấy ra phần tử ở đầu hàng đợi
if (isPeek)
{
Console.WriteLine("First item in queue: "+peek); // Apple
}
else
{
Console.WriteLine("Not Found item");
}
totalFruit = queueFruit.Count; // lúc này là 3, peek không xoá phần tử nào
Console.WriteLine("Total item: " + totalFruit);
}

Clear() : trong .NET core sẽ có thêm phương thức Clear() có nhiệm vụ xoá hết hàng đợi
GetEnumerator(): trả về một danh sách kiểu liệt kê từ hàng đợi
CopyTo(T[] array, int index): sao chép các phần từ của hàng đợi đến một mảng cho trước kiểu T
Các thuộc tính
IsEmpty : kiểm tra xem hàng đợi có rỗng hay không, nếu có là true và ngược lại
Count: trả về tổng số phần tử của hàng đợi kiểu int.
Nguồn: DotNetTutorial