网站设计人员白山北京网站建设
web/
2025/10/4 5:50:29/
文章来源:
网站设计人员,白山北京网站建设,涿州市网站建设,如何创建网站的第一步一、前言 接着前面的分析#xff0c;接下来分析ConcurrentLinkedQueue#xff0c;ConcurerntLinkedQueue一个基于链接节点的无界线程安全队列。此队列按照 FIFO#xff08;先进先出#xff09;原则对元素进行排序。队列的头部是队列中时间最长的元素。队列的尾部 是队列中时…一、前言 接着前面的分析接下来分析ConcurrentLinkedQueueConcurerntLinkedQueue一个基于链接节点的无界线程安全队列。此队列按照 FIFO先进先出原则对元素进行排序。队列的头部是队列中时间最长的元素。队列的尾部 是队列中时间最短的元素。新的元素插入到队列的尾部队列获取操作从队列头部获得元素。当多个线程共享访问一个公共 collection 时ConcurrentLinkedQueue是一个恰当的选择。此队列不允许使用null元素。 二、ConcurrentLinkedQueue数据结构 通过源码分析可知ConcurrentLinkedQueue的数据结构与LinkedBlockingQueue的数据结构相同都是使用的链表结构。ConcurrentLinkedQueue的数据结构如下 说明ConcurrentLinkedQueue采用的链表结构并且包含有一个头结点和一个尾结点。 三、ConcurrentLinkedQueue源码分析 3.1 类的继承关系 public class ConcurrentLinkedQueueE extends AbstractQueueEimplements QueueE, java.io.Serializable {} 说明ConcurrentLinkedQueue继承了抽象类AbstractQueueAbstractQueue定义了对队列的基本操作同时实现了Queue接口Queue定义了对队列的基本操作同时还实现了Serializable接口表示可以被序列化。 3.2 类的内部类 private static class NodeE {// 元素volatile E item;// next域volatile NodeE next;/*** Constructs a new node. Uses relaxed write because item can* only be seen after publication via casNext.*/// 构造函数Node(E item) {// 设置item的值UNSAFE.putObject(this, itemOffset, item);}// 比较并替换item值boolean casItem(E cmp, E val) {return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);}void lazySetNext(NodeE val) {// 设置next域的值并不会保证修改对其他线程立即可见UNSAFE.putOrderedObject(this, nextOffset, val);}// 比较并替换next域的值boolean casNext(NodeE cmp, NodeE val) {return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);}// Unsafe mechanics// 反射机制private static final sun.misc.Unsafe UNSAFE;// item域的偏移量private static final long itemOffset;// next域的偏移量private static final long nextOffset;static {try {UNSAFE sun.misc.Unsafe.getUnsafe();Class? k Node.class;itemOffset UNSAFE.objectFieldOffset(k.getDeclaredField(item));nextOffset UNSAFE.objectFieldOffset(k.getDeclaredField(next));} catch (Exception e) {throw new Error(e);}}} View Code 说明Node类表示链表结点用于存放元素包含item域和next域item域表示元素next域表示下一个结点其利用反射机制和CAS机制来更新item域和next域保证原子性。 3.3 类的属性 public class ConcurrentLinkedQueueE extends AbstractQueueEimplements QueueE, java.io.Serializable {// 版本序列号 private static final long serialVersionUID 196745693267521676L;// 反射机制private static final sun.misc.Unsafe UNSAFE;// head域的偏移量private static final long headOffset;// tail域的偏移量private static final long tailOffset;static {try {UNSAFE sun.misc.Unsafe.getUnsafe();Class? k ConcurrentLinkedQueue.class;headOffset UNSAFE.objectFieldOffset(k.getDeclaredField(head));tailOffset UNSAFE.objectFieldOffset(k.getDeclaredField(tail));} catch (Exception e) {throw new Error(e);}}// 头结点private transient volatile NodeE head;// 尾结点private transient volatile NodeE tail;
} View Code 说明属性中包含了head域和tail域表示链表的头结点和尾结点同时ConcurrentLinkedQueue也使用了反射机制和CAS机制来更新头结点和尾结点保证原子性。 3.4 类的构造函数 1. ConcurrentLinkedQueue()型构造函数 public ConcurrentLinkedQueue() {// 初始化头结点与尾结点head tail new NodeE(null);} View Code 说明该构造函数用于创建一个最初为空的 ConcurrentLinkedQueue头结点与尾结点指向同一个结点该结点的item域为nullnext域也为null。 2. ConcurrentLinkedQueue(Collection? extends E)型构造函数 public ConcurrentLinkedQueue(Collection? extends E c) {NodeE h null, t null;for (E e : c) { // 遍历c集合// 保证元素不为空checkNotNull(e);// 新生一个结点NodeE newNode new NodeE(e);if (h null) // 头结点为null// 赋值头结点与尾结点h t newNode;else {// 直接头结点的next域t.lazySetNext(newNode);// 重新赋值头结点t newNode;}}if (h null) // 头结点为null// 新生头结点与尾结点h t new NodeE(null);// 赋值头结点head h;// 赋值尾结点tail t;} View Code 说明该构造函数用于创建一个最初包含给定 collection 元素的 ConcurrentLinkedQueue按照此 collection 迭代器的遍历顺序来添加元素。 3.5 核心函数分析 1. offer函数 public boolean offer(E e) {// 元素不为nullcheckNotNull(e);// 新生一个结点final NodeE newNode new NodeE(e);for (NodeE t tail, p t;;) { // 无限循环// q为p结点的下一个结点NodeE q p.next;if (q null) { // q结点为null// p is last nodeif (p.casNext(null, newNode)) { // 比较并进行替换p结点的next域// Successful CAS is the linearization point// for e to become an element of this queue,// and for newNode to become live.if (p ! t) // p不等于t结点不一致 // hop two nodes at a time// 比较并替换尾结点casTail(t, newNode); // Failure is OK.// 返回return true;}// Lost CAS race to another thread; re-read next}else if (p q) // p结点等于q结点// We have fallen off list. If tail is unchanged, it// will also be off-list, in which case we need to// jump to head, from which all live nodes are always// reachable. Else the new tail is a better bet.// 原来的尾结点与现在的尾结点是否相等若相等则p赋值为head否则赋值为现在的尾结点p (t ! (t tail)) ? t : head;else// Check for tail updates after two hops.// 重新赋值p结点p (p ! t t ! (t tail)) ? t : q;}} View Code 说明offer函数用于将指定元素插入此队列的尾部。下面模拟offer函数的操作队列状态的变化假设单线程添加元素连续添加10、20两个元素。 ① 若ConcurrentLinkedQueue的初始状态如上图所示即队列为空。单线程添加元素此时添加元素10则状态如下所示 ② 如上图所示添加元素10后tail没有变化还是指向之前的结点继续添加元素20则状态如下所示 ③ 如上图所示添加元素20后tail指向了最新添加的结点。 2. poll函数 public E poll() {restartFromHead:for (;;) { // 无限循环for (NodeE h head, p h, q;;) { // 保存头结点// item项E item p.item;if (item ! null p.casItem(item, null)) { // item不为null并且比较并替换item成功// Successful CAS is the linearization point// for item to be removed from this queue.if (p ! h) // p不等于h // hop two nodes at a time// 更新头结点updateHead(h, ((q p.next) ! null) ? q : p); // 返回itemreturn item;}else if ((q p.next) null) { // q结点为null// 更新头结点updateHead(h, p);return null;}else if (p q) // p等于q// 继续循环continue restartFromHead;else// p赋值为qp q;}}} View Code 说明此函数用于获取并移除此队列的头如果此队列为空则返回null。下面模拟poll函数的操作队列状态的变化假设单线程操作状态为之前offer10、20后的状态poll两次。 ① 队列初始状态如上图所示在poll操作后队列的状态如下图所示 ② 如上图可知poll操作后head改变了并且head所指向的结点的item变为了null。再进行一次poll操作队列的状态如下图所示。 ③ 如上图可知poll操作后head结点没有变化只是指示的结点的item域变成了null。 3. remove函数 public boolean remove(Object o) {// 元素为null返回if (o null) return false;NodeE pred null;for (NodeE p first(); p ! null; p succ(p)) { // 获取第一个存活的结点// 第一个存活结点的item值E item p.item;if (item ! null o.equals(item) p.casItem(item, null)) { // 找到item相等的结点并且将该结点的item设置为null// p的后继结点NodeE next succ(p);if (pred ! null next ! null) // pred不为null并且next不为null// 比较并替换next域pred.casNext(p, next);return true;}// pred赋值为ppred p;}return false;} View Code 说明此函数用于从队列中移除指定元素的单个实例如果存在。其中会调用到first函数和succ函数first函数的源码如下 NodeE first() {restartFromHead:for (;;) { // 无限循环确保成功for (NodeE h head, p h, q;;) {// p结点的item域是否为nullboolean hasItem (p.item ! null);if (hasItem || (q p.next) null) { // item不为null或者next域为null// 更新头结点updateHead(h, p);// 返回结点return hasItem ? p : null;}else if (p q) // p等于q// 继续从头结点开始continue restartFromHead;else// p赋值为qp q;}}} View Code 说明first函数用于找到链表中第一个存活的结点。succ函数源码如下 final NodeE succ(NodeE p) {// p结点的next域NodeE next p.next;// 如果next域为自身则返回头结点否则返回nextreturn (p next) ? head : next;} View Code 说明succ用于获取结点的下一个结点。如果结点的next域指向自身则返回head头结点否则返回next结点。下面模拟remove函数的操作队列状态的变化假设单线程操作状态为之前offer10、20后的状态执行remove(10)、remove(20)操作。 ① 如上图所示为ConcurrentLinkedQueue的初始状态remove(10)后的状态如下图所示 ② 如上图所示当执行remove(10)后head指向了head结点之前指向的结点的下一个结点并且head结点的item域置为null。继续执行remove(20)状态如下图所示 ③ 如上图所示执行remove(20)后head与tail指向同一个结点item域为null。 4. size函数 public int size() {// 计数int count 0;for (NodeE p first(); p ! null; p succ(p)) // 从第一个存活的结点开始往后遍历if (p.item ! null) // 结点的item域不为null// Collection.size() spec says to max outif (count Integer.MAX_VALUE) // 增加计数若达到最大值则跳出循环break;// 返回大小return count;} View Code 说明此函数用于返回ConcurrenLinkedQueue的大小从第一个存活的结点first开始往后遍历链表当结点的item域不为null时增加计数之后返回大小。 五、示例 下面通过一个示例来了解ConcurrentLinkedQueue的使用 package com.hust.grid.leesf.collections;import java.util.concurrent.ConcurrentLinkedQueue;class PutThread extends Thread {private ConcurrentLinkedQueueInteger clq;public PutThread(ConcurrentLinkedQueueInteger clq) {this.clq clq;}public void run() {for (int i 0; i 10; i) {try {System.out.println(add i);clq.add(i);Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}}}
}class GetThread extends Thread {private ConcurrentLinkedQueueInteger clq;public GetThread(ConcurrentLinkedQueueInteger clq) {this.clq clq;}public void run() {for (int i 0; i 10; i) {try {System.out.println(poll clq.poll());Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}}}
}public class ConcurrentLinkedQueueDemo {public static void main(String[] args) {ConcurrentLinkedQueueInteger clq new ConcurrentLinkedQueueInteger();PutThread p1 new PutThread(clq);GetThread g1 new GetThread(clq);p1.start();g1.start();}
} View Code 运行结果某一次 add 0
poll null
add 1
poll 0
add 2
poll 1
add 3
poll 2
add 4
poll 3
add 5
poll 4
poll 5
add 6
add 7
poll 6
poll 7
add 8
add 9
poll 8 View Code 说明GetThread线程不会因为ConcurrentLinkedQueue队列为空而等待而是直接返回null所以当实现队列不空时等待时则需要用户自己实现等待逻辑。 六、总结 ConcurrentLinkedQueue的源码也相对简单其实对于并发集合而言分析源码时首先理解单线程情况然后再考虑在多线程并发时的情况这样会使得分析源码容易得多ConcurrentLinkedQueue和LinkedBlockingQueue的区别还是很明显的前者在取元素时若队列为空则返回null后者会进行等待。谢谢各位园友的观看~转载于:https://www.cnblogs.com/leesf456/p/5539142.html
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/86623.shtml
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!