当前位置:   article > 正文

26.jdk源码阅读之ConcurrentLinkedQueue

26.jdk源码阅读之ConcurrentLinkedQueue

1. 写在前面

在这里插入图片描述
ConcurrentLinkedQueue 是 Java 中提供的一个基于无界非阻塞算法的线程安全队列。它是一个适用于高并发环境的队列实现,基于链表结构。不知道大家在日常工作中这个类用的多不多,它的一些实现细节原理是否有思考过,比如下面这个几个问题:

  1. ConcurrentLinkedQueue 和 LinkedBlockingQueue 有什么区别?
  2. ConcurrentLinkedQueue 使用了哪些关键技术来实现线程安全?
  3. 如何确保 ConcurrentLinkedQueue 的高并发性能?
  4. ConcurrentLinkedQueue 的典型使用场景有哪些?
  5. ConcurrentLinkedQueue 如何实现无锁的插入和删除操作?
  6. ConcurrentLinkedQueue 的弱一致性是什么?
  7. ConcurrentLinkedQueue 的 size 方法是线程安全的吗?
  8. ConcurrentLinkedQueue 的迭代器是线程安全的吗?

2. 从使用说起

2.1 任务调度

在多线程环境中,使用 ConcurrentLinkedQueue 存储待处理的任务,并由多个工作线程并发地处理这些任务。

import java.util.concurrent.ConcurrentLinkedQueue;

public class TaskScheduler {
    private ConcurrentLinkedQueue<Runnable> taskQueue = new ConcurrentLinkedQueue<>();
    private volatile boolean running = true;

    // 添加任务到队列
    public void addTask(Runnable task) {
        taskQueue.offer(task);
    }

    // 启动工作线程
    public void startWorkers(int numWorkers) {
        for (int i = 0; i < numWorkers; i++) {
            new Thread(() -> {
                while (running) {
                    Runnable task = taskQueue.poll();
                    if (task != null) {
                        task.run();
                    }
                }
            }).start();
        }
    }

    // 停止任务调度
    public void stop() {
        running = false;
    }

    public static void main(String[] args) {
        TaskScheduler scheduler = new TaskScheduler();

        // 添加任务
        scheduler.addTask(() -> System.out.println("Task 1 executed"));
        scheduler.addTask(() -> System.out.println("Task 2 executed"));

        // 启动工作线程
        scheduler.startWorkers(2);

        // 为了演示方便,延迟 5 秒后停止任务调度
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        scheduler.stop();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49

2.2 事件驱动系统

在事件驱动系统中,使用 ConcurrentLinkedQueue 存储和处理事件。

import java.util.concurrent.ConcurrentLinkedQueue;

public class EventDrivenSystem {
    private ConcurrentLinkedQueue<String> eventQueue = new ConcurrentLinkedQueue<>();
    private volatile boolean running = true;

    // 添加事件到队列
    public void addEvent(String event) {
        eventQueue.offer(event);
    }

    // 处理事件
    public void startEventProcessor() {
        new Thread(() -> {
            while (running) {
                String event = eventQueue.poll();
                if (event != null) {
                    processEvent(event);
                }
            }
        }).start();
    }

    // 处理事件的逻辑
    private void processEvent(String event) {
        System.out.println("Processing event: " + event);
    }

    // 停止事件处理
    public void stop() {
        running = false;
    }

    public static void main(String[] args) {
        EventDrivenSystem system = new EventDrivenSystem();

        // 添加事件
        system.addEvent("Event 1");
        system.addEvent("Event 2");

        // 启动事件处理
        system.startEventProcessor();

        // 为了演示方便,延迟 5 秒后停止事件处理
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        system.stop();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52

2.3 消息队列

分布式系统中,使用 ConcurrentLinkedQueue 实现简单的消息队列。

import java.util.concurrent.ConcurrentLinkedQueue;

public class MessageQueue {
    private ConcurrentLinkedQueue<String> messageQueue = new ConcurrentLinkedQueue<>();
    private volatile boolean running = true;

    // 生产者线程:添加消息到队列
    public void startProducer() {
        new Thread(() -> {
            int messageCount = 0;
            while (running) {
                String message = "Message " + messageCount++;
                messageQueue.offer(message);
                System.out.println("Produced: " + message);
                try {
                    Thread.sleep(1000); // 模拟生产间隔
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }

    // 消费者线程:从队列中取出消息并处理
    public void startConsumer() {
        new Thread(() -> {
            while (running) {
                String message = messageQueue.poll();
                if (message != null) {
                    System.out.println("Consumed: " + message);
                }
            }
        }).start();
    }

    // 停止消息队列
    public void stop() {
        running = false;
    }

    public static void main(String[] args) {
        MessageQueue queue = new MessageQueue();

        // 启动生产者和消费者线程
        queue.startProducer();
        queue.startConsumer();

        // 为了演示方便,延迟 10 秒后停止消息队列
        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        queue.stop();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56

2.4 生产者-消费者模式

使用 ConcurrentLinkedQueue 实现生产者-消费者模式。

import java.util.concurrent.ConcurrentLinkedQueue;

public class ProducerConsumer {
    private ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<>();
    private volatile boolean running = true;

    // 生产者线程:生成数据并添加到队列
    public void startProducer() {
        new Thread(() -> {
            int count = 0;
            while (running) {
                queue.offer(count++);
                System.out.println("Produced: " + count);
                try {
                    Thread.sleep(500); // 模拟生产间隔
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }

    // 消费者线程:从队列中取出数据并处理
    public void startConsumer() {
        new Thread(() -> {
            while (running) {
                Integer data = queue.poll();
                if (data != null) {
                    System.out.println("Consumed: " + data);
                }
            }
        }).start();
    }

    // 停止生产和消费
    public void stop() {
        running = false;
    }

    public static void main(String[] args) {
        ProducerConsumer pc = new ProducerConsumer();

        // 启动生产者和消费者线程
        pc.startProducer();
        pc.startConsumer();

        // 为了演示方便,延迟 10 秒后停止生产和消费
        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        pc.stop();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55

3. add(E e)底层实现

  public boolean add(E e) {
        return offer(e);
    }
 public boolean offer(E e) {
        checkNotNull(e);
        final Node<E> newNode = new Node<E>(e);

        for (Node<E> t = tail, p = t;;) {
            Node<E> q = p.next;
            if (q == null) {
                // p is last node
                if (p.casNext(null, newNode)) {
                    // Successful CAS is the linearization point
                    // for e to become an element of this queue,
                    // and for newNode to become "live".
                    if (p != t) // hop two nodes at a time
                        casTail(t, newNode);  // Failure is OK.
                    return true;
                }
                // Lost CAS race to another thread; re-read next
            }
            else if (p == q)
                // We have fallen off list.  If tail is unchanged, it
                // will also be off-list, in which case we need to
                // jump to head, from which all live nodes are always
                // reachable.  Else the new tail is a better bet.
                p = (t != (t = tail)) ? t : head;
            else
                // Check for tail updates after two hops.
                p = (p != t && t != (t = tail)) ? t : q;
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32

3.1 add 方法

add 方法只是简单地调用 offer 方法。add 方法在 Collection 接口中定义,而 offer 方法在 Queue 接口中定义。两者在 ConcurrentLinkedQueue 中的实现是相同的。

3.2 offer 方法

  1. checkNotNull(e)
    这个方法检查传入的元素是否为 null,如果是 null,则抛出 NullPointerException。ConcurrentLinkedQueue 不允许存储 null 元素。
  2. 创建新节点
final Node<E> newNode = new Node<E>(e);
  • 1

创建一个新的节点来存储传入的元素。

  1. 循环遍历队列
for (Node<E> t = tail, p = t;;) {
    Node<E> q = p.next;
  • 1
  • 2

初始化两个节点引用 t 和 p,都指向队列的尾节点 tail。然后进入无限循环,遍历队列。
4. 检查 p.next

if (q == null) {
    // p is last node
    if (p.casNext(null, newNode)) {
        // Successful CAS is the linearization point
        // for e to become an element of this queue,
        // and for newNode to become "live".
        if (p != t) // hop two nodes at a time
            casTail(t, newNode);  // Failure is OK.
        return true;
    }
    // Lost CAS race to another thread; re-read next
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

如果 q 为 null,说明 p 是当前的尾节点。尝试通过 CAS 操作将 newNode 插入到 p.next。

  • 如果 CAS 操作成功,newNode 成为了队列的新尾节点。
  • 如果 p 不等于 t,说明 tail 已经更新,需要更新 tail 指向 newNode。
  • 返回 true,表示插入成功。
    如果 CAS 操作失败,说明有其他线程已经插入了新节点,需要重新读取 p.next。
  1. 处理尾节点不一致的情况
else if (p == q)
    // We have fallen off list.  If tail is unchanged, it
    // will also be off-list, in which case we need to
    // jump to head, from which all live nodes are always
    // reachable.  Else the new tail is a better bet.
    p = (t != (t = tail)) ? t : head;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

如果 p 等于 q,说明我们已经遍历到了队列的末尾。此时需要重新检查 tail,如果 tail 没有变化,则跳转到 head 节点重新开始遍历。
6. 正常遍历队列

else
    // Check for tail updates after two hops.
    p = (p != t && t != (t = tail)) ? t : q;
  • 1
  • 2
  • 3

如果 p.next 不为 null,说明当前节点不是尾节点。继续遍历队列,并在必要时更新 tail。

4. poll()的底层实现

下面这段代码是 ConcurrentLinkedQueue 中的 poll() 方法的实现。poll 方法用于从队列的头部移除并返回一个元素,如果队列为空则返回 null。

    public E poll() {
        restartFromHead:
        for (;;) {
            for (Node<E> h = head, p = h, q;;) {
                E item = p.item;

                if (item != null && p.casItem(item, null)) {
                    // Successful CAS is the linearization point
                    // for item to be removed from this queue.
                    if (p != h) // hop two nodes at a time
                        updateHead(h, ((q = p.next) != null) ? q : p);
                    return item;
                }
                else if ((q = p.next) == null) {
                    updateHead(h, p);
                    return null;
                }
                else if (p == q)
                    continue restartFromHead;
                else
                    p = q;
            }
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

4.1 无限循环和标签

restartFromHead:
for (;;) {
    for (Node<E> h = head, p = h, q;;) {

  • 1
  • 2
  • 3
  • 4

使用无限循环 for (;

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/煮酒与君饮/article/detail/911311
推荐阅读
相关标签