基础 | 并发编程 – [Lock & synchronized]
INDEX
§1 synchronized 与 Lock 对比
synchronized | Lock | |
---|---|---|
依赖 | JVM 的 monitor | JUC 包下的 API |
释放锁 | 不需要,通过正常异常两种场景的 monitorexit 保证自动释放 | 需要 |
等待可中断 | 不可以,除非完成或抛出异常 | 可以,通过超时方法或 lockInterruptibly() |
公平 | 不公平 | 都可以,默认非公平,但可以设置公平 |
能否精确唤醒 | 不能,要不随机一个要不全部 | 可以通过 Condition 分组精确唤醒 |
未能获取锁时 | 休眠,直到 CPU 再次轮到此线程 | 可以通过 trylock() 立即返回,线程不休眠 |
§3 synchronized
§3.1 加锁位置
public static synchronized void method(){ }
public synchronized void method1(){ }
public void method2(){ synchronized (new Object()){ } }
- 静态方法:锁 class
- 方法:锁 this
- 区间:锁传入的对象
§3.2 synchronized 原理
synchronized 的本质
- 依赖对象的 monitor
- 区间锁通过
monitorenter
和monitorexit
进入、释放
通常一个monitorenter
配两个monitorexit
但若同步代码块中抛出运行时异常,只有一个 和monitorexit
- 同步方法、静态同步方法通过
ACC_SYNCHRONIZED
访问标记与非同步方法做出区分 - 锁的本体本质上就是一个或一组可以区分出来的数据
本地锁实际上是对象中的ObjectMonitor
分布式锁实际上是一个唯一的 key - 锁的占用本质上就是堆这些数据的排他性占有
ObjectMonitor 属性
源码
ObjectMonitor() {
_header = NULL;
_count = 0;
_waiters = 0,
_recursions = 0;
_object = NULL;
_owner = NULL;
_WaitSet = NULL;
_WaitSetLock = 0 ;
_Responsible = NULL ;
_succ = NULL ;
//多线程竞争锁进入时的单向链表
_cxq = NULL ;
FreeNext = NULL ;
//_owner从该双向循环链表中唤醒线程结点,_EntryList是第一个节点
_EntryList = NULL ;
_SpinFreq = 0 ;
_SpinClock = 0 ;
OwnerIsThread = 0 ;
_previous_owner_tid = 0;
}
- _owner
持有锁的线程 - _header
保存锁对象的markword,原来锁对象中存放 markword 的位置现在用来存储ObjectMonitor
对象的地址了
退出重量级锁时,会将此字段的值重新复制给锁对象的 markword - _WaitSet
存放等待锁的、wait 状态的线程队列
这里类似 AQS 了 - _EntryList
存放等待锁的、block 状态的线程队列 - _recursions
锁的重入次数 - _count
线程获取锁的次数
synchronized 的原理
monitorenter
时,判断_count
的值- 如果
_count = 0
,表示锁没被占用,可以加锁 - 如果
_count != 0
,表示加锁 - 判断
_owner
是否是当前线程,如果是则可以重入,同时_recursions + 1
,否则不行 monitorexit
时,_recursions - 1
,若此时_recursions = 0
则可以退出锁
§3 Lock
基础场景
有一个资源 a,a 有一个值属性
两个线程,两个线程执行相同的轮次
一个线程当资源值 ==0 时 + 1
一个线程当资源值 !=0 时 - 1
实现示例
public class ResourceData {
private volatile int value;
private Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();//等待、通知的条件
public int increment() {
lock.lock();
try {
// 防虚假唤醒
while(value != 0){
condition.await(); //1
}
value++;
System.out.println("++++++++++++++++");
condition.signalAll();
}catch (InterruptedException e){
e.printStackTrace();
}finally {
lock.unlock();
}
return this.value;
}
public int decrement() {
lock.lock();
try {
while(value == 0){
condition.await();
}
value--;
System.out.println("----------------");
condition.signalAll();
}catch (InterruptedException e){
e.printStackTrace();
}finally {
lock.unlock();
}
return this.value;
}
public static void main(String[] args) {
ResourceData data = new ResourceData();
new Thread(()->{
for(int i = 0; i < 5; i++) {
data.increment();
}
}).start();
new Thread(()->{
for(int i = 0; i < 5; i++) {
data.decrement();
}
}).start();
}
}
虚假唤醒
当线程发起对其他线程发起唤醒后
除了执行任务锁必须的线程外,还可能唤醒了其他冗余或无关的线程,这些线程可能超量或错误的执行任务
还因为,在操作系统设计之初,就存在不是由通知触发唤醒的可能性(比如由中断唤醒)
为避免虚假唤醒,线程在执行任务之前,应该判断当前场景是否具备自身执行任务的前提条件
同时,线程的唤醒位置是线程的等待位置,即上例中 //1 的位置,为了使唤醒的线程再次判断执行条件,因此需要使用 while
精准唤醒场景
多线程的按需调用,
A 线程执行 1 次,随后 B 线程执行 2 次,最后 C 线程执行 3 次
重复 5 轮
在 lock 中,可以通过对应的 Condition 精准的唤醒指定的线程
在 synchronized 中,只能通过全部唤醒,然后根据标志位(下面示例中的 flag)使不需要唤醒的线程再 wait()
public class OrderResourceData {
private int flag = 0;
private ReentrantLock lock = new ReentrantLock();
// 几个线程几个条件
private Condition[] conditions = new Condition[]{lock.newCondition(), lock.newCondition(), lock.newCondition()};
private void work(int n){
for (int i = 0; i < n; i++) {
System.out.println(Thread.currentThread().getName() + " : " + (i+1));
}
}
public void preciseWork(int flag, int n){
for (int i = 0; i < 5; i++) {
lock.lock();
Condition condition = conditions[flag];
try {
while(this.flag!=flag) {
condition.await();
}
work(n);
System.out.println();
// 切换下一个线程
// 一个 n 个条件,flag 在 [0 , n-1] 中循环
this.flag = (this.flag + 1) % conditions.length;
// 精准唤醒
conditions[this.flag].signal();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
lock.unlock();
}
}
}
public static void main(String[] args) {
OrderResourceData data = new OrderResourceData();
new Thread(()->{
data.preciseWork(0, 1);
},"A").start();
new Thread(()->{
data.preciseWork(1, 2);
},"B").start();
new Thread(()->{
data.preciseWork(2, 3);
},"C").start();
}
}
§4 ReentrantReadWriteLock
什么是读写锁
- 读写锁分为两个部分,一个共享读锁和一个独占写锁
- 独占锁是指锁只能被一个线程独享
- 共享锁是指锁可以由多个线程共享
为什么会有读写锁
对于数据,通常读的场景远比写的场景多,但写的场景通常无法避免
因此,需要存在锁,但需要尽量降低锁对数据在性能方面的影响,这些影响包括
- 读写操作互斥,防止读操作读到写了一半的脏数据
这里的互斥是指线程间互斥,即一个线程正在写的同时另一个线程正在读
同一个线程的写时读查看后文的 锁降级 - 在写的场景,要求线程独占,即只有一条线程可以写成功,以免并发写的安全问题
- 在读的场景,要求性能共享,即可以多线程同步读,以免带来性能损耗
读写锁同时满足上述要求
适用场景
读多写少的场景
public class ReadWriteLockDemo {
private static Map<String,String> map = new HashMap<>();
private static ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
public static void put(String key,String value){
lock.writeLock().lock();
try {
System.out.println(Thread.currentThread().getName()+" put "+key);
TimeUnit.MILLISECONDS.sleep(100);
map.put(key,value);
System.out.println(Thread.currentThread().getName()+" put done "+key);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
lock.writeLock().unlock();
}
}
public static void get(String key){
lock.readLock().lock();
try {
System.out.println(Thread.currentThread().getName()+" get "+key);
map.get(key);
TimeUnit.MILLISECONDS.sleep(1000);
System.out.println(Thread.currentThread().getName()+" get done "+key);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
lock.readLock().unlock();
}
}
public static void main(String[] args) {
for(int i=0;i<3;i++){
int finalI = i;
new Thread(()->{
put(String.valueOf(finalI),UUID.randomUUID().toString().substring(0,8));
},String.valueOf(i)).start();
}
try {
TimeUnit.MILLISECONDS.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
for(int i=0;i<3;i++){
int finalI = i;
new Thread(()->{
get(String.valueOf(finalI));
},String.valueOf(i)).start();
}
}
}
读时无所谓,写时必须每个线程独占整个锁
缺点
- 写锁饥饿
因为通常使用读写锁时,读的场景远高于写
因此,可能出现 写操作的线程长时间抢占不到写锁的情况,导致数据 一直无法更新
写锁饥饿可以通过公平性解决,但公平性牺牲吞吐量,因此常用邮戳锁解决 - 锁降级
流程麻烦,限制多,见下文
锁降级
- 锁降级是指从写锁降级为读锁
- 锁降级的目的是避免 先释放写锁随后获取读锁中间的时间间隙
- 有些操作,需要写后马上读取
- 但若写后先释放写锁,在获取读锁,可能出现并发问题
- 即:可能有其他线程在当前线程再次获取读锁前,抽空对刚刚释放写锁的数据完成一轮修改
- 因此通过锁降级完成这一需求
- 完整流程如下
- 保证数据状态,未被中间修改
- 先 获取读锁
- 确认数据的写状态为未被修改
获取读锁后,其他线程不能获取写锁,(其他线程正在写时,当前线程也不能获取读锁)
若此时写状态为未被修改,表示本次写操作并未紧随其他写操作之后,可以安全的写数据
否则,数据可能正在被修改,此时抢锁意义不大 - 释放读锁
否则本线程也不能获取写锁
- 获取写锁
- 校验写状态,防止刚刚释放读锁到获取写锁的间隙,其他线程抽空做了修改
- 写操作
- 不释放写锁,获取读锁
- 释放写锁,完成降级
- 读操作
- 释放读锁
- 保证数据状态,未被中间修改
双重检索示例
//是否被修改的标记
//防止获取锁的间隙,被其他线程抽空完成修改,所有必须是 volatile
volatile boolean writen = false;
void xx(String key,String value){
lock.readLock().lock();
if(!writen){
// writen 一定是 false
// 即近期未被修改,故下面操作基本安全,释放读锁以便获取写锁
lock.readLock().unlock();
// 获取写锁
lock.writeLock().lock();
try {
// 再查一次,因为释放读后获取写之间,有一定的概率被修改
if(!writen ){
write(data);
cacheValid = true;
}
lock.readLock().lock();
} finally {
lock.writeLock().unlock();
}
try{
read(data);
}finally {
lock.readLock().unlock();
}
}
}
§5 StampeLock
特点
- 不可重入
大坑,重复获取锁会死锁,比如递归 - 加解锁基于邮戳
- 所有获取锁的方法,都返回一个邮戳
- 所有释放锁的方法,都需要一个邮戳
- 获取锁时,返回的邮戳是 0 ,表示获取失败
- 释放锁时,需要提供获取时返回的那个邮戳,提供的邮戳得和获取锁时的一致
访问模式
- 悲观读
同ReentrantReadWriteLock
读锁 - 悲观写
同ReentrantReadWriteLock
读锁 - 乐观读
无锁机制,相当于数据库读锁
可以并发读写,但遇到并发问题后升级为悲观读写
缺点
- 不支持重入
- 不支持条件变量,即
Condition
- 不能调用中断,即
interrupt()
当两个线程,一个获取读锁一个获取写锁,二者一个处理中,另一个因等待获取锁而阻塞
若中断阻塞的线程,可能会导致阻塞线程的 CPU 飙升
使用场景
- 并发较大
- 读场景远高于写
- 读写逻辑相对简单
否则可能涉及重入、条件变量 或 中断
使用方式
StampeLock
可以按 ReentrantReadWriteLock
行为使用
可以按 ReentrantReadWriteLock
行为使用,但细节处有不同
详细见上文 缺点
public class StampedLockDemo {
StampedLock lock = new StampedLock();
int resource = 0;
public void write(){
long stamp = lock.writeLock();
System.out.println("====== write ====== : " + stamp);
try {
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) { e.printStackTrace(); }
resource ++;
} finally {
lock.unlockWrite(stamp);
System.out.println("====== writed ======");
}
}
public int read(){
long stamp = lock.readLock();
System.out.println("====== read ====== : " + stamp);
try {
try {
TimeUnit.MILLISECONDS.sleep(2000);
} catch (InterruptedException e) { e.printStackTrace(); }
return resource;
} finally {
lock.unlockRead(stamp);
System.out.println("====== read ======");
}
}
public static void main(String[] args) {
StampedLockDemo demo = new StampedLockDemo();
new Thread(()->{demo.read();},"A").start();
try { TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); }
new Thread(()->{demo.write();},"B").start();
}
}
StampeLock
可以按乐观读方式使用
public class StampedLockDemo {
StampedLock lock = new StampedLock();
int resource = 0;
public void write(){
long stamp = lock.writeLock();
System.out.println("====== write ====== : " + stamp);
try {
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) { e.printStackTrace(); }
resource ++;
} finally {
lock.unlockWrite(stamp);
System.out.println("====== writed ======");
}
}
public int read(){
long stamp = lock.readLock();
System.out.println("====== read ====== : " + stamp);
try {
try {
TimeUnit.MILLISECONDS.sleep(2000);
} catch (InterruptedException e) { e.printStackTrace(); }
return resource;
} finally {
lock.unlockRead(stamp);
System.out.println("====== read ======");
}
}
public int optimisticRead(){
long stamp = lock.tryOptimisticRead();
System.out.println("====== optimistic read ====== : " + stamp);
try {
try {
TimeUnit.MILLISECONDS.sleep(2000);
} catch (InterruptedException e) { e.printStackTrace(); }
//falldown to read
if(!lock.validate(stamp))
return read();
return resource;
} finally {
System.out.println("====== optimistic read ======");
}
}
public static void main(String[] args) {
StampedLockDemo demo = new StampedLockDemo();
new Thread(()->{demo.optimisticRead();},"A").start();
try { TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); }
new Thread(()->{demo.write();},"B").start();
}
}