ConcurrentLinkedQueue详解(图文并茂)

news/2024/9/22 19:50:53

前言

ConcurrentLinkedQueue是基于链接节点的无界线程安全队列。此队列按照FIFO(先进先出)原则对元素进行排序。队列的头部是队列中存在时间最长的元素,而队列的尾部则是最近添加的元素。新的元素总是被插入到队列的尾部,而队列的获取操作(例如pollpeek)则是从队列头部开始。

与传统的LinkedList不同,ConcurrentLinkedQueue使用了一种高效的非阻塞算法,被称为无锁编程(Lock-Free programming),它通过原子变量和CAS(Compare-And-Swap)操作来保证线程安全,而不是通过传统的锁机制。这使得它在高并发场景下具有出色的性能表现。

可以看做一个线程安全的LinkedList,是一个线程安全的无界队列,但LinkedList是一个双向链表,而ConcurrentLinkedQueue是单向链表。

ConcurrentLinkedQueue线程安全在于设置head、tail以及next指针时都用的cas操作,而且node里的item和next变量都是用volatile修饰,保证了多线程下变量的可见性。而ConcurrentLinkedQueue的所有读操作都是无锁的,所以可能读会存在不一致性。

应用场景

如果对队列加锁的成本较高则适合使用无锁的 ConcurrentLinkedQueue 来替代。适合在对性能要求相对较高,同时有多个线程对队列进行读写的场景。

ConcurrentLinkedQueue通过无锁来做到了更高的并发量,是个高性能的队列,但是使用场景相对不如阻塞队列常见,毕竟取数据也要不停的去循环,不如阻塞的设计,但是在并发量特别大的情况下,是个不错的选择,性能上好很多,而且这个队列的设计也是特别费力,尤其的使用的改良算法和对哨兵的处理。

主要方法

ConcurrentLinkedQueue提供了丰富的方法来操作队列,包括:

  • offer(E e):将指定的元素插入此队列的尾部。
  • add(E e):将指定的元素插入此队列的尾部(与offer方法功能相同,但在失败时抛出异常)。
  • poll():获取并移除此队列的头部,如果此队列为空,则返回null
  • peek():获取但不移除此队列的头部,如果此队列为空,则返回null
  • size():返回此队列中的元素数量。需要注意的是,由于并发的原因,这个方法返回的结果可能并不准确。如果需要在并发环境下获取准确的元素数量,建议使用java.util.concurrent.atomic包中的原子变量进行计数。
  • isEmpty():检查此队列是否为空。与size()方法类似,由于并发的原因,这个方法返回的结果也可能不准确。

需要注意的是,在并发环境下使用size()isEmpty()方法时需要特别小心,因为它们的结果可能并不准确。如果需要精确的元素数量或空队列检测,建议使用额外的同步机制或原子变量来实现。

底层源码

类的内部类

private static class Node<E> {// 元素volatile E item;// next域volatile Node<E> next;/*** Constructs a new node.  Uses relaxed write because item can* only be seen after publication via casNext.*/// 构造函数Node(E item) {// 设置item的值UNSAFE.putObject(this, itemOffset, item);}// 比较并替换item值boolean casItem(E cmp, E val) {return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);}void lazySetNext(Node<E> val) {// 设置next域的值,并不会保证修改对其他线程立即可见UNSAFE.putOrderedObject(this, nextOffset, val);}// 比较并替换next域的值boolean casNext(Node<E> cmp, Node<E> val) {return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);}// Unsafe mechanics// 反射机制private static final sun.misc.Unsafe UNSAFE;// item域的偏移量private static final long itemOffset;// next域的偏移量private static final long nextOffset;static {try {UNSAFE = sun.misc.Unsafe.getUnsafe();Class<?> k = Node.class;itemOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("item"));nextOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("next"));} catch (Exception e) {throw new Error(e);}}
}

说明: Node类表示链表结点,用于存放元素,包含item域和next域,item域表示元素,next域表示下一个结点,其利用反射机制和CAS机制来更新item域和next域,保证原子性。

类的属性

public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>implements Queue<E>, java.io.Serializable {// 版本序列号        private static final long serialVersionUID = 196745693267521676L;// 反射机制private static final sun.misc.Unsafe UNSAFE;// head域的偏移量private static final long headOffset;// tail域的偏移量private static final long tailOffset;static {try {UNSAFE = sun.misc.Unsafe.getUnsafe();Class<?> k = ConcurrentLinkedQueue.class;headOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("head"));tailOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("tail"));} catch (Exception e) {throw new Error(e);}}// 头节点private transient volatile Node<E> head;// 尾结点private transient volatile Node<E> tail;
}

说明: 属性中包含了head域和tail域,表示链表的头节点和尾结点,同时,ConcurrentLinkedQueue也使用了反射机制和CAS机制来更新头节点和尾结点,保证原子性。

类的构造函数

  • ConcurrentLinkedQueue()
public ConcurrentLinkedQueue() {// 初始化头节点与尾结点head = tail = new Node<E>(null);
}

说明: 该构造函数用于创建一个最初为空的 ConcurrentLinkedQueue,头节点与尾结点指向同一个结点,该结点的item域为null,next域也为null。

  • ConcurrentLinkedQueue(Collection<? extends E>)
public ConcurrentLinkedQueue(Collection<? extends E> c) {Node<E> h = null, t = null;for (E e : c) { // 遍历c集合// 保证元素不为空checkNotNull(e);// 新生一个结点Node<E> newNode = new Node<E>(e);if (h == null) // 头节点为null// 赋值头节点与尾结点h = t = newNode;else {// 直接头节点的next域t.lazySetNext(newNode);// 重新赋值头节点t = newNode;}}if (h == null) // 头节点为null// 新生头节点与尾结点h = t = new Node<E>(null);// 赋值头节点head = h;// 赋值尾结点tail = t;
}

说明: 该构造函数用于创建一个最初包含给定 collection 元素的 ConcurrentLinkedQueue,按照此 collection 迭代器的遍历顺序来添加元素。

核心函数分析

offer函数

public boolean offer(E e) {// 检查e是不是null,是的话抛出NullPointerException异常。checkNotNull(e);// 创建新的节点final Node<E> newNode = new Node<E>(e);// 将“新的节点”添加到链表的末尾。for (Node<E> t = tail, p = t;;) {//这个for循环是个死循环,增加了两个指针p,t。Node<E> q = p.next;// 情况1:q为空,p就是尾节点,新节点插入if (q == null) {// CAS操作:如果“p的下一个节点为null”(即p为尾节点),则设置p的下一个节点为newNode。// 如果该CAS操作成功的话,则比较“p和t”(若p不等于t,则设置newNode为新的尾节点),然后返回true。// 如果该CAS操作失败,这意味着“其它线程对尾节点进行了修改”,则重新循环。if (p.casNext(null, newNode)) {if (p != t) // hop two nodes at a timecasTail(t, newNode);  // Failure is OK.return true;}}// 情况2:p和q相等else if (p == q)p = (t != (t = tail)) ? t : head;// 情况3:其它\\//这里是移动p指针,意思就是此时如果p不是最后一个元素则把p指针指向tail,否则指向q,也就是指向p.next元素。elsep = (p != t && t != (t = tail)) ? t : q;}
}

说明: offer函数用于将指定元素插入此队列的尾部。下面模拟offer函数的操作,队列状态的变化(假设单线程添加元素,连续添加10、20两个元素)。

若ConcurrentLinkedQueue的初始状态如上图所示,即队列为空。单线程添加元素,此时,添加元素10,则状态如下所示

如上图所示,添加元素10后,tail没有变化,还是指向之前的结点,继续添加元素20,则状态如下所示

如上图所示,添加元素20后,tail指向了最新添加的结点。

poll函数

public E poll() {restartFromHead:for (;;) { // 无限循环for (Node<E> h = head, p = h, q;;) { // 保存头节点// item项E item = p.item;if (item != null && p.casItem(item, null)) { // item不为null并且比较并替换item成功// Successful CAS is the linearization point// for item to be removed from this queue.if (p != h) // p不等于h    // hop two nodes at a time// 更新头节点updateHead(h, ((q = p.next) != null) ? q : p); // 返回itemreturn item;}else if ((q = p.next) == null) { // q结点为null// 更新头节点updateHead(h, p);return null;}else if (p == q) // p等于q// 继续循环continue restartFromHead;else// p赋值为qp = q;}}
}

说明: 此函数用于获取并移除此队列的头,如果此队列为空,则返回null。下面模拟poll函数的操作,队列状态的变化(假设单线程操作,状态为之前offer10、20后的状态,poll两次)。

队列初始状态如上图所示,在poll操作后,队列的状态如下图所示

如上图可知,poll操作后,head改变了,并且head所指向的结点的item变为了null。再进行一次poll操作,队列的状态如下图所示。

如上图可知,poll操作后,head结点没有变化,只是指示的结点的item域变成了null。

remove函数

public boolean remove(Object o) {// 元素为null,返回if (o == null) return false;Node<E> pred = null;for (Node<E> p = first(); p != null; p = succ(p)) { // 获取第一个存活的结点// 第一个存活结点的item值E item = p.item;if (item != null &&o.equals(item) &&p.casItem(item, null)) { // 找到item相等的结点,并且将该结点的item设置为null// p的后继结点Node<E> next = succ(p);if (pred != null && next != null) // pred不为null并且next不为null// 比较并替换next域pred.casNext(p, next);return true;}// pred赋值为ppred = p;}return false;
}

说明: 此函数用于从队列中移除指定元素的单个实例(如果存在)。其中,会调用到first函数和succ函数,first函数的源码如下

Node<E> first() {restartFromHead:for (;;) { // 无限循环,确保成功for (Node<E> h = head, p = h, q;;) {// p结点的item域是否为nullboolean hasItem = (p.item != null);if (hasItem || (q = p.next) == null) { // item不为null或者next域为null// 更新头节点updateHead(h, p);// 返回结点return hasItem ? p : null;}else if (p == q) // p等于q// 继续从头节点开始continue restartFromHead;else// p赋值为qp = q;}}
}

说明: first函数用于找到链表中第一个存活的结点。succ函数源码如下

final Node<E> succ(Node<E> p) {// p结点的next域Node<E> next = p.next;// 如果next域为自身,则返回头节点,否则,返回nextreturn (p == next) ? head : next;
}

说明: succ用于获取结点的下一个结点。如果结点的next域指向自身,则返回head头节点,否则,返回next结点。下面模拟remove函数的操作,队列状态的变化(假设单线程操作,状态为之前offer10、20后的状态,执行remove(10)、remove(20)操作)。

如上图所示,为ConcurrentLinkedQueue的初始状态,remove(10)后的状态如下图所示

如上图所示,当执行remove(10)后,head指向了head结点之前指向的结点的下一个结点,并且head结点的item域置为null。继续执行remove(20),状态如下图所示

如上图所示,执行remove(20)后,head与tail指向同一个结点,item域为null。

size函数

public int size() {// 计数int count = 0;for (Node<E> p = first(); p != null; p = succ(p)) // 从第一个存活的结点开始往后遍历if (p.item != null) // 结点的item域不为null// Collection.size() spec says to max outif (++count == Integer.MAX_VALUE) // 增加计数,若达到最大值,则跳出循环break;// 返回大小return count;
}

说明: 此函数用于返回ConcurrenLinkedQueue的大小,从第一个存活的结点(first)开始,往后遍历链表,当结点的item域不为null时,增加计数,之后返回大小。

HOPS(延迟更新的策略)的设计

通过上面对offer和poll方法的分析,我们发现tail和head是延迟更新的,两者更新触发时机为:

  • tail更新触发时机:当tail指向的节点的下一个节点不为null的时候,会执行定位队列真正的队尾节点的操作,找到队尾节点后完成插入之后才会通过casTail进行tail更新;当tail指向的节点的下一个节点为null的时候,只插入节点不更新tail。

  • head更新触发时机:当head指向的节点的item域为null的时候,会执行定位队列真正的队头节点的操作,找到队头节点后完成删除之后才会通过updateHead进行head更新;当head指向的节点的item域不为null的时候,只删除节点不更新head。

从上面更新时的状态图可以看出,head和tail的更新是“跳着的”即中间总是间隔了一个。那么这样设计的意图是什么呢?

如果让tail永远作为队列的队尾节点,实现的代码量会更少,而且逻辑更易懂。但是,这样做有一个缺点,如果大量的入队操作,每次都要执行CAS进行tail的更新,汇总起来对性能也会是大大的损耗。如果能减少CAS更新的操作,无疑可以大大提升入队的操作效率,所以doug lea大师每间隔1次(tail和队尾节点的距离为1)进行才利用CAS更新tail。对head的更新也是同样的道理,虽然,这样设计会多出在循环中定位队尾节点,但总体来说读的操作效率要远远高于写的性能,因此,多出来的在循环中定位尾节点的操作的性能损耗相对而言是很小的。

关于作者

来自一线程序员Seven的探索与实践,持续学习迭代中~

本文已收录于我的个人博客:https://www.seven97.top

公众号:seven97,欢迎关注~

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.ryyt.cn/news/63448.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈,一经查实,立即删除!

相关文章

Linux 中实现文本中所有的单词的第一个字符大写,其余字符小写

001、[root@PC1 test]# ls a.txt [root@PC1 test]# cat a.txt ## 测试数据 afdf eDET FDSS FFde fexk mxnd [root@PC1 test]# cat a.txt | awk {for(i = 1; i <= NF; i++) {$i = toupper(subst…

Docker方式搭建Maven私服

私服搭建 如下讲解如何基于Docker方式快速搭建Nexus3私服。 编写docker-compose.yaml文件,内容如下: version: 2services:nexus3:image: sonatype/nexus3:3.72.0container_name: nexus3restart: alwaysports:- 8081:8081 volumes:- /data/opt/nexus3/data:/nexus-data为了避免…

安全的路很长,致迷茫的你

最近有一些朋友找到我,跟我聊,说自己感觉很迷茫,不知所措,不知道未来该怎么办?安全该怎么做?OKR该怎么写?其实我想反问他以下几个问题: 1.漏洞研究了几个? 2.样本分析了几个? 3.这段时间看了多少安全技术类文档? 4.目前流行的恶意样本家族都有哪些? 5.目当流行的影…

我对什么都感兴趣,可我迷茫了

我收到一个同学给我的邮件问了个在我看来属于“太阳系”级的难题,比宇宙终极难题还差那么些^^他问: ----------------- 这几天一直挺困惑。说下我的问题,你有空的时候帮我解答下吧。 今天问自己个问题,找个自己的特长现在开始发展它。 基本上以后主要就靠这个特长工作。 但…

进程控制2

使用waitpid(pid, status,0);填入子进程的pid,阻塞父进程,直到子进程结束了,然后把子进程的pcb结构体的状态码读取出来。使用WIFEXITED(status)判断子进程是否正常退出,WEXITSTATUS(status)读取退出码来判断运行是否正常结束。 因为进程结束有3种情况:1.进程正常退出,并正…

redis八股

redis 八股文 基础内容 Redis面试题,56道Redis八股文(1.9万字97张手绘图),面渣逆袭必看👍 | 二哥的Java进阶之路 (javabetter.cn) Redis 常见面试题 | 小林coding (xiaolincoding.com) redis数据类型 Redis 常见面试题 | 小林coding (xiaolincoding.com) Redis 常见数据类…

[网鼎杯 2020 朱雀组]Nmap

这题考察的是nmap写一句话木马的知识 可以输入形如 <?php eval($_POST[1]);?> -oG 1.php 来写入一句话 经过测试这题过滤了php,我们就是用短标签绕过 -oG b.phtml ` 这段命令的作用是:将扫描结果保存到b.phtml中,同时这个phtml文件还包含了前面的一句话木马 目的就是…

电力煤矿液体泄漏识别系统

电力煤矿液体泄漏识别系统对电力煤矿危化品生产区域管道机械实时检测,当电力煤矿液体泄漏识别系统检测到机械管道出现液体泄漏时,系统立即抓拍存档并告警同步回传给报警信息给后台监控人员,让工作人员及时处理,电力煤矿液体泄漏识别系统实现危险区域跑冒滴漏异常自动监控抓…