赞
踩
ConcurrentLinkedQueue 是 Java 中提供的一个基于无界非阻塞算法的线程安全队列。它是一个适用于高并发环境的队列实现,基于链表结构。不知道大家在日常工作中这个类用的多不多,它的一些实现细节原理是否有思考过,比如下面这个几个问题:
在多线程环境中,使用 ConcurrentLinkedQueue 存储待处理的任务,并由多个工作线程并发地处理这些任务。
import java.util.concurrent.ConcurrentLinkedQueue;
public class TaskScheduler {
private ConcurrentLinkedQueue<Runnable> taskQueue = new ConcurrentLinkedQueue<>();
private volatile boolean running = true;
// 添加任务到队列
public void addTask(Runnable task) {
taskQueue.offer(task);
}
// 启动工作线程
public void startWorkers(int numWorkers) {
for (int i = 0; i < numWorkers; i++) {
new Thread(() -> {
while (running) {
Runnable task = taskQueue.poll();
if (task != null) {
task.run();
}
}
}).start();
}
}
// 停止任务调度
public void stop() {
running = false;
}
public static void main(String[] args) {
TaskScheduler scheduler = new TaskScheduler();
// 添加任务
scheduler.addTask(() -> System.out.println("Task 1 executed"));
scheduler.addTask(() -> System.out.println("Task 2 executed"));
// 启动工作线程
scheduler.startWorkers(2);
// 为了演示方便,延迟 5 秒后停止任务调度
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
scheduler.stop();
}
}
在事件驱动系统中,使用 ConcurrentLinkedQueue 存储和处理事件。
import java.util.concurrent.ConcurrentLinkedQueue;
public class EventDrivenSystem {
private ConcurrentLinkedQueue<String> eventQueue = new ConcurrentLinkedQueue<>();
private volatile boolean running = true;
// 添加事件到队列
public void addEvent(String event) {
eventQueue.offer(event);
}
// 处理事件
public void startEventProcessor() {
new Thread(() -> {
while (running) {
String event = eventQueue.poll();
if (event != null) {
processEvent(event);
}
}
}).start();
}
// 处理事件的逻辑
private void processEvent(String event) {
System.out.println("Processing event: " + event);
}
// 停止事件处理
public void stop() {
running = false;
}
public static void main(String[] args) {
EventDrivenSystem system = new EventDrivenSystem();
// 添加事件
system.addEvent("Event 1");
system.addEvent("Event 2");
// 启动事件处理
system.startEventProcessor();
// 为了演示方便,延迟 5 秒后停止事件处理
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
system.stop();
}
}
分布式系统中,使用 ConcurrentLinkedQueue 实现简单的消息队列。
import java.util.concurrent.ConcurrentLinkedQueue;
public class MessageQueue {
private ConcurrentLinkedQueue<String> messageQueue = new ConcurrentLinkedQueue<>();
private volatile boolean running = true;
// 生产者线程:添加消息到队列
public void startProducer() {
new Thread(() -> {
int messageCount = 0;
while (running) {
String message = "Message " + messageCount++;
messageQueue.offer(message);
System.out.println("Produced: " + message);
try {
Thread.sleep(1000); // 模拟生产间隔
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
// 消费者线程:从队列中取出消息并处理
public void startConsumer() {
new Thread(() -> {
while (running) {
String message = messageQueue.poll();
if (message != null) {
System.out.println("Consumed: " + message);
}
}
}).start();
}
// 停止消息队列
public void stop() {
running = false;
}
public static void main(String[] args) {
MessageQueue queue = new MessageQueue();
// 启动生产者和消费者线程
queue.startProducer();
queue.startConsumer();
// 为了演示方便,延迟 10 秒后停止消息队列
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
queue.stop();
}
}
使用 ConcurrentLinkedQueue 实现生产者-消费者模式。
import java.util.concurrent.ConcurrentLinkedQueue;
public class ProducerConsumer {
private ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<>();
private volatile boolean running = true;
// 生产者线程:生成数据并添加到队列
public void startProducer() {
new Thread(() -> {
int count = 0;
while (running) {
queue.offer(count++);
System.out.println("Produced: " + count);
try {
Thread.sleep(500); // 模拟生产间隔
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
// 消费者线程:从队列中取出数据并处理
public void startConsumer() {
new Thread(() -> {
while (running) {
Integer data = queue.poll();
if (data != null) {
System.out.println("Consumed: " + data);
}
}
}).start();
}
// 停止生产和消费
public void stop() {
running = false;
}
public static void main(String[] args) {
ProducerConsumer pc = new ProducerConsumer();
// 启动生产者和消费者线程
pc.startProducer();
pc.startConsumer();
// 为了演示方便,延迟 10 秒后停止生产和消费
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
pc.stop();
}
}
public boolean add(E e) {
return offer(e);
}
public boolean offer(E e) {
checkNotNull(e);
final Node<E> newNode = new Node<E>(e);
for (Node<E> t = tail, p = t;;) {
Node<E> q = p.next;
if (q == null) {
// p is last node
if (p.casNext(null, newNode)) {
// Successful CAS is the linearization point
// for e to become an element of this queue,
// and for newNode to become "live".
if (p != t) // hop two nodes at a time
casTail(t, newNode); // Failure is OK.
return true;
}
// Lost CAS race to another thread; re-read next
}
else if (p == q)
// We have fallen off list. If tail is unchanged, it
// will also be off-list, in which case we need to
// jump to head, from which all live nodes are always
// reachable. Else the new tail is a better bet.
p = (t != (t = tail)) ? t : head;
else
// Check for tail updates after two hops.
p = (p != t && t != (t = tail)) ? t : q;
}
}
add 方法只是简单地调用 offer 方法。add 方法在 Collection 接口中定义,而 offer 方法在 Queue 接口中定义。两者在 ConcurrentLinkedQueue 中的实现是相同的。
final Node<E> newNode = new Node<E>(e);
创建一个新的节点来存储传入的元素。
for (Node<E> t = tail, p = t;;) {
Node<E> q = p.next;
初始化两个节点引用 t 和 p,都指向队列的尾节点 tail。然后进入无限循环,遍历队列。
4. 检查 p.next
if (q == null) {
// p is last node
if (p.casNext(null, newNode)) {
// Successful CAS is the linearization point
// for e to become an element of this queue,
// and for newNode to become "live".
if (p != t) // hop two nodes at a time
casTail(t, newNode); // Failure is OK.
return true;
}
// Lost CAS race to another thread; re-read next
}
如果 q 为 null,说明 p 是当前的尾节点。尝试通过 CAS 操作将 newNode 插入到 p.next。
else if (p == q)
// We have fallen off list. If tail is unchanged, it
// will also be off-list, in which case we need to
// jump to head, from which all live nodes are always
// reachable. Else the new tail is a better bet.
p = (t != (t = tail)) ? t : head;
如果 p 等于 q,说明我们已经遍历到了队列的末尾。此时需要重新检查 tail,如果 tail 没有变化,则跳转到 head 节点重新开始遍历。
6. 正常遍历队列
else
// Check for tail updates after two hops.
p = (p != t && t != (t = tail)) ? t : q;
如果 p.next 不为 null,说明当前节点不是尾节点。继续遍历队列,并在必要时更新 tail。
下面这段代码是 ConcurrentLinkedQueue 中的 poll() 方法的实现。poll 方法用于从队列的头部移除并返回一个元素,如果队列为空则返回 null。
public E poll() {
restartFromHead:
for (;;) {
for (Node<E> h = head, p = h, q;;) {
E item = p.item;
if (item != null && p.casItem(item, null)) {
// Successful CAS is the linearization point
// for item to be removed from this queue.
if (p != h) // hop two nodes at a time
updateHead(h, ((q = p.next) != null) ? q : p);
return item;
}
else if ((q = p.next) == null) {
updateHead(h, p);
return null;
}
else if (p == q)
continue restartFromHead;
else
p = q;
}
}
}
restartFromHead:
for (;;) {
for (Node<E> h = head, p = h, q;;) {
使用无限循环 for (;
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。