0%

java-锁

主要是锁,线程相关的一些知识点

线程间通信

  1. volatile和synchronized
  2. wait/notify
  3. 管道输入输出流
  4. Thread.join()

线程池本质是 对任务的一个生产者消费者模型,通过worker来对job进行消费,没有就阻塞,这种就是核心线程。job是对runnable或者callable接口的实现,加入到阻塞队列可以更简单的进行同步的操作。

原子操作的实现原理

处理器实现原子操作有两种方式:

  1. 通过总线锁来保证原子性
  2. 通过缓存锁来保证原子性

java种则通过CAS来实现原子操作,CAS的底层就是处理器的指令

synchronized关键字

  1. synchronized 锁的对象

    synchronized 可以用来修饰类的实例方法、静态方法、代码块

    1. 修饰实例方法的时候,锁是当前实例对象
    2. 修饰静态同步方法的时候,锁是当前class类对象
    3. 修饰代码块的时候,锁是synchronized括号里配置的对象
  2. synchronized 的特点

    1. synchronized 具备可重入性,对同一个线程在获得锁之后在调用其他需要同样锁的代码时可以直接调用,其可重入性是通过记录锁的持有线程和持有数量来实现的,调用 synchronized 代码时检查对象是否已经被锁,是则检查是否被当前线程锁定,是则计数加一,不是则加入等待队列,释放时计数减一直到为零释放锁。
    2. synchronized 是重量级锁,竞争失败的线程会阻塞。
  3. synchronized 的实现原理

    语义底层是通过一个 monitor 监视器对象来完成,监视器锁(monitor)的本质依赖于底层操作系统的互斥锁(Mutex Lock)实现,而操作系统实现线程之间的切换需要从用户态转换到核心态,这个成本非常高,状态之间的转换需要相对比较长的时间,所以这就是为什么 synchronized 效率低且重量级的原因(Java 1.6 进行了优化,但是相比其他锁机制还是略显偏重)。

    其实 wait、notify 等方法也依赖于 monitor 对象,所以这就是为什么只有在同步的块或者方法中才能调用 wait、notify 等方法,否则会抛出 IllegalMonitorStateException 异常的原因。

每日一题

锁的级别

java中每个对象都可作为锁,锁有四种级别,按照量级从轻到重分为:无锁、偏向锁、轻量级锁、重量级锁。并且锁只能升级不能降级,但是偏向锁状态可以被重置为无锁状态。

自旋

自旋是指当一个线程尝试获取某个锁时,如果该锁已被其他线程占用,就一直循环检测锁是否被释放,而不是进入线程挂起或睡眠状态。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import java.util.concurrent.atomic.AtomicReference;

public class SpinLock {
private AtomicReference<Thread> owner = new AtomicReference<Thread>();

public void lock() {
Thread currentThread = Thread.currentThread();

// 如果锁未被占用,则设置当前线程为锁的拥有者
while (!owner.compareAndSet(null, currentThread)) {
}
}

public void unlock() {
Thread currentThread = Thread.currentThread();

// 只有锁的拥有者才能释放锁
owner.compareAndSet(currentThread, null);
}
}

线程B尝试进入临界区的时候,在lock()函数里面会一直CAS失败而不能进入到临界区,此时就是自旋状态。可以看到这个语义是靠CAS来实现的。

对象头

锁的实现机制与java对象头息息相关,锁的所有信息,都记录在java的对象头中。用2字宽(WORD)(32位JVM中1字宽==4Byte=32bit)存储对象头,如果是数组类型使用3字存储(还需存储数组长度)。对象头中包括mark word、类元数据的指针和数组的长度。

长度 内容 说明
32/64bit Mark Word 存储对象的hashCode或锁信息等。
32/64bit Class Metadata Address 存储到对象类型数据的指针
32/64bit Array length 数组的长度(如果当前对象是数组)

mark word默认存储对象的hashcode,GC分代年龄,锁的状态,偏向锁的线程ID等。32位位:

mark word

在64位虚拟机下,Mark Word是64bit大小的,其存储结构如下:

mark word

偏向锁

引入偏向锁是为了在无多线程竞争的情况下尽量减少不必要的轻量级锁执行路径。

当只有一个线程去竞争锁的时候,我们不需要阻塞,也不需要自旋,因为只有一个线程在竞争,我们只要去判断该偏向锁中的ThreadID是否为当前线程即可。

如果线程2要竞争锁对象,(偏向锁不会主动释放因此还是存储的线程1的threadID),那么需要查看Java对象头中记录的线程是否存活,

如果没有存活,那么锁对象被重置为无锁状态,再使用cas替换偏向锁线程ID为线程2,锁不升级;如果存活,那么立刻查找该线程(线程1)的栈帧信息,

如果还是需要继续持有这个锁对象,那么暂停当前线程1,撤销偏向锁,升级为轻量级锁,如果线程1 不再使用该锁对象,那么将锁对象状态设为无锁状态,重新偏向新的线程。

轻量锁

轻量级锁考虑的是竞争锁对象的线程不多,而且线程持有锁的时间也不长的情景。因为阻塞线程需要CPU从用户态转到内核态,代价较大,如果刚刚阻塞不久这个锁就被释放了,那这个代价就有点得不偿失了,因此这个时候就干脆不阻塞这个线程,让它自旋这等待锁释放。

线程1获取轻量级锁时会先把锁对象的对象头MarkWord复制一份到线程1的栈帧中创建的用于存储锁记录的空间(称为DisplacedMarkWord),然后使用CAS把对象头中的内容替换为线程1的锁记录地址

如果在线程1复制对象头的同时(在线程1CAS之前),线程2也准备获取锁,复制了对象头到线程2的锁记录空间中,但是在线程2CAS的时候,发现线程1已经把对象头换了,线程2的CAS失败,那么线程2就尝试使用自旋锁来等待线程1释放锁。

但是如果自旋的时间太长也不行,因为自旋是要消耗CPU的,因此自旋的次数是有限制的,比如10次或者100次,如果自旋次数到了线程1还没有释放锁,线程2还在自旋等待,或者又有一个线程3过来竞争这个锁对象,那么这个时候轻量级锁就会膨胀为重量级锁。重量级锁把除了拥有锁的线程都阻塞,防止CPU空转。

优缺点对比

优点 缺点 适用场景
偏向锁 加锁和解锁不需要额外的消耗,和执行非同步方法比仅存在纳秒级的差距。 如果线程间存在锁竞争,会带来额外的锁撤销的消耗。 适用于只有一个线程访问同步块场景。
轻量级锁 竞争的线程不会阻塞,提高了程序的响应速度。 如果始终得不到锁竞争的线程使用自旋会消耗CPU。 追求响应时间。 同步块执行速度非常快。
重量级锁 线程竞争不使用自旋,不会消耗CPU。 线程阻塞,响应时间缓慢。 追求吞吐量。 同步块执行速度较长。

Java并发——Synchronized关键字和锁升级,详细分析偏向锁和轻量级锁的升级

Lock锁

lock锁和synchronized的对比

  1. 非阻塞的获取锁(基于CAS的自旋?),synchronized竞争失败的线程会阻塞
  2. 能够响应中断,当获取到锁的线程被中断的时候,中断异常将会被抛出,同时锁会被释放,synchronized不会响应中断
  3. 有超时时间,synchronized会一直阻塞

LockSupport

在Java多线程中,当需要阻塞或者唤醒一个线程时,都会使用LockSupport工具类来完成相应的工作。LockSupport定义了一组公共静态方法,这些方法提供了最基本的线程阻塞和唤醒功能,而LockSupport也因此成为了构建同步组件的基础工具。

LockSupport定义了一组以park开头的方法用来阻塞当前线程,以及unpark(Thread)方法来唤醒一个被阻塞的线程,这些方法描述如下:

方法名称 描 述
void park() 阻塞当前线程,如果掉用unpark(Thread)方法或被中断,才能从park()返回
void parkNanos(long nanos) 阻塞当前线程,超时返回,阻塞时间最长不超过nanos纳秒
void parkUntil(long deadline) 阻塞当前线程,直到deadline时间点
void unpark(Thread) 唤醒处于阻塞状态的线程

需要注意的是,和wait,notify不一样,park和unpark之间没有顺序要求,可以理解为调用unpark则获得了一个许可,没有这个许可则阻塞。(If the thread was blocked on park then it will unblock. Otherwise, its next call to park is guaranteed not to block.)

lock锁实现原理

基于AbstractQueuedSynchronizer(AQS,队列同步器)实现:

先说大致流程:队列同步器依赖内部的同步队列(一个fifo双向队列)来完成同步状态的管理,当前线程获取同步状态失败时,同步器会将当前线程以及等待状态等信息构造成为一个节点并将其加入同步队列,同时阻塞线程,当同步状态释放的时候,会把节点中的线程唤醒,使其再次尝试获取同步状态。

节点的定义可以看作是有一些额外属性的双向链表,同步器内部则保存了链表的头尾节点的引用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
/**
* 一个独占锁的示例代码,,不支持重入
*/
public class MyLock {
private static class Sync extends AbstractQueuedSynchronizer {

@Override
protected boolean tryAcquire(int acquires) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}

@Override
protected boolean tryRelease(int i) {
if (getState() == 0) {
throw new IllegalMonitorStateException();
}
setExclusiveOwnerThread(null);
setState(0);
return true;
}

/**
* 一般表示是否被当前线程所独占
*
* @return
*/
@Override
protected boolean isHeldExclusively() {
return getState() == 1;
}

ConditionObject newCondition() {
return new ConditionObject();
}
}

private final Sync sync = new Sync();

public void lock() {
sync.acquire(1);
}

public boolean tryLock() {
return sync.tryAcquire(1);
}

public void unlock() {
sync.release(1);
}

public Condition newCondition() {
return sync.newCondition();
}

public boolean isLocked() {
return sync.isHeldExclusively();
}

public boolean hasQueuedThreads() {
return sync.hasQueuedThreads();
}

public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}

public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
}

扩展:

一行一行源码分析清楚AbstractQueuedSynchronizer

一行一行源码分析清楚 AbstractQueuedSynchronizer(二)

ArrayBlockingQueue

ArrayBlockingQueue 底层是数组,有界队列,如果我们要使用生产者-消费者模式,这是非常好的选择。

1
2
3
4
5
6
7
8
9
10
11
12
13
// 用于存放元素的数组
final Object[] items;
// 下一次读取操作的位置
int takeIndex;
// 下一次写入操作的位置
int putIndex;
// 队列中的元素数量
int count;

// 以下几个就是控制并发用的同步器
final ReentrantLock lock;
private final Condition notEmpty;
private final Condition notFull;

array-blocking-queue

ArrayBlockingQueue用到了一个锁(ReentranLock基于AQS实现,因此就带有一个同步队列)和两个condition,也就是一个同步队列和两个条件队列。

  1. 对于ArrayBlockingQueue里面的数组数据写,写数据的时候从0向后写,遇到数组结尾则重置位置,继续从0开始,因为如果有数据被读了, 那0开始的一段一定是空的,这样就不会覆盖数据。
  2. 对于ArrayBlockingQueue里面的数组数据读,读数据的时候从0向后读,遇到数组结尾则重置位置,继续从0开始,算是跟着写的顺序,这样就肯定能读到数据,除非数组中没有数据了。
  3. 对于同步队列,每个线程尝试进入临界区的时候会调用lock.lock创建节点并且加入到同步队列末尾,如果锁是公平锁,那么还需要排队,如果是非公平锁,那么可以直接竞争,竞争成功则把带有自己线程ID的节点设置为同步队列的头节点。临界区执行完毕后从同步队列中删除节点。
  4. 如果遇到conditionA .await()调用, 因为这段代码在临界区,因此这个线程一定就是头节点的线程,此时会根据线程ID重新创建一个节点加入到conditionA的条件队列的末尾。
  5. 如果遇到conditionA.signal()调用,则会在conditionA的条件队列中找到头节点(等待最久的线程),把它从条件队列中移除,并且加入到锁的同步队列中,如果同步队列的锁是公平锁,那么还需要排队,如果是非公平锁,那么可以直接竞争锁,竞争成功则设置自己为头结点,进入临界区。

LinkedBlockingQueue

LinkedBlockingQueue 是链表实现的阻塞队列,这里默认和最大长度是Integer.MAX_VALUE。可以当做无界和有界队列来使用,所以大家不要以为它就是无界队列。

1
2
3
4
5
6
7
8
9
10
11
// take, poll, peek 等读操作的方法需要获取到这个锁
private final ReentrantLock takeLock = new ReentrantLock();

// 如果读操作的时候队列是空的,那么等待 notEmpty 条件
private final Condition notEmpty = takeLock.newCondition();

// put, offer 等写操作的方法需要获取到这个锁
private final ReentrantLock putLock = new ReentrantLock();

// 如果写操作的时候队列是满的,那么等待 notFull 条件
private final Condition notFull = putLock.newCondition();

array-blocking-queue

LinkedBlockingQueue用到了两个锁(ReentranLock基于AQS实现,因此就带有一个同步队列)和两个condition,也就是两个同步队列和两个条件队列。

同步队列和条件队列之间节点的变化和上面还是类似的,不过这边因为有两个同步队列,取数据的时候只锁了读锁,此时还是可以进行写,数据全部取出的时候才会阻塞读线程。写数据的时候也只是锁了写锁,此时还是可以读,数据达到最大值的时候才会阻塞写线程。

SynchronousQueue

SynchronousQueue 本身不带有空间来存储任何元素,使用上可以选择公平模式和非公平模式。它的队列不提供任何空间(一个都没有)来存储元素。数据必须从某个写线程交给某个读线程,而不是写到某个队列中等待被消费。

在它的实现代码中没有用到AQS锁,是基于CAS自旋和LockSupport阻塞而实现的。

主要的逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 写入值
public void put(E o) throws InterruptedException {
if (o == null) throw new NullPointerException();
if (transferer.transfer(o, false, 0) == null) { // 1
Thread.interrupted();
throw new InterruptedException();
}
}
// 读取值并移除
public E take() throws InterruptedException {
Object e = transferer.transfer(null, false, 0); // 2
if (e != null)
return (E)e;
Thread.interrupted();
throw new InterruptedException();
}

其中,transferer接口对应于SynchronousQueue的两个内部类,构造 SynchronousQueue 的时候,我们可以指定公平策略。公平模式对应 TransferQueue。而非公平模式则对应 TransferStack。

1
2
static final class TransferQueue<E> extends Transferer<E> { ... }
static final class TransferStack<E> extends Transferer<E> { ... }

其他的细节还是直接看解读 Java 并发队列 BlockingQueue吧,他写的比较好。

PriorityBlockingQueue

PriorityBlockingQueue 是无界队列,基于数组,数据结构为二叉堆,数组第一个也是树的根节点总是最小值。

PriorityQueue 的线程安全版本。

//todo 很多代码没有详细看,只是过了一下大致的流程