一、Monitor 原理
二、synchronized 原理
(一)、基础
synchronized 即使内部抛出异常也会释放锁
(二)、轻量级锁
static final Object obj = new Object();
public static void method1() {synchronized( obj ) {// 同步块 Amethod2();}
}
public static void method2() {synchronized( obj ) {// 同步块 B}
}
创建轻量级锁的过程
(三)、锁膨胀
当轻量级锁加锁失败时,会将轻量级锁升级为Monitor锁
(三)、自旋优化
注意:
- 自旋会占用 CPU 时间,单核 CPU 自旋就是浪费,多核 CPU 自旋才能发挥优势。
- 在 Java 6 之后自旋锁是自适应的,比如对象刚刚的一次自旋操作成功过,那么认为这次自旋成功的可能性会高,就多自旋几次;反之,就少自旋甚至不自旋,总之,比较智能。
- Java 7 之后不能控制是否开启自旋功能
(四)、偏向锁
1、基本概念2
例如:
static final Object obj = new Object();public static void m1() {synchronized (obj) {// 同步块 Am2();}
}public static void m2() {synchronized (obj) {// 同步块 Bm3();}
}public static void m3() {synchronized (obj) {}
}
|--------------------------------------------------------------------|--------------------|
| Mark Word (64 bits) | State |
|--------------------------------------------------------------------|--------------------|
| unused:25 | hashcode:31 | unused:1 | age:4 | biased_lock:0 | 01 | Normal |
|--------------------------------------------------------------------|--------------------|
| thread:54 | epoch:2 | unused:1 | age:4 | biased_lock:1 | 01 | Biased(偏向锁) |
|--------------------------------------------------------------------|--------------------|
| ptr_to_lock_record:62 | 00 | Lightweight Locked |
|--------------------------------------------------------------------|--------------------|
| ptr_to_heavyweight_monitor:62 | 10 | Heavyweight Locked |
|--------------------------------------------------------------------|--------------------|
| | 11 | Marked for GC |
|--------------------------------------------------------------------|--------------------|
轻量级锁(00),存储锁记录的指针
重量级锁(10),存储重量级锁的指针
注意:
- 适用于单线程下用带锁的方法
- 如果开启了偏向锁(默认开启),那么对象创建后,markword 值为 0x05 即最后 3 位为 101,这时它的thread、epoch、age 都为 0
- 偏向锁是默认是延迟的,不会在程序启动时立即生效,如果想避免延迟,可以加 VM 参数 - XX:BiasedLockingStartupDelay=0 来禁用延迟
- 如果没有开启偏向锁,那么对象创建后,markword 值为 0x01 即最后 3 位为 001,这时它的 hashcode、 age 都为 0,第一次用到 hashcode 时才会赋
- 在代码运行时在添加 VM 参数 - XX: - UseBiasedLocking 禁用偏向锁
2、撤销偏向锁
(1)、调用了对象的 hashCode,但偏向锁的对象 MarkWord 中存储的是线程 id,如果调用 hashCode 会导致偏向锁被(为了存储hashcode没有地方存储线程id了)
- 轻量级锁会在锁记录中记录 hashCode
- 重量级锁会在 Monitor 中记录 hashCode
(2)、当有其它线程使用偏向锁对象时,会将偏向锁升级为轻量级锁(A线程加了偏向锁之后运行完成,然后B线程加锁)
(3)、使用wait/notify时
3、批量重偏向
private static void test3() throws InterruptedException {Vector<Dog> list = new Vector<>();Thread t1 = new Thread(() -> {for (int i = 0; i < 30; i++) {Dog d = new Dog();list.add(d);synchronized (d) {log.debug(i + "\t" + ClassLayout.parseInstance(d).toPrintableSimple(true));}}synchronized (list) {list.notify();}}, "t1");t1.start();Thread t2 = new Thread(() -> {synchronized (list) {try {list.wait();} catch (InterruptedException e) {e.printStackTrace();}}log.debug("===============> ");for (int i = 0; i < 30; i++) {Dog d = list.get(i);log.debug(i + "\t" + ClassLayout.parseInstance(d).toPrintableSimple(true));synchronized (d) {log.debug(i + "\t" + ClassLayout.parseInstance(d).toPrintableSimple(true));}log.debug(i + "\t" + ClassLayout.parseInstance(d).toPrintableSimple(true));}}, "t2");t2.start();
}
在t2线程中,前20次对象都是偏向线程t1的,但是撤销20次之后,剩下的对象全部偏向于t2线程。
批量重偏向不是批量执行的。而是说本来偏向锁只会偏向第一个锁定它的线程,之后再有线程就会是轻量锁。但因为撤销达到20次所以该类的对象当被锁定时再次被赋予偏向的能力。
4、批量撤销
5、锁消除
JIT对于热点代码进行优化,把不需要加锁但加了锁的地方进行优化
三、join原理
运用了模式中的保护性暂停
public final synchronized void join(long millis)throws InterruptedException {long base = System.currentTimeMillis();long now = 0;if (millis < 0) {throw new IllegalArgumentException("timeout value is negative");}if (millis == 0) {while (isAlive()) {wait(0);}} else {while (isAlive()) {long delay = millis - now;if (delay <= 0) {break;}wait(delay);now = System.currentTimeMillis() - base;}}}
四、park/unpark
五、重新理解线程状态转换
1 NEW --> RUNNABLE
2 RUNNABLE <--> WAITING
3 RUNNABLE <--> WAITING
4 RUNNABLE <--> WAITING
5 RUNNABLE <--> TIMED_WAITING
6 RUNNABLE <--> TIMED_WAITING
7 RUNNABLE <--> TIMED_WAITING
8 RUNNABLE <--> TIMED_WAITING
9 RUNNABLE <--> BLOCKED
10 RUNNABLE <--> TERMINATED
六、死锁
七、活锁
package com.itcast.test;import lombok.extern.slf4j.Slf4j;@Slf4j(topic = "c.Test7")
public class Test7 {static volatile int count = 10;static final Object lock = new Object();public static void main(String[] args){new Thread(() -> {// 期望减到 0 退出循环while (count > 0) {try {Thread.sleep(200);} catch (InterruptedException e) {e.printStackTrace();}count--;log.debug("count: {}", count);}}, "t1").start();new Thread(() -> {// 期望超过 20 退出循环while (count < 20) {try {Thread.sleep(200);} catch (InterruptedException e) {e.printStackTrace();}count++;log.debug("count: {}", count);}}, "t2").start();}
}
}
八、饥饿
九、共享模型之内存
(一)、JMM
- 原子性 - 保证指令不会受到线程上下文切换的影响
- 可见性 - 保证指令不会受 cpu 缓存的影响
- 有序性 - 保证指令不会受 cp
(二)、可见性(volatile)
static boolean run = true;public static void main(String[] args) throws InterruptedException {Thread t = new Thread(()->{while(run){// ....}});t.start();sleep(1);run = false; // 线程t不会如预想的停下来}
解决方法
volatile static boolean run = true;
注意:如果对改为
while(true){synchronized(this){if(!run) break;}
}
加了锁之后,不用加volatile也可以
(三)、有序性
1、指令重排
2、出错的结果
int num = 0;
boolean ready = false;
// 线程1 执行此方法
public void actor1(I_Result r) {if(ready) {r.r1 = num + num;} else {r.r1 = 1;}
}
// 线程2 执行此方法
public void actor2(I_Result r) {num = 2;ready = true;
}
如果actor2发生了指令重排,有可能结果会出现等于0的情况
3、解决方法
volatile boolean ready = false;
将ready添加volatile关键字,可以防止ready前面的指令发生重排,所以不需要将num也添加volatile关键字
十、volatile原理
(一)、如何保证可见性
public void actor2(I_Result r) {num = 2;ready = true; // ready 是 volatile 赋值带写屏障// 写屏障
}
public void actor1(I_Result r) {// 读屏障// ready 是 volatile 读取值带读屏障if(ready) {r.r1 = num + num;} else {r.r1 = 1;}
}
(二)、如何保证有序性
public void actor2(I_Result r) {num = 2;ready = true; // ready 是 volatile 赋值带写屏障// 写屏障
}
public void actor1(I_Result r) {// 读屏障// ready 是 volatile 读取值带读屏障if(ready) {r.r1 = num + num;} else {r.r1 = 1;}
}
(三)、double-checked locking 问题
public final class Singleton {private Singleton() { }private static Singleton INSTANCE = null;public static Singleton getInstance() {if(INSTANCE == null) { // t2// 首次访问会同步,而之后的使用没有 synchronizedsynchronized(Singleton.class) {if (INSTANCE == null) { // t1INSTANCE = new Singleton();}}}return INSTANCE;}
}
(四)、double-checked locking 解决
private static volatile Singleton INSTANCE = null;
(五)、happens-before
十一、LongAdder原理
// 累加单元数组, 懒惰初始化
transient volatile Cell[] cells;
// 基础值, 如果没有竞争, 则用 cas 累加这个域
transient volatile long base;
// 在 cells 创建或扩容时, 置为 1, 表示加锁
transient volatile int cellsBusy;
(一)、cas锁
// 不要用于实践!!!
public class LockCas {private AtomicInteger state = new AtomicInteger(0);public void lock() {while (true) {if (state.compareAndSet(0, 1)) {break;}}}public void unlock() {log.debug("unlock...");state.set(0);}
}
测试
LockCas lock = new LockCas();
new Thread(() -> {log.debug("begin...");lock.lock();try {log.debug("lock...");sleep(1);} finally {lock.unlock();}}).start();new Thread(() -> {log.debug("begin...");lock.lock();try {log.debug("lock...");} finally {lock.unlock();}}).start();
(二)、原理之伪共享
// 防止缓存行伪共享
@sun.misc.Contended
static final class Cell {volatile long value;Cell(long x) { value = x; }// 最重要的方法, 用来 cas 方式进行累加, prev 表示旧值, next 表示新值final boolean cas(long prev, long next) {return UNSAFE.compareAndSwapLong(this, valueOffset, prev, next);}// 省略不重要代码
}

public void add(long x) {// as 为累加单元数组// b 为基础值// x 为累加值Cell[] as;long b, v;int m;Cell a;// 进入 if 的两个条件// 1. as 有值, 表示已经发生过竞争, 进入 if// 2. cas 给 base 累加时失败了, 表示 base 发生了竞争, 进入 ifif ((as = cells) != null || !casBase(b = base, b + x)) {// uncontended 表示 cell 没有竞争boolean uncontended = true;if (// as 还没有创建as == null || (m = as.length - 1) < 0 ||// 当前线程对应的 cell 还没有(a = as[getProbe() & m]) == null ||// cas 给当前线程的 cell 累加失败 uncontended=false ( a 为当前线程的 cell )!(uncontended = a.cas(v = a.value, v + x))) {// 进入 cell 数组创建、cell 创建的流程longAccumulate(x, null, uncontended);}}
}
final void longAccumulate(long x, LongBinaryOperator fn,boolean wasUncontended) {int h;// 当前线程还没有对应的 cell, 需要随机生成一个 h 值用来将当前线程绑定到 cellif ((h = getProbe()) == 0) {// 初始化 probeThreadLocalRandom.current();// h 对应新的 probe 值, 用来对应 cellh = getProbe();wasUncontended = true;}// collide 为 true 表示需要扩容boolean collide = false;for (; ; ) {Cell[] as;Cell a;int n;long v;// 已经有了 cellsif ((as = cells) != null && (n = as.length) > 0) {// 还没有 cellif ((a = as[(n - 1) & h]) == null) {// 为 cellsBusy 加锁, 创建 cell, cell 的初始累加值为 x// 成功则 break, 否则继续 continue 循环}// 有竞争, 改变线程对应的 cell 来重试 caselse if (!wasUncontended)wasUncontended = true;// cas 尝试累加, fn 配合 LongAccumulator 不为 null, 配合 LongAdder 为 nullelse if (a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x))))break;// 如果 cells 长度已经超过了最大长度, 或者已经扩容, 改变线程对应的 cell 来重试 caselse if (n >= NCPU || cells != as)collide = false;// 确保 collide 为 false 进入此分支, 就不会进入下面的 else if 进行扩容了else if (!collide)collide = true;// 加锁else if (cellsBusy == 0 && casCellsBusy()) {// 加锁成功, 扩容continue;}// 改变线程对应的 cellh = advanceProbe(h);}// 还没有 cells, 尝试给 cellsBusy 加锁else if (cellsBusy == 0 && cells == as && casCellsBusy()) {// 加锁成功, 初始化 cells, 最开始长度为 2, 并填充一个 cell// 成功则 break;}// 上两种情况失败, 尝试给 base 累加else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x))))break;}
}

public long sum() {Cell[] as = cells; Cell a;long sum = base;if (as != null) {for (int i = 0; i < as.length; ++i) {if ((a = as[i]) != null)sum += a.value;}}return sum;
}
十二、不可变设计
string
public final class Stringimplements java.io.Serializable, Comparable<String>, CharSequence {/** The value is used for character storage. */private final char value[];/** Cache the hash code for the string */private int hash; // Default to 0// ...}
十三、final原理

十四、AQS原理
(一)、概述
(二)、实现不可重入锁
final class MySync extends AbstractQueuedSynchronizer {@Overrideprotected boolean tryAcquire(int acquires) {if (acquires == 1) {if (compareAndSetState(0, 1)) {setExclusiveOwnerThread(Thread.currentThread());return true;}}return false;}@Overrideprotected boolean tryRelease(int acquires) {if (acquires == 1) {if (getState() == 0) {throw new IllegalMonitorStateException();}setExclusiveOwnerThread(null);setState(0);return true;}return false;}protected Condition newCondition() {return new ConditionObject();}@Overrideprotected boolean isHeldExclusively() {return getState() == 1;}
}
class MyLock implements Lock {static MySync sync = new MySync();@Override// 尝试,不成功,进入等待队列public void lock() {sync.acquire(1);}@Override// 尝试,不成功,进入等待队列,可打断public void lockInterruptibly() throws InterruptedException {sync.acquireInterruptibly(1);}@Override// 尝试一次,不成功返回,不进入队列public boolean tryLock() {return sync.tryAcquire(1);}@Override// 尝试,不成功,进入等待队列,有时限public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {return sync.tryAcquireNanos(1, unit.toNanos(time));}@Override// 释放锁public void unlock() {sync.release(1);}@Override// 生成条件变量public Condition newCondition() {return sync.newCondition();}
}
十五、ReentrantLock 原理
(一)、非公平锁实现原理
1、加锁解锁流程
public ReentrantLock() {sync = new NonfairSync();
}

加锁流程
- 构造器构造,默认构造非公平锁
- (无竞争,第一个线程尝试加锁时)加锁,luck(),
final void lock() {// 首先用 cas 尝试(仅尝试一次)将 state 从 0 改为 1, 如果成功表示获得了独占锁if (compareAndSetState(0, 1))setExclusiveOwnerThread(Thread.currentThread());else// 如果尝试失败,进入 ㈠acquire(1); }
首先尝试将锁的state改为1,如果修改成功,则将拥有锁的线程修改位为当前线程
- 当第一个竞争线程出现时,竞争线程尝试加锁,无法将state由0改为1,竞争线程进入方法acquire(1);
// ㈠ AQS 继承过来的方法, 方便阅读, 放在此处 public final void acquire(int arg) {// ㈡ tryAcquireif (!tryAcquire(arg) &&// 当 tryAcquire 返回为 false 时, 先调用 addWaiter ㈣, 接着 acquireQueued ㈤acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) {selfInterrupt();} }
- 线程进入tryAcquire(arg)方法,再次尝试加锁,如果成功 !(tryAcquire(arg)) = false,退出流程,加锁成功
- 再次加锁失败!(tryAcquire(arg)) = true,进入 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)方法
- 先执行addWaiter(Node.EXCLUSIVE)方法,该方法是构造 Node 队列,在第一个竞争线程执行该方法时,除了创造关联本线程的节点,还会创造一个哑元节点(该节点就是列表的head节点,NonfairSync中的head也指向该节点),默认初始状态都为0,形成双向列表,返回值时关联竞争线程的那个Node节点
- 执行acquireQueued(addWaiter(Node.EXCLUSIVE), arg)方法,
// AQS 继承过来的方法, 方便阅读, 放在此处 final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (; ; ) {final Node p = node.predecessor();// 上一个节点是 head, 表示轮到自己(当前线程对应的 node)了, 尝试获取if (p == head && tryAcquire(arg)) {// 获取成功, 设置自己(当前线程对应的 node)为 headsetHead(node);// 上一个节点 help GCp.next = null;failed = false;// 返回中断标记 falsereturn interrupted;}if (// 判断是否应当 park, 进入 ㈦shouldParkAfterFailedAcquire(p, node) &&// park 等待, 此时 Node 的状态被置为 Node.SIGNAL ㈧parkAndCheckInterrupt()) {interrupted = true;}}} finally {if (failed)cancelAcquire(node);} }
-
进入到for(;;)循环,找出当前节点的前驱节点定义为p,此时p就是哑元节点,此时 p == head,再次尝试获取锁(如果当前节点是排在第二位的节点,就可以尝试再次加锁),如果尝试加锁成功
-
尝试加锁失败,执行
if(// 判断是否应当 park, 进入 ㈦shouldParkAfterFailedAcquire(p,node)&&// park 等待, 此时 Node 的状态被置为 Node.SIGNAL ㈧parkAndCheckInterrupt()){interrupted=true; }
-
执行shouldParkAfterFailedAcquire(p,node)方法
// AQS 继承过来的方法, 方便阅读, 放在此处 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {// 获取上一个节点的状态int ws = pred.waitStatus;if (ws == Node.SIGNAL) { //Node.SIGNAL = -1// 上一个节点都在阻塞, 那么自己也阻塞好了return true;}// > 0 表示取消状态if (ws > 0) {// 上一个节点取消, 那么重构删除前面所有取消的节点, 返回到外层循环重试do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);pred.next = node;} else {// 这次还没有阻塞// 但下次如果重试不成功, 则需要阻塞,这时需要设置上一个节点状态为 Node.SIGNALcompareAndSetWaitStatus(pred, ws, Node.SIGNAL);}return false; }
-
由于pred(p)的状态=0,所以进入compareAndSetWaitStatus(pred, ws, Node.SIGNAL),该方法时将pred(p)的状态改为-1,结束方法,返回false
-
回到之前的代码,进行下一次循环,再次执行if (p == head && tryAcquire(arg)),再次尝试加锁,如果成功,...... ,失败,进入
if(// 判断是否应当 park, 进入 ㈦shouldParkAfterFailedAcquire(p,node)&&// park 等待, 此时 Node 的状态被置为 Node.SIGNAL ㈧parkAndCheckInterrupt()){interrupted=true; }
// AQS 继承过来的方法, 方便阅读, 放在此处 final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (; ; ) {final Node p = node.predecessor();// 上一个节点是 head, 表示轮到自己(当前线程对应的 node)了, 尝试获取if (p == head && tryAcquire(arg)) {// 获取成功, 设置自己(当前线程对应的 node)为 headsetHead(node);// 上一个节点 help GCp.next = null;failed = false;// 返回中断标记 falsereturn interrupted;}if (// 判断是否应当 park, 进入 ㈦shouldParkAfterFailedAcquire(p, node) &&// park 等待, 此时 Node 的状态被置为 Node.SIGNAL ㈧parkAndCheckInterrupt()) {interrupted = true;}}} finally {if (failed)cancelAcquire(node);} }
-
再次进入shouldParkAfterFailedAcquire(p,node),此时prep(p) = -1,返回true
// AQS 继承过来的方法, 方便阅读, 放在此处 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {// 获取上一个节点的状态int ws = pred.waitStatus;if (ws == Node.SIGNAL) { //Node.SIGNAL = -1// 上一个节点都在阻塞, 那么自己也阻塞好了return true;}// > 0 表示取消状态if (ws > 0) {// 上一个节点取消, 那么重构删除前面所有取消的节点, 返回到外层循环重试do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);pred.next = node;} else {// 这次还没有阻塞// 但下次如果重试不成功, 则需要阻塞,这时需要设置上一个节点状态为 Node.SIGNALcompareAndSetWaitStatus(pred, ws, Node.SIGNAL);}return false; }
-
进入parkAndCheckInterrupt()方法,当前线程进入阻塞状态
// 阻塞当前线程 private final boolean parkAndCheckInterrupt() {LockSupport.park(this);return Thread.interrupted(); }
-
多个线程竞争失败后,
-
此时,Thread-0执行完成,释放锁,调用ReentrantLock中的
public void unlock() {sync.release(1); }
-
进入sync.release(1)方法,
public final boolean release(int arg) {if (tryRelease(arg)) {AbstractQueuedSynchronizer.Node h = head;if (h != null && h.waitStatus != 0)unparkSuccessor(h);return true;}return false; }
在tryRelease(arg)方法中,设置 exclusiveOwnerThread 为 null,state = 0,返回true(返回false 的情况下面再说)
-
执行到
AbstractQueuedSynchronizer.Node h = head; if (h != null && h.waitStatus != 0)unparkSuccessor(h);
-
此时h = head 不等于null,且h的状态!=0 (等于-1),进入unparkSuccessor(h)方法,唤醒后继节点,此时node (h) 的状态=-1,h的后继节(s)点 != null,执行 if (s != null) LockSupport.unpark(s.thread); 唤醒s线程,s线程开始竞争锁
private void unparkSuccessor(AbstractQueuedSynchronizer.Node node) {/** If status is negative (i.e., possibly needing signal) try* to clear in anticipation of signalling. It is OK if this* fails or if status is changed by waiting thread.*/int ws = node.waitStatus;if (ws < 0)compareAndSetWaitStatus(node, ws, 0);/** Thread to unpark is held in successor, which is normally* just the next node. But if cancelled or apparently null,* traverse backwards from tail to find the actual* non-cancelled successor.*/AbstractQueuedSynchronizer.Node s = node.next;if (s == null || s.waitStatus > 0) {s = null;for (AbstractQueuedSynchronizer.Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}if (s != null)LockSupport.unpark(s.thread); }
-
s(Thread-1)线程回到
// 阻塞当前线程 private final boolean parkAndCheckInterrupt() {LockSupport.park(this);return Thread.interrupted(); }
继续执行
-
返回到
// AQS 继承过来的方法, 方便阅读, 放在此处 final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (; ; ) {final Node p = node.predecessor();// 上一个节点是 head, 表示轮到自己(当前线程对应的 node)了, 尝试获取if (p == head && tryAcquire(arg)) {// 获取成功, 设置自己(当前线程对应的 node)为 headsetHead(node);// 上一个节点 help GCp.next = null;failed = false;// 返回中断标记 falsereturn interrupted;}if (// 判断是否应当 park, 进入 ㈦shouldParkAfterFailedAcquire(p, node) &&// park 等待, 此时 Node 的状态被置为 Node.SIGNAL ㈧parkAndCheckInterrupt()) {interrupted = true;}}} finally {if (failed)cancelAcquire(node);} }
继续进行for循环,此时if (p == head && tryAcquire(arg)) ,在次尝试加锁,此时如果加锁成功,执行以下代码
setHead(node); // 上一个节点 help GC p.next = null; failed = false; // 返回中断标记 false return interrupted;
将关联s线程(刚才关联Thread-1线程的节点)的节点设置为头节点(删除之前的头节点,将此节点关联的线程改为null)
-
如果刚才thread-1线程唤醒后,新出现了一个线程与之竞争,且thread-1线程竞争失败,在次进入parkAndCheckInterrupt(),进入阻塞状态
2、可重入原理
ReentrantLock的非公平获取锁的源码
protected final boolean tryAcquire(int acquires) {return nonfairTryAcquire(acquires);
}
static final class NonfairSync extends Sync {// ...// Sync 继承过来的方法, 方便阅读, 放在此处final boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {if (compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}// 如果已经获得了锁, 线程还是当前线程, 表示发生了锁重入else if (current == getExclusiveOwnerThread()) {// state++int nextc = c + acquires;if (nextc < 0) // overflowthrow new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;}// Sync 继承过来的方法, 方便阅读, 放在此处protected final boolean tryRelease(int releases) {// state--int c = getState() - releases;if (Thread.currentThread() != getExclusiveOwnerThread())throw new IllegalMonitorStateException();boolean free = false;// 支持锁重入, 只有 state 减为 0, 才释放成功if (c == 0) {free = true;setExclusiveOwnerThread(null);}setState(c);return free;}
}
- 当一个线程第一次获得锁时,进入代码
if (c == 0) {if (compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;} }
把锁的state设置为1,把拥有锁的线程设置为当前线程,返回true
- 当一个线程多次获得锁时(锁重入),进入代码
else if (current == getExclusiveOwnerThread()) {// state++int nextc = c + acquires;if (nextc < 0) // overflowthrow new Error("Maximum lock count exceeded");setState(nextc);return true; }
让state++,返回true
- 当锁重入后释放锁时,进入
protected final boolean tryRelease(int releases) {// state--int c = getState() - releases;if (Thread.currentThread() != getExclusiveOwnerThread())throw new IllegalMonitorStateException();boolean free = false;// 支持锁重入, 只有 state 减为 0, 才释放成功if (c == 0) {free = true;setExclusiveOwnerThread(null);}setState(c);return free;}
让state--,如果state != 0 返回false,如果=0,设置当前拥有锁的线程为null,返回true
3、可打断原理
(1)、不可打断(默认)
// Sync 继承自 AQS
static final class NonfairSync extends Sync {// ...private final boolean parkAndCheckInterrupt() {// 如果打断标记已经是 true, 则 park 会失效LockSupport.park(this);// interrupted 会清除打断标记return Thread.interrupted();}final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (; ; ) {final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {setHead(node);p.next = null;failed = false;// 还是需要获得锁后, 才能返回打断状态return interrupted;}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt()) {// 如果是因为 interrupt 被唤醒, 返回打断状态为 trueinterrupted = true;}}} finally {if (failed)cancelAcquire(node);}}public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) {// 如果打断状态为 trueselfInterrupt();}}static void selfInterrupt() {// 重新产生一次中断Thread.currentThread().interrupt();}
}
- 被打断后,进入方法,return true,但是Thread.interrupted()会重置打断标记为false
// 阻塞当前线程 private final boolean parkAndCheckInterrupt() {LockSupport.park(this);return Thread.interrupted(); }
-
回退到
if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt()) {// 如果是因为 interrupt 被唤醒, 返回打断状态为 trueinterrupted = true; }
置interrupted = true
-
接着循环,接着进入到
// 阻塞当前线程 private final boolean parkAndCheckInterrupt() {LockSupport.park(this);return Thread.interrupted(); }
进入阻塞状态,但再次被唤醒之后器其返回值仍然时true
-
直到该线程获得所之后,执行
if (p == head && tryAcquire(arg)) {setHead(node);p.next = null;failed = false;// 还是需要获得锁后, 才能返回打断状态return interrupted; }
返回true
-
回退到
public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) {// 如果打断状态为 trueselfInterrupt();}}static void selfInterrupt() {// 重新产生一次中断Thread.currentThread().interrupt();}
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)返回值时true,执行selfInterrupted(),打断当前进程
在不可打断模式下,只要任务在AQS队列中,就不能打断
(2)、可打断
// ㈠ 可打断的获取锁流程
private void doAcquireInterruptibly(int arg) throws InterruptedException {final Node node = addWaiter(Node.EXCLUSIVE);boolean failed = true;try {for (;;) {final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCfailed = false;return;}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt()) {// 在 park 过程中如果被 interrupt 会进入此// 这时候抛出异常, 而不会再次进入 for (;;)throw new InterruptedException();}}} finally {if (failed)cancelAcquire(node);}
}
打断后直接抛出异常
(二)、公平锁实现原理
static final class FairSync extends Sync {private static final long serialVersionUID = -3000897897090466540L;final void lock() {acquire(1);}// AQS 继承过来的方法, 方便阅读, 放在此处public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) {selfInterrupt();}}// 与非公平锁主要区别在于 tryAcquire 方法的实现protected final boolean tryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {// 先检查 AQS 队列中是否有前驱节点, 没有才去竞争if (!hasQueuedPredecessors() &&compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}} else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0)throw new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;}// ㈠ AQS 继承过来的方法, 方便阅读, 放在此处public final boolean hasQueuedPredecessors() {Node t = tail;Node h = head;Node s;// h != t 时表示队列中有 Nodereturn h != t &&(// (s = h.next) == null 表示队列中没有老二(s = h.next) == null || // 或者队列中老二线程不是此线程s.thread != Thread.currentThread());}
}
在获取锁时,要限先执行方法hasQueuedPredecessors(),该方法当队列中
没有第二位(没有老二是因为这时候另一个线程在初始化这个队列,刚好head被创建出来了但是没有设置next)
或者
第二位节点不是当前节点时,返回true,取反为false,无法获取锁,返回false
(三)、条件变量实现原理
1、await流程
// 等待 - 直到被唤醒或打断
public final void await() throws InterruptedException {if (Thread.interrupted()) {throw new InterruptedException();}// 添加一个 Node 至等待队列, 见 ㈠Node node = addConditionWaiter();// 释放节点持有的锁int savedState = fullyRelease(node);int interruptMode = 0;// 如果该节点还没有转移至 AQS 队列, 阻塞while (!isOnSyncQueue(node)) {// park 阻塞LockSupport.park(this); // 如果被打断, 退出等待队列if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)break;}// 退出等待队列后, 还需要获得 AQS 队列的锁if (acquireQueued(node, savedState) && interruptMode != THROW_IE)interruptMode = REINTERRUPT;// 所有已取消的 Node 从队列链表删除, 见 ㈡if (node.nextWaiter != null)unlinkCancelledWaiters();// 应用打断模式, 见 ㈤if (interruptMode != 0)reportInterruptAfterWait(interruptMode);
}
- 先进入addConditionWaiter()方法,创建一个顶的node节点,将其挂到ConditionObject中,将其状态置为-2,返回这个节点
// 添加一个 Node 至等待队列 private Node addConditionWaiter() {Node t = lastWaiter;// 所有已取消的 Node 从队列链表删除, 见 ㈡if (t != null && t.waitStatus != Node.CONDITION) {unlinkCancelledWaiters();t = lastWaiter;}// 创建一个关联当前线程的新 Node, 添加至队列尾部Node node = new Node(Thread.currentThread(), Node.CONDITION);if (t == null)firstWaiter = node;elset.nextWaiter = node;lastWaiter = node;return node; }
- 执行int savedState = fullyRelease(node),
final int fullyRelease(AbstractQueuedSynchronizer.Node node) {boolean failed = true;try {int savedState = getState();if (release(savedState)) {failed = false;return savedState;} else {throw new IllegalMonitorStateException();}} finally {if (failed)node.waitStatus = AbstractQueuedSynchronizer.Node.CANCELLED;} }
进入release(savedState)
public final boolean release(int arg) {if (tryRelease(arg)) {AbstractQueuedSynchronizer.Node h = head;if (h != null && h.waitStatus != 0)unparkSuccessor(h);return true;}return false; }
进入tryRelease(arg)中,将state置为0,将拥有锁的线程设置为null
protected final boolean tryRelease(int releases) {int c = getState() - releases;if (Thread.currentThread() != getExclusiveOwnerThread())throw new IllegalMonitorStateException();boolean free = false;if (c == 0) {free = true;setExclusiveOwnerThread(null);}setState(c);return free; }
返回
public final boolean release(int arg) {if (tryRelease(arg)) {AbstractQueuedSynchronizer.Node h = head;if (h != null && h.waitStatus != 0)unparkSuccessor(h);return true;}return false; }
唤醒head的后继节点
- 返回到await,进入while循环,阻塞当前线程
// 等待 - 直到被唤醒或打断 public final void await() throws InterruptedException {if (Thread.interrupted()) {throw new InterruptedException();}// 添加一个 Node 至等待队列, 见 ㈠Node node = addConditionWaiter();// 释放节点持有的锁int savedState = fullyRelease(node);int interruptMode = 0;// 如果该节点还没有转移至 AQS 队列, 阻塞while (!isOnSyncQueue(node)) {// park 阻塞LockSupport.park(this); // 如果被打断, 退出等待队列if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)break;}// 退出等待队列后, 还需要获得 AQS 队列的锁if (acquireQueued(node, savedState) && interruptMode != THROW_IE)interruptMode = REINTERRUPT;// 所有已取消的 Node 从队列链表删除, 见 ㈡if (node.nextWaiter != null)unlinkCancelledWaiters();// 应用打断模式, 见 ㈤if (interruptMode != 0)reportInterruptAfterWait(interruptMode); }
2、signal流程
让Thread-1线程唤醒Thread-0线程
public final void signal() {if (!isHeldExclusively()) //判断当前线程是否是拥有锁的线程throw new IllegalMonitorStateException();AbstractQueuedSynchronizer.Node first = firstWaiter; //获取队首的节点if (first != null)doSignal(first);
}
- 执行doSignal(first)方法
private void doSignal(AbstractQueuedSynchronizer.Node first) {do {if ( (firstWaiter = first.nextWaiter) == null)lastWaiter = null;first.nextWaiter = null;} while (!transferForSignal(first) &&(first = firstWaiter) != null); }
将当前的节点从ConditionObject的队列中断开
执行transferForSignal(first)方法
final boolean transferForSignal(AbstractQueuedSynchronizer.Node node) {if (!compareAndSetWaitStatus(node, AbstractQueuedSynchronizer.Node.CONDITION, 0))return false;AbstractQueuedSynchronizer.Node p = enq(node);int ws = p.waitStatus;if (ws > 0 || !compareAndSetWaitStatus(p, ws, AbstractQueuedSynchronizer.Node.SIGNAL))LockSupport.unpark(node.thread);return true; }
先将当前节点的状态设置为0,进入enq(node)方法,将node挂在到阻塞队列末尾,返回node的前驱节点(Thread-3)记为p,将p的状态设置为-1,然后返回true
十六、读写锁原理
ReentrantReadWriteLock
StampedLock
十七、Semaphore 原理
(一)、acquire
- Semaphore 的构造方法(假设设置信号量为3)
public Semaphore(int permits) {sync = new NonfairSync(permits); }
NonfairSync(int permits) {super(permits); }
Sync(int permits) {setState(permits); }
将state设置为3
- 线程获得锁要调用acquire()方法
public void acquire() throws InterruptedException {sync.acquireSharedInterruptibly(1); }
public final void acquireSharedInterruptibly(int arg)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();if (tryAcquireShared(arg) < 0)doAcquireSharedInterruptibly(arg); }
进入tryAcquireShared(arg)
protected int tryAcquireShared(int acquires) {return nonfairTryAcquireShared(acquires); }
final int nonfairTryAcquireShared(int acquires) {for (;;) {int available = getState(); //获得state = 3int remaining = available - acquires; //state-1if (remaining < 0 ||compareAndSetState(available, remaining))return remaining; //return 2} }
返回到
public final void acquireSharedInterruptibly(int arg)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();if (tryAcquireShared(arg) < 0) //返回值为2,加锁成功,结束doAcquireSharedInterruptibly(arg); }
- 假设Thread-1,Thread-2,Thread-4 cas 竞争成功,此时Thread-0竞争锁时
if (remaining < 0 ||compareAndSetState(available, remaining))return remaining; //return -1}
返回
public final void acquireSharedInterruptibly(int arg)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();if (tryAcquireShared(arg) < 0) //返回值为-1,加锁失败doAcquireSharedInterruptibly(arg); }
执行doAcquireSharedInterruptibly(arg); 与ReentrantLock 相似
private void doAcquireSharedInterruptibly(int arg)throws InterruptedException {final AbstractQueuedSynchronizer.Node node = addWaiter(AbstractQueuedSynchronizer.Node.SHARED);boolean failed = true;try {for (;;) {final AbstractQueuedSynchronizer.Node p = node.predecessor();if (p == head) {int r = tryAcquireShared(arg);if (r >= 0) {setHeadAndPropagate(node, r);p.next = null; // help GCfailed = false;return;}}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())throw new InterruptedException();}} finally {if (failed)cancelAcquire(node);} }
thread-3也重复上述过程,最终阻塞
(二)、release
- Thread-4释放一个许可
public void release() {sync.releaseShared(1); }
public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {doReleaseShared();return true;}return false; }
进入
protected final boolean tryReleaseShared(int releases) {for (;;) {int current = getState(); //拿到当前锁状态int next = current + releases; //suo状态加1if (next < current) // overflowthrow new Error("Maximum permit count exceeded");if (compareAndSetState(current, next)) //尝试改变锁状态return true; //返回true} }
返回
public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {doReleaseShared();return true;}return false; }
执行doReleaseShared();
private void doReleaseShared() {for (;;) {AbstractQueuedSynchronizer.Node h = head;if (h != null && h != tail) {int ws = h.waitStatus;if (ws == AbstractQueuedSynchronizer.Node.SIGNAL) {if (!compareAndSetWaitStatus(h, AbstractQueuedSynchronizer.Node.SIGNAL, 0))continue; // loop to recheck casesunparkSuccessor(h); //唤醒线程}else if (ws == 0 &&!compareAndSetWaitStatus(h, 0, AbstractQueuedSynchronizer.Node.PROPAGATE))continue; // loop on failed CAS}if (h == head) // loop if head changedbreak;} }
- 接下来竞争锁的过程同上