JUC并发编程
1.什么是JUC
JUC是java.util.concurrent的缩写,用于多线程并发编程,目的是为了支持更好的高并发任务。
2.进程和线程概念
何为进程?
进程是程序的一次执行过程,是系统运行程序的基本单位,因此进程是动态的。系统运行一个程序包括一个进程的创建,运行和销毁过程。
在Java程序中启动main函数实质启动了一个JVM进程,在这里插入代码片
而main函数所在的线程就是这个进程的一个线程,称为主线程。
可以通过查看任务管理器来查看正在运行的进程。
使用ESC+shift+ctrl
Java程序中默认有几个线程?
答: 两个,分别是main主线程和GC线程。
何为线程?
线程和进程相似,但线程是比进程更小的执行单位。一个进程中往往可以有多个线程。相当于解答一道数学题,数学题好比一个进程,一个个的解答步骤好比一个个的线程,通过这样一个个的解答步骤最终完成这道数学题。
与进程不同的是同类的多个线程会共享进程的堆和方法区的资源,而每个线程有一个私有的程序计数器、虚拟机栈和本地方法栈,因此系统会产生一个线程,或是各个线程之间切换工作时,负担要比进程小得多,线程因此也被成为轻量级进程。
Java程序天生就是多线程程序,通过JMX
来看一下一个普通的java程序有哪些线程。
JMX(Java Management Extensions),中文意思是Java管理拓展,用于管理和监视Java程序。最常用的就是对于JVM的检测和管理,比如JVM内存,CPU使用率、线程数、垃圾收集情况等。
通过上面可知:一个Java程序的运行是由main线程和多个其他线程同时运行的。
线程和进程的关系,区别及优缺点
基于JVM的角度分析,首先查看下图
线程是进程划分的更小的运行单位,线程和进程最大的不同在于各个进程运行是独立的,而各个线程则不一定,因为同一进程的线程极有可能会相互影响。线程的执行开销小,但不利于资源的管理和保护,而进程恰恰相反。
Java真的可以开启线程吗?
开不了,因为开启的线程的底层原理是C++,需要使用本地方法,而Java是无法直接操作本地资源的。
并行与并发的区别
- 并发: 两个及两个以上的作业在同一时间段内执行
- 并行: 两个及两个以上的作业在同一时间执行
为什么要使用多线程呢?
从总体来看:
-
从计算机底层来说: 线程可以比作是轻量级的进程,是程序执行的最小单位,线程间的切换和调度的成本远远小于进程。另外,多核 CPU 时代意味着多个线程可以同时运行,这减少了线程上下文切换的开销。
-
从当前互联网时代来看: 现在开发出来的系统动不动就是百万级乃至千万级的并发量,而多线程并发编程正是开发高并发系统的基础,利用好多线程并发机制可以大大提高系统的并发能力及性能。
从计算机底层探究:
-
单核时代: 单核时代多线程主要是了提高进程利用CPU和IO系统的效率。假设只能运行一个 Java 进程的情况,当我们请求 IO 的时候,如果 Java 进程中只有一个线程,此线程被 IO 阻塞则整个进程被阻塞。CPU 和 IO 设不能同时运行,那么可以简单地说系统整体效率只有 50%。当使用多线程的时候,一个线程被 IO 阻塞,其他线程还可以继续使用 CPU。从而提高了 Java 进程利用系统资源的整体效率。
-
多核时代: 多核时代的多线程是是为了提高进程利用多核CPU的能力。:假如我们要计算一个复杂的任务,我们只用一个线程的话,不论系统有几个 CPU 核心,都只会有一个 CPU 核心被利用到。而创建多个线程,这些线程可以被映射到底层多个 CPU 上执行,在任务中的多个线程没有资源竞争的情况下,任务执行的效率会有显著性的提高,约等于(单核时执行时间/CPU 核心数)。
使用多线程可能带来什么问题?
并发编程的目的是为了提高程序的执行效率提高程序运行速度,但是并发编程并不总是运行速度,而且并发编程会遇到很多问题,比如:内存泄漏,死锁,线程不安全等等。
线程的状态
查看线程内部的State枚举类型
public enum State {
/**
* Thread state for a thread which has not yet started.
*/
NEW, 新建状态
/**
* Thread state for a runnable thread. A thread in the runnable
* state is executing in the Java virtual machine but it may
* be waiting for other resources from the operating system
* such as processor.
*/
RUNNABLE, 运行中状态
/**
* Thread state for a thread blocked waiting for a monitor lock.
* A thread in the blocked state is waiting for a monitor lock
* to enter a synchronized block/method or
* reenter a synchronized block/method after calling
* {@link Object#wait() Object.wait}.
*/
BLOCKED, 阻塞状态
/**
* Thread state for a waiting thread.
* A thread is in the waiting state due to calling one of the
* following methods:
* <ul>
* <li>{@link Object#wait() Object.wait} with no timeout</li>
* <li>{@link #join() Thread.join} with no timeout</li>
* <li>{@link LockSupport#park() LockSupport.park}</li>
* </ul>
*
* <p>A thread in the waiting state is waiting for another thread to
* perform a particular action.
*
* For example, a thread that has called <tt>Object.wait()</tt>
* on an object is waiting for another thread to call
* <tt>Object.notify()</tt> or <tt>Object.notifyAll()</tt> on
* that object. A thread that has called <tt>Thread.join()</tt>
* is waiting for a specified thread to terminate.
*/
WAITING,等待状态
/**
* Thread state for a waiting thread with a specified waiting time.
* A thread is in the timed waiting state due to calling one of
* the following methods with a specified positive waiting time:
* <ul>
* <li>{@link #sleep Thread.sleep}</li>
* <li>{@link Object#wait(long) Object.wait} with timeout</li>
* <li>{@link #join(long) Thread.join} with timeout</li>
* <li>{@link LockSupport#parkNanos LockSupport.parkNanos}</li>
* <li>{@link LockSupport#parkUntil LockSupport.parkUntil}</li>
* </ul>
*/
TIMED_WAITING, 限时等待状态
/**
* Thread state for a terminated thread.
* The thread has completed execution.
*/
TERMINATED; 终止状态
}
由上图可以看出:线程创建之后它将处于 NEW(新建) 状态,调用 start() 方法后开始运行,线程这时候处于 READY(可运行) 状态。可运行状态的线程获得了 CPU 时间片(timeslice)后就处于 RUNNING(运行) 状态。
在操作系统中层面线程有 READY 和 RUNNING 状态,而在 JVM 层面只能看到 RUNNABLE 状态(图源:HowToDoInJava:Java Thread Life Cycle and Thread States),所以 Java 系统一般将这两个状态统称为 RUNNABLE(运行中) 状态 。
为什么 JVM 没有区分这两种状态呢?
现在的时分(time-sharing)多任务(multi-task)操作系统架构通常都是用所谓的“时间分片(time quantum or time slice)”方式进行抢占式(preemptive)轮转调度(round-robin式)。这个时间分片通常是很小的,一个线程一次最多只能在 CPU 上运行比如 10-20ms 的时间(此时处于 running 状态),也即大概只有 0.01 秒这一量级,时间片用后就要被切换下来放入调度队列的末尾等待再次调度。(也即回到 ready 状态)。线程切换的如此之快,区分这两种状态就没什么意义了。
线程执行 wait()方法之后,线程进入 WAITING(等待) 状态。进入等待状态的线程需要依靠其他线程的通知才能够返回到运行状态,而 TIMED_WAITING(超时等待) 状态相当于在等待状态的基础上增加了超时限制,比如通过 sleep(long millis)方法或 wait(long millis)方法可以将 Java 线程置于 TIMED_WAITING 状态。当超时时间到达后 Java 线程将会返回到 RUNNABLE 状态。当线程调用同步方法时,在没有获取到锁的情况下,线程将会进入到 BLOCKED(阻塞) 状态。线程在执行 Runnable 的run()方法之后将会进入到 TERMINATED(终止) 状态
什么是上下文切换?
线程在执行过程中有自己的运行条件和状态(称为上下文),比如上文中所说的程序计数器,栈信息等。 当出现以下情况,线程会从占有CPU状态中退出。
- 主动让出 CPU,比如调用了 sleep(), wait() 等。
- 时间片用完,因为操作系统要防止一个线程或者进程长时间占用CPU导致其他线程或者进程阻塞。
- 调用了阻塞类型的系统中断,比如请求 IO,线程被阻塞。
- 被终止或结束运行
这其中前三种都会发生线程切换,线程切换意味着需要保存当前线程的上下文,留待线程下次占用 CPU 的时候恢复现场。并加载下一个将要占用 CPU 的线程上下文。这就是所谓的 上下文切换。
上下文切换是现代操作系统的基本功能,因其每次需要保存信息恢复信息,这将会占用 CPU,内存等系统资源进行处理,也就意味着效率会有一定损耗,如果频繁切换就会造成整体效率低下。
什么是死锁?如何避免死锁
死锁是多个线程为了抢夺某一资源而进入堵塞的情况,而该资源由于不足或者是不合理分配,导致程序无法正常终止。
如下图所示,线程 A 持有资源 2,线程 B 持有资源 1,他们同时都想申请对方的资源,所以这两个线程因为得不到对方的资源就会互相等待而进入死锁状态。
产生死锁的四个条件
- 互斥条件: 该资源在任何一个时刻都归一个线程占用
- 请求与保持条件: 一个线程因请求资源堵塞,其已持有的资源不会释放。
- 不剥夺条件: 线程已获得的资源在未使用完之前不能被其他线程强行剥夺,只能由自己使用完毕后才释放资源。
- 循环等待条件: 若干线程形成头尾相接的循环等待某资源。
破坏死锁的条件
- 破环请求与保持条件: 一次性申请到所有需要的资源
- 破坏不剥夺条件: 占用部分的资源进一步申请其他的资源,若申请不到,则释放占有的资源。
- 破环循环等待条件: 靠按序申请资源来预防。按某一顺序申请资源,释放资源则反序释放。破坏循环等待条件。
如何避免死锁
避免死锁就是在资源分配时,借助于算法(比如银行家算法)对资源分配进行合理估算,使其进入安全状态。
sleep()方法和wait()方法的区别和共同点?
- sleep方法是Thread的方法,而wait方法任意对象都持有的方法。
- 二者主要区别的是:sleep()方法不会释放锁,而wait()方法会释放锁。
- 二者都可以暂停线程的执行,但wait()被用于线程之间的交互/通信,而sleep()方法主要用于暂停执行。
- wait方法使用后,需要其他的线程调用同一对象来唤醒,使用notify()或者notifyall()方法。sleep()方法被执行完成后,线程会自动苏醒。或者使用wait(long timeout)超时后线程也会自动苏醒。
- wait不需要捕获异常,而sleep需要捕获异常。
为什么我们要调用start()方法会执行run()方法,而不能直接调用run()方法?
调用start()方法方可启动线程并使其进入就绪状态,这时只要获取cpu执行就可进入运行状态,此时为多线程工作。而直接调用run()方法,实质上是调用main线程的一个普通方法,不是一个多线程下的工作。
3. Lock锁
传统synchronized
/**
* 真正的多线程开发,公司中的开发,降低耦合性
* 线程就是一个单独的资源类,没有任何附属的操作!
* 1、 属性、方法
*/
package com.liang;
public class SaleTicketDemo1 {
public static void main(String[] args) {
//启动主线程后,开启两个线程,多线程操作一个资源类
final Ticket ticket = new Ticket();
// new Thread(()->{
// for (int i = 1; i < 40; i++) {
// ticket.sale();
// }
// },"A").start();
//
// new Thread(()->{
// for (int i = 1; i < 40; i++) {
// ticket.sale();
// }
// },"B").start();
//使用更简洁lambada表达式,清晰易懂
new Thread(()-> {for (int i = 1; i < 40; i++) ticket.sale();
},"A").start();
new Thread(()-> {
for (int i = 1; i < 40; i++) ticket.sale();
},"B").start();
new Thread(()->{for(int i =0 ;i<40 ;i++) ticket.sale();
},"B").start();
}
}
//创建一个票类,拥有售票功能
class Ticket{
//票数,为每个对象的成员变量
private int ticket = 30;
//synchronized保证代码的同步问题
public synchronized void sale()
{
if(ticket>0){
System.out.println(Thread.currentThread().getName()+"取得了第"+(ticket--)+"票");
}
}
}
Lock接口
Lock位于JUC下的接口类,实现类有三种,可重入锁,读锁,写锁,默认为非公平锁,但可以设置为公平锁。
公平锁: 按照先来后到的顺序,依次执行线程,不允许插队现象。
非公平锁: 允许插队现象,先到的线程可能后面执行,显得不公平。
使用
package com.liang;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class SaleTicketDemo2 {
//获取资源对象
public static void main(String[] args) {
Ticket2 ticket = new Ticket2();
//创建线程
new Thread(()->{for(int i =1 ;i<40;i++) ticket.sale();},"A").start();
new Thread(()->{for(int i =1 ;i<40;i++) ticket.sale();},"B").start();
new Thread(()->{for(int i =1 ;i<40;i++) ticket.sale();},"C").start();
}
}
//资源对象,使用锁机制Lock
class Ticket2{
private int number = 30;
//创建锁
Lock lock = new ReentrantLock(); //可重入锁
//new ReentrantReadWriteLock(); 读写锁
//操作普通方法,其内部加锁
public void sale()
{
try{
//加锁
lock.lock();
if(number>0){
System.out.println(Thread.currentThread().getName()+"取到了第"+(number--)+"票");
}
}catch (Exception e){
e.printStackTrace();
}finally {
//使用完lock对象要释放
lock.unlock();
}
}
}
Sychronized和Lock的区别
1.Sychronized是内置的Java关键字,Lock是Java的一个接口。
2.Sychronized不需要手动的释放锁,而Lock需要手动的释放锁。
3.Sychronized无法判断获取锁的动态,而Lock可以判断是否获取到了锁。
4.Sychronized未获取到的锁会一直等待获取锁,而Lock会使用tryLock()方法取尝试获取锁。
5.Sychronzied是可重入锁,不可以中断,非公平。Lock也是同步锁,可以判读锁,默认为非公平(可以自己设置)。
5. Sychronized适用于锁少量的代码同步问题,而Lock适用于锁大量的代码同步问题(因为Sychronized更消耗系统的资源)。
通过代码查看电脑核数
//Auditor 核数
public class Auditor {
public static void main(String[] args) {
//获取电脑核数
System.out.println(Runtime.getRuntime().availableProcessors());
}
}
4.生产者和消费者
面试常考题: 单例模式,排序算法,生产者和消费者,死锁
生产者和消费者Synchronized
/**
*
* 线程之间的通信: 生产者和消费者问题! 等待唤醒,通知唤醒
* 线程交替运行,通过同一变量控制
*/
public class PCTest {
public static void main(String[] args) {
//1.创建要操作的资源对象
Data data = new Data();
//2.创建两个线程
new Thread(()->{
try {
for (int i = 0; i < 10; i++) {
data.add();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
},"A").start();
new Thread(()->{
try {
for (int i = 0; i < 10; i++) {
data.remove();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
},"B").start();
}
}
class Data{ //资源类
private int number =0;
//同步生产操作
public synchronized void add() throws InterruptedException {
//资源不为空,则进入等待
if(number!=0){
this.wait();
}
System.out.println(Thread.currentThread().getName()+"=>"+(number++));
//此时唤醒消费进程
this.notify();
}
//同步消费操作
public synchronized void remove() throws InterruptedException {
if(number ==0){
this.wait(); //资源为空则要进行生产,把执行权力腾出
}
System.out.println(Thread.currentThread().getName()+"=>"+(number--));
//唤醒生产线程
this.notify();
}
}
两个线程是可以满足以上的要求,实现线程之间的通信,一个负责消费,一个负责生产,二者之间只能运行一个,可以唤醒对方的线程。
问题存在
当生产者和消费者问题,使用4个线程启动时,会出现数据混乱的状态,以上代码不能满足预期要求。
虚假唤醒就是在多线程执行过程中,线程间的通信未按照我们幻想的顺序唤醒,故会出现数据不一致等不符合我们预期的结果。比如 我的想法是:加1和减1交替执行,他却出现了2甚至3这种数。
为什么会出现虚假唤醒?
虚假唤醒时出现与预期结果不一样的数据,由于我们使用的if判断语句,第一次线程进入等待状态就已经执行过了,所以线程再次唤醒时,就会跳出if方法,从而得到了超出了1的数据。虚假唤醒存在唤醒不当,不会按照我们的想法去唤醒。
虚假唤醒解决
将if判断为while判断
解释:
while是为了再一次循环判断刚刚争抢到锁的线程是否满足继续执行下去的条件,条件通过才可以继续执行下去,不通过的线程只能再次进入wait状态,由其他活着的、就绪状态的线程进行争抢锁。
JUC版本的生产者和消费者问题(基于Lock)
JUC下的线程等待和线程唤醒时通过Condition实现类来实现的,相比于对象内部包含线程等待和线程唤醒的方法,JUC的线程等待和线程唤醒面向接口。
实现
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* JUC下的生产者和消费者问题
* 基于Lock实现,其中使用到Condition条件接口
* JUC下的线程等待和线程唤醒时通过Condition来实现的
*/
public class PCTest1 {
public static void main(String[] args) {
//获取线程资源类
final Data2 data2 = new Data2();
new Thread(()->{for (int i = 0; i < 10; i++)data2.add();},"A").start();
new Thread(()->{for (int i = 0; i < 10; i++)data2.remove();},"B").start();
new Thread(()->{for (int i = 0; i < 10; i++)data2.add();},"C").start();
new Thread(()->{for (int i = 0; i < 10; i++)data2.remove();},"D").start();
}
}
class Data2{
private int number = 0;
Lock lock = new ReentrantLock(); //可重入锁
//获取条件
Condition condition = lock.newCondition();
//普通方法内部加锁
public void add()
{
try{
//加锁
lock.lock();
while(number!=0){
condition.await(); //还有资源,调用condition的等待方法
}
System.out.println(Thread.currentThread().getName()+"=>"+(++number));
//这里唤醒别的线程
condition.signalAll();
}catch (Exception e){
}finally {
lock.unlock();
}
}
public void remove()
{
try{
//加锁
lock.lock();
while(number==0){
condition.await(); //没有资源,调用condition的等待方法
}
System.out.println(Thread.currentThread().getName()+"=>"+(--number));
//这里唤醒别的线程
condition.signalAll();
}catch (Exception e){
}finally {
lock.unlock();
}
}
}
signal和signalAll方法的区别
void signal() :Wakes up one waiting thread. 唤醒在等待的单个线程
void signalAll():Wakes up all waiting threads. 唤醒等待的所有线程
唤醒的单个进程可能未满足执行条件,会继续等待。
Condition精确的通知和唤醒线程
上面输出的结果,Condition未按照我们想要的顺序执行,可以通过Condition精确的通知和唤醒线程
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
//Condition精确的通知唤醒线程
public class PCCondition {
public static void main(String[] args) {
Data3 data3 = new Data3();
new Thread(()->{for (int i = 0; i < 10; i++)data3.a();},"A").start();
new Thread(()->{for (int i = 0; i < 10; i++)data3.b();},"B").start();
new Thread(()->{for (int i = 0; i < 10; i++)data3.c();},"C").start();
}
}
class Data3{
private int number = 1;
Lock lock = new ReentrantLock(); //可重入锁
//获取条件,需要三个条件类
Condition condition1 = lock.newCondition();
Condition condition2 = lock.newCondition();
Condition condition3 = lock.newCondition();
//普通方法内部加锁
public void a()
{
//加锁
lock.lock();
try{
while(number!=1){
condition1.await(); //还有资源,调用condition的等待方法
}
System.out.println(Thread.currentThread().getName()+"=>"+number);
number =2;
//这里唤醒别的线程
condition2.signal();
}catch (Exception e){
e.printStackTrace();
}finally {
lock.unlock();
}
}
public void b()
{
//加锁
lock.lock();
try{
while(number!=2){
condition2.await();
}
System.out.println(Thread.currentThread().getName()+"=>"+number);
//这里唤醒别的线程
number =3;
condition3.signal();
}catch (Exception e){
e.printStackTrace();
}finally {
lock.unlock();
}
}
public void c()
{
//加锁
lock.lock();
try{
while(number!=3){
condition3.await(); //没有资源,调用condition的等待方法
}
System.out.println(Thread.currentThread().getName()+"=>"+(number));
//这里唤醒别的线程
number =1;
condition1.signal();
}catch (Exception e){
e.printStackTrace();
}finally {
lock.unlock();
}
}
}
注意: 在其他线程通过Condition来指明那个线程将要唤醒
5.八锁的现象
如何判断锁的是谁?
深度理解锁
八锁其实是锁的八个问题,关于八个问题如下:
1.标准情况下,两个线程先打印还是打电话?
2.发短息延迟4秒,两个线程先发短信还是打电话?
此时锁的都是同一资源对象
3.增加了一个普通方法后,先执行发短信还是Hello普通方法?(普通方法不需要关注锁)
4.两个对象,两个同步方法,此时先发短信还是打电话(发短信内部有延迟4s)
锁对象是两个不同的资源对象,因此谁先快执行谁执行。
5.增加两个静态同步方法,只有一个对象,先发短信还是打电话?
6.两个对象!增加两个静态的同步方法,先发短信还是打电话?
两个对象的Class模板只有一个,static静态方法加载的是Class模板。
7.1个静态的同步方法,1个普通的同步方法,一个对象,先发短信还是打电话
两个不同的锁对象,一个是class模板,一个是当前的资源对象
8…1个静态的同步方法,1个普通的同步方法,两个对象,先发短信还是打电话
小结
new this 具体的一个手机对象
static Class 唯一的一个模板
6.集合类的不安全及措施
List不安全
List的实现类ArrayList是线程不安全的,在多线程中会出现并发时修改错误
List为何是线程不安全的
ArrayList的add()是没有进行同步的方法,因此是线程不安全的。
List不安全解决方案
-
将ArrayList替换为Vector
vector是使用了synchronized关键字来保证同步。 -
使用Collections工具类转化为同步的list
-
使用CopyOnWriteArrayList
CopyOnWrite写入时复制 COW是计算机设计策略的一种优化策略。
多个线程调用的时候,list,读取的时候是固定的,写入(覆盖),在写入时编避免覆盖,造成数据问题,读写分离。
相比于Vector,CopyOnWriteArrayList的优势在哪?
Vector和CopyOnWriteArrayList都是线程安全的List,底层都是数组实现的,Vector的每个方法都进行了加锁,而CopyOnWriteArrayList的读操作是不加锁的,因此CopyOnWriteArrayList的读性能远高于Vector,Vector每次扩容的大小都是原来数组大小的2倍,而CopyOnWriteArrayList不需要扩容,通过COW思想就能使得数组容量满足要求。
在资源竞争不激烈的情形下,使用Lock性能稍微比synchronized差点点。但是当同步非常激烈的时候,synchronized的性能一下子能下降好几十倍。
set
在多线程同样也会出现并发时修改错误,因为它是普通方法,没有实现数据的同步。
解决方案:
1.将HashSet通过使用Collections工具类,将其转化为同步的Set
2.使用CopyOnWriteArraySet
HashSet的底层原理就是map,key是无法重复的
Map不安全
HashMap是线程不安全的,其底层没有加任何的同步措施,同样在多线程会出现并发时修改错误。
解决方案:
使用线程安全的Map类,例如ConcurrentHashMap
ConcurrentHash类的put方法中使用了synchronized来保证同步。
7.Callable(可调用的)
Callable是一个函数式接口,用于创建线程。返回结果并可能引发异常的任务。实现者只需要去实现Callable接口,并重写它的call即可,和Runable相似。
Runable没有返回值并且不会抛出被检查出来的异常。
Executors包含的实例方法,从其他普通形式转化为Callable接口类。
相比于Runable接口,Callable接口
- 可以有返回值
- 可以抛出异常
- 方法不同,call()
Java创建线程的三种方式
- 继承Thread类
- 实现Runable接口
- 实现Callable接口和Future类
具体实现
public class CallableTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
MyThread myThread =new MyThread(); //创建一个自己写的线程类
//FutureTask是JDK并发包为Future接口提供的一个实现,代表一个支持取消操作(cancel)的异步计算任务。
FutureTask futureTask = new FutureTask(myThread); //适配器类
new Thread(futureTask,"A").start();
new Thread(futureTask,"B").start();
Integer o = (Integer) futureTask.get(); //get方法可能会堵塞,得到计算的结果
System.out.println(o);
}
}
class MyThread implements Callable<Integer> {
@Override
public Integer call() throws Exception {
System.out.println("进入call方法");
return 1024; //可以有返回值
}
}
FutureTask有三种执行状态。
未启动:在FutureTask.run()还没执行之前,FutureTask处于未启动状态。当创建一个FutureTask对象,并且run()方法未执行之前,FutureTask处于未启动状态。
2、已启动:FutureTask对象的run方法启动并执行的过程中,FutureTask处于已启动状态。
3、已完成:FutureTask正常执行结束,或者FutureTask执行被取消(FutureTask对象cancel方法),或者FutureTask对象run方法执行抛出异常而导致中断而结束,FutureTask都处于已完成状态。
FutureTask的get和cancel的执行示意图如下:
细节:
1.有缓存
2.结果可能需要等待,会堵塞。
8.CountDownLatch、CyclicBarrier、Semaphore
CountDownLatch
实现
public class CountDownLatchDemo {
public static void main(String[] args) {
//减法计算器(参数为线程数目)
CountDownLatch countDownLatch = new CountDownLatch(6);
for (int i = 1; i <=6; i++) {
new Thread(()->{
System.out.println(Thread.currentThread().getName()+"go out");
countDownLatch.countDown(); //数量减1
},String.valueOf(i)).start();
}
//直到计算器为0,才向下执行
try {
countDownLatch.await();
System.out.println("close door");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
原理:
- countDownLatch.countDown(); // 数量-1
- countDownLatch.await(); // 等待计数器归零,然后再向下执行
每次有线程调用 countDown() 数量-1,假设计数器变为0,countDownLatch.await() 就会被唤醒,继续
执行!
CyclicBarrier(加法计算器)
public static void main(String[] args) {
/**
* 集齐7颗龙珠召唤神龙
*/
// 召唤龙珠的线程
CyclicBarrier cyclicBarrier = new CyclicBarrier(7,()->{
System.out.println("召唤神龙成功");
});
for (int i = 1; i <= 7; i++) {
final int temp = i;
new Thread(()->{
System.out.println(Thread.currentThread().getName()+"集结了"+temp+"个龙珠");
try {
cyclicBarrier.await(); //集结完毕进入等待状态
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}).start();
}
}
Semaphore(信号量)
实现
原理
-
semaphore.acquire() 获得,假设如果最大线程数已经满了,等待,等待被释放为止!
-
semaphore.release(); 释放,会将当前的信号量释放 + 1,然后唤醒等待的线程!
作用:多个共享资源互斥的使用! 并发限流,控制最大的线程数!
9.读写锁(ReadWriteLock)
ReentrantReadWriteLock是ReadWriteLock的一个实现类,ReadWriteLock维护者一对关联的locks,一个用于读操作,一个用于写操作。
获取读锁时读的时候多个线程同时读,获取写锁时写的时候只有一个线程写。
一个线程成功获取读锁时,会得到之前最后一次写锁时更新的内容。
读写锁允许访问共享数据时的并发性高于互斥锁锁允许的并发性。
演示
不使用读写锁,此时写会同时有多个线程,不符合实际。
public class ReadWriteTest {
public static void main(String[] args) {
MyCache myCache = new MyCache();
//创建多个线程
for (int i = 1; i <=5; i++) {
final int temp =i; //保证temp变量的不可变
new Thread(()->{
myCache.put(temp+"",temp);
},String.valueOf(i)).start();
}
for (int i = 1; i <= 5; i++) {
final int temp =i;
new Thread(()->{
myCache.get(temp+"");
},String.valueOf(i)).start();
}
}
}
//不加锁的自定义缓存,写入的时候可以有多个线程在写
class MyCache{
//添加volatile保证可见性
private volatile Map<String,Object> map = new HashMap<>();
//写入
public void put(String key,Object value){
System.out.println(Thread.currentThread().getName()+"写入值"+key);
map.put(key, value);
System.out.println(Thread.currentThread().getName()+"写入成功!");
}
//读取
public void get(String key){
System.out.println(Thread.currentThread().getName()+"读取"+key);
Object o = map.get(key);
System.out.println(Thread.currentThread().getName()+"读取成功!值为:"+o);
}
}
使用了读写锁,此时只有线程写,其他的线程都在读。
public class ReadWriteTest {
public static void main(String[] args) {
MyLockCache myCache = new MyLockCache();
//创建多个线程
for (int i = 1; i <=5; i++) {
final int temp =i; //保证temp变量的不可变
new Thread(()->{
myCache.put(temp+"",temp);
},String.valueOf(i)).start();
}
for (int i = 1; i <= 5; i++) {
final int temp =i;
new Thread(()->{
myCache.get(temp+"");
},String.valueOf(i)).start();
}
}
}
class MyLockCache{
//使用volatile为了保证性可见性
private volatile Map<String,Object> map = new HashMap<>();
//使用读写锁,更加细粒度的控制
private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
//写入
public void put(String key,Object value){
//获取写锁
readWriteLock.writeLock().lock();
try{
System.out.println(Thread.currentThread().getName()+"写入"+key);
map.put(key, value);
System.out.println(Thread.currentThread().getName()+"写入成功");
}catch (Exception e){
e.printStackTrace();
}finally {
readWriteLock.writeLock().unlock();
}
}
//读取
public void get(String key){
//获取读锁
readWriteLock.readLock().lock();
try {
System.out.println(Thread.currentThread().getName()+"读取"+key);
Object o = map.get(key);
System.out.println(Thread.currentThread().getName()+"读取成功!值为:"+o);
}catch (Exception e){
e.printStackTrace();
}finally {
readWriteLock.readLock().unlock();
}
}
}
可以控制只有线程在写,其他的线程不能写。
10.阻塞队列
阻塞队列用来存放当前未得到执行权利的线程,队列有大小限制。
写入: 如果队列满了,就必须阻塞等待
取: 如果队列是空的,必须阻塞等待生产。
阻塞队列是集合的一种
什么情况下使用阻塞队列: 多线程并发处理,线程池!
阻塞队列的使用
四组API
方式 | 抛出异常 | 有返回值,不抛出异常 | 阻塞等待 | 超时等待 |
---|---|---|---|---|
添加 | add() | offer() | put() | offer(,) |
移除 | remove() | poll() | take() | poll(.,) |
检查队首元素 | element | peek() | - | - |
1.抛出异常
public class BlockingQueueTest {
public static void main(String[] args) {
//创建阻塞队列,使用ArrayBlockingQueue
ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(3);//阻塞队列容量未3
//超出容量使用以下方法会抛出异常
System.out.println(queue.add("a"));
System.out.println(queue.add("b"));
System.out.println(queue.add("c"));
System.out.println(queue.add("d")); //抛出队列已满异常
System.out.println(queue.remove("a"));
System.out.println(queue.remove("b"));
System.out.println(queue.remove("c"));
//System.out.println(queue.remove("d"));
}
// 检查队首元素
System.out.println(queue.element());
抛出队列已满异常
2.不抛出异常,有返回值的
System.out.println(queue.offer("a"));
System.out.println(queue.offer("b"));
System.out.println(queue.offer("c"));
System.out.println(queue.offer("a"));
System.out.println(queue.poll());
System.out.println(queue.poll());
System.out.println(queue.poll());
System.out.println(queue.poll());
//检查队首元素
System.out.println(queue.peek());
不会有返回值,且有返回值false
3.阻塞,等待
private static void t3(ArrayBlockingQueue<String> queue) throws InterruptedException {
queue.put("a");
queue.put("b");
queue.put("c");
queue.put("d");
System.out.println(queue.take());
System.out.println(queue.take());
System.out.println(queue.take());
System.out.println(queue.take());
}
超过线程容量,不会抛出异常,会一直等待。
4.阻塞,限时等待
System.out.println(queue.offer("a"));
System.out.println(queue.offer("b"));
System.out.println(queue.offer("c"));
System.out.println(queue.offer("d",2, TimeUnit.SECONDS));
System.out.println(queue.poll());
System.out.println(queue.poll());
System.out.println(queue.poll());
System.out.println(queue.poll(2,TimeUnit.SECONDS));
有时限等待,和第二种一致,只是有时间限制。
SynchronousQueue同步队列
同步队列,没有容量大小,存入了值必须取出来才可以再存入。
实现
/**
* /**
* * 同步队列
* * 和其他的BlockingQueue 不一样, SynchronousQueue 不存储元素
* * put了一个元素,必须从里面先take取出来,否则不能在put进去值!
* */
public static void main(String[] args) {
BlockingQueue<String> queue = new SynchronousQueue<>();
//存入值
new Thread(()->{
try {
System.out.println(Thread.currentThread().getName()+"put 1");
queue.put("1");
System.out.println(Thread.currentThread().getName()+"put 2");
queue.put("2");
System.out.println(Thread.currentThread().getName()+"put 3");
queue.put("3");
} catch (InterruptedException e) {
e.printStackTrace();
}
},"A").start();
new Thread(()->{
try {
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName()+"获得"+queue.take());
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName()+"获得"+queue.take());
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName()+"获得"+queue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
},"B").start();
}
11.线程池(重点)
掌握三大方法,7大参数,4种拒绝策略
池化技术:事先准备一些资源,可以拿来使用,但使用完要归还。
我们都知道对象的创建和销毁,十分的耗内耗和性能,因此争对于频繁的创建对象和销毁对象,JVM做了优化,使用各种各样的池子来管理对象的创建和销毁,在线程中使用线程池来管理线程,优化资源的使用。
线程池的好处
- 降低资源消耗
- 提高响应速度
- 方便管理
小结: 线程复用、可以控制最大并发数、管理线程。
线程池
public class ThreadPoolTest {
//Executors工具类的3大方法
public static void main(String[] args) {
ExecutorService threadPool = Executors.newSingleThreadExecutor(); //创建单个线程池
ExecutorService threadPool1 = Executors.newFixedThreadPool(4); //创建固定线程池大小
ExecutorService threadPool2 = Executors.newCachedThreadPool(); //创建可伸缩线程池
ScheduledExecutorService threadPool3 = Executors.newScheduledThreadPool(2); //创建一个指定核心线程数的计划线程池
try{
for (int i = 0; i < 100; i++) {
//执行线程池
threadPool.execute(()->{
System.out.println(Thread.currentThread().getName()+"ok");
});
}
}catch (Exception e){
e.printStackTrace();
}finally {
threadPool.shutdown(); //关闭线程池
}
}
}
7大参数
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(5, 5,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
// 本质ThreadPoolExecutor()
public ThreadPoolExecutor(int corePoolSize, // 核心线程池大小
int maximumPoolSize, // 最大核心线程池大小
long keepAliveTime, // 超时了没有人调用就会释放
TimeUnit unit, // 超时单位
BlockingQueue<Runnable> workQueue, // 阻塞队列
ThreadFactory threadFactory, // 线程工厂:创建线程的,一般
不用动
RejectedExecutionHandler handle // 拒绝策略) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
- int corePoolSize, // 核心线程池大小
- int maximumPoolSize, // 最大核心线程池大小
- long keepAliveTime, // 超时了没有人调用就会释放
- TimeUnit unit, // 超时单位
- BlockingQueue workQueue, // 阻塞队列
- ThreadFactory threadFactory, // 线程工厂:创建线程的,一般不用动
- RejectedExecutionHandler handle // 拒绝策略
手动创建一个线程池
public class MyThreadPool {
/**
* 线程池四种拒绝策略
* AbortPolicy -- 当任务添加到线程池被拒绝时,它将抛出RejectedExecutionException异常
* CallerRunsPolicy -- 当任务添加线程池被拒绝时,会在线程池当前正在运行的Thread线程池中处理被拒绝的任务
* DiscardOldestPolicy --当任务添加到线程池被拒绝时,线程池会放弃等待队列中最旧的未处理的任务。然后将被拒绝的任务添加到等待队列中。
* DiscardPolicy --当任务添加到线程池被拒绝时,线程池将会丢弃被拒绝的任务。
* @param args
*/
public static void main(String[] args) {
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
2, //核心线程数
5, //最大线程数
3, //活跃时间,
TimeUnit.SECONDS, //时间单位
new LinkedBlockingQueue<>(3), //链表阻塞队列
Executors.defaultThreadFactory(), // 默认的线程池工厂
new ThreadPoolExecutor.AbortPolicy() //拒绝策略
);
try{
for (int i = 0; i < 10; i++) {
threadPool.execute(()->{
System.out.println(Thread.currentThread().getName()+"ok");
});
}
}catch (Exception e){
e.printStackTrace();
}finally {
threadPool.shutdown();
}
}
}
四种拒绝策略
- AbortPolicy – 当任务添加到线程池被拒绝时,它将抛出RejectedExecutionException
- CallerRunsPolicy – 当任务添加线程池被拒绝时,会在线程池当前正在运行的Thread线程池中处理被拒绝
- DiscardOldestPolicy --当任务添加到线程池被拒绝时,线程池会放弃等待队列中最旧的未处理的任务。然后将
- DiscardPolicy --当任务添加到线程池被拒绝时,线程池将会丢弃被拒绝的任务。
小结与拓展
池的最大线程数目的大小如何设置
- CPU密集型 :池的最大线程数目就是CPU的核数,保持CPU效率最高!
- IO密集型:池的最大线程数目大于程序中十分耗IO的线程。
CPU密集型
public class MyThreadPool01 {
//CPU密集型,定义线程池中最大线程数目为核数
public static void main(String[] args) {
System.out.println(Runtime.getRuntime().availableProcessors());
//CPU密集型
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2,
Runtime.getRuntime().availableProcessors(),
3,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.DiscardOldestPolicy());
try{
for(int i=1 ;i<=9; i++){
threadPool.execute(()->{
System.out.println(Thread.currentThread().getName()+"ok");
});
}
}catch (Exception e){
e.printStackTrace();
}finally {
threadPool.shutdown();
}
}
}
IO密集型则需要去判断程序中耗IO的线程总数,然后线程池线程数大于即可。
12.四大函数式接口
新时代的程序员:lambda表达式,链式编程,函数式接口,stream流计算。
函数式接口: 只有一个方法的接口
foreach里面的参数底层是一个消费者函数式接口
Function函数式接口(有传入参数T和返回类型R)
/**
* 函数式接口
*/
public class FunctionDemo {
public static void main(String[] args) {
//使用function接口(需传入参数和指定返回值类型)
Function<String,String> f1 = (str)->{return str;}; //函数式接口和lambda表达式整合
System.out.println(f1.apply("liang"));
}
}
Predicate断定式接口(有输入参数,有返回值类型但是是布尔值)
实践
Consumer消费者接口(只有输入参数,没有返回值类型)
实践
Supplier 供给型接口:只有输出,没有输入
实践
13.Stream流式计算
什么是Stream计算
大数据: 存储+计算
集合、MySQL本质就是存储数据的。
计算交给流来实现。
实现
14.分支合并(ForkJoin)
什么是ForkJoin
Fork/Join 框架是 Java7 提供了的一个用于并行执行任务的框架, 是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。
使用的算法
工作窃取算法
工作窃取(work-stealing)算法是指某个线程从其他队列里窃取任务来执行。
那么为什么需要使用工作窃取算法呢?假如我们需要做一个比较大的任务,我们可以把这个任务分割为若干互不依赖的子任务,为了减少线程间的竞争,于是把这些子任务分别放到不同的队列里,并为每个队列创建一个单独的线程来执行队列里的任务,线程和队列一一对应,比如 A 线程负责处理 A 队列里的任务。但是有的线程会先把自己队列里的任务干完,而其他线程对应的队列里还有任务等待处理。干完活的线程与其等着,不如去帮其他线程干活,于是它就去其他线程的队列里窃取一个任务来执行。而在这时它们会访问同一个队列,所以为了减少窃取任务线程和被窃取任务线程之间的竞争,通常会使用双端队列,被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行。
工作窃取算法的优点是充分利用线程进行并行计算,并减少了线程间的竞争,其缺点是在某些情况下还是存在竞争,比如双端队列里只有一个任务时。并且消耗了更多的系统资源,比如创建多个线程和多个双端队列。
设计ForkJoin框架
第一步分割任务。首先我们需要有一个 fork 类来把大任务分割成子任务,有可能子任务还是很大,所以还需要不停的分割,直到分割出的子任务足够小。
第二步执行任务并合并结果。分割的子任务分别放在双端队列里,然后几个启动线程分别从双端队列里获取任务执行。子任务执行完的结果都统一放在一个队列里,启动一个线程从队列里拿数据,然后合并这些数据。
Fork/Join 使用两个类来完成以上两件事情:
ForkJoinTask:我们要使用 ForkJoin 框架,必须首先创建一个 ForkJoin 任务。它提供在任务中执行 fork() 和 join() 操作的机制,通常情况下我们不需要直接继承 ForkJoinTask 类,而只需要继承它的子类,Fork/Join 框架提供了以下两个子类:
RecursiveAction:用于没有返回结果的任务。
RecursiveTask :用于有返回结果的任务。
ForkJoinPool :ForkJoinTask 需要通过 ForkJoinPool 来执行,任务分割出的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。当一个工作线程的队列里暂时没有任务时,它会随机从其他工作线程的队列的尾部窃取一个任务。
Fork/Join框架的异常处理
ForkJoinTask 在执行的时候可能会抛出异常,但是我们没办法在主线程里直接捕获异常,所以 ForkJoinTask 提供了 isCompletedAbnormally() 方法来检查任务是否已经抛出异常或已经被取消了,并且可以通过 ForkJoinTask 的 getException 方法获取异常。
if(task.isCompletedAbnormally())
{
System.out.println(task.getException());
}
ForkJoin框架的实现原理
ForkJoinPool是由ForkJoinTask数组和ForkJoinWorkerThread数组组成。
ForkJoinTask数组负责存放程序提交给ForkJoinPool的任务。而ForkJoinWorkerThread数组负责执行这些任务。
原理探究
ForkJoinTask 的 fork 方法实现原理。当我们调用 ForkJoinTask 的 fork 方法时,程序会调用 ForkJoinWorkerThread 的队列里的 push 方法异步的执行这个任务,然后立即返回结果。代码如下:
push方法把当前任务存放在 ForkJoinTask 数组 queue 里。然后再调用 ForkJoinPool 的 signalWork() 方法唤醒或创建一个工作线程来执行任务。代码如下:
ForkJoinTask 的 join 方法实现原理。Join 方法的主要作用是阻塞当前线程并等待获取结果。让我们一起看看 ForkJoinTask 的 join 方法的实现,代码如下:
首先,它调用了 doJoin() 方法,通过 doJoin() 方法得到当前任务的状态来判断返回什么结果,任务状态有四种:已完成(NORMAL),被取消(CANCELLED),信号(SIGNAL)和出现异常(EXCEPTIONAL)。
- 如果任务状态是已完成,则直接返回任务结果。
- 如果任务状态是被取消,则直接抛出 CancellationException。
- 如果任务状态是抛出异常,则直接抛出对应的异常。
在 doJoin() 方法里,首先通过查看任务的状态,看任务是否已经执行完了,如果执行完了,则直接返回任务状态,如果没有执行完,则从任务数组里取出任务并执行。如果任务顺利执行完成了,则设置任务状态为 NORMAL,如果出现异常,则纪录异常,并将任务状态设置为 EXCEPTIONAL。
实现
1.创建一个ForkJoin任务的实现类
package com.forkjoin;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
/**
* 如何使用forkjoin
* 计算类需要继承ForkJoinTask
* ForkJoinTask的实现类:RecursiveActive,RecursiveTask
*/
public class ForkJoinDemo extends RecursiveTask<Long> {
private Long start;
private Long end;
//临界值
private Long temp = 10000L;
//构造方法
public ForkJoinDemo(Long start, Long end) {
this.start = start;
this.end = end;
}
//计算方法
@Override
protected Long compute() {
//介于临界值分为两种情况
Long sum = 0L;
if ((end - start) < temp) {
for (Long i = start; i <=end; i++) {
sum += i;
}
return sum;
} else {//使用forkjoin计算
Long middle = (start + end)/2; //中间值
ForkJoinTask<Long> task1 = new ForkJoinDemo(start, middle);
task1.fork(); //将任务放到ForkJoinPool中的ForkJoin任务数组中,ForkJoinPool唤醒线程来执行它。
ForkJoinTask<Long> task2 = new ForkJoinDemo(middle + 1, end);
task2.fork();
return task1.join() + task2.join(); //join方法获取每个子任务的返回结果
}
}
}
2.执行任务不同做法
public class ForkJoinTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//m1(); // 普通方法
//m2(); // 使用forkjoin
m3(); // 使用stream流,特别快
}
private static void m3() {
long start = System.currentTimeMillis();
//stream并行流计算
long sum = LongStream.rangeClosed(1L, 10_0000_0000L)
.parallel() //并行计算
.reduce(0, Long::sum);//Long::sum java新特性,更简洁方法调用
long end = System.currentTimeMillis();
System.out.println("sum = "+sum+",时间: "+(end-start));
}
private static void m2() throws InterruptedException, ExecutionException {
Long start = System.currentTimeMillis();
//创建和使用ForkJoinPool
ForkJoinPool forkJoinPool = new ForkJoinPool();
//分割任务
ForkJoinTask<Long> task =new ForkJoinDemo(1L, 10_0000_0000L);
//使用线程池提交任务。
ForkJoinTask<Long> submit = forkJoinPool.submit(task);
//得到执行的结果
Long sum = submit.get();
Long end = System.currentTimeMillis();
System.out.println("使用forkJoin返回结果:"+sum+"执行结果"+(end-start));
}
private static void m1() {
Long sum = 0L;
//开始时间
Long start = System.currentTimeMillis();
//结束时间
for(Long i= 1L; i< 10_0000_0000L;i++)
{
sum += i;
}
Long end = System.currentTimeMillis();
System.out.println("普通方法消耗时间为:"+(end-start));
}
}
15.异步回调
回调分为同步和异步。
同步回调 : 我们常用的一些请求都是同步回调的,同步回调是阻塞的,单个的线程需要等待结果的返回才能继续执行。
异步回调: 有的时候,我们不希望程序在某个执行方法上一直阻塞,需要先执行后续的方法,那就是这里的异步回调。我们在调用一个方法时,如果执行时间比较长,我们可以传入一个回调的方法,当方法执行完时,让被调用者执行给定的回调方法。
Future设计的初衷:对将来的某个事件的结果进行建模。
异步执行: CompletableFuture//异步执行: 不论执行成功或者失败,都会回调。
异步回调分为有返回值和没有返回值
runAsync()没有返回值
supplyAsync()有返回值
whenCompleteAsync的构造参数(消费型接口变量,异常类变量)
演示
CompletableFuture<Integer> supplyAsync = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + "调用supplyAsync()方法,有返回值");
int i=10/0;
return 200;
});
System.out.println(supplyAsync.whenComplete((t,u)->{
System.out.println("t => "+t); //获取返回的结果
System.out.println("u =>"+u); //获取抛出的异常
//异常捕获
}).exceptionally((e)->{
System.out.println(e.getMessage());
return 404; //可以获取错误返回的结果
}).get());
16.JMM
谈谈对volatile的理解
volatile是Java虚拟机保证轻量级的同步机制
1.保证可见性
2不保证原子性
3.禁止指令重排
什么是JMM
JMM:Java内存模型,其抽象了线程和主内存之间的关系。
关于JMM的一些同步约定:
- 线程解锁前,必须吧共享变量立刻刷回内存。
- 线程加锁前,必须读取主存中的共享变量最新值到工作内存!
- 加锁和解锁是同一把锁
围绕着线程、工作内存、主内存的8种操作
加上lock,unlock八种
JMM使用的八种指令规定
问题:程序不知道主内存共享变量的值已经被修改过了
17. volatile
volatile保证可见性
public class volatileDemo {
private volatile static int num = 0;
//visibility 可见性
//volatile保证可见性, 若不加volatile会进入死循环
public static void main(String[] args) {
//创建一个次线程
new Thread(()->{
while (num==0){ //次线程不知道主线程是否修改了共享变量的值
;
}
System.out.println(Thread.currentThread().getName()+": num ="+num);
},"A").start();
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
num =1;
System.out.println(Thread.currentThread().getName()+": num ="+num);
}
}
volatile不保证原子性
原子性:是指一个操作是不可分割,要么都执行,要么都不执行。
private volatile static int num = 0;
//visibility 可见性
//volatile保证可见性, 若不加volatile会进入死循环
public static void main(String[] args) {
// visibility(); //测试volatile可见性
//atomicity
for(int i =1 ;i<=20 ;i++)
{
new Thread(()->{
for(int j=0;j<1000;j++){
add();
}
}).start();
}
while(Thread.activeCount()>2){
Thread.yield(); //活着的线程超过两个线程让步
}
//使用volatile同样数据丢失
System.out.println(Thread.currentThread().getName()+" "+num);
}
public static void add()
{
num++;
}
使用原子性,解决原子性问题
public class volatileDemo {
//volatile不能保证原子性,使用原子类Integer
private volatile static AtomicInteger num = new AtomicInteger();
//visibility 可见性
//volatile保证可见性, 若不加volatile会进入死循环
public static void main(String[] args) {
// visibility(); //测试volatile可见性
//atomicity
for(int i =1 ;i<=20 ;i++)
{
new Thread(()->{
for(int j=0;j<1000;j++){
add();
}
}).start();
}
while(Thread.activeCount()>2){
Thread.yield(); //活着的线程超过两个线程让步
}
//使用volatile同样数据丢失
System.out.println(Thread.currentThread().getName()+" "+num);
}
public static void add()
{
// num++;
num.getAndIncrement();
}
这些类的底层都直接和操作系统挂钩!在内存中修改值!Unsafe类是一个很特殊的存在!
volatle禁止指令重排序
指令重排序: 就是你写的程序,计算机可能不会按照你想的顺序执行。
源代码–>编译器优化的重排–>指令并行也可能会重排–>内存系统也会重排–>执行
非计算机专业
Volatile是可以保持可见性,不能保证原子性,由于内存屏障,可以保证避免指令重排的现象发生。
18.深入单调模式
饿汉式,DCL懒汉式,深究!
饿汉式
//饿汉单例模式
public class Hungry {
//可能会浪费空间
private byte[] data1 = new byte[1024];
private byte[] data2 = new byte[1024];
private byte[] data3 = new byte[1024];
private byte[] data4 = new byte[1024];
private Hungry(){
}
//类一加载就初始化对象
private final static Hungry HUNGRY = new Hungry();
public Hungry getInstance(){
return HUNGRY;
}
}
DCL懒汉模式(双重锁懒汉模式)
package com.singtle;
//DCL懒汉单例模式:可以使用反射破坏
import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
public class LazyMan {
//防止反射破坏措施,为了实现实例化只调用一次,加一对布尔值进行控制,防止反射二次创建对象
private static boolean flag = false;
public LazyMan(){
if(flag == false){
flag =true; //标记第一次初始化变量修改
}else{
throw new RuntimeException("不要试图通过反射创建对象");
}
}
//双重检测模式
private volatile static LazyMan lazyMan;
//懒汉模式,加载类方法才初始化对象
public LazyMan getInstance(){
if(lazyMan == null){ //外层if提高效率
synchronized (lazyMan){
if(lazyMan == null){ //内层循环确保lazyMan未初次化
lazyMan = new LazyMan();
}
}
}
return lazyMan;
}
//反射,但是还是可以通过修改布尔值去获取多个不同的实例对象
public static void main(String[] args) throws NoSuchFieldException, NoSuchMethodException, InvocationTargetException, InstantiationException, IllegalAccessException {
//获取布尔变量的属性
Field flag = LazyMan.class.getDeclaredField("flag");
flag.setAccessible(true); //设置可以访问
//获取初始化构造器
Constructor<LazyMan> declaredConstructor = LazyMan.class.getDeclaredConstructor(null);//获取默认的初始化构造器
declaredConstructor.setAccessible(true);
//获取构造器创建的实例
LazyMan lazyMan = declaredConstructor.newInstance();
flag.set(lazyMan,false); //去修改该对象的flag值
LazyMan lazyMan1 = declaredConstructor.newInstance();
System.out.println(lazyMan);
System.out.println(lazyMan1);
}
}
1.分配内存空间
2.执行构造方法,初始化对象
3.把这个对象指向这个空间
静态内部类
/**
* 静态内部类,把对象的实例化交给内部类
*/
public class Holder {
public Holder(){
}
public Holder getInstance(){
return Inter.HOLDER;
}
public static class Inter{
public static final Holder HOLDER = new Holder();
}
}
枚举类型(单例不安全,反射)
public static void main(String[] args) throws NoSuchMethodException, InvocationTargetException, InstantiationException, IllegalAccessException {
EnumSingle instance1 = EnumSingle.INSTANCE;
Constructor<EnumSingle> declaredConstructor = EnumSingle.class.getDeclaredConstructor(null);
declaredConstructor.setAccessible(true);
EnumSingle instance2 = declaredConstructor.newInstance();
// 通过反射创建和实例化是否是同一对象
System.out.println(instance1);
System.out.println(instance2);
}
枚举类型的最终反编译源码:
19.深入理解CAS
CAS,乐观锁就是使用这个机制。
CAS是一个compareandset:比较并交换
CAS使用
public class CASDemo {
public static void main(String[] args) {
//使用原子类,保证原子性
AtomicInteger atomicInteger = new AtomicInteger(2020);
//交换atomicInteger的值
System.out.println(atomicInteger.compareAndSet(2020, 2021));
System.out.println(atomicInteger.get());
atomicInteger.getAndIncrement(); //获取并自增
System.out.println(atomicInteger.compareAndSet(2020, 2021));
System.out.println(atomicInteger.get());
}
}
结果
底层 Unsafe类
CAS:比较当前工作现场内存的值和主内存的值,,如果这个值是期望的,那么则执行操作!如果不是就一直循环!
缺点
- 循环耗时
- 一次性只能保证一个共享变量的原子性
- ABA问题
CBA问题
如果我期望的值达到了,那么就更新,否则,就不更新, CAS 是CPU的并发原语。
public class CASDemo2 {
public static void main(String[] args) {
AtomicInteger atomicInteger = new AtomicInteger(2020);
//捣乱的线程
System.out.println(atomicInteger.compareAndSet(2020, 2021));
System.out.println(atomicInteger.get());
System.out.println(atomicInteger.compareAndSet(2021, 2020));
System.out.println(atomicInteger.get());
//期望的线程
System.out.println(atomicInteger.compareAndSet(2020, 6666));
System.out.println(atomicInteger.get());
}
}
20.原子引用
AtomicInteger、AtomicBoolean、AtomicLong、AtomicReference 这些原子类型,它们无一例外都采用了基于 volatile 关键字 +CAS 算法无锁的操作方式来确保共享数据在多线程操作下的线程安全性。
- volatile关键字保证了线程间的可见性,当某线程操- 作了被volatile关键字修饰的变量,其他线程可以立即看到该共享变量的变化。
- CAS算法,即对比交换算法,是由UNSAFE提供的,实质上是通过操作CPU指令来得到保证的。- -CAS算法提供了一种快速失败的方式,当某线程修改已经被改变的数据时会快速失败。
- 当CAS算法对共享数据操作失败时,因为有自旋算法的加持,我们对共享数据的更新终究会得到计算。
总之,原子类型用自旋锁+CAS的无锁操作保证了共享变量的线程安全性和原子性
ABA问题
绝大多数情况下,CAS算法并没有什么问题,但是在需要关心变化值的操作中会存在 ABA 的问题,比如一个值原来是A,变成了B,后来又变成了A,那么CAS检查时会发现它的值没有发生变化,但是实际上却是发生了变化的。
如何避免CAS算法带来的ABA问题呢?
针对乐观锁在并发情况下的操作,我们通常会增加版本号,比如数据库中关于乐观锁的实现方式,以此来解决并发操作带来的ABA问题。在Java原子包中也提供了这样的实现AtomicStampedReference。
注意:
Interger对象的复用
Integer使用了对象的缓存机制,使用valueof会使用缓存,而new一定会创建新的对象分配新的内存空间。
相同包装类型之间的比较
实践
public class AtomicReference {
//常规的使用CAS方法进行无锁自加或者更换栈的表头,出现ABA问题。
//AtomicStampReference 注意,如果是一个包装类,注意对象之间的引用问题。
//使用AtomicStampReference是为了解决CAS中的ABA问题,
//CAS只比较当前值和内存值是否相等,而AtomicStampReference还会比较引用是否相等,接着比较值是否相等,从而避免ABA问题。
static AtomicStampedReference<Integer> atomicStampedReference = new AtomicStampedReference<>(1,1); //初始化引用和版本号
public static void main(String[] args) {
//A线程 :捣乱线程
new Thread(()->{
int stamp = atomicStampedReference.getStamp();//获取版本号
System.out.println("stamp = "+stamp);
//线程睡眠1s
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
//使用引用和版本号进行CAS
atomicStampedReference.compareAndSet(1, 2, atomicStampedReference.getStamp(),atomicStampedReference.getStamp()+1 );
System.out.println("a2 = "+ atomicStampedReference.getStamp());
System.out.println(atomicStampedReference.compareAndSet(2,1, atomicStampedReference.getStamp()+1,atomicStampedReference.getStamp()));
System.out.println("a3 = "+atomicStampedReference.getStamp());
},"A").start();
//B线程:预期线程
new Thread(()->{
int stamp = atomicStampedReference.getStamp(); //获取版本号
System.out.println("b1 = "+stamp);
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(atomicStampedReference.compareAndSet(1, 6, stamp, stamp + 1));
System.out.println(" b2 = "+atomicStampedReference.getStamp());
},"B").start();
}
}
21.锁讲解
公平锁和非公平锁
公平锁:非常公平,必须先来后到,依次执行。
非公平锁: 非常不公平,可以插队(默认都是非公平锁)
可重入锁(递归锁)
- 可重入锁: 当线程获取某个锁后,还可以继续获取该锁,可以递归调用,而且不会产生死锁。
- 不可重入锁与重入锁相反,获取锁后不可重复获取,否则会产生死锁(自己锁自己)
可重入锁
synchronized
/**
* 可重入锁 Synchronized
*/
public class ReentrantLock {
public static void main(String[] args) {
Phone phone = new Phone();
//手机资源
new Thread(()->{phone.send();},"A").start();
new Thread(()->{phone.reception();},"B").start();
}
}
//资源类
class Phone{
public synchronized void send(){
System.out.println("发送消息");
reception(); //这里也有锁,可重入锁递归调用
}
public synchronized void reception(){
System.out.println("接收消息");
}
}
Lock版
/**
* 可重入锁: lock
*/
public class ReentrantLockDemo2 {
public static void main(String[] args) {
Phone1 phone = new Phone1();
new Thread(()->{phone.send();},"A").start();
new Thread(()->{phone.reception();},"A").start();
}
}
//资源类
class Phone1{
//创建一个可重入锁
private Lock lock = new ReentrantLock();
public synchronized void send(){
//加锁
lock.lock();
//再次加锁
lock.lock();
try {
System.out.println("发送消息");
reception(); //这里也有锁,可重入锁递归调用
}catch (Exception e){
e.printStackTrace();
}finally {
lock.unlock(); //锁了几次就要解锁几次
lock.unlock();
}
}
public synchronized void reception(){
lock.lock();
try{
System.out.println("接收消息");
}catch (Exception e){
e.printStackTrace();
}finally {
lock.unlock(); //解锁
}
}
}
不可重入锁
基于notify/wait实现不可重入锁
/**
* 不可重入锁 : wait,notify
*/
public class ReentrantForbiddenLock {
private Thread owner; //拥有锁的线程,为空表示无人占有
public synchronized void lock() throws InterruptedException {
Thread thread = Thread.currentThread(); //获取当前线程
//使用While,可以防止虚假唤醒
while(owner != null){
System.out.println(String.format("%s等待%s 释放锁",thread.getName(),owner.getName()));
wait(); //进入阻塞状态
}
System.out.println(thread.getName()+"获得了锁");
owner = thread;
}
public synchronized void unlock(){
if(Thread.currentThread() != owner) //只有持有锁的线程才有释放锁的权利
{
throw new IllegalMonitorStateException();
}
System.out.println(owner.getName()+"释放了锁");
owner =null;
notify();
}
public static void main(String[] args) throws InterruptedException {
ReentrantForbiddenLock lock = new ReentrantForbiddenLock();
lock.lock(); //获得锁
lock.lock(); //再次获取锁
}
}
main线程未释放锁,又再次调用锁,出现死锁.
自旋锁
基于自旋锁实现不可重入锁
自旋锁,即获取锁的线程在锁被占用时,不会停下来等待,而是不断的尝试,直到获取锁成功。
- 好处: 线程比较活跃,减少线程上下文切换的开销。
- 坏处: 很耗cpu,特别是等待时间很长时。适应于多核系统。
/**
* 不可重入锁 : cas
*/
public class ReentrantForbiddenLockDemo {
//原子引用: 持有锁的线程,为空表示无人占有
private AtomicReference<Thread> owner = new AtomicReference<>();
/**
* 使用cas原子操作,而不使用synchronized同步了
*/
public void lock(){
// compareAndSet:原子操作,依赖于操作系统实现
Thread thread = Thread.currentThread(); //获取当前线程
//被其他的线程修改了,则放弃本次操作,继续尝试操作
while(!owner.compareAndSet(null, thread)){
System.out.println(String.format("%s等待%s释放锁", thread.getName(),owner.get().getName()));
}
System.out.println(thread.getName()+"获取到了锁");
}
public void unlock(){
Thread thread = Thread.currentThread();
//如果能进行cas,则说明线程可以释放
if(owner.compareAndSet(thread, null)){
System.out.println(thread.getName()+"释放了锁");
return;
}
throw new IllegalMonitorStateException();
}
public static void main(String[] args) {
ReentrantForbiddenLockDemo lock = new ReentrantForbiddenLockDemo();
lock.lock();
lock.lock();
}
}
死锁
死锁:就是锁已经其他线程(或自身)占有了,而自己又占有对方线程的锁,互相不释放各自所持有的锁,导致死锁的现象。
/**
* 死锁
*/
public class DeadLock {
public static void main(String[] args) {
String lockA = "lockA";
String lockB = "lockB";
new Thread(new MyThread(lockA, lockB)).start();
new Thread(new MyThread(lockB, lockA)).start();
}
}
class MyThread implements Runnable{
//两个锁资源
private String lockA;
private String lockB;
public MyThread(String lockA, String lockB) {
this.lockA = lockA;
this.lockB = lockB;
}
@Override
public void run() {
synchronized (lockA){
System.out.println(Thread.currentThread().getName()+"lock:"+lockA+" =>" +lockB);
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (lockB){
System.out.println(Thread.currentThread().getName()+"lock"+lockB+"=>get"+lockA);
}
}
}}