JUC 并发编程
JUC
1. 基本介绍
- JUC:
java.util.concurrent
包,简称 JUC,是 Java 5 引入的并发工具包,提供了大量的并发编程工具和高级特性,用于简化多线程编程 - 进程和线程:
- 进程:计算机中正在执行的独立程序,每个进程拥有独立的内存空间和系统资源,进程是CPU资源分配的最小单位
- 线程:进程内部的执行单元,一个进程可以包含多个线程,线程共享进程的内存空间和资源,线程是CPU调度和执行的最小单位
- Java 默认有2个线程 ==> main线程、GC线程
- Java 真的可以开启线程?
- 从源码看,Java 是使用本地 native 方法调用底层的 C/C++ 代码来开启线程
start0
是Thread
类的私有本地方法,由 JVM 通过 C/C++ 代码实现
1 | // C++ 底层, Java是没有权限操作底层硬件的 |
并发和并行:并发编程的本质:充分利用CPU的资源!
- 并发:在单核处理器上模拟出多条线程,任务通过操作系统的调度,轮流占用CPU时间片,从宏观上看似乎是同时进行(多线程操作同一个资源)
- 并行:在多核处理器上,多个任务可以真正同时在不同的处理器核心上运行,可以使用线程池提高性能
1
2
3
4
5
6public class Test1 {
public static void main(String[] args) {
//获取cpu的核数
System.out.println(Runtime.getRuntime().availableProcessors());
}
}多线程 是实现并发的一种方式,通过并发(交替或同时)执行多个线程实现
Java 中线程的 6 种状态:NEW,RUNNABLE,BLOCKED,WAITING,TIMED_WAITING,TERMINATED
1 | public enum State { |
2. wait 和 sleep 的区别
- 来自不同的类:
- wait => Object
- sleep => Thread
一般情况企业中使用休眠是:
1 | // 来自 java.util.concurrent 包下的 TimeUnit 类 |
- 关于锁的释放:
- wait 会释放锁对象,使其他等待该锁的线程能够获得锁并继续执行
- sleep 不会释放锁 ==> 抱着锁睡觉
- 使用的范围不同:
- wait 必须在同步方法/同步代码块中调用,否则会抛出
IllegalMonitorStateException
异常 - sleep 可以在任何地方睡
- wait 必须在同步方法/同步代码块中调用,否则会抛出
- 是否需要捕获异常:
- 都需要捕获
InterruptedException
异常
- 都需要捕获
3. Lock 锁(重点)
3.1 传统的 Synchronized 锁
- 减少耦合:线程就是一个单独的资源类,没有任何的附属操作!
1 | /** |
3.2 Lock 锁
- 加锁解锁:
lock()
、unlock()
- 3个实现类:
ReentrantLock
: 可重入锁ReentrantReadWriteLock.ReadLock
: 可重入读锁ReentrantReadWriteLock.WriteLock
: 可重入写锁
- 公平锁与非公平锁:
- 公平锁: 必须先来后到
- 非公平锁: 可以插队**(默认为非公平锁)**
- 使用
Lock
的三步曲:- 创建
Lock
对象:Lock lock = new ReentrantLock();
- 加锁:
lock.lock();
- 解锁:在
finally
块中解锁lock.unlock();
- 创建
1 | public class SaleTicketDemo02 { |
3.3 Synchronized VS. Lock
锁的实现:
- Synchronized:内置的 Java 关键字,直接由 JVM 实现
- Lock:接口,由 Java 类实现,需要显式调用
lock
和unlock
方法
获取锁状态:
- Synchronized:无法判断获取锁的状态
- Lock: 可以通过
tryLock
方法判断是否成功获得锁
锁的释放:
- Synchronized: 出了同步块或方法范围后,自动释放锁
- Lock:需要手动加锁和手动释放锁,如果没有释放锁,可能会导致死锁
锁等待行为:
Synchronized:线程1获得锁,线程2必须等待,直到线程1释放锁
Lock:可以通过
tryLock
方法尝试获取锁,不会导致长时间等待
可重入性:又称递归锁,指一个线程在持有锁的情况下可以再次获取该锁的能力;当一个线程已经获得了某个锁,可以再次获取该锁而不会被阻塞;每次获取锁的计数增加 1,每次释放锁的计数减少 1,直到计数为 0 时,锁才真正被释放
- Synchronized:是可重入锁,不可以中断的,非公平的
- Lock:
ReentrantLock
实现了可重入锁,可以判断锁,可以自己设置公平锁和非公平锁
使用场景:
- Synchronized:适合锁定少量代码的同步问题
- Lock:适合锁定大量同步代码的复杂场景
锁到底是什么? 如何判断锁的是谁?
- 锁: 一种并发控制机制,用于协调多个线程对共享资源的访问,确保数据的正确性和一致性
- 锁的粒度:锁可以锁定方法、代码块或对象实例
- 锁的持有者:可以使用调试工具或日志记录来判断哪个线程当前持有锁
4. 生产者和消费者问题-线程通信
面试常考:单例模式、排序算法、生产者和消费者、死锁
4.1 Synchronized 版
wait
、notify
和notifyAll
的使用中必须谨防虚假唤醒- 虚假唤醒:多线程环境下,有多个线程执行了wait()方法,需要其他线程执行notify()或者notifyAll()方法去唤醒它们,假如多个线程都被唤醒了,但是只有其中一部分是有用的唤醒操作,其余的唤醒都是无用功;对于不应该被唤醒的线程而言,便是虚假唤醒
- 防止虚假唤醒问题:
wait
方法的调用应该始终放在循环(while
)中,而不是条件判断(if
)中- 当某个线程被错误地唤醒后,如果使用
if
判断条件,线程会继续执行,而不是重新检查条件是否满足,导致逻辑错误
- 当某个线程被错误地唤醒后,如果使用
目前只有 A 和 B 两个线程,一个 +1,一个 -1,用 if 不会出问题,但是如果增加 C 和 D 线程,就会变成两个 +1,两个 -1,此时用 if ,可能会出现问题,如:
A先执行,执行时调用了wait方法,那它会等待,此时会释放锁,如果线程C获得锁并且也会执行wait方法,两个加线程一起等待被唤醒,此时减线程中的某一个线程执行完毕并且唤醒了这俩加线程,那么A获取了锁并且加1,执行完毕之后B再执行,如果是if的话,那么A修改完num后,B不会再去判断num的值,直接会给num+1。如果是while的话,A执行完之后,B还会去判断num的值,因此就不会执行
1 | /** |
4.2 Lock 版
- 对应于 synchronized,JUC 版本下,Lock 锁也有对应的唤醒与停止方法,分别是 condition接口下的signal()与await()
1 | public class B { |
- Condition 的优势?
- 可以实现精准的通知和唤醒线程,如:在A线程执行完后精准唤醒B线程执行,B线程执行完后精准唤醒C线程执行,C线程执行完后精准唤醒A线程执行
- 线程 A: 其他同理
- 通过
condition1.await()
等待number
变为 1。 - 打印
AAAAA
,设置number
为 2,唤醒condition2
上的等待线程(B)
- 通过
1 | /** |
5. 8锁现象
- 如何判断锁锁的到底是谁?
- 锁: 线程同步的机制,用来确保多个线程能够安全访问共享资源
- 锁的是谁:
- 实例方法的锁: 锁的是调用方法的实例对象
- 静态方法的锁: 锁的是该类的
Class
对象
- 从 8 个锁相关问题深刻理解锁
5.1 一对象,两同步
同一个对象,两个同步方法,标准情况下是先发短信还是打电话
- 问题: 两个同步实例方法
sendSms
和call
,哪个先执行? - 结论: 先发短信,再打电话,因为两个方法用的是同一个锁
1 | /** |
5.2 一对象,两同步,延迟4s
同一个对象,两个同步方法
- 问题: 如果
sendSms
方法延迟 4 秒,哪个先执行? - 结论: 还是先发短信,再打电话,因为两个方法用的是同一个锁
1 | class Phone { |
5.3 一对象,一同步,一普通
同一个对象,一个同步方法,一个普通方法
问题: 如果增加一个普通方法
hello
,是先执行发短信还是 hello?结论: 先执行
hello
,再执行发短信,因为普通方法没有锁,不受同步方法的影响
1 | class Phone2 { |
5.4 两对象,两同步
两个对象,两个同步方法
- 问题: 两个不同的对象,分别调用同步方法
call
和sendSms
,哪个先执行? - 结论: 先打电话,再发短信,因为两个不同的对象对应不同的锁
1 | class Phone2 { |
5.5 一对象,两静态同步
一个对象,两个静态同步方法
- 问题: 如果修改为两个静态同步方法,哪个先执行?
- 结论: 先发短信,再打电话,因为两个方法用的是同一个锁,即
Class
对象
1 | class Phone3 { |
5.6 两对象,两静态同步
两个对象,两个静态同步方法
- 问题: 两个不同的对象,分别调用静态同步方法
sendSms
和call
,哪个先执行? - 结论:先发短信,再打电话,因为静态方法锁的是
Class
对象,两个对象共享一个Class
锁
1 | class Phone3 { |
5.7 一对象,一同步,一静态同步
一个对象,一个静态同步方法,一个普通同步方法
- 问题: 同一个对象上调用静态同步方法
sendSms
和普通同步方法call
,哪个先执行? - 结论: 先打电话,再发短信,因为静态同步方法和普通同步方法锁定的是不同的对象,一个是
Class
对象,一个是实例对象
1 | class Phone4 { |
5.8 两对象,一同步,一静态同步
两个对象,一个静态同步方法,一个普通的同步方法
- 问题: 两个不同的对象,分别调用静态同步方法
sendSms
和普通同步方法call
,哪个先执行? - 结论: 还是先打电话,再发短信,因为静态同步方法和普通同步方法锁定的是不同的对象,一个是
Class
对象,一个是实例对象
1 | class Phone4 { |
6. 线程不安全的集合类
6.1 List
- 并发修改异常:
ArrayList
在多线程并发修改下不安全,可能会导致ConcurrentModificationException
- 解决方案:
- 使用
Vector
:Vector
是早期集合框架的一部分,实现了线程安全- 内部方法大多通过
synchronized
关键字同步,但性能较低
- 使用
Collections.synchronizedList
:- 提供线程安全的
List
封装 - 通过内部的同步代码块实现安全性
- 提供线程安全的
- 使用
CopyOnWriteArrayList
:写入时复制! COW 计算机程序设计领域的一种优化策略- 高效的线程安全
List
实现 - 读写分离,写操作时复制整个底层数组,写入效率较低但读取效率高
- 写操作:
- 每次写操作都会复制整个数组,保证写入操作的原子性
- 使用
ReentrantLock
锁实现线程安全
- 读操作:
- 读取操作不加锁,使用数组副本提供一致性读
- 写操作:
- 高效的线程安全
- 使用
1 | import java.util.*; |
性能对比:
Vector
性能较低的原因:- 每个方法都使用
synchronized
关键字锁定整个对象,导致锁争用严重,特别是在多线程环境下。 - 无论读写操作,均需获取锁,降低了并发性能。
- 每个方法都使用
Collections.synchronizedList
:- 使用独立的锁对象,避免锁定整个
List
对象。 - 提供线程安全的
List
封装,通过内部的同步代码块实现安全性。
- 使用独立的锁对象,避免锁定整个
Vector
与Collections.synchronizedList
在性能上差异较小,因为都使用synchronized
关键字同步CopyOnWriteArrayList
性能较高的原因:- 读写分离,读操作无需加锁,写操作时使用
ReentrantLock
锁实现线程安全。 - 适用于读多写少的场景,提高了读取性能。
- 读写分离,读操作无需加锁,写操作时使用
6.2 Set
- HashSet 的底层就是 HashMap,都是线程不安全的,也就是在在多线程环境下可能会出现
ConcurrentModificationException
或数据丢失等问题 - 解决方案:
Collections.synchronizedSet
:- 使用
Collections
工具类提供线程安全封装 - 通过内部的同步代码块保证线程安全
- 使用
CopyOnWriteArraySet
:- 基于
CopyOnWriteArrayList
实现的线程安全Set
- 读写分离,读操作无需加锁,写操作复制底层数组
- 适用于读多写少的场景
- 基于
1 | import java.util.concurrent.CopyOnWriteArraySet; |
6.3 Map
HashMap
是线程不安全的- 解决方案:
Hashtable
:- 早期集合框架中的线程安全
Map
实现 - 内部通过
synchronized
关键字同步 - 性能较低,因为所有方法都需要获取锁
- 早期集合框架中的线程安全
Collections.synchronizedMap
:- 使用
Collections
工具类提供线程安全封装 - 内部通过同步代码块实现线程安全
- 使用
ConcurrentHashMap
:- JUC 包中提供的高效线程安全
Map
实现 - 使用分段锁定技术,允许更高的并发度
- JUC 包中提供的高效线程安全
1 | public class MapTest { |
6.4 总结
- 线程安全集合的选择:
List
:Collections.synchronizedList
,CopyOnWriteArrayList
Set
:Collections.synchronizedSet
,CopyOnWriteArraySet
Map
:Collections.synchronizedMap
,ConcurrentHashMap
- 线程安全集合框架对比:
集合类型 | 线程安全性 | 锁机制 | 读操作性能 | 写操作性能 |
---|---|---|---|---|
ArrayList |
线程不安全 | 无 | 高 | 高 |
Vector |
线程安全 | synchronized 同步方法 |
低 | 低 |
Collections.synchronizedList |
线程安全 | synchronized 同步代码块 |
低 | 低 |
CopyOnWriteArrayList |
线程安全 | ReentrantLock |
高 | 中(复制成本) |
HashSet |
线程不安全 | 无 | 高 | 高 |
Collections.synchronizedSet |
线程安全 | synchronized |
低 | 低 |
CopyOnWriteArraySet |
线程安全 | ReentrantLock |
高 | 中(复制成本) |
HashMap |
线程不安全 | 无 | 高 | 高 |
Hashtable |
线程安全 | synchronized |
低 | 低 |
Collections.synchronizedMap |
线程安全 | synchronized |
低 | 低 |
ConcurrentHashMap |
线程安全 | 分段锁定(ReentrantLock ) |
高 | 高 |
7. Callable 接口
类似 Runnable 接口,但 Callable 接口:
可以有返回值,可以抛出异常
重写的方法不同,Runnable 是 run(),而 Callable 是 call()
如何启动 Callable?
FutureTask
接受一个Callable
实现,作为适配器,使其可以作为Runnable
传递给Thread
构造器- 原理:
FutureTask
实现了Runnable
接口的子接口RunnableFuture
,FutureTask
有一个带Callable
类型参数的构造器,所以FutureTask
可以作为Runnable
接口的实现类传到Thread
的构造器中
结果缓存机制:
FutureTask
通过内部状态state
管理任务的执行和结果缓存,共有 5 种状态:
1
2
3
4
5
6
7
8
9
10
11
12
13
14private volatile int state;
// NEW: 初始状态,任务未启动
private static final int NEW = 0;
// COMPLETING: 任务正在完成中
private static final int COMPLETING = 1;
// NORMAL: 任务成功完成
private static final int NORMAL = 2;
// EXCEPTIONAL: 任务抛出了异常
private static final int EXCEPTIONAL = 3;
// CANCELLED: 任务被取消
private static final int CANCELLED = 4;
// INTERRUPTING/INTERRUPTED: 任务正在被中断
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;- 源码分析:
FutureTask
的run()
方法负责执行Callable
,并在执行后保存结果- 只有当
state
为NEW
时才会执行任务,否则直接返回 - 执行过程中会将
state
更新为COMPLETING
,然后再更新为NORMAL
或EXCEPTIONAL
- 当一个
Callable
被FutureTask
包装后,任务执行后会缓存其结果,避免重复执行
FutureTask
线程安全机制:- CAS: 使用
Unsafe
类的 CAS 操作保证状态变更的原子性 - 锁: 使用
ReentrantLock
锁控制任务执行和结果获取的同步
- CAS: 使用
阻塞调用:
get()
方法会阻塞等待任务完成,如果任务已经完成,则直接返回结果(缓存机制)- 可以使用异步方式获取结果,例如结合
ExecutorService
代码示例:为什么结果只打印出了一个 call() ?
- 线程 A:
- 首次启动
Thread
,执行futureTask.run()
state
为NEW
,成功进入执行逻辑- 执行
call()
方法,打印 “call()” 并返回结果 - 更新
state
为NORMAL
,缓存结果
- 首次启动
- 线程 B:
- 启动
Thread
,再次执行futureTask.run()
state
已经是NORMAL
(或其他非NEW
状态),直接返回- 任务没有被再次执行
- 启动
- 线程 A:
1 | public class CallableTest { |
- 如果希望两个线程都能执行
call()
,需要为每个线程创建独立的FutureTask
实例:
1 | FutureTask<String> futureTask1 = new FutureTask<>(thread); |
8. 常用的辅助类
8.1 CountDownLatch
组团插队
- 作用:允许一组线程等待另一组线程完成特定任务之后再继续执行,可以用于线程之间的协调和同步
- 可以看作一个减法计数器,当计数器归零时,所有等待的线程被唤醒
- 主要方法
countDown()
:- 将计数器减 1
- 当计数器变为 0 时,所有阻塞在
await()
方法的线程将被唤醒
await()
:使当前线程阻塞,直到计数器归零就唤醒,再继续向下运行
- 使用限制:
- 一次性工具: 不能复用
- 计数器不可重置: 每次需要新的
CountDownLatch
实例
- 代码示例:
1 | import java.util.concurrent.CountDownLatch; |
- 工作原理:
- 计数器初始化: 在创建
CountDownLatch
对象时,指定计数器的初始值(如new CountDownLatch(6)
) - 任务线程执行: 每个线程执行任务后调用
countDown()
,使计数器减 1 - 等待线程阻塞: 调用
await()
的线程进入阻塞状态,等待计数器归零 - 计数器归零: 当所有任务线程执行完毕(即计数器减为 0),所有等待线程被唤醒,继续执行后续代码
- 计数器初始化: 在创建
8.2 CyclicBarrier
允许一组线程彼此等待,直到所有线程都到达某个公共屏障点。与
CountDownLatch
不同的是,它是一种可重用的屏障机制主要特性:
- 加法计数器: 指定线程数的栅栏点,只有所有线程都达到该点后才会继续执行
- 可重用: 计数器归零后可以重新使用,不同于
CountDownLatch
的一次性计数器 - 公共屏障点: 可以在所有线程到达屏障点后执行特定任务
主要方法:
await()
:- 使当前线程等待,直到所有线程都调用
await()
并达到屏障点 - 当所有线程都调用该方法时,将执行可选的
barrierAction
- 使当前线程等待,直到所有线程都调用
getNumberWaiting()
:- 返回当前等待屏障点的线程数
isBroken()
:- 返回屏障是否被打破
- 若某个线程在等待时被中断或超时,屏障被打破,所有等待线程抛出
BrokenBarrierException
应用场景:
- 并行任务协调: 等待一组线程都到达某个屏障点再继续执行
- 分阶段执行: 分批次执行多线程任务
源码分析:
- 构造方法:初始化计数器和可选的屏障动作
1
2
3
4
5
6public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}await()
方法:- 调用
dowait()
方法等待 - 计数器减 1,当减到 0 时,执行屏障动作并重置计数器
- 调用
1
2
3
4
5
6
7
8
9private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException, TimeoutException {
// ...
int index = --count;
if (index == 0) {
// ...
}
// ...
}代码示例:
1 | public class CyclicBarrierDemo { |
- 工作原理:
- 计数器初始化: 创建
CyclicBarrier
时,指定需要互相等待的线程数(parties
) - 线程等待: 每个线程调用
await()
方法,计数器减 1,并阻塞等待 - 屏障点到达: 当计数器减到 0 时:
- 执行可选的
barrierAction
。 - 唤醒所有等待线程。
- 重置计数器,以便屏障重新使用。
- 执行可选的
- 屏障被打破: 当等待的线程被中断或超时:
- 抛出
BrokenBarrierException
。 - 屏障被打破,所有等待线程不再阻塞。
- 抛出
- 计数器初始化: 创建
8.3 Semaphore
- 一种信号量实现,用于控制对共享资源的访问
- 信号量维护了一个计数器,计数器的值表示当前可用的资源数
- 线程可以通过信号量获取或释放资源,从而控制同时访问共享资源的线程数量
- 主要方法:
acquire()
:- 获取一个许可,如果没有可用许可,则阻塞等待
- 可重载为
acquire(int permits)
来获取多个许可
release()
:- 释放一个许可,增加可用许可的数量
- 可重载为
release(int permits)
来释放多个许可
- 构造方法:
Semaphore(int permits)
:构造一个具有指定许可数量的信号量Semaphore(int permits, boolean fair)
:构造一个具有指定许可数量的信号量,并指定公平性fair
为true
时,实现公平性,按照线程的等待顺序分配许可
- 应用场景:
- 限流控制: 控制同时访问的线程数量,类似于限制并发访问的连接池(如:限制 Web 服务的最大并发请求数)
- 资源互斥: 访问资源时确保互斥性,类似于互斥锁的实现
- 多线程协作: 控制任务的执行顺序,实现多个线程间的协调
- 代码示例:
1 | public class SemaphoreDemo { |
- 工作原理:
- 信号量初始化:
- 创建
Semaphore
对象时,指定初始的许可数量(permits
) - 内部通过 AQS(
AbstractQueuedSynchronizer
)的状态来维护许可数量
- 创建
- 获取资源:
acquire()
- 如果可用资源大于 0,则直接减 1 并返回
- 否则,当前线程进入等待队列,阻塞等待资源释放
- 释放资源:
release()
- 增加可用资源数量,唤醒等待队列中的第一个线程
- 信号量初始化:
9. 读写锁 ReadWriteLock
9.1 概述
ReadWriteLock
是 JUC 中的一种高级锁,实现了读写锁机制ReentrantReadWriteLock
是ReadWriteLock
接口的实现类,包含内部类ReadLock
和WriteLock
ReadLock
和WriteLock
实现了Lock
接口
1
2
3
4
5
6
7
8
9
10
11public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable {
// ...
public static class ReadLock implements Lock, java.io.Serializable { // 读锁
// ...
}
public static class WriteLock implements Lock, java.io.Serializable { // 写锁
// ...
}
// ...
}
9.2 读写锁特点
读锁与写锁的互斥关系:
- 共享锁(读锁): 多个线程可以同时持有读锁
- 独占锁(写锁): 只能有一个线程持有写锁
读写锁特点:
- 读-读共存: 允许多个线程同时读取数据
- 读-写互斥: 读操作和写操作不能同时进行(保证读操作读取到一致的已提交数据)
- 写-写互斥: 只允许一个线程执行写操作
读写锁的作用:
提高多线程环境下读操作的并发性,适用于读多写少的场景
提供锁降级机制,实现数据一致性和性能的平衡
与其他锁的对比:
synchronized
: 独占锁,每次只能一个线程访问ReentrantLock
: 独占锁,具备可重入性和公平性ReentrantReadWriteLock
: 读写锁,读读共享、读写互斥
应用场景:适用于读多写少的场景,提高读操作并发性
- 如:缓存系统、配置中心
代码示例:使用读写锁的缓存实现
1 | class MyCacheLock { |
9.3 锁降级
- 读写锁的锁降级:指将持有的写锁降级为读锁,在持有写锁的情况下获取读锁,然后释放写锁的过程
- 作用:在一边读一边写的情况下提高性能
- 确保数据的一致性和可见性
- 锁降级和不降级的区别:
- 降级步骤:获取写锁 => 获取读锁 => 释放写锁 => 持有读锁,确保数据一致性
- 保证数据的可见性,在释放写锁后继续保持读锁,确保在写锁被释放后,其他线程不能立即获取写锁修改数据(写了之后读完了别人才能写)
- 降低锁的竞争,提高读操作的并发性
- 不降级步骤:获取写锁 => 释放写锁 => 获取读锁 => 释放读锁
- 直接释放写锁并获取读锁,可能存在数据被其他写操作修改的风险
- 在高并发场景中,可能会出现读到不一致的数据,降低数据的一致性
- 降级步骤:获取写锁 => 获取读锁 => 释放写锁 => 持有读锁,确保数据一致性
- 作用:在一边读一边写的情况下提高性能
1 | class MyCacheWithLockDowngrade { |
10. 阻塞队列
10.1 基本介绍
- 阻塞队列是 Java 并发包中的一种数据结构,提供了一种线程安全的队列操作方式,具有阻塞特性,即在队列满或空时,添加或移除元素的操作会被阻塞,直到队列发生变化
- 继承关系:
BlockingQueue
接口是Queue
接口的子接口,位于java.util.concurrent
包中- 阻塞队列的实现类有
ArrayBlockingQueue
、LinkedBlockingQueue
、SynchronousQueue
等
- 应用场景:
- 多线程并发处理场景:用于生产者-消费者模式等
- 线程池:用于任务调度和任务队列管理
10.2 四组 API
- 阻塞队列提供了四组 API,分别适用于不同的场景,具体如下:
- 抛出异常: 在操作失败时,抛出异常
- 不抛出异常,有返回值: 在操作失败时,返回特定值或标识
- 阻塞等待: 在操作失败时,线程会阻塞等待
- 超时等待: 在操作失败时,线程会阻塞等待一段时间后返回
方式 | 抛出异常 | 有返回值,不抛出异常 | 阻塞等待 | 超时等待 |
---|---|---|---|---|
添加 | add(E e) | offer(E e) | put() | offer(E e, long timeout, TimeUnit unit) |
移除 | remove() | poll() | take() | poll(long timeout, TimeUnit unit) |
获取队首元素 | element() | peek() | - | - |
代码测试:
- 抛出异常:
add()
方法底层调用的offer()
方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20public static void main(String[] args) {
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
// 获取队首元素 队首不存抛异常: NoSuchElementException
// System.out.println(blockingQueue.element());
System.out.println(blockingQueue.add("a")); // true
System.out.println(blockingQueue.add("b")); // true
System.out.println(blockingQueue.add("c")); // true
System.out.println(blockingQueue.element()); // a 获取队首元素
// 抛出异常: IllegalStateException: Queue full
// System.out.println(blockingQueue.add("d"));
System.out.println(blockingQueue.remove()); // a
System.out.println(blockingQueue.remove()); // b
System.out.println(blockingQueue.remove()); // c
// 抛出异常: NoSuchElementException
// System.out.println(blockingQueue.remove());
}- 不抛出异常:而是返回特殊值(通常是
false
或null
)offer()
方法的源码第一句就是判断是否为 null,也就是不能添加 null 值;而add()
方法底层也是直接调用的offer()
方法,所以也不可以添加 null 值
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18public static void main(String[] args) {
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
// 获取队首元素 队首不存在不抛异常
System.out.println(blockingQueue.peek()); // null
System.out.println(blockingQueue.offer("a")); // true
System.out.println(blockingQueue.offer("b")); // true
System.out.println(blockingQueue.offer("c")); // true
// 不抛出异常
System.out.println(blockingQueue.offer("d")); // false
System.out.println(blockingQueue.peek()); // a
System.out.println(blockingQueue.poll()); // a
System.out.println(blockingQueue.poll()); // b
System.out.println(blockingQueue.poll()); // c
// 不抛出异常
System.out.println(blockingQueue.poll()); // null
}- 阻塞等待:线程在操作不能立即执行时阻塞等待,直到操作可以执行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17public static void main(String[] args) throws InterruptedException {
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
blockingQueue.put("a");
blockingQueue.put("b");
blockingQueue.put("c");
// 此时队列已满, 控制台窗口会一直阻塞等待
// blockingQueue.put("d");
System.out.println(blockingQueue.take()); // a
System.out.println(blockingQueue.take()); // b
System.out.println(blockingQueue.take()); // c
// 此时队列已空, 控制台窗口会一直阻塞等待
// System.out.println(blockingQueue.take());
}- 超时等待:线程在操作不能立即执行时阻塞等待一定的时间,如果在指定的时间内操作不能执行,则返回特殊值
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19public static void main(String[] args) throws InterruptedException {
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
blockingQueue.offer("a");
blockingQueue.offer("b");
blockingQueue.offer("c");
// 尝试在2秒内添加元素,如果不能添加则放弃
boolean status = blockingQueue.offer("d", 2, TimeUnit.SECONDS);
System.out.println(status); // false
System.out.println(blockingQueue.poll()); // a
System.out.println(blockingQueue.poll()); // b
System.out.println(blockingQueue.poll()); // c
// 尝试在2秒内取出元素,如果队列空则放弃
String element = blockingQueue.poll(2, TimeUnit.SECONDS);
System.out.println(element); // null
}- 抛出异常:
10.3 SynchronousQueue 同步队列
SynchronousQueue
是阻塞队列的特殊实现(一种无缓冲的等待队列),没有容量,也可以视为容量为1的队列- 相对于有缓冲的 BlockingQueue 来说,少了一个中间经销商的环节(缓冲区)
特点:添加一个元素必须等待另一个线程取走,否则一直阻塞
方法:
put(E e)
:将元素放入队列中。如果没有其他线程正在尝试取走元素,此方法会阻塞take()
:取走队列中的元素。如果没有元素可取,此方法会阻塞,使用Lock 锁保证线程安全
公平性选择:构造
SynchronousQueue
时可以选择公平性。如果设置为公平模式,则线程按照等待时间的长短获得访问权;非公平模式则随机分配
1 | // 如果输出不一致可能是因为 System.out.println 导致的竞态条件或线程调度问题 |
11. 线程池(重点)
11.1 概述
线程池是一种基于池化技术的资源管理工具,用于有效管理线程资源
线程池的好处:线程可以复用,可以控制最大并发量,管理线程
- 降低资源消耗:通过重复利用已创建的线程降低线程创建和销毁的开销
- 提高响应速度:减少了线程创建的时间,改善了程序的响应速度
- 方便线程管理:线程池可以统一分配、调优和监控线程
Java线程池关键组件
- Executor 框架:Java 提供的线程池实现框架,包括以下几个关键类:
Executor
:负责线程使用和调度的根接口。Executors
:工厂类,用于创建不同类型的线程池。ExecutorService
:继承自Executor接口,定义了线程池的生命周期管理方法,如启动、关闭、提交任务等。AbstractExecutorService
:ExecutorService 接口的抽象实现类,提供了 ExecutorService 的基本实现。ThreadPoolExecutor
:AbstractExecutorService 的具体实现类,提供了创建线程池的完整功能。
- Executor 框架:Java 提供的线程池实现框架,包括以下几个关键类:
- 线程池:三大方法,7大参数,4种拒绝策略
11.2 三大方法
线程池的三种常用创建方式(通过Executors)
- SingleThreadExecutor:单个后台线程的线程池
1
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
- FixedThreadPool:固定大小的线程池
1
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);
- CachedThreadPool:大小不固定的线程池,根据需求自动更改数量
1
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
代码示例:
1 | // Executors 三大方法 |
- 注意:阿里开发规范文档指出 => 线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样的处理方式让编写人员更加明确线程池的运行规则,同时规避资源耗尽的风险,Executors 各个方法的弊端:
- FixedThreadPool 和 SingleThreadExecutor:允许的请求队列长度为
Integer.MAX_VALUE
,可能会堆积大量的请求,从而导致 OOM - CachedThreadPool 和 ScheduledThreadPool:允许创建的线程数量为
Integer.MAX_VALUE
(约为21亿),可能会创建大量的线程,从而导致 OOM
- FixedThreadPool 和 SingleThreadExecutor:允许的请求队列长度为
11.3 七大参数-自定义线程池
- Executors 创建方式底层源码分析:
1 | public static ExecutorService newSingleThreadExecutor() { |
- 本质:三种方法都是调用的 ThreadPoolExecutor
1 | public class ThreadPoolExecutorAnalysis { |
- 推荐使用底层线程池手动创建方式:使用
ThreadPoolExecutor
直接构造,这种方式允许自定义参数,更灵活,可以明确线程池的运行规则,避免资源耗尽的风险,七大参数:- corePoolSize:核心线程数,即不被回收的线程数量,除非设置了
allowCoreThreadTimeOut
。 - maximumPoolSize:最大线程数,能容纳的最大线程数量。
- keepAliveTime:线程空闲后的存活时长。
- unit:时间单位,与
keepAliveTime
配合使用。 - workQueue:任务队列,被提交但未执行的任务。
- threadFactory:线程工厂,用于创建线程。
- handler:拒绝策略,当任务太多来不及处理时,如何拒绝任务。
- corePoolSize:核心线程数,即不被回收的线程数量,除非设置了
1 | public class CustomThreadPoolExample { |
底层工作原理:
- 假设来了9个线程,在执行execute()方法才创建线程
- 第1-2个线程进入线程池创建
- 第3-5个线程进入阻塞队列
- 第6-8个线程会为他们创建新线程执行(直接运行线程6而非线程3)
- 第9个线程会被拒绝
总结:先到常驻线程,满了之后再到阻塞队列进行等待,阻塞队列满了之后,在往外扩容线程,扩容线程不能大于最大线程数。大于最大线程数和阻塞队列之和后,会执行拒绝策略
11.4 四种策略
new ThreadPoolExecutor.AbortPolicy()
:默认策略,抛出异常阻止系统正常运行new ThreadPoolExecutor.CallerRunsPolicy()
:调用者运行,将任务回退到调用者,降低新任务流量new ThreadPoolExecutor.DiscardPolicy()
:丢弃任务,不抛出异常new ThreadPoolExecutor.DiscardOldestPolicy()
:抛弃队列中等待最久的任务,尝试再次提交当前任务
11.5 如何设置线程池的最大大小
在设置线程池的最大大小时,针对CPU密集型和IO密集型任务有不同考虑
对于不同类型的任务,应该根据任务的特点和系统资源情况来灵活设置线程池的最大大小,以提高系统的效率和性能
- CPU密集型任务:
- 设置线程池的最大大小为处理器核心数,这样可以最大程度地利用CPU资源,避免线程过多导致线程切换频繁而降低效率
- 可以通过代码获取处理器核心数,然后将最大线程数设置为相应的核心数
1 | int coreCount = Runtime.getRuntime().availableProcessors(); |
- IO密集型任务:
- 根据程序中IO操作的情况来确定最大线程池大小
- 通常情况下,可以将最大线程数设置为大约是最大I/O数的一倍到两倍之间,以保证足够的线程处理IO任务
12. 四大函数式接口(必须掌握!)
Java 程序员必须掌握:
泛型、枚举、反射
lambda表达式、链式编程、函数式接口、Stream流式计算
12.1 概述
- 函数式接口:仅定义一个抽象方法的接口
- 函数式接口通常标注了
@FunctionalInterface
注解,这不是必需的,但有助于编译器检查接口是否符合函数式接口的定义
1 |
|
Java 8引入的四大函数式接口分别是
Consumer
、Function
、Predicate
、和Supplier
,这些接口主要用于常见的操作:消费、转换、断言和提供应用场景:
- Function:适用于转换数据,如从一种类型映射到另一种类型
- Predicate:常用于筛选数据,如在集合操作中进行条件过滤
- Consumer:常用于处理从数据源消费数据,如打印、存储操作
- Supplier:适用于需要多次返回结果的场景,如工厂方法、构造器引用等
在特定的应用场景下,还可以定义自己的函数式接口来更好地满足需求
1 |
|
12.2 Function 接口
- 函数型接口:接受一个输入参数,返回一个结果
- 源码:
1 |
|
- 代码测试:
1 | /** |
12.3 Predicate 接口
- 断定型接口:接受一个输入参数,返回一个布尔值
- 源码:
1 |
|
- 代码测试:
1 | /** |
12.4 Consumer 接口
- 消费型接口:接受单一输入参数,不返回结果
- 源码:
1 |
|
- 代码测试:
1 | /** |
12.5 Supplier 接口
- 供给型接口:无参数,返回一个结果
- 源码:
1 |
|
- 代码测试:
1 | /** |
13. Stream 流式计算
Stream 流式计算是Java 8中引入的一项强大的新特性,它允许以声明性方式处理数据集合
- 通过 Stream API 对数据进行高效的查询、过滤、转换、聚合等操作,无需编写冗长的代码
- 对集合对象功能的增强,专注于对集合对象进行各种便捷和高效的聚合操作或大批量数据操作,借鉴了函数式编程语言的许多概念,利用更丰富的语法对集合数据进行查询和处理
特点:
- 不是数据结构:Stream不存储数据,它只是在源数据(如集合、数组)的基础上提供了一种对数据的高效处理方式
- 只能遍历一次:和迭代器类似,流一旦遍历过一次,便不能重复使用或“倒带”
- 延迟执行:很多Stream操作都是延迟执行的,只有在需要结果的时候才执行(只有在终端操作执行时,所有中间操作才会被实际执行)
- 可以避免对数据的不必要处理,特别是在链式调用中,可以合并多个操作,减少遍历次数
- 支持并行处理:Stream有串行和并行两种模式,通过并行模式可以利用多核处理器的优势,提高执行效率
1
2
3
4
5
6
7List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
// 并行处理一组数字,筛选出偶数并计算它们的和
int sum = numbers.parallelStream()
.filter(n -> n % 2 == 0)
.mapToInt(Integer::intValue)
.sum();
System.out.println("Sum of even numbers: " + sum);核心操作:Stream 操作可以分为中间操作和终端操作两种:
- 中间操作:中间操作都会返回一个新的 Stream。常见的中间操作有
filter
(过滤)、map
(映射)、sorted
(排序)等 - 终端操作:终端操作会从 Stream 产生结果,之后不能再使用Stream。常见的终端操作包括
forEach
( 遍历流中的每个元素,执行给定的操作)、collect
(将Stream转换成不同类型的结果)、reduce
(通过某个连接操作将所有元素汇聚成一个汇总结果)、findAny
(返回流中的任意元素)等
- 中间操作:中间操作都会返回一个新的 Stream。常见的中间操作有
代码示例:
1 | public class Test { |
应用场景:
数据筛选和转换:如从数据库查询到的记录进行预处理,例如过滤、转换等
聚合统计:如统计某个字段的平均值、最大值、最小值等
并行运算:利用Stream的并行流大幅提高数据处理速度
14. Fork/Join 分支合并
14.1 概述
Fork/Join 框架是自 Java 7 引入的一个用于并行执行任务的工具,尤其适合处理那些可以递归方式拆分成更小任务的大问题
- 基于“分治法”的设计思想,旨在充分利用多核处理器的计算能力来提高应用性能
特点:工作窃取(Work Stealing)
- Fork/Join 框架采用工作窃取算法来平衡工作负载。每个线程都维护一个双端队列,忙碌的线程可以将部分任务(从队列尾部拿取)转移给空闲的线程(从队列头部拿取)
实现原理:双端队列
- 每个工作线程都有自己的双端队列,用来存放分配给自己的任务
- 线程主要从自己的队列中取任务执行,当自己的队列空时,可以从其他线程的队列尾部“窃取”任务
核心:Fork/Join 框架的核心在于两个操作:
fork()
和join()
- Fork:将大任务拆分成若干子任务,子任务可以并行执行
- Join:等待子任务完成,并将所有子任务的结果合并成总结果
使用场景:
- 大数据处理
- 图像处理
- 大规模数值处理
使用方法:
- 创建 ForkJoinPool:所有 ForkJoin 任务都需要通过 ForkJoinPool 来执行,ForkJoinPool 是任务管理和执行的核心
- 定义任务:创建继承自
ForkJoinTask
(通常是它的子类RecursiveAction
或RecursiveTask
)的类RecursiveTask
(有返回值)RecursiveAction
(无返回值)
- 启动任务:通过
ForkJoinPool
的invoke()
或submit()
方法启动任务
14.2 代码示例
ForkJoinDemo 类:继承自
RecursiveTask<Long>
,是一个可以返回结果的任务**临界值 (
temp
)**:用于控制任务分解的粒度。如果任务的大小小于此值,则不再继续分解任务,而是直接进行计算。**fork()**:将子任务推送到ForkJoinPool的工作队列。
**join()**:等待子任务完成,并获取其结果。
1 | public class ForkJoinDemo extends RecursiveTask<Long> { |
- 测试类:使用三种不同的方法来计算从 1 到 20亿的整数和
- **test1()**:使用单线程迭代的方法计算总和
- **test2()**:使用 ForkJoin 框架执行同样的计算。通过
ForkJoinPool.invoke()
方法启动 ForkJoin 任务 - **test3()**:使用 Java 8 Stream API 的并行流进行计算。Stream API 的并行流内部使用的也是 ForkJoinPool
.parallel().reduce(0, Long::sum)
使用并行流来执行归约操作,这里用于计算一个长整型数列的总和- **.parallel()**:用于将流转换为并行流,并行流利用Java的Fork/Join框架,允许在多核处理器上并行处理任务,从而加快执行速度
- **.reduce(0, Long::sum)**:通过指定的函数来合并流中的元素,这里使用的是两参数版本的
reduce
方法- 第一个参数
0
是流为空时的默认结果 - 第二个参数
Long::sum
是一个方法引用,指向一个接受两个参数并返回它们的和的方法。它定义了如何合并流中的元素
- 第一个参数
- reduce 方法的优点:灵活、不需要存储中间状态,并行性能高、不会修改原始数据源
1 | public class Test { |
- Stream 并行流提供了最好的性能和最简洁的代码。
- Fork/Join 框架提供了较好的性能提升,适用于更复杂的任务分解和自定义并行处理逻辑。
- 普通迭代方法虽然实现最简单,但在处理大数据量时性能最低。
对于大多数大规模计算任务,推荐使用 Stream 并行流或 Fork/Join 框架来利用现代多核 CPU 的计算能力
15. 异步回调
15.1 同步和异步
- 同步操作:程序按顺序执行,必须等待当前操作完全完成后才能继续到下一步
- 例如,从数据库读取数据或从网络加载资源,线程会被阻塞,直到操作完成
- 异步操作:程序可以在等待操作完成的同时继续执行其他任务。
- 异步操作常见于不希望阻塞主线程的场景,例如GUI应用程序、大规模计算和高性能Web服务器
15.2 CompletableFuture
CompletableFuture
是在Java 8中引入的,用于增强现有的Future
接口,主要用于异步编程- 异步通常意味着非阻塞,可以使得我们的任务单独运行在与主线程分离的其他线程中,并且可以通过回调在主线程中获取异步任务的执行状态、完成情况以及异常信息等
- CompletableFuture 实现了 Future,CompletionStage 接口,使得它既兼容现有的线程池框架,又提供了异步编程的接口抽象
- 常用方法:
- runAsync:执行没有返回值的异步任务
- supplyAsync:执行有返回值的异步任务
- 特点:
- 支持手动完成,可以显式地设置其结果
- 提供了异常处理机制
- 支持回调函数,当
Future
完成时可以自动触发 - 支持链式调用,允许将多个异步操作的结果串联起来
- 支持合并多个
CompletableFuture
,可以等待多个CompletableFuture
完成后再继续执行
- 代码示例:
whenComplete
:此方法接收两个参数:t
和u
- t:代表正常返回的结果
- u:代表抛出异常的错误信息
- 异常处理:
exceptionally
:当异步操作发生异常时,可以通过该方法来定义一个回调函数,用于处理异常,该方法接收一个函数,该函数的输入是引发问题的异常对象,并返回一个替代值来“修复”异常情况,继续后续的处理流程get
方法在调用时会阻塞直到异步操作完成。如果操作成功完成,则返回正常的结果;如果操作中发生异常且没有被exceptionally
方法处理,则会抛出一个ExecutionException
。如果使用exceptionally
处理了异常,则get
将返回exceptionally
中定义的替代值
1 | public static void main(String[] args) throws ExecutionException, InterruptedException { |
15.3 Future VS. CompletableFuture
Futrue 在 Java 里面,通常用来表示一个异步任务的引用,比如我们将任务提交到线程池里面,然后我们会得到一个 Futrue,在 Future 里面有 isDone 方法来 判断任务是否处理结束,还有 get 方法可以一直阻塞直到任务结束然后获取结果
- 但整体来说这种方式,还是同步的,因为需要客户端不断阻塞等待或者不断轮询才能知道任务是否完成
对比Future和CompletableFuture,CompletableFuture优势:
支持非阻塞回调:
CompletableFuture
支持在任务完成时,自动执行某些函数(回调),而Future
则需要调用者不断检查更丰富的操作:
CompletableFuture
支持更丰富的方法和流式操作,如thenApply
,thenAccept
,thenCompose
, 和combine
等,这些都是Future
所不具备的异常处理:
CompletableFuture
提供了异常处理的直接支持,而Future
则没有
16. JMM
JMM(Java Memory Model):JAVA 内存模型,是一个抽象的概念或约定,用于定义Java程序中变量的访问规则以及在多线程环境下如何进行线程间的通信
- 它描述了程序中各种变量(包括类实例字段、静态字段和构造数组对象)的访问方式
JMM 的同步约定:
- 线程解锁前必须立即将共享变量刷新回主存:确保释放锁时,对变量的修改能够被接下来获取该锁的其他线程看到
- 线程加锁前必须读取主存中的最新值到工作内存中:确保获取锁后,工作内存中的变量是最新的
- 加锁和解锁必须是同一把锁:保证锁的获取和释放的一致性
线程内存交互操作:Java 内存模型定义了以下8种操作来控制线程对内存的交互
- **Read(读取)**:从主内存中读取变量到线程的工作内存
- **Load(载入)**:在工作内存中对变量赋值(跟随Read操作)
- **Use(使用)**:使用工作内存中的变量
- **Assign(赋值)**:向工作内存中的变量赋新值
- **Store(存储)**:将工作内存中的变量的值写回主内存(准备写入操作)
- **Write(写入)**:将Store的值真正写入主内存
- **Lock(锁定)**:标记变量在主内存中开始处于锁定状态
- **Unlock(解锁)**:标记变量在主内存中结束锁定状态
JMM 的操作规定:
- Read和Load、Store和Write操作必须成对出现:保证内存值的正确传递
- 线程对变量的修改必须同步回主内存:保证其他线程能看到最新值
- 初始化变量必须在主内存中进行:避免使用未初始化的数据
- 变量锁定和解锁必须一致:确保每次只有一个线程可以修改变量
代码示例问题分析:
num
变量的更新可能对另一个线程不可见,因为没有适当的同步措施。这可能导致程序无法如预期终止,因为线程1可能永远看不到num
的更新
1 | private static int num = 0; |
为解决这一问题,可以通过添加volatile
关键字来声明num
变量,这将保证每次读写都直接从主内存完成,从而保证了可见性
17. Volatile
17.1 概述
- Volatile 是 Java 虚拟机提供的一种轻量级的同步机制
- 特点:
- 保证可见性:确保一个线程修改了该变量的值后,其他线程能够立即得知这个修改
- 不保证原子性:虽然 volatile 变量的读写是原子的,但复合操作(如递增)不是原子的
- 禁止指令重排:确保编译器在编译过程中不会对涉及 volatile变量的指令进行重排序,这是通过插入内存屏障来实现的
17.2 保证可见性
- 代码示例:使用 volatile 防止死循环
- 不使用
volatile
修饰number
,其他线程可能无法看到number
的改变,导致无限循环 - 使用
volatile
后,一旦number
的值被修改,所有线程都能立即看到这一变化
- 不使用
1 | public class JMMDemo01 { |
17.3 不保证原子性
- 原子性:不可分割,线程在执行任务的时候要么同时成功,要么同时失败
- 代码示例:验证 volatile 不保证原子性
- 代码中即使
number
是volatile变量,number++
操作的非原子性导致最终结果可能不是预期的20000
- 代码中即使
1 | /** |
- 如何保证原子性?除了使用
lock
和synchronized
关键字?- Java 还提供了基于
java.util.concurrent.atomic
包中的一系列原子类 - 这些原子类使用了高效的机制来保证单个变量操作的原子性,通常是通过底层的CAS(Compare-And-Swap)操作实现
- Java 还提供了基于
1 | // 适用 AtomicInteger 类代替普通的 int 变量 |
- 底层原理:
- 原子类的工作原理:原子类在底层使用了CAS操作
- CAS操作涉及三个操作数:内存位置(在这里是
number
的值)、预期原值和新值 - 如果内存位置的当前值与预期原值相匹配,那么处理器会自动将该位置值更新为新值。这个过程是原子的,即不可中断,保证了更新操作的原子性
- CAS操作涉及三个操作数:内存位置(在这里是
Unsafe
类:原子类的实现依赖于Unsafe
类,这是 Java 中一个提供底层、不安全操作的类,如直接内存访问和非常规的对象实例化等。Unsafe
类使得Java能够执行类似指针的操作,并且能够进行底层的内存操作
- 原子类的工作原理:原子类在底层使用了CAS操作
使用
AtomicInteger
类和其他原子类是在不想使用显式同步(例如synchronized
或Java锁API)时确保数据完整性的一种有效方式。它们特别适用于计数器或累加器,以及任何只需要对单个变量进行原子操作的场景。这些原子类不仅效率高,而且代码简洁,易于理解和维护
17.4 防止指令重排
- 指令重排:指计算机程序执行过程中,为了优化性能和利用硬件的并行处理能力,编译器和处理器可能会改变指令的执行顺序。重排可以发生在多个阶段:
- 编译器优化重排:编译器在生成机器代码时,可能会重新安排指令顺序以提高执行效率
- 指令并行重排:现代处理器可能会并行执行多个指令,无需严格按照程序中的原始顺序
- 内存系统重排:处理器和内存系统可能会改变操作执行的顺序,这与缓存和内存访问的优化有关
1 | 源代码–>编译器优化重排–>指令并行也可能会重排–>内存系统也会重排–>执行 |
数据依赖性:处理器在进行重排时会考虑到指令之间的数据依赖性。如果一条指令的结果依赖于前一条指令的结果,处理器会保留这种依赖关系,确保程序的执行结果符合逻辑的预期
- 如:理论上,执行顺序可能变为
2-1-3-4
或1-3-2-4
,但不会是4-1-2-3
,因为4
操作依赖于3
操作的结果
1
2
3
4int x = 1; // 1
int y = 2; // 2
x = x + 5; // 3
y = x * x; // 4- 如:理论上,执行顺序可能变为
volatile 防止指令重排:使用 volatile 变量时,JVM 会插入内存屏障来防止指令重排
- 内存屏障:是一种CPU指令,用于实现以下两个主要目的:
- 保证特定操作的执行顺序:防止屏障前后的指令进行重排
- 保证变量的内存可见性:确保屏障前的写操作在屏障后的读操作可见
- 通过插入读屏障和写屏障,
volatile
确保在读取volatile
变量之前的所有操作完成,且结果对后续的读取可见;写入volatile
变量后的操作不会被重排到写操作之前,示例如下:- 理论上,最终
x
和y
的值可能是(0,1)
,(1,0)
,(1,1)
,但由于volatile
的使用,不会出现(0,0)
,因为每个线程对a
或b
的写入在另一个线程读取之前已经完成和可见 - 情况
(1, 0)
- 线程B 执行
b = 1
完成后的写内存屏障确保b
的更新对所有其他线程可见 - 线程A 在执行
x = b
时,必须通过读内存屏障,确保看到b
的最新值(即1)。但在此之前,a
可能还未被 线程B 读到,所以y
可能仍是0
- 线程B 执行
- 情况
(0, 1)
- 线程A 执行
a = 1
完成后的写内存屏障确保a
的更新对所有其他线程可见 - 线程B 在执行
y = a
时,必须通过读内存屏障,确保看到a
的最新值(即1)。但在此之前,b
可能还未被 线程A 读到,所以x
可能仍是0
- 线程A 执行
- 情况
(1, 1)
- 如果 线程A 和 线程B 的写操作(
a = 1
和b = 1
)都完成,并且各自的写内存屏障生效之后,对方线程的读操作(x = b
和y = a
)发生。这意味着每个线程都能看到对方的变量更新,结果就是(1, 1)
- 如果 线程A 和 线程B 的写操作(
- 理论上,最终
- 内存屏障:是一种CPU指令,用于实现以下两个主要目的:
1 | volatile int a = 0, b = 0; |
在单例模式等场景中,volatile
常被用于确保对象创建过程的安全性,防止对象未完全构造就被其他线程访问
18. 单例模式
- 单例模式是一种设计模式,用于确保一个类只有一个实例,并提供一个全局访问点
- 在 Java 中,单例模式的实现主要有几种方式:饿汉式、懒汉式(DCL双重检查锁定实现)、静态内部类和枚举方式
18.1 饿汉式
1 | /** |
18.2 懒汉式-DCL实现
- DCL懒汉式单例 是在需要时才创建实例,利用双重检查锁定机制确保只创建一个实例,同时使用
volatile
关键字防止指令重排,确保线程安全- 通过在私有构造器中使用同步代码块和标识量来防止反射破解。然而,由于反射机制可以绕过私有构造器的限制,所以即使在构造器中使用了同步代码块和标识量,也无法阻止反射机制创建新的实例
1 | /** |
18.3 静态内部类
- 使用静态内部类可以达到懒加载的效果,并且由于类加载机制保证了实例的唯一性和线程安全性
- 还是防止不了反射,因为还是有私有构造器
1 | public class Holder { |
18.4 枚举-防止反射破坏单例
Java的枚举提供了一种简洁的方式来实现单例
- 枚举自带防反射和防序列化破坏的功能
1
2
3
4
5// java.lang.reflect.Constructor.java 源码
// 可以看到在使用反射获取的构造器创建实例的源码底层设计了禁止通过反射创建枚举对象
// 如果是枚举类型,就会抛出异常
if ((clazz.getModifiers() & Modifier.ENUM) != 0)
throw new IllegalArgumentException("Cannot reflectively create enum objects");- 使用枚举不仅简单,而且由于Java虚拟机从根本上保证了每个枚举常量的唯一性,因此通过枚举实现的单例模式也是线程安全的
1 | // enum 本身就是一个 Class 类 |
19. CAS
19.1 概述
CAS(Compare-And-Swap)是一种用于实现同步原语的技术,广泛用于多线程编程中实现无锁的并发算法
- 锁-Free机制:不依赖传统的锁机制(如互斥锁、读写锁)来同步线程的访问,而是通过原子操作来保证代码的安全执行
操作元素:CAS涉及三个基本操作数
- **V (内存位置)**:需要更新的内存地址
- **A (预期原值)**:期望内存位置的值
- **B (新值)**:如果位置的当前值与预期相符,需要写入的新值
工作原理:CAS操作会原子性地执行以下步骤
- 读取当前值:从内存位置V读取当前值
- 比较当前值与预期值:检查内存位置的当前值是否与预期值A相等
- 条件更新:如果当前值与预期值相等,那么将内存位置的值更新为新值B
缺点:
- 循环时间长、开销大:如果多个线程同时尝试更新同一变量,可能导致高CPU占用,因为线程需要在循环中不断尝试
- 仅保证单一变量的原子性:对于涉及多个变量的复合操作,CAS无法直接保障其原子性
- ABA问题:变量在被更新前后可能会被改变多次,导致CAS认为没有变化,实际上值已经被修改过
- eg. 变量V先从A变为B,然后又从B变回A,那么使用CAS进行检查时会认为这个变量没有被修改过,但实际上它已经被修改了两次
19.2 代码示例
- 在 Java 中利用原子操作类实现,
java.util.concurrent.atomic
包下的类例如AtomicBoolean
,AtomicInteger
,AtomicLong
以Atomic开头的包装类提供了 CAS 功能。它们分别用于 Boolean,Integer,Long 类型的原子性操作
1 | public class CASDemo { |
Unsafe
类在Java中提供了一个低级别的非安全机制,包括对内存的直接操作和对CAS操作的支持Unsafe
提供的compareAndSwap
方法直接映射到硬件的原子指令,使得Java能够利用这些底层指令来实现原子操作
1 | // Unsafe.java |
19.3 ABA 问题
- 本质是 CAS 操作只能检查变量当前的值是否与预期的值相同,但它不能检测在操作间隙内该值是否被修改过
- 如果一个变量的值原先是A,被改变为B,然后又被改回A,使用CAS进行比较时将无法识别出变量已经被修改过
- 举例:
- 初始状态:小童的账户余额为1000元
- 操作序列:
- 线程1:尝试从账户中提取500元。它检查余额是否为1000元,并准备更新为500元
- 线程1 成功执行,账户余额现在是500元
- 线程2 也想从账户中提取500元。它开始时检查余额为1000元,但在它执行时,线程1 已经将余额改为500元。此时,线程2 被暂停或阻塞
- 线程3(小童的妈妈)为账户汇入500元,余额恢复为1000元
- 线程2 恢复执行,检查余额仍然为其记忆中的1000元,然后尝试将余额更新为500元。由于当前余额确实是1000元,CAS检查通过,余额被更新为500元
- 结果:账户本应仅被扣款500元,最终却被错误地扣款两次,余额变为500元
19.4 解决 ABA 问题-带版本号的原子引用
- 为了解决ABA问题,可以使用带版本号的原子引用
AtomicStampedReference
是Java提供的一个用于解决ABA问题的类。它通过维护一个“时间戳”(或称为版本号)来管理每个变量的版本,从而防止ABA问题- 每次变量更新都伴随一个版本号的增加,即使数据再次回到原始状态,版本号也会不同,从而避免ABA问题
- 代码示例:
- 线程a:成功地两次改变了值,并且每次操作后版本号都递增了
- 线程b:尽管在它执行时值又回到了1,但由于它的操作基于过时的版本号1,所以它的更新操作失败
1 | public class ABASolution { |
注意:
compareAndSet
方法内部使用==
来比较当前引用和传入的期望引用。对于Integer
类型,这种比较是基于对象的内存地址,而非其数值
1
2
3
4
5
6
7
8
9
10
11
12public boolean compareAndSet(V expectedReference,
V newReference,
int expectedStamp,
int newStamp) {
Pair<V> current = pair;
return
expectedReference == current.reference &&
expectedStamp == current.stamp &&
((newReference == current.reference &&
newStamp == current.stamp) ||
casPair(current, Pair.of(newReference, newStamp)));
}- 自动装箱问题:
- 在Java中,整数类型如
Integer
在-128到127之间使用了对象缓存机制。超出这个范围,或者在特定情况下,整数会被重新装箱成新的对象,导致即使数值相同,内存地址也可能不同 - 在
compareAndSet
操作中,如果期望的引用数值是通过字面量或计算得出的新Integer
对象,可能会因为对象地址不同而导致比较失败 - 所以使用 Integer 作为泛型进行测试时,如果使用的数字大于128了,使用原子引用时就不会进行版本上升
- 在Java中,整数类型如
- 正常业务操作中,我们一般使用的是对象类型作为泛型(如 User 类),一般情况不会遇到这种情况
20. 理解各种锁
20.1 公平锁、非公平锁
- 公平锁:
- 确保获取锁的顺序按照线程请求锁的顺序来进行,即先到先得
- 可以防止线程饥饿,但可能导致整体吞吐量较低,因为每次都需要在多个线程中严格排序
- 非公平锁:
- 允许插队,不保证请求锁的顺序,可以减少线程切换的开销,提高系统整体的吞吐量
- 可能导致线程饥饿,即某些线程可能长时间获取不到锁
- 用法: 在创建可重入锁时,想构造器中传入true
1 | private final ReentrantLock fairLock = new ReentrantLock(true); // 公平锁 |
- ReentrantLock 的构造器源码如下:
1 | public ReentrantLock() { // 公平锁 |
20.2 可重入锁(递归锁)
- 可重入锁允许线程进入任何一个它已经拥有的锁同步的代码块
- synchronized 和 lock 都是可重入锁
synchronized
关键字提供的锁是内置的Java关键字,并且是隐式的可重入锁,不用手工上锁与解锁
1 | public class Demo01 { |
Lock
是显式的可重入锁,必须手动地锁定和解锁
1 | class Phone2 { |
20.3 自旋锁
自旋锁是指尝试获取锁的线程不会立即阻塞,而是采用循环的方式尝试获取锁,这样可以减少线程上下文切换的消耗,提高效率
代码示例:设计一个自己的锁
1 | public class SpinLock { |
- 测试类:
1 | public class TestSpinLock { |
- 运行过程:
- t1线程首先启动,尝试并成功获取自旋锁。获取锁后,它将持有锁3秒钟,期间其他线程不能获取锁
- t2线程随后启动,由于t1线程已经持有锁,t2线程会进入自旋状态,不断尝试获取锁,直到t1线程释放锁
- 当t1线程释放锁后,t2线程立即获取锁,然后也保持3秒后释放
20.4 死锁
死锁是多个线程在执行过程中因争夺资源而造成的一种僵局
代码示例:背
1 | public class DeadLockDemo { |
检测死锁:
- jps(Java Virtual Machine Process Status Tool):
- 用于查看在系统中运行的Java进程
- 命令:
jps -l
(列出系统中所有Java进程的PID和主类的全名)
- jstack:
- 用于生成Java虚拟机当前时刻的线程快照(堆栈跟踪)
- 命令:
jstack [pid]
(pid
是通过jps
查到的Java进程ID) - 死锁信息通常出现在输出的最后部分,标识为Deadlock
- jps(Java Virtual Machine Process Status Tool):
排查问题的方法:日志、堆栈信息
20.5 乐观/悲观锁
在Java中,乐观锁和悲观锁是用于管理并发操作中数据一致性和完整性的两种策略
悲观锁:假设最坏的情况,即认为数据在被读取的同时,一定会有其他线程来尝试修改这些数据
- 因此,悲观锁在数据被读取时立即对其进行锁定,直到事务完成才释放锁。这种锁的主要目的是避免数据被其他事务修改
- 在Java中,悲观锁可以通过
synchronized
关键字或Lock
接口实现的各种锁(如ReentrantLock
)实现
乐观锁:基于数据不会被频繁修改的假设,每次去更新数据时,它都会假设没有其他线程对这些数据进行修改
- 乐观锁通常会使用版本号或时间戳来实现
- Java 中使用
Atomic
类(如AtomicInteger
,AtomicReference
)提供的原子操作实现乐观锁
使用场景比较:
悲观锁更适合写操作多的场景,可以避免数据更新的冲突
乐观锁适用于读操作多的场景,减少了锁的开销,可以提高查询性能,但需要处理更新失败的情况