JUC,指的是 JDK 的三个并发包:java.util.concurrent、java.util.concurrent.atomic、java.util.concurrent.locks
一、谈谈你对 volatile 的理解
1、volatile 是 Java 虚拟机提供的轻量级的同步机制
- 保证可见性
- 不保证原子性
- 禁止指令重排
2、谈谈 JMM(线程安全性获得保证)
JMM(Java 内存模型)本身是一种抽象的概念,并不真实存在,它描述的是一组规则或者规范。通过这组规范定义了程序中各个变量(包括实例字段,静态字段和构成数组对象的元素)的访问方式。三种性质:可见性、原子性、有序性。
可见性
JMM 关于同步的规定:
- 线程解锁前,必须把共享变量的值刷新回主内存;
- 程序加锁前,必须读取主内存的最新值到自己的工作内存;
- 加锁解锁是同一把锁。
由于 JVM 运行程序的实体是线程,而每个线程创建时 JVM 都会为其创建一个工作内存(有些地方称为栈空间),工作内存是每个线程的私有数据区域,而 Java 内存模型中规定所有变量都存储在主内存,主内存是共享内存区域,所有线程都可以访问,但线程对变量的操作(读取赋值等)必须在工作内存中进行,首先要将变量从主内存拷贝到自己的工作内存空间,然后对变量进行操作,操作完成后再将变量写回主内存,不能直接操作主内存中的变量,各个线程中的工作内存中存储着主内存中的变量副本拷贝,因此不同的线程间无法访问对方的工作内存,线程间的通信(传值)必须通过主内存来完成,如图:
原子性
不可分割,number++ 在多线程下是非线程安全的,如何不加 synchronized 解决?使用 juc 包下的 AtomicInteger。
有序性
计算机在执行程序时,为了提高性能,编译器和处理器的常常会对指令做重排,一般分以下 3 种:源代码 → 编译器优化的重排 → 指令并行的重排 → 内存系统的重排 → 最终执行的指令。
处理器在进行指令重排时必须要考虑指令之间的数据依赖性。单线程环境里指令重排不影响最终执行结果。多线程环境中线程交替执行,由于编译器优化重排的存在,两个线程中使用的变量能否保证一致性是无法确定的。
volatile 能够实现禁止指令重排优化,从而避免多线程环境下程序出现乱序执行的现象。内存屏障(Memory Barrier)又称内存栅栏,是一个 CPU 指令,它的作用有两个:
- 保证特定操作的执行顺序;
- 保证某些变量的内存可见性(利用该特性实现 volatile 的内存可见性)。
由于编译器和处理器都能执行指令重排优化。如果在指令间插入一条 Memory Barrier 则会告诉编译器和 CPU,不管什么指令都不能和这条 Memory Barrier 指令重排序,也就是说通过插入内存屏障禁止在内存屏障前后的指令执行重排序优化。内存屏障另外一种作用是强制刷新各种 CPU的缓存数据,因此任何 CPU 上的线程都能读取到这些数据的最新版本。
3、你在哪些地方用到过 volatile
单例模式 DCL 代码。
二、CAS
1、什么是 CAS,以及底层原理
CAS(compareAndSet)比较并交换,它是一条 CPU 并发原语。它的功能是判断内存某个位置的值是否为预期值,如果是则更改为新的值,这个过程是原子的。
public class Demo2 {
public static void main(String[] args) {
AtomicInteger atomicInteger = new AtomicInteger(5);
// 当 atomicInteger 中的值为 5 时,将其更新为 19。底层调用 unsafe.compareAndSwapInt
System.out.println(atomicInteger.compareAndSet(5, 19) + "\t data:" + atomicInteger.get());
System.out.println(atomicInteger.compareAndSet(5, 19) + "\t data:" + atomicInteger.get());
}
}
运行结果:
true data:19
false data:19
atomicInteger.getAndIncrement() 的源码:
public final int getAndIncrement() {
return unsafe.getAndAddInt(this, valueOffset, 1);
}
/**
* 比较当前工作内存中的值和主内存中的值,如果相同则执行规定操作。否则继续执行直到值一致为止。
* var1 AtomicInteger 对象本身
* var2 该对象值得引用地址
* var4 需要变动的数量
* var5 是用 var1 和 var2 从主内存中取到的最新值
* 用该值与 var5 进行比较
* 如果相同,则更新 var5+var4并且返回 true
* 不同则继续取值再比较
*/
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
} while (!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
return var5;
}
2、你对 UnSafe 的理解
UnSafe 是 CAS 的核心类,由于 Java 方法无法直接访问底层系统,需要通过本地(native)方法来访问,UnSafe 相当于一个后门,基于该类可以直接操作特定内存的数据。UnSafe 类存在于 sum.misc 包中,其内部方法操作可以像 C 的指针一样直接操作内存,因此 Java 中 CAS 操作的执行依赖于 UnSafe 类的方法。
3、CAS 的缺点
- 循环时间长开销很大:不成功一直执行;
- 只能保证一个共享变量的原子操作:对于多个共享变量操作时,循环 CAS 就无法保证操作的原子性,这个时候就需要锁来保证原子性;
- 引出 ABA 问题。
三、谈谈原子类 AtomicInteger 的 ABA 问题,原子更新引用知道吗?
1、ABA 问题怎么产生?
CAS 算法实现一个重要前提需要取出内存中某时刻的数据并在当下时刻比较并替换,那么在这个时间差内,数据可能发生变化。
比如:一个线程 one 从内存位置 V 中取出 A,这时候另一个线程 two 也从内存中取出 A,并且线程 two 进行了一些操作将值变成了 B,然后线程 two 又将 V 位置的数据变成 A,这时候线程 one 进行 CAS 操作发现内存中仍然是 A,然后线程 one 操作成功。尽管线程 one 的 CAS 操作成功,但是不代表这个过程没问题。这就是 ABA 问题。
2、原子引用
public class Demo2 {
static AtomicReference<Integer> atomicReference = new AtomicReference<>(100);
public static void main(String[] args) {
new Thread(() -> {
atomicReference.compareAndSet(100, 101);
atomicReference.compareAndSet(101, 100);
}, "t1").start();
new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(atomicReference.compareAndSet(100, 2021) + "\t data:" + atomicReference.get());
}, "t2").start();
}
}
运行结果为 true data:2021
存在 ABA 问题。
3、时间戳原子引用
利用 AtomicStampedReference 来解决 ABA 问题,原子引用 + 版本号(类似于时间戳)。
public class Demo2 {
static AtomicStampedReference<Integer> atomicStampedReference = new AtomicStampedReference<>(100, 1);
public static void main(String[] args) {
new Thread(() -> {
int stamp = atomicStampedReference.getStamp();
System.out.println(Thread.currentThread().getName() + "\t 第一次版本号:" + stamp);
// 暂停 1 秒让线程 2 先获取当前版本
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
atomicStampedReference.compareAndSet(100, 101, atomicStampedReference.getStamp(), atomicStampedReference.getStamp() + 1);
System.out.println(Thread.currentThread().getName() + "\t 第二次版本号:" + atomicStampedReference.getStamp());
atomicStampedReference.compareAndSet(101, 100, atomicStampedReference.getStamp(), atomicStampedReference.getStamp() + 1);
System.out.println(Thread.currentThread().getName() + "\t 第三次版本号:" + atomicStampedReference.getStamp());
}, "t1").start();
new Thread(() -> {
int stamp = atomicStampedReference.getStamp();
System.out.println(Thread.currentThread().getName() + "\t 第一次版本号:" + stamp);
// 暂停 3 秒,让线程 1 进行 ABA 操作
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
boolean result = atomicStampedReference.compareAndSet(100, 2021, stamp, atomicStampedReference.getStamp() + 1);
System.out.println(Thread.currentThread().getName() + "\t 修改是否成功:" + result + "\t 当前最新版本号" + atomicStampedReference.getStamp());
System.out.println(Thread.currentThread().getName() + "\t 当前最新值:" + atomicStampedReference.getReference());
}, "t2").start();
}
}
运行结果:
t1 第一次版本号:1
t2 第一次版本号:1
t1 第二次版本号:2
t1 第三次版本号:3
t2 修改是否成功:false 当前最新版本号3
t2 当前最新值:100
四、ArrayList 是线程不安全的,请编写一个不安全的案例并给出解决方案
不安全的案例:
public class Demo2 {
public static void main(String[] args) {
List<String> list = new ArrayList<>();
for (int i = 0; i < 6; i++) {
new Thread(() -> {
list.add(UUID.randomUUID().toString().substring(0, 8));
System.out.println(list);
}, String.valueOf(1)).start();
}
}
}
故障现象
java.util.ConcurrentModificationException
导致原因
并发争抢修改导致。
解决办法
为什么不安全?因为 add 等方法,没有加 synchronized 关键字。
- new Vector<>():Vector 其实就是加 synchronized 关键字的 ArrayList;
- Collections.synchronizedList(new ArrayList<>()):
- new CopyOnWriteArrayList<>():
CopyOnWrite 容器即写时复制的容器,往一个容器添加元素的时候,不直接往当前容器 Object[] 添加,而是先将当前容器 Object[] 进行 Copy,复制出一个新的容器 Object[] newElements,然后往新的容器 Object[] newElements 里添加元素,添加完元素之后,再将原容器的引用指向新容器 setArray(newElements);。这样做的好处是可以对 CopyonWrite 容器进行并发的读,而不需要加锁,因为当前容器不会添加任何元素。所以 CopyOnWrite 容器也是一种读写分离的思想,读和写不同的容器。
但是 CopyOnWriteArrayList 有其缺陷:
- 内存占用:在写操作时需要复制一个新的数组,使得内存占用为原来的两倍左右;
- 数据不一致:读操作不能读取实时性的数据,因为部分写操作的数据还未同步到读数组中。
- 不适合内存敏感以及对实时性要求很高的场景。
public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1);
newElements[len] = e;
setArray(newElements);
return true;
} finally {
lock.unlock();
}
}
Set
HashSet 的底层是 HashMap,add 方法为什么只需给一个元素?值为一个 Object 的常量。
public boolean add(E e) {
return map.put(e, PRESENT)==null;
}
HashSet 也是线程不安全的,解决办法:
- Collections.synchronizedSet(new HashSet<>()):
- new CopyOnWriteArraySet<>():低层是 CopyOnWriteArrayList;
Map
HashMap 也是线程不安全的,解决办法:
- Collections.synchronizedMap(new HashMap<>());
- ConcurrentReaderHashMap();
五、公平锁/非公平锁/可重入锁/递归锁/自旋锁/读写锁
1、公平锁和非公平锁
并发包中 ReentrantLock 的创建可以指定构建函数的 boolean 类型来得到公平锁或非公平锁,默认是 false 非公平锁。非公平锁的优点在于吞吐量比公平锁大,synchronized 是一种非公平锁。
- 公平锁是指多个线程按照申请锁的顺序来获取锁,类似排队打饭,先来后到。类似于 FIFO 的规则。
- 非公平锁是指多个线程获取的顺序并不是按照申请锁的顺序,有可能后申请的线程比先申请的线程优先获取锁。在高并发的情况下,有可能会造成优先级反转或者饥饿现象。
2、可重入锁(递归锁)
可重入锁指的是同一线程外层函数获得锁之后,内层递归函数仍然能获取该锁的代码。在同一个线程在外层方法获取锁的时候,在进入内层方法会自动获取锁。也就是说,线程可以进入任何一个它已经拥有的锁所同步着的代码块。其最大的作用就是可以避免死锁。
synchronized 的重入锁实现机制
每个锁对象拥有一个锁计数器和一个指向持有该锁的线程的指针。当执行 monitorenter 时,如果目标锁对象的计数器为零,那么说明它没有被其他线程所持有,Java 虚拟机会将该锁对象的持有线程设置为当前线程,并且将其计数器加 1。在目标锁对象的计算器不为零的情况下,如果锁对象的持有线程是当前线程,那么 Java 虚拟机可以将其计数器加 1,否则需要等待,直至持有线程释放锁。当执行 monitorexit 时,Java 虚拟机则需要将锁对象的计数器减 1。计数器为零代表锁已被释放。
两个 synchronized 用的是同一把锁。
public class Demo2 {
public static synchronized void sendSMS() {
System.out.println(Thread.currentThread().getName() + "\t sendSMS");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
sendEmail();
}
public static synchronized void sendEmail() {
System.out.println(Thread.currentThread().getName() + "\t sendEmail");
}
public static void main(String[] args) {
new Thread(() -> {
sendSMS();
}, "t1").start();
new Thread(() -> {
sendEmail();
}, "t2").start();
}
}
运行结果:
t1 sendSMS
t1 sendEmail
t2 sendEmail
两个 lock 用的是同一把锁。
public class Demo2 implements Runnable {
public static void main(String[] args) {
Demo2 demo = new Demo2();
Thread t1 = new Thread(demo, "t1");
Thread t2 = new Thread(demo, "t2");
t1.start();
t2.start();
}
Lock lock = new ReentrantLock();
@Override
public void run() {
get();
}
public void get() {
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + "\t get()");
set();
} finally {
lock.unlock();
}
}
public void set() {
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + "\t set()");
} finally {
lock.unlock();
}
}
}
运行结果:
t1 get()
t1 set()
t2 get()
t2 set()
下面代码也能正常运行,但要求解锁和加锁出现的次数相同。
public void get() {
lock.lock();
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + "\t get()");
set();
} finally {
lock.unlock();
lock.unlock();
}
}
3、自旋锁(spinlock)
自旋锁是指尝试获取锁的线程不会立即阻塞,而是采用循环的方式去尝试获取锁,这样的好处是减少线程上下文切换的消耗,缺点是循环会消耗 CPU。
通过 CAS 操作完成自旋锁,A 线程先进来调用 myLock 方法,自己持有锁5 秒钟。B 随后进来后发现当前有线程持有锁,不是 null。所以只能通过自旋等待,直到 A 释放锁后 B 随后抢到。
public class Demo2 {
// 原子引用线程
AtomicReference<Thread> atomicReference = new AtomicReference<>();
public void myLock() {
Thread thread = Thread.currentThread();
System.out.println(Thread.currentThread().getName() + "\t myLock");
while (!atomicReference.compareAndSet(null, thread)) {
}
}
public void myUnlock() {
Thread thread = Thread.currentThread();
atomicReference.compareAndSet(thread, null);
System.out.println(Thread.currentThread().getName() + "\t myUnlock");
}
public static void main(String[] args) {
Demo2 demo = new Demo2();
new Thread(() -> {
demo.myLock();
// 暂停一会儿线程
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
demo.myUnlock();
}, "t1").start();
new Thread(() -> {
demo.myLock();
demo.myUnlock();
}, "t2").start();
}
}
运行结果:
t1 myLock
t2 myLock
t1 myUnlock
t2 myUnlock
4、读写锁
独占锁指该锁一次只能被一个线程所持有。对 ReentrantLock 和 synchronized 而言都是独占锁。共享锁指该锁可被多个线程锁持有。对于 ReentrantReadWriteLock 其读锁是共享锁,其写锁是独占锁。读锁的共享锁可保证并发读是非常高效的,读写,写读,写写的过程是互斥的。
不使用锁的情况:
public class Demo2 {
private volatile Map<String, Object> map = new HashMap<>();
public void put(String key, Object value) {
System.out.println(Thread.currentThread().getName() + "\t 正在写入" + key);
try {
TimeUnit.MICROSECONDS.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
map.put(key, value);
System.out.println(Thread.currentThread().getName() + "\t 写入完成");
}
public void get(String key) {
System.out.println(Thread.currentThread().getName() + "\t 正在读取");
try {
TimeUnit.MICROSECONDS.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
Object result = map.get(key);
System.out.println(Thread.currentThread().getName() + "\t 读取完成" + result);
}
public static void main(String[] args) {
Demo2 demo = new Demo2();
for (int i = 0; i < 5; i++) {
final int tmp = i;
new Thread(() -> {
demo.put(tmp + "", tmp + "");
}, String.valueOf(i)).start();
}
for (int i = 0; i < 5; i++) {
final int tmp = i;
new Thread(() -> {
demo.get(tmp + "");
}, String.valueOf(i)).start();
}
}
}
运行后,发现写操作被拆分,读操作有时读到了 null。写操作应该是原子+独占,整个过程必须是一个完整的统一体,中间不许被分割,被打断。使用读写锁的情况:
public class Demo2 {
private volatile Map<String, Object> map = new HashMap<>();
private ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock();
public void put(String key, Object value) {
reentrantReadWriteLock.writeLock().lock();
try {
System.out.println(Thread.currentThread().getName() + "\t 正在写入" + key);
try {
TimeUnit.MICROSECONDS.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
map.put(key, value);
System.out.println(Thread.currentThread().getName() + "\t 写入完成");
} catch (Exception e) {
e.printStackTrace();
} finally {
reentrantReadWriteLock.writeLock().unlock();
}
}
public void get(String key) {
reentrantReadWriteLock.readLock().lock();
try {
System.out.println(Thread.currentThread().getName() + "\t 正在读取");
try {
TimeUnit.MICROSECONDS.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
Object result = map.get(key);
System.out.println(Thread.currentThread().getName() + "\t 读取完成" + result);
} catch (Exception e) {
e.printStackTrace();
} finally {
reentrantReadWriteLock.readLock().unlock();
}
}
public static void main(String[] args) {
Demo2 demo = new Demo2();
for (int i = 0; i < 5; i++) {
final int tmp = i;
new Thread(() -> {
demo.put(tmp + "", tmp + "");
}, String.valueOf(i)).start();
}
for (int i = 0; i < 5; i++) {
final int tmp = i;
new Thread(() -> {
demo.get(tmp + "");
}, String.valueOf(i)).start();
}
}
}
六、CountDownLatch/CyclicBarrier/Semaphore
1、CountDownLatch
CountDownLatch 让一些线程阻塞直到另一个线程完成一系列操作后才被唤醒。CountDownLatch 主要有两个方法,当一个或多个线程调用 await 方法时,调用线程会被阻塞。其它线程调用 countDown 方法会将计数器减 1(调用 countDown 方法的线程不会阻塞),当计数器的值变为零时,因调用 await 方法被阻塞的线程会被唤醒,继续执行。
public class Demo2 {
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(6);
for (int i = 0; i < 6; i++) {
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "\t 上完自习,离开教室");
countDownLatch.countDown();
}, String.valueOf(i)).start();
}
countDownLatch.await();
System.out.println(Thread.currentThread().getName() + "\t 班长最后关门走人");
}
}
与枚举类结合使用:
public class Demo2 {
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(6);
for (int i = 1; i <= 6; i++) {
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "\t 国,被灭");
countDownLatch.countDown();
}, CountryEnum.getRetMessage(i).getRetMessage()).start();
}
countDownLatch.await();
System.out.println(Thread.currentThread().getName() + "\t 秦国,统一华夏");
}
}
enum CountryEnum {
ONE(1, "齐"), TWO(2, "楚"), THREE(3, "燕"), FOUR(4, "赵"), FIVE(5, "魏"), SIX(6, "韩");
private int retCode;
private String retMessage;
CountryEnum(int retCode, String retMessage) {
this.retCode = retCode;
this.retMessage = retMessage;
}
public static CountryEnum getRetMessage(int index) {
CountryEnum[] myArray = CountryEnum.values();
for (CountryEnum countryEnum : myArray) {
if (index == countryEnum.getRetCode()) {
return countryEnum;
}
}
return null;
}
public int getRetCode() {
return retCode;
}
public String getRetMessage() {
return retMessage;
}
}
2、CyclicBarrier
CyclicBarrier 的字面意思是可循环(Cyclic)使用的屏障(Barrier)。它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活,线程进入屏障通过 CyclicBarrier 的 await() 方法。
public class Demo2 {
public static void main(String[] args) {
CyclicBarrier countDownLatch = new CyclicBarrier(7, () -> {
System.out.println("召唤神龙");
});
for (int i = 1; i <= 7; i++) {
final int tmp = i;
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "\t 收集到第:" + tmp + "龙珠");
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}, String.valueOf(i)).start();
}
}
}
3、Semaphore
信号量(Semaphore)主要用于两个目的,一个是多个共享资源的互斥使用,另一个用于并发线程数的控制。
public class Demo2 {
public static void main(String[] args) {
// 假如有 3 个停车位
Semaphore semaphore = new Semaphore(3);
for (int i = 1; i <= 6; i++) {
new Thread(() -> {
try {
// 获取资源
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + "\t 抢到了车位");
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "\t 停车 3 秒后离开车位");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 释放资源
semaphore.release();
}
}, String.valueOf(i)).start();
}
}
}
七、阻塞队列
阻塞队列在数据结构中所起的作用大致如下图所示:
线程 1 往阻塞队列中添加元素,而线程 2 从阻塞队列中移除元素:
- 当阻塞队列是空时,从队列中获取元素的操作将会被阻塞;
- 当阻塞队列是满时,往队列里添加元素的操作将会被阻塞。
在多线程领域中,所谓阻塞,是指在某些情况下被挂起的线程。待条件满足时,被挂起的线程又会自动被唤醒。
好处
在 concurrent 包发布以前,在多线程环境下,我们每个程序员都必须去自己控制这些细节,尤其还要兼顾效率和线程安全。BlockingQueue 会控制什么时候需要阻塞线程,什么时候需要唤醒线程。
1、BlockingQueue 的核心方法
方法类型 | 抛出异常 | 特殊值 | 阻塞 | 超时 |
---|---|---|---|---|
插入 | add(e) | offer(e) | put(e) | offer(e,time,unit) |
移除 | remove() | poll() | take() | poll(time,unit) |
检查 | element() | peek() | 不可用 | 不可用 |
- 抛出异常:
- 当阻塞队列满时,再往队列里 add 插入元素会抛出 java.lang.IllegalStateException: Queue full;
- 当阻塞队列空时,在往队列里 remove 移除元素会抛出 java.util.NoSuchElementException。
- 特殊值:
- 插入方法,成功 true 失败 false;
- 移除方法,方法返回出列的元素,队列里面没有就返回 null。
- 一直阻塞:
- 当阻塞队列满时,生产者线程继续往队列里 put 元素,队列会一直阻塞生产线程直到 put 数据或响应中断退出;
- 当阻塞队列空时,消费者线程试图从队列里 take 元素,队列会一直阻塞消费者线程知道队列可用。
- 超时退出:
- 当阻塞队列满时,队列会阻塞生产者线程一定时间,超过后限时后生产者线程会退出。
2、常用子类
- ArrayBlockingQueue:由数组结构组成的有界阻塞队列,此队列按 FIFO(先进先出)原则对元素进行排序;
- LinkedBlockingQueue:由链表结构组成的有界(但大小默认值为 Integer.NAX_VALUE)阻塞队列,此队列按 FIFO 排序元素;
- PriorityBlockingQueue:支持优先级排序的无界阻塞队列;
- DelayQueue:使用优先级队列实现的延迟无界阻塞队列;
- SynchronousQueue:不存储元素的阻塞队列,即单个元素的队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态;
- LinkedTransferQueue:由链表结构组成的无界阻塞队列;
- LinkedBlockingDeque:由链表结构组成的双向阻塞队列;
用在哪里
- 生产者消费者模式:传统版、阻塞版
- 线程池
- 消息中间件
八、生产者消费者模式
一个初始值为零的变量,两个线程对其交替进行加减操作。lock->await->singal 替代 sync->wait->notify。唤醒线程后必须使用 while 进行判断,如果使用 if 进行判断,两个以上的线程就会出现问题。
- await():会释放当前锁,进入等待状态;
- signal():会唤醒某个等待线程;
- signalAll():会唤醒所有等待线程;
- 唤醒线程从 await() 返回后需要重新获得锁。
1、普通版
public class ProdConsumer {
public static void main(String[] args) {
ShareData shareData = new ShareData();
new Thread(() -> {
for (int i = 1; i <= 5; i++) {
try {
shareData.increment();
} catch (Exception e) {
e.printStackTrace();
}
}
}, "AA").start();
new Thread(() -> {
for (int i = 1; i <= 5; i++) {
try {
shareData.decrement();
} catch (Exception e) {
e.printStackTrace();
}
}
}, "BB").start();
}
}
class ShareData {
private int number = 0;
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
public void increment() {
lock.lock();
try {
// 判断
while (number != 0) {
// 等待
condition.await();
}
// 干活
number++;
System.out.println(Thread.currentThread().getName() + "\t" + number);
// 通知唤醒
condition.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void decrement() {
lock.lock();
try {
// 判断
while (number == 0) {
// 等待
condition.await();
}
// 干活
number--;
System.out.println(Thread.currentThread().getName() + "\t" + number);
// 通知唤醒
condition.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
2、阻塞队列版
public class Demo2 {
public static void main(String[] args) {
MyResource myResource = new MyResource(new ArrayBlockingQueue<>(10));
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "\t 生产线程启动");
try {
myResource.myProd();
} catch (Exception e) {
e.printStackTrace();
}
}, "Prod").start();
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "\t 消费线程启动");
try {
myResource.myConsumer();
} catch (Exception e) {
e.printStackTrace();
}
}, "Consumer").start();
try {
TimeUnit.SECONDS.sleep(7);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println();
System.out.println();
System.out.println();
System.out.println("5 秒钟时间到,main 线程叫停,活动结束");
myResource.stop();
}
}
class MyResource {
// 默认开启,进行生产 + 消费
private volatile boolean FLAG = true;
private AtomicInteger atomicInteger = new AtomicInteger();
BlockingQueue<String> blockingQueue = null;
public MyResource(BlockingQueue<String> blockingQueue) {
this.blockingQueue = blockingQueue;
System.out.println(blockingQueue.getClass().getName());
}
public void myProd() throws Exception {
String data = null;
boolean retValue;
while (FLAG) {
data = atomicInteger.incrementAndGet() + "";
retValue = blockingQueue.offer(data, 2L, TimeUnit.SECONDS);
if (retValue) {
System.out.println(Thread.currentThread().getName() + "\t 插入队列" + data + "成功");
} else {
System.out.println(Thread.currentThread().getName() + "\t 插入队列" + data + "失败");
}
TimeUnit.SECONDS.sleep(1);
}
System.out.println(Thread.currentThread().getName() + "\t 大老板叫停了,表示 FLAG=false,生产动作结束");
}
public void myConsumer() throws Exception {
String result = null;
while (FLAG) {
result = blockingQueue.poll(2L, TimeUnit.SECONDS);
if (result == null || result.equals("")) {
FLAG = false;
System.out.println(Thread.currentThread().getName() + "\t 超过两秒没有取到蛋糕,消费退出");
return;
}
System.out.println(Thread.currentThread().getName() + "\t 消费队列蛋糕" + result + "成功");
}
}
public void stop() {
this.FLAG = false;
}
}
九、线程同步
多线程之间按顺序调用,实现 A->B->C 三个线程启动,要求如下:AA 打印 5 次、BB 打印 10 次、CC 打印 15 次。
public class ShareResource {
/**
* A:1 B:2 C:3
*/
private int number = 1;
private Lock lock = new ReentrantLock();
private Condition c1 = lock.newCondition();
private Condition c2 = lock.newCondition();
private Condition c3 = lock.newCondition();
public void print5() {
lock.lock();
try {
// 判断
while (number != 1) {
c1.await();
}
// 干活
for (int i = 1; i <= 5; i++) {
System.out.println(Thread.currentThread().getName() + "\t" + i);
}
// 通知
number = 2;
c2.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void print10() {
lock.lock();
try {
// 判断
while (number != 2) {
c2.await();
}
// 干活
for (int i = 1; i <= 10; i++) {
System.out.println(Thread.currentThread().getName() + "\t" + i);
}
// 通知
number = 3;
c3.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void print15() {
lock.lock();
try {
// 判断
while (number != 3) {
c3.await();
}
// 干活
for (int i = 1; i <= 15; i++) {
System.out.println(Thread.currentThread().getName() + "\t" + i);
}
// 通知
number = 1;
c1.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public static void main(String[] args) {
ShareResource shareResource = new ShareResource();
new Thread(() -> {
for (int i = 1; i < 10; i++) {
shareResource.print5();
}
}, "A").start();
new Thread(() -> {
for (int i = 1; i < 10; i++) {
shareResource.print10();
}
}, "B").start();
new Thread(() -> {
for (int i = 1; i < 10; i++) {
shareResource.print15();
}
}, "C").start();
}
}
十、synchronized 和 Lock 的区别
原始构成
- synchronized 是关键字属于 JVM 层面:monitorenter 和 monitorexit底层是通过 monitor 对象来完成,其实 wait/notify 等方法也依赖于 monitor 对象。
- Lock 是具体类 java.util.concurrent.locks.Lock 是 api 层面的锁。
使用方法
- synchronized 不需要用户去手动释放锁,当 synchronized 代码执行完后系统会自动让线程释放对锁的占用。
- ReentrantLock 则需要用户去手动释放锁,若没有主动释放锁,就有可能导致出现死锁现象。
等待是否中断
- synchronized 不可中断,除非抛出异常或者正常运行完成。
- ReentrantLock 可中断:设置超时方法 tryLock(long timeout, TimeUnit unit)、lockInterruptibly() 房代码块中,调用 Thread.interrupted() 方法可中断。
加锁是否公平
- synchronized 是非公平锁。
- ReentrantLock 两者都可以,默认公平锁,通过构造方法传 boolean 值,true 为公平锁,false 为非公平锁。
锁绑定多个条件 Condition
- synchronized 没有。
- ReentrantLock 用来实现分组唤醒需要唤醒的线程们,可以精确唤醒,而不是像 synchronized 要么随机唤醒一个线程要么唤醒全部线程。
十一、线程池用过吗?谈谈你对 ThreadPoolExecutor 的理解
1、创建线程的四种方法
(1)Thread 类
继承 Thread 类并覆盖 run() 方法:run() 为线程类的核心方法,相当于主线程的 main 方法,是每个线程的入口。一个线程调用两次 start() 方法将会抛出线程状态异常(IllegalThreadStateException),也就是的 start() 只可以被调用一次。
public class MyThread1 extends Thread {
@Override
public void run() {
System.out.println("子线程启动,ID为:" + Thread.currentThread().getId() + ",名字为:" + Thread.currentThread().getName());
}
}
class Test {
public static void main(String[] args) {
// 创建一个线程并开启线程
MyThread1 thread = new MyThread1();
thread.start();
// 多创建几个线程
new MyThread1().start();
new MyThread1().start();
new MyThread1().start();
}
}
(2)Runnable 接口
实现 Runnable 接口并覆盖 run() 方法,实现多线程可以避免单继承局限性。
public class MyRunnable implements Runnable {
@Override
public void run() {
System.out.println("子线程启动,ID为:" + Thread.currentThread().getId() + ",名字为" + Thread.currentThread().getName());
}
}
class Test1 {
public static void main(String[] args) {
// 通过将Runnable对象传入Thread构造函数来创建线程,并开启线程
Runnable runnable = new MyRunnable();
Thread thread1 = new Thread(runnable, "线程1");
thread1.start();
// 一个Runnable对象可以用来创建多个线程
new Thread(runnable, "线程2").start();
new Thread(runnable, "线程3").start();
new Thread(runnable, "线程4").start();
}
}
(3)Callable 接口
实现 Callable 接口并覆盖 call() 方法:call() 有返回值,可以向上抛异常。futureTask.get() 只有在线程执行完才能返回结果,否则会阻塞。
public class MyThread2 implements Callable<String> {
@Override
public String call() throws Exception {
System.out.println("子线程启动,ID为:" + Thread.currentThread().getId() + ",名字为" + Thread.currentThread().getName());
return Thread.currentThread().getName();
}
}
class Test2 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTask<String> futureTask = new FutureTask<>(new MyThread2());
new Thread(futureTask, "线程1").start();
// 多次创建只会执行一次
new Thread(futureTask, "线程2").start();
System.out.println(futureTask.get());
}
}
使用同一个 FutureTask 创建多个线程,只会执行一次。运行结果:
子线程启动,ID为:12,名字为线程1
线程1
(4)线程池
Java 中的线程池是通过 Executor 框架实现,线程池通过 ThreadPoolExecutor 创建。
线程池的优势
线程池做的工作主要是控制运行的线程的数量,处理过程中将任务放入队列,然后在线程创建后启动这些任务。如果线程数量超过了最大数量,超过数据的线程排队等候,等其它线程执行完毕,再从线程中取出任务来执行。
特点:线程复用、控制最大并发数、管理线程。
- 降低资源消耗,通过重复利用已创建的线程,降低线程创建和销毁造成的消耗;
- 提高响应速度,当任务到达时,任务可以不需要等待线程创建就能立即执行;
- 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。
创建线程池的常见方法:
- Executors.newScheduledThreadPool(int);:有计划性的线程池,就是在给定的延迟之后运行或周期性地执行。
- Executors.newWorkStealingPool();:java 8新增,它不是ThreadPoolExecutor 的扩展,它是新的线程池类 ForkJoinPool 的扩展。能够合理的使用 CPU 进行对任务并行操作(使用目前机器上可用的处理器作为它的并行级别),所以适合使用在很耗时的任务中
- Executors.newFixedThreadPool(int);:创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。执行长期的任务,性能好很多。
- Executors.newSingleThreadExecutor();:创建一个单线程的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序执行。一个任务一个任务执行的场景
- Executors.newCachedThreadPool();:创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空线程,若无可回收,则新建线程。适用:执行很多短期异步的小程序或者负载较轻的服务器。
public class MyThreadPool {
public static void main(String[] args) {
// 创建 5 个线程
// ExecutorService threadPool = Executors.newFixedThreadPool(5);
// 单个线程
// ExecutorService threadPool = Executors.newSingleThreadExecutor();
// N 个线程
ExecutorService threadPool = Executors.newCachedThreadPool();
try {
// 模拟 10 个 用户来办理业务,每个用户就是一个来自外部的请求线程
for (int i = 1; i <= 10; i++) {
threadPool.execute(() -> {
System.out.println(Thread.currentThread().getName() + "\t 办理业务");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
threadPool.shutdown();
}
}
}
不同方法的运行结果:
newFixedThreadPool 方法的运行结果
pool-1-thread-2 办理业务
pool-1-thread-1 办理业务
pool-1-thread-4 办理业务
pool-1-thread-5 办理业务
pool-1-thread-5 办理业务
pool-1-thread-5 办理业务
pool-1-thread-3 办理业务
pool-1-thread-4 办理业务
pool-1-thread-1 办理业务
pool-1-thread-2 办理业务
newSingleThreadExecutor 方法的运行结果
pool-1-thread-1 办理业务
pool-1-thread-1 办理业务
pool-1-thread-1 办理业务
pool-1-thread-1 办理业务
pool-1-thread-1 办理业务
pool-1-thread-1 办理业务
pool-1-thread-1 办理业务
pool-1-thread-1 办理业务
pool-1-thread-1 办理业务
pool-1-thread-1 办理业务
newCachedThreadPool 方法的运行结果
pool-1-thread-2 办理业务
pool-1-thread-4 办理业务
pool-1-thread-3 办理业务
pool-1-thread-1 办理业务
pool-1-thread-5 办理业务
pool-1-thread-6 办理业务
pool-1-thread-7 办理业务
pool-1-thread-8 办理业务
pool-1-thread-9 办理业务
pool-1-thread-2 办理业务
2、创建线程池的七个参数的意思
参数意义如下:
- corePoolSize:线程池中的常驻核心线程数。
- maximumPoolSize:线程池能够容纳,同时执行的最大线程数,此值必须大于等于 1。
- keepAliveTime:多余的空闲线程的存活时间。当前线程池数量超过 corePoolSize 时,当空闲时间达到 keepAliveTime 值时,多余空闲线程会被销毁知到只剩下 corePoolSize 个线程为止。
- unit:keepAliveTime 的单位
- workQueue:任务队列,被提交但尚未被执行的任务。
- threadFactory:表示生成线程池中工作线程的线程工厂,用于创建线程,一般用默认的即可。
- handler:拒绝策略,表示当队列满时,并且工作线程大于等于线程池的最大线程(maximumPoolSize)时,如何来拒绝请求执行的 runnable 的策略。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
- 在创建了线程池后,等待提交过来的任务请求;
- 当调用 execute() 方法添加一个请求任务时,线程池会做如下判断:
- 如果正在运行的线程数量小于 corePoolSize,那么马上创建线程运行这个任务;
- 如果正在运行的线程数大于等于 corePoolSize,那么将这个任务放入队列;
- 如果队列满了且正在运行的线程数量还小于 maximumPoolSize,那么要创建非核心线程立即运行这个任务;
- 如果队列满了且正在运行的线程数量大于等于 maximumPoolSize,那么线程池会启动饱和拒绝策略来执行。
- 当一个线程完成任务时,它会从队列中取下一个任务来执行。
- 当一个线程无事可做,超过一定的时间(keepAliveTime)时,线程池会判断:如果当前运行的线程数大于 corePoolSize,那么这个线程就被停掉。所以线程池的所有任务完成后,它最终会收缩到 corePoolSize 的大小。
3、线程池的拒绝策略
当等待队列已经排满,再也塞不下新任务了。同时,线程池中的 max 线程也达到了,无法继续为新任务服务。这时,需要拒绝策略机制合理的处理这些任务。
- AbortPolicy(默认):直接抛出 RejectedExecutionException 异常阻止系统正常运行;
- CallerRunsPolicy:“调用者运行”一种调节机制,该策略既不会抛弃任务,也不会抛出异常,而是将某些任务回退到调用者,从而降低新任务的流量。
- DiscardOldestPolicy:抛弃队列中等待最久的任务,然后把当前任务加入队列中尝试再次提交当前任务。
- DiscardPolicy:直接丢弃任务,不给予任何处理也不抛出异常。如果允许任务丢失,这是最好的方案。
public class MyThreadPool {
public static void main(String[] args) {
ExecutorService threadPool = new ThreadPoolExecutor(
2,
5,
1L,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
try {
// 模拟 10 个 用户来办理业务,每个用户就是一个来自外部的请求线程
for (int i = 1; i <= 10; i++) {
threadPool.execute(() -> {
System.out.println(Thread.currentThread().getName() + "\t 办理业务");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
threadPool.shutdown();
}
}
}
AbortPolicy 拒绝策略的运行结果,抛出异常阻止系统正常运行。
pool-1-thread-1 办理业务
pool-1-thread-5 办理业务
pool-1-thread-4 办理业务
pool-1-thread-3 办理业务
pool-1-thread-2 办理业务
pool-1-thread-4 办理业务
pool-1-thread-5 办理业务
pool-1-thread-1 办理业务
java.util.concurrent.RejectedExecutionException: Task spring.Demo.ThreadDemo.MyThreadPool$$Lambda$1/186276003@7506e922 rejected from java.util.concurrent.ThreadPoolExecutor@4ee285c6[Running, pool size = 5, active threads = 2, queued tasks = 0, completed tasks = 6]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
at spring.Demo.ThreadDemo.MyThreadPool.main(MyThreadPool.java:19)
CallerRunsPolicy 拒绝策略的运行结果,让调用者来运行服务器。
pool-1-thread-1 办理业务
pool-1-thread-5 办理业务
pool-1-thread-4 办理业务
pool-1-thread-4 办理业务
pool-1-thread-3 办理业务
pool-1-thread-2 办理业务
main 办理业务
pool-1-thread-5 办理业务
pool-1-thread-1 办理业务
pool-1-thread-5 办理业务
DiscardOldestPolicy 和 DiscardPolicy拒绝策略的运行结果,两种策略处理非常相似。DiscardOldestPolicy 丢弃的是当前队列中最老的任务;DiscardPolicy 策略会直接将溢出任务丢弃。
pool-1-thread-1 办理业务
pool-1-thread-4 办理业务
pool-1-thread-5 办理业务
pool-1-thread-2 办理业务
pool-1-thread-3 办理业务
pool-1-thread-5 办理业务
pool-1-thread-4 办理业务
pool-1-thread-1 办理业务
4、工作中使用哪种方式创建线程
一个都不用,生产上只使用自定义的线程池。线程池不建议使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式去创建,这样可以规避资源耗尽的风险。Executors 返回的线程池对象的弊端如下:
- FixedThreadPool 和 SingleThreadExecutor:允许的请求队列长度 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM。
- CachedThreadPool 和 ScheduledThreadPool:允许的创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,从而导致 OOM。
5、怎样合理配置线程池
获取 CPU 核数:Runtime.getRuntime().availableProcessors();
CPU 密集
CPU 密集的意思是该任务需要大量的运算,而没有阻塞,CPU 一直全速运行。CPU 密集任务只有在真正的多核 CPU 上才可能得到加速(通过多线程),在单核 CPU 上,无论你开几个模拟的多线程,该任务都不可能得到加速,因为 CPU 总的运算能力就那些。
CPU 密集型任务配置尽可能少的线程数量:$CPU 核数+1个线程得到线程池$。
IO密集型
IO 密集型,即该任务需要大量的 IO,即大量的阻塞。在单线程上运行 IO 密集型的任务会导致,大量的 CPU 运算能力浪费在等待。所以在 IO 密集型任务中使用多线程可以大大的加速程序运行,即使在单核 CPU 上,这种加速主要就是利用了被浪费掉的阻塞时间。
IO 密型型时,大部分线程都阻塞,故需要多配置线程数:参考公式:$CPU 核数/(1-阻塞系数)$,阻塞系数在 0.8~0.9 之间,比如:8 核 CPU $8/(1-0.9)=80个线程数$。
十二、死锁编码及定位分析
死锁是指两个或两个以上的线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力干涉那它们都将无法推进下去。如果系统资源充足,线程的资源请求都能够得到满足,死锁出现的可能性就很低,否则就会因争夺有限的资源而陷入死锁。
- 系统资源不足;
- 线程运行推荐的顺序不合适;
- 资源分配不当。
会发生死锁的代码:
public class HoldLockThread implements Runnable {
private String lockA;
private String lockB;
public HoldLockThread(String lockA, String lockB) {
this.lockA = lockA;
this.lockB = lockB;
}
@Override
public void run() {
synchronized (lockA) {
System.out.println(Thread.currentThread().getName() + "\t 自己持有:" + lockA + "\t 尝试获取:" + lockB);
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (lockB) {
System.out.println(Thread.currentThread().getName() + "\t 自己持有:" + lockB);
}
}
}
}
class Test3 {
public static void main(String[] args) {
String lockA = "lockA";
String lockB = "lockB";
new Thread(new HoldLockThread(lockA, lockB), "ThreadAAA").start();
new Thread(new HoldLockThread(lockB, lockA), "ThreadBBB").start();
}
}
使用 jps 命令定位进程号,jstack 查找死锁。
C:\software\IntelliJ IDEA Workspace\springBoot\src\main\java\spring\Demo\ThreadDemo>jps -l
30736
28900 sun.tools.jps.Jps
44932 finalshell.jar
16360 org.jetbrains.jps.cmdline.Launcher
20092 spring.Demo.ThreadDemo.Test3
45820 org.jetbrains.kotlin.daemon.KotlinCompileDaemon
C:\software\IntelliJ IDEA Workspace\springBoot\src\main\java\spring\Demo\ThreadDemo>jstack 20092
...
Java stack information for the threads listed above:
===================================================
"ThreadBBB":
at spring.Demo.ThreadDemo.HoldLockThread.run(HoldLockThread.java:25)
- waiting to lock <0x00000000d6a9c478> (a java.lang.String)
- locked <0x00000000d6a9c4b0> (a java.lang.String)
at java.lang.Thread.run(Thread.java:748)
"ThreadAAA":
at spring.Demo.ThreadDemo.HoldLockThread.run(HoldLockThread.java:25)
- waiting to lock <0x00000000d6a9c4b0> (a java.lang.String)
- locked <0x00000000d6a9c478> (a java.lang.String)
at java.lang.Thread.run(Thread.java:748)
Found 1 deadlock.
十三、LockSupport
LockSupport 是用来创建锁和其他同步类的基本线程阻塞原语。park() 和 unpark() 的作用分别是阻塞线程和解除阻塞线程。
线程等待唤醒机制
- 使用 Object 中的 wait/notify 方法实现。
- Condition 接口中的 await/signal 方法实现。
- LockSupport 类中的 park/unpark 方法实现。
1、wait/notify
使用 Object 中的 wait() 方法让线程等待,使用 Object 中的 notify() 方法唤醒线程。
- wait 和 notify 不在同步代码块中,会报错。
- notify 在 wait 方法前执行会无法唤醒。
public static void main(String[] args) throws InterruptedException {
new Thread(() -> {
// 睡 3 秒,让 B 线程先唤醒。
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (object) {
System.out.println(Thread.currentThread().getName() + "\t" + "-----执行");
try {
object.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "\t" + "-----被唤醒");
}
}, "A").start();
new Thread(() -> {
synchronized (object) {
object.notify();
System.out.println(Thread.currentThread().getName() + "\t" + "-----唤醒");
}
}, "B").start();
}
2、await/signal
使用 JUC 包中的 Condition 的 await() 方法让线程等待,使用 signal() 方法唤醒线程。也会出现 wait/notify 相同的问题。
static Lock lock = new ReentrantLock();
static Condition condition = lock.newCondition();
public static void main(String[] args) throws InterruptedException {
new Thread(() -> {
// 睡 3 秒,让 B 线程先唤醒。
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + "\t" + "-----执行");
condition.await();
System.out.println(Thread.currentThread().getName() + "\t" + "-----被唤醒");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}, "A").start();
new Thread(() -> {
lock.lock();
try {
condition.signal();
System.out.println(Thread.currentThread().getName() + "\t" + "-----通知");
} finally {
lock.unlock();
}
}, "B").start();
}
传统的 synchronized 和 lock 实现等待唤醒通知的约束
- 线程先要获得并持有锁,必须在锁块(synchronized 或 lock)中。
- 必须要先等待后唤醒,线程才能够被唤醒。
3、LockSupport 类
LockSupport 类可以阻塞当前线程以及唤醒指定被阻塞的线程。所有的方法都是静态方法,可以让线程在任意位置阻塞,阻塞之后也有对应的唤醒方法。
LockSupport 提供 park() 和 unpark() 方法实现阻塞线程和解除线程阻塞的过程。LockSupport 和每个使用它的线程都有一个许可(permit)关联,permit 相当于(1, 0)开关,默认是 0。
调用 park 时:
park() / park(Object blocker):阻塞当前线程/阻塞传入的具体线程。permit 默认是 0,所有一开始调用 park() 方法,当前线程会阻塞,直到别的线程将当前线程的 permit 设置为 1 时,park 方法会被唤醒,然后会将 permit 再次设置为 0 并返回。
线程阻塞需要消耗凭证(permit),这个凭证最多只有一个:
- 如果有凭证,则会直接消耗掉这个凭证然后正常退出;
- 如果无凭证,就必须阻塞等待凭证可用。
public static void park() {
UNSAFE.park(false, 0L);
}
调用 unpark 时:
unpark(Thread thread):唤醒处于阻塞状态的指定线程。调用 unpark(thread) 方法后,就会将 thread 线程的许可 permit 设置成 1(多次调用 unpark 方法,不会累加,permit 值还是 1)会自动唤醒 thread 线程,即之前阻塞中的 LockSupport.park() 方法会立即返回。LockSupport 支持先唤醒后等待。
public static void main(String[] args) throws InterruptedException {
Thread a = new Thread(() -> {
// 睡 3 秒,让 B 线程先唤醒。
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "\t" + "-----执行");
LockSupport.park();
System.out.println(Thread.currentThread().getName() + "\t" + "-----被唤醒");
}, "A");
a.start();
new Thread(() -> {
LockSupport.unpark(a);
System.out.println(Thread.currentThread().getName() + "\t" + "-----通知");
}, "B").start();
}
因为凭证数量最多为 1,连续调用两次 unpark 和调用一次 unpark 效果一样,只会增加一个凭证。而调用两次 park 却需要消费两个凭证,所以下面代码会阻塞。
public static void main(String[] args) throws InterruptedException {
Thread a = new Thread(() -> {
// 睡 3 秒,让 B 线程先唤醒。
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "\t" + "-----执行");
LockSupport.park();
LockSupport.park();
System.out.println(Thread.currentThread().getName() + "\t" + "-----被唤醒");
}, "A");
a.start();
new Thread(() -> {
LockSupport.unpark(a);
LockSupport.unpark(a);
System.out.println(Thread.currentThread().getName() + "\t" + "-----通知");
}, "B").start();
}
十四、AbstractQueuedSynchronizer(AQS)
AQS 是抽象的队列同步器,用来构建锁或者其他同步器组件的重量级基础框架及整个 JUC 体系的基石,通过内置的 FIFO 队列来完成资源获取线程的排队工作,并通过一个 int 类型变量表示持有锁的状态。
- 锁:面向锁的使用者:定义了程序员和锁交互的使用 API,隐藏了实现细节。
- 同步器:面向锁的实现者:提供统一规范并简化锁的实现,屏蔽了同步状态管理、阻塞线程排队和通知、唤醒机制等。
1、AQS 的作用
加锁会导致阻塞:有阻塞就需要排队,实现排队必然需要某种形式的队列来进行管理。抢到资源的线程直接使用处理业务逻辑,抢不到资源的必然涉及一种排队等候机制。抢占资源失败的线程继续等待,但等待线程仍然保留获取锁的可能且获取锁流程扔在继续。
CLH 是一个单向链表,AQS 中的队列是 CLH 变体的虚拟双向队列 FIFO。
和 AQS 有关的类:ReentrantLock、CountDownLatch、ReentrantReadWriteLock、Semaphore 等。
2、AQS 的内部体系架构
如果共享资源被占用,就需要一定的阻塞等待唤醒机制来保证锁分配。这个机制主要用的是 CLH 队列的变体实现的,将暂时获取不到锁的线程加入到队列中,这个队列就是 AQS 的抽象表现。它将请求共享资源的线程封装成队列的结点(Node)。通过 CAS 自旋以及 LockSupport.park() 的方式,维护 state 变量的状态,使并发达到同步的控制效果。
AQS 使用一个 volatile 的 int 类型的成员变量来表示同步状态,通过内置的 FIFO 队列来完成资源获取的排队工作将每条要去抢占资源的线程封装成一个 Node 节点来实现锁的分配,通过 CAS 完成对 State 值的修改。
- AQS 的同步状态 state 成员变量:0 表示没有线程,可以执行;大于等于 1 表示有线程占用,其他线程等待。
- AQS 的 CLH 队列:队列为一个双向队列,存储等待的线程。通过自旋等待,state 变量判断是否阻塞,从队尾入队,从队首出队。
- 内部类 Node(AQS 类的内部类):成员变量 waitStates 表示当前线程的等待状态。
AQS 同步队列的基本结构
3、ReentrantLock 中的非公平锁和公平锁
Lock 接口的实现类,基本都是通过聚合了一个队列同步器的子类完成线程访问控制的。
(1)非公平锁
NonfairSync 类的非公平锁获取同步状态。
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
(2)公平锁
FairSync 类的公平锁获取同步状态。
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
(3)非公平锁和公平锁的不同
可以明显看出公平锁与非公平锁的 lock() 方法唯一的区别就在于公平锁在获取同步状态时多了一个限制条件 !hasQueuedPredecessors()
,它是公平锁获取锁前,判断等待队列中是否存在有效节点的方法。
public final boolean hasQueuedPredecessors() {
// 判断是否需要排队,保证了不论是新的线程还是已经排队的线程都顺序使用锁
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
- 非公平锁在调用 lock 后,首先就会调用 CAS 进行一次抢锁,如果这个时候恰巧锁没有被占用,那么直接就获取到锁返回了。
- 非公平锁在 CAS 失败后,和公平锁一样都会进入到 tryAcquire 方法,在 tryAcquire 方法中,如果发现锁这个时候被释放了(state == 0),非公平锁会直接 CAS 抢锁,但是公平锁会判断等待队列是否有线程处于等待状态,如果有则不去抢锁,乖乖排到后面。
公平锁和非公平锁就这两点区别,如果这两次 CAS 都不成功,那么后面非公平锁和公平锁是一样的,都要进入到阻塞队列等待唤醒。相对来说,非公平锁会有更好的性能,因为它的吞吐量比较大。当然,非公平锁让获取锁的时间变得更加不确定,可能会导致在阻塞队列中的线程长期处于饥饿状态。
4、ReentrantLock 类的 lock 方法
ReentrantLock 类的加锁过程:(源码以非公平锁为例)
- 尝试加锁;
- 加锁失败,线程入队列;
- 线程入队了后,进入阻塞状态。
final void lock() {
// 当前线程进行抢占锁
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
// 排队线程
acquire(1);
}
(1)acquire 方法
当前线程锁抢占失败,执行 acquire 方法。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
(2)tryAcquire 方法
然后执行 tryAcquire,再次尝试获取锁。
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
// 获取锁的状态
int c = getState();
// 如果锁没被占用则进行抢占
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 如果当前锁被占用,但是是自己占用,则对 state++ (acquires 值为 1) ,可重入锁实现原理
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
(3)addWaiter 方法
没抢到锁,则返回 false,然后执行 addWaiter(Node.EXCLUSIVE),线程开始入队。
private Node addWaiter(Node mode) {
// 线程入队,创建 Node 节点
Node node = new Node(Thread.currentThread(), mode);
// 获取尾节点
Node pred = tail;
// 尾节点为 null ,表示队列没一个节点
if (pred != null) {
// 不为空则直接添加
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 创建队列
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
// 判断尾指针是否为 null
if (t == null) {
// 初始化队列,创建一个空节点作为头节点(占位)
if (compareAndSetHead(new Node()))
tail = head;
} else {
// 将当前节点入队
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
(4)acquireQueued 方法
线程进入阻塞队列后执行 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
// 自旋
for (;;) {
// 获取当前节点的前驱
final Node p = node.predecessor();
// 判断前驱是否为头节点(保证先进先出 FIFO),tryAcquire 再次去请求锁
if (p == head && tryAcquire(arg)) {
// 初始化当前节点,并设置为头节点
setHead(node);
// 将头节点的 next 指针断开,等待 GC
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 防止抛异常
if (failed)
cancelAcquire(node);
}
}
// 获取前驱
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 获取前驱节点的状态
int ws = pred.waitStatus;
// 如果为 Node.SIGNAL 则直接返回,即等待被占用资源释放,返回 true
// 接着调用 parkAndCheckInterrupt 方法
if (ws == Node.SIGNAL)
return true;
// 大于 0 说明是 CANCELLED 状态
if (ws > 0) {
// 循环判断前驱节点的前驱节点是否也为 CANCELLED 状态,忽略该状态的节点,重新连接队列
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 将当前节点得到前驱节点设置为 Node.SIGNAL 状态,用于后续唤醒操作
// 程序第一次执行到这返回 false,第二次执行值为 Node.SIGNAL,返回 true
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
// 多次争抢锁没抢到,则阻塞当前线程
private final boolean parkAndCheckInterrupt() {
// 线程挂起,程序不会继续执行
LockSupport.park(this);
// 根据 park 方法 API 描述,程序在下述三种情况下会继续执行
// 1. 被 unpark 2. 被中断(interrupt)3. 其他不合逻辑的返回才会继续向下执行
// 返回当前线程的中断状态,并情况中断状态。如果由于被中断,该方法会返回 true
return Thread.interrupted();
}
5、ReentrantLock 类的 unlock 方法
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
// 如果锁已经释放,则判断头节点是否为空,状态是否为 0,即判断是否有线程阻塞
if (h != null && h.waitStatus != 0)
// 开始唤醒线程
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
// 更改当前锁的状态
int c = getState() - releases;
// 当前线程必须为拿锁的线程
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
// 锁是否完全释放,可能是重入锁,只更改状态
boolean free = false;
// 状态为 0,则设置当前锁没有线程占用
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
private void unparkSuccessor(Node node) {
// 将头节点状态设置为 0
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 获取头结点的下一个节点
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
// 获取下一个合法的线程进行唤醒(从后往前),这里为什么不从前往后找呢?
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 唤醒 s 线程
if (s != null)
LockSupport.unpark(s.thread);
}
为什么不从前往后找呢?
通常要 unpark 的线程就在下一个节点(这也是我们认为从前往后找的事实依据),但是这里只是一个判断,一般不会出现。但当出现后继是 null 的情况,我只能从后面找一个真正的后继。(在这里要是 tail.prev 莫名其妙为 null 了,那 s 就为null,就不做 unpark 了。
标题:高并发面试题总结
作者:Yi-Xing
地址:http://zyxwmj.top/articles/2021/02/21/1613874576713.html
博客中若有不恰当的地方,请您一定要告诉我。前路崎岖,望我们可以互相帮助,并肩前行!