为什么要用多线程
当我们需要做类似 IO 这种慢操作,可以开多个线程,尽量不要让 CPU 空闲下来,提高系统的资源利用率。如果是计算型的操作,本来 CPU 就不空闲,还开很多的线程就不对了(有多线程就会有线程切换的问题,线程切换都是需要耗费资源的)
多线程是为了同步完成多项任务,不是为了提高程序运行效率,而是通过提高资源使用效率来提高系统的效率。
并发编程三要素
可见性
一个线程对共享变量的修改,能够被其他线程看到。Java 通过 volatile
、final
、锁(如 synchronized
)来实现可见性。
原子性
原子性是指操作是不可分的,要么全部一起执行,要么不执行。
比如 int a=0
这个操作是不可分割的,那么我们说这个操作时原子操作。再比如 a++
这个操作实际是 a=a+1
是可分割的,所以它不是一个原子操作。
非原子操作都会存在线程安全问题,需要我们使用同步技术(sychronized,CAS)来让它变成一个原子操作。
有序性
有序性指的是程序按照代码的先后顺序执行。
为了性能优化,编译器和处理器会进行指令重排序,有时候会改变程序语句的先后顺序。
int a = 1; //1
int b = 20; //2
int c = a + b; //3
编译器优化后可能变成
int b = 20; //1
int a = 1; //2
int c = a + b; //3
上面这个例子中,编译器调整了语句的顺序,但是不影响程序的最终结果。
线程的创建
- 继承 Thread 类
- 使用继承 Thread 类的方法来创建线程类时,多个线程之间无法共享线程类的实例变量
// 定义 Thread 类的子类
public class MyThread extends Thread {
// 重写 Thread 类中的 run() 方法,线程执行体
public void run() {
}
}
public class Demo {
public static void main(String[] args) {
Thread t = new MyThread(); // 创建 Thread 子类的对象
t.start(); // 调用线程对象的 start() 方法来启动该线程
}
}
// 使用匿名内部类的方式创建
new Thread() {
public void run() {
}
}.start();
- 实现 Runnable 接口
- 采用 Runnable 接口的方式创建的多个线程可以共享同一个 target 对象的实例变量
- void run():使用实现接口 Runnable 的对象创建一个线程时,启动该线程将导致在独立执行的线程中调用对象的 run 方法
// 定义 Runnable 接口的实现类
public class MyRunnable implements Runnable {
// 重写 Runnable 接口中的 run() 方法,线程执行体
public void run() {
}
}
public class Demo {
public static void main(String[] args) {
// 创建 Runnable 实现类的对象 target
Runnable target = new MyRunnable();
// 将 target 作为运行目标来创建创建 Thread 类的对象
Thread t = new Thread(target, "线程名");
// 调用线程对象的 start() 方法来启动该线程
t.start();
}
}
// 使用匿名内部类的方式创建
new Thread(new Runnable() {
public void run() {
}
}).start();
- 实现 Callable 接口
- Callable 接口提供了一个 call() 方法(可以有返回值,可以声明抛出异常)可以作为线程执行体,Callable 接口里的泛型形参类型与 call() 方法返回值类型相同
- V call():计算结果,如果无法计算结果,则抛出一个异常
//实现Callable接口
public class MyThread implements Callable<Boolean> {
private int num = 50;
@Override
public Boolean call() {
while (num > 0) {
eat();
}
return true;
}
// 同步方法
private synchronized void eat() {
if (num > 0) {
System.out.println(Thread.currentThread().getName() + " 吃了编号为 " + num-- + " 的苹果");
}
}
}
//启用多线程
@Test
public void ExecutorServiceTest() throws ExecutionException, InterruptedException {
MyThread t1 = new MyThread();
MyThread t2 = new MyThread();
MyThread t3 = new MyThread();
//创建线程池
ExecutorService ser = Executors.newFixedThreadPool(3);
//向线程池提交任务
Future<Boolean> r1 = ser.submit(t1);
Future<Boolean> r2 = ser.submit(t2);
Future<Boolean> r3 = ser.submit(t3);
//获取结果
boolean rs1 = r1.get();
boolean rs2 = r2.get();
boolean rs3 = r3.get();
//关闭线程池
ser.shutdown();
}
- 使用 FutureTask 类
@Test
public void futureTaskTest() {
//创建FutureTask对象
FutureTask<Boolean> futureTask = new FutureTask<>(new MyThread());
//创建线程
Thread thread = new Thread(futureTask);
thread.start();
try {
//等待任务执行完毕,并得到返回值
Boolean result = futureTask.get();
System.out.println(result);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
线程的生命周期
- 新建 (NEW):新建但没有调用 start() 方法的线程处于此状态。
- 运行 (RUNNABLE):包含就绪(READY)和运行中(RUNNING)两种状态。线程调用 start() 方法会进入就绪(READY)状态,等待获取 CPU 时间片。如果成功获取到 CPU 时间片,则会进入运行中(RUNNING)状态。
- 阻塞 (BLOCKED):线程在进入同步方法 / 同步块(synchronized)时被阻塞,等待同步锁的线程处于此状态。
- 等待 (WAITING):线程无限期等待另一个线程执行特定操作,需要被显式唤醒,否则会一直等待下去。例如对于 Object.wait(),需要等待另一个线程执行 Object.notify() 或 Object.notifyAll();对于 Thread.join(),则需要等待指定的线程终止。
- 超时等待 (TIMED_WAITING):跟 WAITING 类似,区别在于该状态有超时时间参数,在超时时间到了后会自动唤醒,避免了无期限的等待。
- 终止 (TERMINATED):表示该线程已经执行完毕。
- 线程在给定的时间点只能处于一种状态。这些状态是虚拟机状态,不反映任何操作系统线程状态。
控制线程
join
等待线程结束:线程 T1 需要等待 T2、T3 完成之后才能继续执行,那么在 T1 线程中需要分别调用 T2 和 T3 的 join() 方法。
比如,主线程等待 t 线程执行结束
//主线程
{
//t 线程执行
t.start();
//在主线程调用 t 线程的 join 方法,主线程等待 t 线程执行结束
t.join();
}
sleep
- 调用 sleep 会让当前线程从 Running 进入 Timed Waiting 状态(阻塞)
- 其它线程可以使用 interrupt 方法打断正在睡眠的线程,那么被打断的线程这时就会抛出 InterruptedException 异常【注意:这里打断的是正在休眠的线程,而不是其它状态的线程】
- 睡眠结束后的线程未必会立刻得到执行 (需要分配到 cpu 时间片)
- 建议用 TimeUnit 的 sleep() 代替 Thread 的 sleep() 来获得更好的可读性
sleep 与 wait 的区别
- sleep() 方法是 Thread 的静态方法,而 wait 是 Object 实例方法;
- wait() 方法必须要在同步方法或者同步块中调用,也就是必须已经获得对象锁。而 sleep() 方法没有这个限制可以在任何地方种使用。另外,wait() 方法会释放占有的对象锁,使得该线程进入等待池中,等待下一次获取资源。而 sleep() 方法只是会让出 CPU 并不会释放掉对象锁;
- sleep() 方法在休眠时间达到后如果再次获得 CPU 时间片就会继续执行,而 wait() 方法必须等待 Object.notift/Object.notifyAll 通知后,才会离开等待池,并且再次获得 CPU 时间片才会继续执行。
yeild
当前线程出让 CPU,但需要注意的是,让出 CPU 并不是说不让当前线程执行了,当前线程在让出 CPU 后,还会进行 CPU 资源的争夺,但是能否再抢到 CPU 的执行权就不一定了。因此,对 Thread.yield() 方法的调用好像就是在说:我已经完成了一些主要的工作,我可以休息一下了,可以让 CPU 给其他线程一些工作机会了。
如果觉得一个线程不太重要,或者优先级比较低,而又担心此线程会过多的占用 CPU 资源,那么可以在适当的时候调用一下 Thread.yield() 方法,给与其他线程更多的机会。
interrupt
- 通知目标线程中断,也就是设置中断标志位为 true,中断标志位表示当前线程已经被中断了。
- 线程中断并不会使线程立即退出,而是给线程发送一个通知,告知目标线程,有人希望你中断。至于目标线程接收到通知之后如何处理,则完全由目标线程自己决定。
public void interrupt() //设置中断标志位为true public boolean interrupted() //判断线程是否被中断
public static boolean interrupted() //判断线程是否被中断,并清除当前中断状态(置为false)
中断线程的三种方法
- 监听中断标志
public class OneThread implements Runnable {
@Override
public void run() {
while (true) {
for (int i = 0; i < 50; i++) {
System.out.println("thread is running..." + i + " interrupted:" + Thread.currentThread().isInterrupted());
}
// 监听中断标志
if (!Thread.interrupted()) {
System.out.println("thread is interrupting...");
break;
}
}
}
}
// mian
public static void main(String[] args) throws InterruptedException {
OneThread oneThread = new OneThread();
Thread thread = new Thread(oneThread);
thread.start();
TimeUnit.SECONDS.sleep(1);
// 中断
thread.interrupt();
System.out.println("================= end =====================");
}
- 通过变量控制
public class OneThread implements Runnable {
public volatile boolean isStop = false;
@Override
public void run() {
while (true) {
for (int i = 0; i < 50; i++) {
System.out.println("thread is running..." + i + " interrupted:" + isStop);
}
if (isStop) {
System.out.println("thread is interrupting...");
break;
}
}
}
}
// main
public static void main(String[] args) throws InterruptedException {
OneThread oneThread = new OneThread();
Thread thread = new Thread(oneThread);
thread.start();
TimeUnit.SECONDS.sleep(1);
// 中断
oneThread.isStop = true;
System.out.println("================= end =====================");
}
- 抛出异常
错误示范
public class OneThread implements Runnable {
@Override
public void run() {
while (true) {
try {
// 睡眠
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
// sleep() 中断而抛出异常后
// 线程的中断标志会被清除(置为false)
// 所以 finally 的代码永远无法执行
} finally {
if (Thread.interrupted()) {
System.out.println("thread is interrupting...");
break;
}
}
}
}
}
// main
public static void main(String[] args) throws InterruptedException {
OneThread oneThread = new OneThread();
Thread thread = new Thread(oneThread);
thread.start();
TimeUnit.SECONDS.sleep(1);
// 中断
thread.interrupt();
System.out.println("================= end =====================");
}
sleep() 由于中断而抛出异常之后,线程的中断标志会被清除(置为 false),所以在异常中需要执行 this.interrupt(),将中断标志位置为 true
public class OneThread implements Runnable {
@Override
public void run() {
while (true) {
try {
// 睡眠
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
if (Thread.interrupted()) {
System.out.println("thread is interrupting...");
break;
}
}
}
}
}
// main
public static void main(String[] args) throws InterruptedException {
OneThread oneThread = new OneThread();
Thread thread = new Thread(oneThread);
thread.start();
TimeUnit.SECONDS.sleep(1);
// 中断
thread.interrupt();
System.out.println("================= end =====================");
}
挂起(suspend)和继续执行(resume)
系统不推荐使用 suspend() 方法去挂起线程,因为 suspend() 方法导致线程暂停的同时,并不会释放任何锁资源。此时,其他任何线程想要访问被它占用的锁时,都会被牵连,导致无法正常运行。直到在对应的线程上进行了 resume() 方法操作,被挂起的线程才能继续,其他所有阻塞在相关锁上的线程也可以继续执行。
但是,如果 resume() 方法操作意外地在 suspend() 方法前就被执行了,那么被挂起的线程可能很难有机会被继续执行了。更严重的是,线程所占用的锁不会被释放,因此可能会导致整个系统工作不正常。而且,被挂起的线程还是 Runnable 状态,这也会影响我们对系统当前状态的判断。
守护线程与用户线程
Java 中的线程分为两类:
- 用户线程(User Thread)
- 守护线程(Daemon Thread)
默认情况下,Java 进程需要等待所有的线程结束后才会停止,但是有一种特殊的线程,叫做守护线程。在其他线程全部结束的时候,即使守护线程未执行完,Java 进程也会停止。普通线程 t 可以调用 t.setDeamon(true) 方法变成守护线程。
main 方法所在的线程就是一个用户线程,同时 Java 虚拟机还会启动很多守护线程,例如:垃圾回收线程。
线程安全
在多线程下代码执行的结果与预期正确的结果不一致,该代码就是线程不安全的,否则则是线程安全的。出现线程安全的问题一般是因为主内存和工作内存数据不一致性和重排序导致的。
线程同步
JMM(Java 内存模型)
Java 线程之间的通信由 Java 内存模型(JMM)控制,JMM 决定一个线程对共享变量的写入何时对另一个线程可见。线程之间的共享变量存储在主内存中,每个线程都有自己的工作内存,工作内存中保存了主内存中变量的副本。线程对变量的所有操作(读取、写入)都在工作内存中进行,最后再将结果同步回主内存。
从上图中可以看出,线程 A 需要和线程 B 通信,必须要经历下面 2 个步骤:
- 首先,线程 A 把本地内存 A 中更新过的共享变量刷新到主内存中去
- 然后,线程 B 到主内存中去读取线程 A 之前已更新过的共享变量
主内存和工作内存之间的交互(JMM 中的八种操作)
操作 | 作用对象 | 解释 |
---|---|---|
lock | 主内存 | 把一个变量标识为一个线程独占的状态 |
unlock | 主内存 | 把一个处于锁定状态的变量释放出来,释放后才可被其他线程锁定 |
read | 主内存 | 把一个变量的值从主内存传输到线程工作内存中,以便 load 操作使用 |
load | 工作内存 | 把 read 操作从主内存中得到的变量值放入工作内存的变量副本中 |
use | 工作内存 | 把工作内存中一个变量的值传递给执行引擎,每当虚拟机遇到一个需要使用到变量值的字节码指令时将会执行这个操作 |
assign | 工作内存 | 把一个从执行引擎接收到的值赋接到收到的值赋予工作内存的变量,每当虚拟机遇到一个给变量赋值的字节码指令时执行这个操作 |
store | 工作内存 | 把工作内存中的一个变量的值送到主内存中,以便 write 操作 |
write | 工作内存 | 把 store 操作从工作内存中得到的变量的值放入主内存的变量中 |
happens-before
JMM 定义了一组 happens-before 规则,用来确定操作之间的顺序,确保内存可见性和有序性。
如果一个操作 happens-before 另一个操作,那么第一个操作的执行结果将对第二个操作可见,并且第一个操作的执行顺序排在第二个操作之前,无论这两个操作是否在同一个线程里。
如果重排序之后的执行结果,与按 happens-before 关系来执行的结果一致,那么 JMM 也允许这样的重排序。
-
程序次序规则
- 一个线程内,按照代码顺序,书写在前面的操作 happens-before 于书写在后面的操作。
-
监视锁规则
- 对一个锁的解锁操作 happens-before 于后续对这个锁的加锁操作。
-
volatile 变量规则
- 对一个 volatile 变量的写操作,happens-before 于后续对这个变量的读操作。说白了就是对 volatile 变量的写操作的结果对于发生于其后的任何操作都是可见的。
-
传递规则
- 如果 A happens-before B,并且 B happens-before C,则 A happens-before C
-
线程启动规则
- 如果线程 A 执行操作 ThreadB.start()(启动线程 B),那么 A 线程的 ThreadB.start() 操作 happens-before 于线程 B 中的任意操作。
-
线程终结规则
- 如果线程 A 执行操作 ThreadB.join() 并成功返回,那么线程 B 中的任意操作 happens-before 于线程 A 从 ThreadB.join() 操作成功返回。
-
线程中断规则
- 对线程 interrupt() 方法的调用 happens-before 于被中断线程的代码检测到中断事件的发生。
-
对象终结原则
- 一个对象的初始化完成 happens-before 于它的 finalize() 方法的开始。
Reference
volatile
volatile
是 Java 中的一个关键字,用于声明变量,它的主要作用在于实现变量的可见性和禁止指令重排序。
- 保证可见性
- 当一个线程修改了一个
volatile
变量的值,这个变化对所有其他线程都是立即可见的。这是因为volatile
变量的读取和写入操作会被强制同步到主内存中,从而确保了变量的最新值能够被所有线程感知。
- 当一个线程修改了一个
- 禁止指令重排
- 在多线程环境中,编译器和处理器可能会为了优化性能而重新安排指令的执行顺序,这在某些情况下会导致不正确的结果。
volatile
关键字确保了与该变量相关的操作会按照代码的顺序执行,从而避免了因重排序导致的问题。- 单例模式(Singleton) 使用
volatile
修饰对象的原因,就是防止实例化对象时指令重排。
- 不保证原子性
- 虽然
volatile
变量提供了可见性和有序性,但它并不保证复合操作的原子性。
- 虽然
public class VolatileTest {
public volatile int inc = 0;
public void increase() {
inc++;
}
public static void main(String[] args) throws InterruptedException {
final VolatileTest test = new VolatileTest();
for (int i = 0; i < 10; i++) {
new Thread(() -> {
for (int j = 0; j < 1000; j++)
test.increase();
}).start();
}
//保证前面的线程都执行完
Thread.sleep(3000);
System.out.println(test.inc);
}
}
内存屏障
JMM 通过内存屏障阻止指令重排序。
JMM 内存屏障分为四类:
- StoreStore 屏障:禁止上面的普通写和下面的 volatile 写重排序;
- StoreLoad 屏障:防止上面的 volatile 写与下面可能有的 volatile 读/写重排序
- LoadLoad 屏障:禁止下面所有的普通读操作和上面的 volatile 读重排序
- LoadStore 屏障:禁止下面所有的普通写操作和上面的 volatile 读重排序
为了实现 volatile 内存语义,编译器在生成字节码时,会在指令序列中插入内存屏障来禁止特定类型的处理器重排序。
- 在每个 volatile 写操作的前面插入一个 StoreStore 屏障;
- 在每个 volatile 写操作的后面插入一个 StoreLoad 屏障;
- 在每个 volatile 读操作的后面插入一个 LoadLoad 屏障;
- 在每个 volatile 读操作的后面插入一个 LoadStore 屏障。
需要注意的是:volatile 写是在前面和后面分别插入内存屏障,而 volatile 读操作是在后面插入两个内存屏障
线程通信
wait & notify
- wait() 方法、wait(long timeoutMillis) 方法、wait(long timeoutMillis, int nanos) 方法、notify() 方法和 notifyAll() 方法都是 Object 类的方法,用于线程等待和唤醒
- 必须放在同步代码(必须在 synchronized 内部执行)中执行,需要先获取锁
- 线程唤醒的方法(notify、notifyAll)需要在等待的方法(wait)之后执行,等待中的线程才可能会被唤醒,否则无法唤醒
wait
当一个线程调用共享变量的 wait 系列方法时,这个线程进入等待状态,并释放共享变量的锁,直到使用下面两种方式才会被唤醒:
- 其他线程调用该共享变量的 notify 系列方法(notify() 方法或者 notifyAll() 方法)
- 其他线程调用该共享变量所在的线程的 interrupt() 方法后,该线程抛出 InterruptedException 异常返回
线程需要获取到共享变量的监视器锁才能调用 wait 方法,否则会抛出 IllegalMonitorStateException 异常
举个栗子
- 在同步方法中使用
//共享变量
private Object lock = new Object();
public synchronized void method1() {
System.out.println(Thread.currentThread().getName() + ":获取到锁");
try {
System.out.println(Thread.currentThread().getName() + "等待...");
lock.wait();
System.out.println(Thread.currentThread().getName() + "被唤醒...");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
- 在同步代码块使用
//共享变量
private Object lock = new Object();
public void method2() {
synchronized (lock) {
System.out.println(Thread.currentThread().getName() + ":获取到锁");
try {
System.out.println(Thread.currentThread().getName() + "等待...");
lock.wait();
System.out.println(Thread.currentThread().getName() + "被唤醒...");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
notify
- 当线程上调用共享变量的 notify 方法后,会唤醒这个共享变量上调用 wait 系列方法后进入等待状态的线程,具体唤醒哪个等待的线程是随机的。
- 被唤醒的线程需要再次获取锁,才能返回执行后面的代码。
- 只有当前线程获取到共享变量的监视器锁后,才能调用该共享变量的 notify 系列方法,否则会抛出 IllegalMonitorStateException 异常。
区别
- sleep,join,yield,interrupted 是 Thread 类中的方法
- wait/notify 是 Object 中的方法
- sleep 不释放锁,释放 cpu;join 释放锁,抢占 cpu;yiled 不释放锁,释放 cpu;wait 释放锁,释放 cpu
生产者 & 消费者案例
/**
* @author huangyunwu
* @date 2020/11/12 11:31
* @decription 菜鸟驿站 - 容器
*/
public class CaiNiao {
private Object lock = new Object();
private final Queue<Object> packageList = new LinkedList<>();
private final int MAX_SIZE = 10;
public void customerEntran() {
synchronized(lock) {
System.out.println(Thread.currentThread().getName() + "来菜鸟驿站");
if (packageList.size() == 0) {
System.out.println("当前菜鸟驿站没有包裹," + Thread.currentThread().getName() + "回去等待");
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
takePackage();
}
}
public synchronized void courierEntran() {
synchronized(lock) {
System.out.println(Thread.currentThread().getName() + "来菜鸟驿站");
if (packageList.size() == MAX_SIZE) {
System.out.println("菜鸟驿站装满了," + Thread.currentThread().getName() + "回去等待");
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
putPackage();
}
}
private void takePackage() {
System.out.println("取包裹");
packageList.poll();
//菜鸟驿站通知快递员放包裹
lock.notifyAll();
}
private void putPackage() {
System.out.println("放包裹");
packageList.offer(new Package());
//菜鸟驿站通知用户取包裹
lock.notifyAll();
}
}
/**
* @author huangyunwu
* @date 2020/11/12 15:54
* @decription 快递员 - 生产者
*/
public class Courier implements Runnable {
private CaiNiao caiNiao;
public Courier(CaiNiao caiNiao) {
this.caiNiao = caiNiao;
}
@Override
public void run() {
for (int i = 0; i < 10; i++) {
caiNiao.courierEntran();
}
System.out.println("======" + Thread.currentThread().getName() + "完成了任务,开心地下班======");
}
}
/**
* @author huangyunwu
* @date 2020/11/12 15:47
* @decription 取快递的人 - 消费者
*/
public class Customer implements Runnable {
private CaiNiao caiNiao;
public Customer(CaiNiao caiNiao) {
this.caiNiao = caiNiao;
}
@Override
public void run() {
for (int i = 0; i < 3; i++) {
caiNiao.customerEntran();
}
System.out.println("======" + Thread.currentThread().getName() + "取完了他的包裹,一本满足的离开菜鸟驿站======");
}
}
@Test
public void caiNiaoTest() throws InterruptedException {
CaiNiao caiNiao = new CaiNiao();
Courier courier1 = new Courier(caiNiao);
Courier courier2 = new Courier(caiNiao);
Courier courier3 = new Courier(caiNiao);
new Thread(courier1, "快递员1").start();
new Thread(courier2, "快递员2").start();
new Thread(courier3, "快递员3").start();
for (int i = 0; i < 10; i++) {
new Thread(new Customer(caiNiao), "顾客"+ i).start();
}
//主线程
Thread.sleep(1500);
}
使用 Condition 实现等待通知机制
- 使用 synchronized 结合 Object 上的 wait 和 notify 方法可以实现线程间的等待通知机制。ReentrantLock 结合 Condition 接口同样可以实现这个功能。而且相比前者使用起来更清晰也更简单。
- Condition 接口在使用前必须先调用 ReentrantLock 的 lock() 方法获得锁。之后调用 Condition 接口的 await() 将释放锁,并且在该 Condition 上等待,直到有其他线程调用 Condition 的 signal() 方法唤醒线程。如果 signal() 方法先于 await() 方法之前调用,那么线程无法被唤醒。
简单使用案例
public class ConditionTest {
static ReentrantLock lock = new ReentrantLock();
static Condition condition = lock.newCondition();
public static void main(String[] args) throws InterruptedException {
lock.lock();
new Thread(new SignalThread()).start();
System.out.println("主线程等待通知");
try {
condition.await();
} finally {
lock.unlock();
}
System.out.println("主线程恢复运行");
}
static class SignalThread implements Runnable {
@Override
public void run() {
lock.lock();
try {
condition.signal();
System.out.println("子线程通知");
} finally {
lock.unlock();
}
}
}
}
线程间定制化通信案例
- 启动三个线程,让线程进行一定的顺序操作
- AA 打印 5 此,BB 打印 10 次,CC 打印 15 次,一共进行 10 轮
//第一步 创建资源类
class ShareResource {
//定义标志位
private int flag = 1; // 1 AA 2 BB 3 CC
//创建Lock锁
private Lock lock = new ReentrantLock();
//创建三个condition
private Condition c1 = lock.newCondition();
private Condition c2 = lock.newCondition();
private Condition c3 = lock.newCondition();
//打印5次,参数第几轮
public void print5(int loop) throws InterruptedException {
//上锁
lock.lock();
try {
//判断
while(flag != 1) {
//等待
c1.await();
}
//干活
for (int i = 1; i <=5; i++) {
System.out.println(Thread.currentThread().getName()+" :: "+i+" :轮数:"+loop);
}
//通知
flag = 2; //修改标志位 2
c2.signal(); //通知BB线程
}finally {
//释放锁
lock.unlock();
}
}
//打印10次,参数第几轮
public void print10(int loop) throws InterruptedException {
lock.lock();
try {
while(flag != 2) {
c2.await();
}
for (int i = 1; i <=10; i++) {
System.out.println(Thread.currentThread().getName()+" :: "+i+" :轮数:"+loop);
}
//修改标志位
flag = 3;
//通知CC线程
c3.signal();
}finally {
lock.unlock();
}
}
//打印15次,参数第几轮
public void print15(int loop) throws InterruptedException {
lock.lock();
try {
while(flag != 3) {
c3.await();
}
for (int i = 1; i <=15; i++) {
System.out.println(Thread.currentThread().getName()+" :: "+i+" :轮数:"+loop);
}
//修改标志位
flag = 1;
//通知AA线程
c1.signal();
}finally {
lock.unlock();
}
}
}
public class ThreadDemo3 {
public static void main(String[] args) {
ShareResource shareResource = new ShareResource();
new Thread(()->{
for (int i = 1; i <=10; i++) {
try {
shareResource.print5(i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"AA").start();
new Thread(()->{
for (int i = 1; i <=10; i++) {
try {
shareResource.print10(i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"BB").start();
new Thread(()->{
for (int i = 1; i <=10; i++) {
try {
shareResource.print15(i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"CC").start();
}
}
使用 Condition 实现简单的阻塞队列
阻塞队列是一种特殊的先进先出队列,它有以下几个特点:
- 入队和出队线程安全。
- 当队列满时,入队线程会被阻塞;当队列为空时,出队线程会被阻塞。
public class MyBlockingQueue<E> {
int size;//阻塞队列最大容量
ReentrantLock lock = new ReentrantLock();
LinkedList<E> list=new LinkedList<>();//队列底层实现
Condition notFull = lock.newCondition();//队列满时的等待条件
Condition notEmpty = lock.newCondition();//队列空时的等待条件
public MyBlockingQueue(int size) {
this.size = size;
}
public void enqueue(E e) throws InterruptedException {
lock.lock();
try {
while (list.size() ==size)//队列已满,在notFull条件上等待
notFull.await();
list.add(e);//入队:加入链表末尾
System.out.println("入队:" +e);
notEmpty.signal(); //通知在notEmpty条件上等待的线程
} finally {
lock.unlock();
}
}
public E dequeue() throws InterruptedException {
E e;
lock.lock();
try {
while (list.size() == 0)//队列为空,在notEmpty条件上等待
notEmpty.await();
e = list.removeFirst();//出队:移除链表首元素
System.out.println("出队:"+e);
notFull.signal();//通知在notFull条件上等待的线程
return e;
} finally {
lock.unlock();
}
}
}
多线程锁
自旋锁
如果持有锁的线程能在很短时间内释放锁资源,那么等待竞争锁的线程只需等持锁释放后即可立即获取锁,这样就避免用户线程和内核的切换的消耗。
线程一直自旋会浪费处理器资源,所以自旋等待的时间有一定的限度,如果自旋超过了限定次数(默认是 10 次,可以使用 -XX: PreBlockSpin 来更改)没有成功获得锁,线程就会进入阻塞状态。
jdk1.5 这个限度是写死的,在 1.6 引入了适应性自旋锁,这意味着自旋的时间(次数)不再固定,而是由前一次在同一个锁上的自旋时间以及锁的拥有者的状态来决定。如果在同一个锁对象上,自旋等待刚刚成功获得过锁,并且持有锁的线程正在运行中,那么虚拟机就会认为这次自旋也是很有可能再次成功,进而它将允许自旋等待持续相对更长的时间。如果对于某个锁,自旋很少成功获得过,那在以后尝试获取这个锁时将可能省略掉自旋过程,直接阻塞线程,避免浪费处理器资源。
悲观锁(对象锁)
悲观锁总是假设自己在使用数据的时候,有别的线程来修改数据。所以,在获取数据的时候会先加锁,确保数据不会被别的线程修改。
synchronized
- 使用 synchronized 关键字来修饰某个方法,就相当于给调用该方法的对象加了锁
- 对于实例方法,同步方法的同步监视器是 this,即调用该方法的对象
- 对于类方法,同步方法的同步监视器是当前方法所在类的字节码对象(如 ArrayUtil.class)
- 不要使用 synchronized 修饰 run() 方法,而是把需要同步的操作定义在一个新的同步方法中,再在 run() 方法中调用该方法
synchronized 锁升级过程
在 Java 中,启用对象锁的方式是使用 synchronized 关键字。对象锁有四种状态:无锁,偏向锁,轻量级锁,重量级锁。
synchronized 关键字就像是汽车的自动档,一脚油门踩下去,synchronized 会从无锁升级为偏向锁,再升级为轻量级锁,最后升级为重量级锁,就像自动换挡一样。
初次执行到 synchronized 代码块的时候,锁对象变成偏向锁(通过 CAS 修改对象头里的锁标志位),字面意思是“偏向于第一个获得它的线程”的锁。执行完同步代码块后,线程并不会主动释放偏向锁。当第二次到达同步代码块时,线程会判断此时持有锁的线程是否就是自己(持有锁的线程 ID 也在对象头里),如果是则正常往下执行。由于之前没有释放锁,这里也就不需要重新加锁。如果自始至终使用锁的线程只有一个,很明显偏向锁几乎没有额外开销,性能极高。
一旦有第二个线程加入锁竞争,偏向锁就升级为轻量级锁(自旋锁)。这里要明确一下什么是锁竞争:如果多个线程轮流获取一个锁,但是每次获取锁的时候都很顺利,没有发生阻塞,那么就不存在锁竞争。只有当某线程尝试获取锁的时候,发现该锁已经被占用,只能等待其释放,这才发生了锁竞争。
在轻量级锁状态下继续锁竞争,没有抢到锁的线程将自旋,即不停地循环判断锁是否能够被成功获取。获取锁的操作,其实就是通过 CAS 修改对象头里的锁标志位。先比较当前锁标志位是否为“释放”,如果是则将其设置为“锁定”,比较并设置是原子性发生的。这就算抢到锁了,然后线程将当前锁的持有者信息修改为自己。
长时间的自旋操作是非常消耗资源的,一个线程持有锁,其他线程就只能在原地空耗 CPU,执行不了任何有效的任务,这种现象叫做忙等(busy-waiting)。如果多个线程用一个锁,但是没有发生锁竞争,或者发生了很轻微的锁竞争,那么 synchronized 就用轻量级锁,允许短时间的忙等现象。这是一种折衷的想法,短时间的忙等,换取线程在用户态和内核态之间切换的开销。
显然,此忙等是有限度的(有个计数器记录自旋次数,默认允许循环 10 次,可以通过虚拟机参数更改)。如果锁竞争情况严重,某个达到最大自旋次数的线程,会将轻量级锁升级为重量级锁(依然是 CAS 修改锁标志位,但不修改持有锁的线程 ID)。当后续线程尝试获取锁时,发现被占用的锁是重量级锁,则直接将自己挂起(而不是忙等),等待将来被唤醒。在 JDK 1.6 之前,synchronized 直接加重量级锁,很明显现在得到了很好的优化。
一个锁只能按照偏向锁、轻量级锁、重量级锁的顺序逐渐升级(也有叫锁膨胀的),不允许降级。
无锁
无锁没有对资源进行锁定,所有的线程都能访问并修改同一个资源,但同时只有一个线程能修改成功。
无锁的特点就是修改操作在循环内进行,线程会不断的尝试修改共享资源。如果没有冲突就修改成功并退出,否则就会继续循环尝试。如果有多个线程修改同一个值,必定会有一个线程能修改成功,而其他修改失败的线程会不断重试直到修改成功。上面我们介绍的 CAS 原理及应用即是无锁的实现。无锁无法全面代替有锁,但无锁在某些场合下的性能是非常高的。
偏向锁
偏向锁是指一段同步代码一直被一个线程所访问,那么该线程会自动获取锁,降低获取锁的代价。
在 Mark Word 中,当锁标志位是 01,那么判断倒数第三个 bit 是否为 1,如果是 1,代表当前对象的锁状态为偏向锁,于是再去读 Mark Word 的前 23 个 bit,这 23 个 bit 就是线程 ID,通过线程 ID 来确认想要获得对象锁的线程是不是“被偏爱的线程”。
偏向锁只有遇到其他线程尝试竞争偏向锁时,持有偏向锁的线程才会释放锁,线程不会主动释放偏向锁。
假如对象发现目前不只有一个线程,而是有多个线程正在竞争锁,那么偏向锁将会升级为轻量级锁。
轻量级锁
当锁是偏向锁的时候,被另外的线程所访问,偏向锁就会升级为轻量级锁,其他线程会通过自旋的形式尝试获取锁,不会阻塞,从而提高性能。
在代码进入同步块的时候,如果同步对象锁状态为无锁状态(锁标志位为“01”状态,是否为偏向锁为“0”),虚拟机首先将在当前线程的栈帧中建立一个名为锁记录(Lock Record)的空间,用于存储锁对象目前的 Mark Word 的拷贝,然后拷贝对象头中的 Mark Word 复制到锁记录中。
拷贝成功后,虚拟机将使用 CAS 操作尝试将对象的 Mark Word 更新为指向 Lock Record 的指针,并将 Lock Record 里的 owner 指针指向对象的 Mark Word。
如果这个更新动作成功了,那么这个线程就拥有了该对象的锁,并且对象 Mark Word 的锁标志位设置为“00”,表示此对象处于轻量级锁定状态。
如果轻量级锁的更新操作失败了,虚拟机首先会检查对象的 Mark Word 是否指向当前线程的栈帧,如果是就说明当前线程已经拥有了这个对象的锁,那就可以直接进入同步块继续执行,否则说明多个线程竞争锁。
若当前只有一个等待线程,则该线程通过自旋进行等待。但是当自旋超过一定的次数,或者一个线程在持有锁,一个在自旋,又有第三个来访时,轻量级锁升级为重量级锁。
重量级锁
如果对象锁状态被标记为重量级锁,那就需要通过 Monitor 来对线程进行控制,此时将会使用同步原语来锁定资源,对线程的控制也最为严格。
这里要介绍一样新事物——Monitor。 Monitor 常常被翻译成监视器或管程。你可以把它想像成一个只能容纳一名客人房间,而把想要获取对象锁的线程想像成想要进入这个房间的客人。一个线程进入了 Monitor,那么其他线程只能等待,只有当这个线程退出,其他线程才有机会进入。
-
Entry Set 中聚集了一些想要进入 Monitor 的线程,它们处于 waiting 状态。
-
假设某个名为 A 线程成功进入了 Monitor,那么它就处于 active 状态。
-
此时 A 线程执行途中,遇到一个判断条件,需要它暂时让出执行权,那么它将进入 Wait Set,状态也被标记为 waiting。
-
这时 Entry Set 中的其他线程就有机会进入 Monitor,假设一个线程 B 成功进入并且顺利完成,那么它可以通过 notify 的形式来唤醒 Wait Set 中的线程 A,让线程 A 再次进入 Monitor,执行完成后便退出。
ReentranLock
ReentrantLock 是可重入锁
- 允许同一个线程多次获取同一把锁
ReentrantLock 获取锁的过程是可中断的
- 从发起获取锁请求到还未获取到锁,这段时间内是可以被中断的
ReentrantLock 可以实现公平锁和非公平锁
- 公平锁:谁等的时间最长,谁就先获取锁
- 非公平锁:CPU 随机分配
- new ReentrantLock(true) // 公平锁
- new ReentrantLock() 或 new ReentrantLock(false) // 非公平锁
ReentrantLock 限时等待
注意
- lock() 必须紧跟 try 代码块
- unlock() 要放到 finally 最后一行,保证不管程序是否有异常,锁必定会释放
- lock() 和 unlock() 需要成对出现,锁了几次,也要释放几次
阿里巴巴开发规范
在使用阻塞等待获取锁的方式中,必须在 try 代码块之外,并且在加锁方法与 try 代码块之间没有任何可能抛出异常的方法调用,避免加锁成功后,在 finally 中无法解锁。
说明一:如果在 lock 方法与 try 代码块之间的方法调用抛出异常,那么无法解锁,造成其它线程无法成功获取锁。
说明二:如果 lock 方法在 try 代码块之内,可能由于其它方法抛出异常,导致在 finally 代码块中, unlock 对未加锁的对象解锁,它会调用 AQS 的 tryRelease 方法(取决于具体实现类),抛出 IllegalMonitorStateException 异常。
说明三:在 Lock 对象的 lock 方法实现中可能抛出 unchecked 异常,产生的后果与说明二相同。
非公平锁案例
public class Apple implements Runnable {
private int num = 50;
private final Lock lock = new ReentrantLock();
@Override
public void run() {
while (num > 0) {
//获取锁
lock.lock();
//处理业务
try {
if (num > 0) {
System.out.println(Thread.currentThread().getName() + "吃了第" + num-- + "个苹果");
}
} finally {
//释放锁
lock.unlock();
}
try {
//吃完后,休息10ms
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
@Test
public void ReentranLockTest() throws InterruptedException {
Apple apple = new Apple();
new Thread(apple, "线程1").start();
new Thread(apple, "线程3").start();
new Thread(apple, "线程2").start();
//苹果被吃完,主线程才停止
Thread.sleep(1000);
}
响应中断
ReentrantLock 中的 lockInterruptibly() 方法使得线程可以在被阻塞时响应中断,比如一个线程 t 1 通过 lockInterruptibly() 方法获取到一个可重入锁,并执行一个长时间的任务,另一个线程通过 interrupt() 方法就可以立刻打断 t 1 线程的执行,来获取 t 1 持有的那个可重入锁。而通过 ReentrantLock 的 lock() 方法或者 Synchronized 持有锁的线程是不会响应其他线程的 interrupt() 方法的,直到该方法主动释放锁之后才会响应 interrupt() 方法。
public class ReentrantLockTest {
static Lock lock1 = new ReentrantLock();
static Lock lock2 = new ReentrantLock();
public static void main(String[] args) throws InterruptedException {
Thread thread = new Thread(new ThreadDemo(lock1, lock2));//该线程先获取锁1,再获取锁2
Thread thread1 = new Thread(new ThreadDemo(lock2, lock1));//该线程先获取锁2,再获取锁1
thread.start();
thread1.start();
thread.interrupt();// 中断第一个线程
}
static class ThreadDemo implements Runnable {
Lock firstLock;
Lock secondLock;
public ThreadDemo(Lock firstLock, Lock secondLock) {
this.firstLock = firstLock;
this.secondLock = secondLock;
}
@Override
public void run() {
try {
firstLock.lockInterruptibly();
TimeUnit.MILLISECONDS.sleep(10);//更好的触发死锁
secondLock.lockInterruptibly();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
firstLock.unlock();
secondLock.unlock();
System.out.println(Thread.currentThread().getName()+"正常结束!");
}
}
}
}
限时等待
- boolean tryLock(long timeout, TimeUnit unit)
- ReentrantLock 还给我们提供了获取锁限时等待的方法 tryLock(),可以选择传入时间参数,表示等待指定的时间。
- 无参则表示立即返回锁申请的结果:true 表示获取锁成功,false 表示获取锁失败。
- 我们可以使用该方法,配合失败重试机制来更好的解决死锁问题。
public class ReentrantLockTest {
static Lock lock1 = new ReentrantLock();
static Lock lock2 = new ReentrantLock();
public static void main(String[] args) throws InterruptedException {
Thread thread = new Thread(new ThreadDemo(lock1, lock2));//该线程先获取锁1,再获取锁2
Thread thread1 = new Thread(new ThreadDemo(lock2, lock1));//该线程先获取锁2,再获取锁1
thread.start();
thread1.start();
}
static class ThreadDemo implements Runnable {
Lock firstLock;
Lock secondLock;
public ThreadDemo(Lock firstLock, Lock secondLock) {
this.firstLock = firstLock;
this.secondLock = secondLock;
}
@Override
public void run() {
try {
while(!lock1.tryLock()){
TimeUnit.MILLISECONDS.sleep(10);
}
while(!lock2.tryLock()){
lock1.unlock();
TimeUnit.MILLISECONDS.sleep(10);
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
firstLock.unlock();
secondLock.unlock();
System.out.println(Thread.currentThread().getName()+"正常结束!");
}
}
}
}
线程通过调用 tryLock() 方法获取锁,第一次获取锁失败时会休眠 10 毫秒,然后重新获取,直到获取成功。第二次获取失败时,首先会释放第一把锁,再休眠 10 毫秒,然后重试直到成功为止。线程获取第二把锁失败时将会释放第一把锁,这是解决死锁问题的关键,避免了两个线程分别持有一把锁然后相互请求另一把锁。
获取锁的 4 种方法对比
Lock 和 synchronized 的区别
-
Lock 是一个接口;synchronized 是 Java 中的关键字;
-
Lock 在发生异常时,如果没有主动通过 unLock() 去释放锁,很可能会造成死锁现象,因此使用 Lock 时需要在 finally 块中释放锁;synchronized 不需要手动获取锁和释放锁,在发生异常时会自动释放锁,因此不会导致死锁;
-
Lock 的使用更加灵活,可以有响应中断、有超时时间等;而 synchronized 却不行,使用 synchronized 时,等待的线程会一直等待下去,直到获取到锁;
-
通过 Lock 可以知道有没有成功获取锁,而 synchronized 却无法办到;
-
在性能上,随着近些年 synchronized 的不断优化,Lock 和 synchronized 在性能上已经没有很明显的差距了,所以性能不应该成为我们选择两者的主要原因。官方推荐尽量使用 synchronized,除非 synchronized 无法满足需求时,则可以使用 Lock。
乐观锁
乐观锁不会在数据读取时立即加锁,而是假设在读取和修改数据的过程中,数据不会被其他线程修改。
因此,乐观锁在数据更新时才检查数据是否已被其他线程修改,如果数据未被修改,则更新成功;如果数据已被修改,则更新失败。
乐观锁在 Java 中是通过使用无锁编程来实现,最常采用的是 CAS 算法。
CAS
- CAS 是英文单词 CompareAndSwap 的缩写,中文意思是:比较并替换。
- CAS 操作包含三个操作数
- 内存值(V)
- 旧的预期值(A)
- 新值(B)
- 如果内存值等于原值,CAS 通过原子方式用新值来更新内存值 ,否则不会执行任何操作。
- 一般情况下,“更新”是一个不断重试的操作(自旋)
- Java 的原子类内部使用了 CAS 操作来实现线程安全的更新。
- CAS 的底层是调用的 Unsafe 类中的方法,都是操作系统提供的,由其他语言实现。
CAS 存在的问题
- ABA 问题
- 自旋时间长开销大
- 只能保证一个共享变量的原子操作
ABA 问题
因为 CAS 需要在操作值的时候,检查值有没有发生变化,没有变化则更新。如果一个值原来是 A,变成了 B,又变成了 A,那么使用 CAS 进行检查时会发现值没有发生变化,但是实际上是有变化的。
Java 并发包为了解决这个问题,提供了一个带有标记的原子引用类“AtomicStampedReference”,它可以通过控制变量值的版本来保证 CAS 的正确性。因此,在使用 CAS 前要考虑清楚“ABA”问题是否会影响程序并发的正确性,如果需要解决 ABA 问题,改用传统的互斥同步可能会比原子类更高效。
自旋时间长开销大
CAS 操作如果长时间不成功,会导致其一直自旋,给 CPU 带来非常大的开销。
只能保证一个共享变量的原子操作
当对一个共享变量执行操作时,我们可以使用循环 CAS 的方式来保证原子操作。但是对多个共享变量操作时,CAS 就无法保证操作的原子性。
解决办法:
- 使用互斥锁来保证原子性;
- 将多个共享变量封装成对象,通过 AtomicReference 来保证原子性。
乐观锁 VS 悲观锁
- 悲观锁适合写操作多的场景,先加锁可以保证写操作时数据正确
- 乐观锁适合读操作多的场景,不加锁的特点能够使其读操作的性能大幅提升
UnSafe 类
Unsafe 是位于 sun.misc 包下的一个类,主要提供一些用于执行低级别、不安全操作的方法,如直接访问系统内存资源、自主管理内存资源等,这些方法在提升 Java 运行效率、增强 Java 语言底层资源操作能力方面起到了很大的作用。但由于 Unsafe 类使 Java 语言拥有了类似 C 语言指针一样操作内存空间的能力,这无疑也增加了程序发生相关指针问题的风险。在程序中过度、不正确使用 Unsafe 类会使得程序出错的概率变大,使得 Java 这种安全的语言变得不再“安全”,因此对 Unsafe 的使用一定要慎重。
Java 高并发中主要涉及到类位于 java.util.concurrent 包中,简称 JUC,JUC 中大部分类都是依赖于 Unsafe 来实现的,主要用到了 Unsafe 中的 CAS、线程挂起、线程恢复等相关功能。
参考链接