欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 汽车 > 维修 > JUC并发编程详解

JUC并发编程详解

2025/6/19 15:16:41 来源:https://blog.csdn.net/m0_52796363/article/details/144103234  浏览:    关键词:JUC并发编程详解

JUC

线程和进程

进程:一个程序,QQ.exe Music.exe程序的集合

一个进程可包含多个线程,至少包含一个

Java默认有两个线程,main,GC

线程:开了一个进程Typora,写字,自动保存(线程负责)

对于java而言:Thread,Runable ,Callable

Java可以开启一个线程吗?不能

    
public synchronized void start() {/*** This method is not invoked for the main method thread or "system"* group threads created/set up by the VM. Any new functionality added* to this method in the future may have to also be added to the VM.** A zero status value corresponds to state "NEW".*/if (threadStatus != 0)throw new IllegalThreadStateException();
​/* Notify the group that this thread is about to be started* so that it can be added to the group's list of threads* and the group's unstarted count can be decremented. */group.add(this);
​boolean started = false;try {start0();started = true;} finally {try {if (!started) {group.threadStartFailed(this);}} catch (Throwable ignore) {/* do nothing. If start0 threw a Throwable thenit will be passed up the call stack */}}}
//本地方法,调用底层c++,Java无法直接操作硬件private native void start0();

并发,并行

并发(多个线程操作同一个资源)

并行(多个人一同行走)

  • CPU多核,多个线程可同时执行

public class Test1 {public static void main(String[] args) {//获取cpu核数//cpu密集型,IO密集型System.out.println(Runtime.getRuntime().availableProcessors());}
}

并发编程的本质:充分利用CPU资源

线程状态

//线程新生
NEW,
​
//运行RUNNABLE,
​
//阻塞BLOCKED,
​//等待 ,死等WAITING,
​//超时等待,过时不候TIMED_WAITING,
​//终止TERMINATED;}
 

wait(),sleep()区别

  1. 来自不同的类 wait=>object

    sleep=>Thread

  2. 关于锁的释放 wait会释放锁,sleep不会

  3. 使用范围不同 wait只能在同步代码块中 sleep任何地方都可以

  4. 是否需要捕获异常 wait不需要捕获异常 sleep需要捕获异常

Lock锁

sysnchronized

公平锁:十分公平,先来后到

非公平锁:十分不公平,可以插队(默认)

package com.dahan.demo1;
​
​
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
​
/*
* 线程是一个单独的资源类,没有任何附属操作
* 1. 属性,方法
* */
public class Test1 {public static void main(String[] args) {//并发:多线程操作同一个资源final Ticket ticket = new Ticket();
​new Thread(() ->{for (int i = 0; i <60 ; i++) {ticket.sale();}},"a").start();new Thread(() ->{for (int i = 0; i <60 ; i++) {ticket.sale();}},"b").start();new Thread(() ->{for (int i = 0; i <60 ; i++) {ticket.sale();}},"c").start();
​
​}
}
class Ticket{//属性,方法private int number = 50;Lock lock = new ReentrantLock();//买票的方式public void sale(){lock.lock();try {}catch (Exception e){if (number>0){System.out.println(Thread.currentThread().getName()+"卖出了第"+(number--)+"票,剩余"+number);}}finally {lock.unlock();}}
}

synchronized和lock的区别

  1. synchronized内置的java关键字,lock是一个Java类

  2. synchronized无法判断获取的状态,lock可以判断是否获取到了锁

  3. synchronized可自动释放锁,lock必须手动释放锁,如果不释放会造成死锁

  4. synchronized线程1(获得锁,阻塞),线程2(等待);lock锁不一定会等下去

  5. synchronized可重入(可重入就是说某个线程已经获得某个锁,可以再次获取锁而不会出现死锁)锁,不可终端的,非公平,lock可重入,可判断锁,非公平(可自己设置)

  6. synchronized适合锁少量的代码同步问题,lock锁适合锁大量的同步代码

锁是什么,如何判断锁的是谁

生产者和消费者问题

package com.dahan.pc;
​
import java.util.Date;
​
/*
* 
* */
public class Test2 {public static void main(String[] args) {Data data = new Data();new Thread(()->{for (int i = 0; i <10 ; i++) {try {data.increment();}catch (Exception e){e.printStackTrace();}}},"A").start();new Thread(()->{for (int i = 0; i <10 ; i++) {try {data.decrement();}catch (Exception e){e.printStackTrace();}}},"B").start();}  
}
​
//判断等待,业务,通知
class Data{//数字,资源类private int number=0;//+1public synchronized void increment() throws InterruptedException {if (number!=0){this.wait();}number++;// 通知其他线程,我+1完毕this.notify();}//-1public synchronized void decrement() throws InterruptedException {if (number==0){//等待this.wait();}number--;// 通知其他线程,我-1完毕this.notify();}
}

问题存在 A,B,C,D4个线程!虚假唤醒

if只会判断一次,应使用while

线程可以唤醒,而不会被通知,中断或超时,即所谓的虚假唤醒。等待应该总是出现在循环中 如:

synchronized(obj){while(<condition does not hold>)obj.wait(timeout);
}
if 改为while判断package com.dahan.pc;
​
import java.util.Date;
​
/*
* 
* */
public class Test2 {public static void main(String[] args) {Data data = new Data();new Thread(()->{for (int i = 0; i <10 ; i++) {try {data.increment();}catch (Exception e){e.printStackTrace();}}},"A").start();new Thread(()->{for (int i = 0; i <10 ; i++) {try {data.decrement();}catch (Exception e){e.printStackTrace();}}},"B").start();}  
}
​
//判断等待,业务,通知
class Data{//数字,资源类private int number=0;//+1public synchronized void increment() throws InterruptedException {while (number!=0){this.wait();}number++;// 通知其他线程,我+1完毕this.notify();}//-1public synchronized void decrement() throws InterruptedException {while (number==0){//等待this.wait();}number--;// 通知其他线程,我-1完毕this.notify();}
}

lock版生产者与消费者问题

package com.dahan.pc;
​
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
​
public class TestLock {public static void main(String[] args) {Data2 data = new Data2();new Thread(()->{for (int i = 0; i < 10; i++) {try {data.increment();}catch (Exception e){e.printStackTrace();}}},"a").start();new Thread(()->{for (int i = 0; i < 10; i++) {try {data.decrement();}catch (Exception e){e.printStackTrace();}}},"b").start();new Thread(()->{for (int i = 0; i < 10; i++) {try {data.increment();}catch (Exception e){e.printStackTrace();}}},"c").start();new Thread(()->{for (int i = 0; i < 10; i++) {try {data.decrement();}catch (Exception e){e.printStackTrace();}}},"d").start();
​}
}
​
//判断等待,业务,通知
class Data2{//数字,资源类private int number=0;Lock lock = new ReentrantLock();Condition condition = lock.newCondition();//+1public  void increment() throws InterruptedException {
//        condition.await();//等待
//        condition.signalAll();//唤醒全部lock.lock();try{while (number!=0){//等待condition.await();}number++;System.out.println(Thread.currentThread().getName()+"=>"+number);condition.signalAll();}catch (Exception e){e.printStackTrace();}finally {lock.unlock();}}//-1public  void decrement() throws InterruptedException {lock.lock();try {while (number==0){condition.await();}number--;System.out.println(Thread.currentThread().getName()+"=>"+number);condition.signalAll();}catch (Exception e){e.printStackTrace();}finally {lock.unlock();}}
}
​

condition实现精准唤醒

package com.dahan.pc;
​
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
​
//a执行完调用b,b执行完调用c,c执行完调用a
public class TestContidion {public static void main(String[] args) {Data3 data = new Data3();new Thread(()->{for (int i = 0; i < 10; i++) {try {data.print1();}catch (Exception e){e.printStackTrace();}}},"a").start();new Thread(()->{for (int i = 0; i < 10; i++) {try {data.print2();}catch (Exception e){e.printStackTrace();}}},"b").start();new Thread(()->{for (int i = 0; i < 10; i++) {try {data.print3();}catch (Exception e){e.printStackTrace();}}},"c").start();
​}
}
​
//判断等待,业务,通知
class Data3{//数字,资源类private int number=1;//1执行a,2b,3cprivate Lock lock = new ReentrantLock();Condition condition1 = lock.newCondition();Condition condition2 = lock.newCondition();Condition condition3 = lock.newCondition();
​//+1public  void print1() throws InterruptedException {lock.lock();try{while (number!=1){//等待condition1.await();}System.out.println(Thread.currentThread().getName()+"=>"+number);//唤醒指定的人number=2;condition2.signal();}catch (Exception e){e.printStackTrace();}finally {lock.unlock();}}//-1public  void print2() throws InterruptedException {lock.lock();try {while (number!=2){condition2.await();}System.out.println(Thread.currentThread().getName()+"=>"+number);number=3;condition3.signal();}catch (Exception e){e.printStackTrace();}finally {lock.unlock();}}public  void print3() throws InterruptedException {lock.lock();try {while (number!=3){condition3.await();}System.out.println(Thread.currentThread().getName()+"=>"+number);number=1;condition1.signal();}catch (Exception e){e.printStackTrace();}finally {lock.unlock();}}
}
​
​

集合不安全

list不安全

package com.dahan.pc;
​
import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
​
//java.util.ConcurrentModificationException 并发修改异常
public class unsafe {public static void main(String[] args) {//并发下ArrayList不安全/*解决方案1.List<String> list = new Vector<>();2. List<String> list = Collections.synchronizedList(new ArrayList<>());*///CopyOnWrite 写入时复制List<String> list = new CopyOnWriteArrayList<>();for (int i = 0; i < 10; i++) {new Thread(()->{list.add(UUID.randomUUID().toString().substring(0,5));System.out.println(list);},String.valueOf(i)).start();}}
}

set不安全

package com.dahan.unsagfe;
​
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArraySet;
​
/*
java.util.ConcurrentModificationException
1.Set<String> set = Collections.synchronizedSet(new HashSet<>());
2.Set<String> set = new CopyOnWriteArraySet<>();*/
public class TestSet {public static void main(String[] args) {// Set<String> set = new HashSet<>();Set<String> set = new CopyOnWriteArraySet<>();for (int i = 0; i < 20; i++) {new Thread(()->{set.add(UUID.randomUUID().toString().substring(0,5));System.out.println(set);},String.valueOf(i)).start();}}
}
hashset的底层是什么public HashSet() {map = new HashMap<>();
}
​
//add set 本质就是map key是无法重复的
public boolean add(E e) {return map.put(e, PRESENT)==null;}
​
private static final Object PRESENT =  new Object();//不变的值

HashMap

package com.dahan.unsagfe;
​
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
​
public class TestMap {public static void main(String[] args) {
//        Map<String, String> map = new HashMap<>();Map<String,String> map = new ConcurrentHashMap<>();
​for (int i = 0; i < 10; i++) {new Thread(()->{map.put(Thread.currentThread().getName(), UUID.randomUUID().toString().substring(0,5));System.out.println(map);},String.valueOf(i)).start();}}
}

Callable

Callable

Callable接口类似Runnable,因为他们都是为其实例可能由另一个线程执行的类设计的,然而Runnable不返回结果,也不抛出异常

  1. 有返回值

  2. 可以抛出异常

  3. 方法不同,run()/call()

代码测试

package com.dahan.CallAble;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
​
public class CallableTest {public static void main(String[] args) throws ExecutionException, InterruptedException {// new Thread(new Mythread()).start();new Thread().start();Mythread mythread = new Mythread();FutureTask futureTask = new FutureTask(mythread);//适配类new Thread(futureTask,"a").start();new Thread(futureTask,"b").start();//执行两个线程时,结果会被缓存,效率高Integer o = (Integer)futureTask.get();//获取callable的返回结果,get()方法可能会产生阻塞,应把它放在最后System.out.println(o);}
}
class Mythread implements Callable<Integer> {
​@Overridepublic Integer call(){System.out.println("call()");return 1024;}
}

常用辅助类

8.1CountDownlatch

package com.dahan.add;
​
import java.util.concurrent.CountDownLatch;
​
public class CountDownLatchDemo {public static void main(String[] args) throws InterruptedException {//倒计时CountDownLatch countDownLatch = new CountDownLatch(6);for (int i = 1; i <=6 ; i++) {new Thread(()->{System.out.println(Thread.currentThread().getName()+"Go");countDownLatch.countDown();//数量-1},String.valueOf(i)).start();}countDownLatch.await();System.out.println("关门");}
}
countDownLatch.countDown()//数量-1countDownLatch.await()//等待计数器归零,然后再向下执行

8.2 CycliBarrier

加法计数器

package com.dahan.add;
​
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
​
public class CycliBarrierDemo {public static void main(String[] args) {
​//召唤龙珠的线程CyclicBarrier cyclicBarrier = new CyclicBarrier(7,()->System.out.println("召唤神龙成功"));
​for (int i = 0; i < 7; i++) {final int temp = i;new Thread(()->{
​System.out.println(Thread.currentThread().getName()+"收集"+temp+"个龙珠");try {cyclicBarrier.await();} catch (InterruptedException e) {e.printStackTrace();} catch (BrokenBarrierException e) {e.printStackTrace();} finally {}}).start();}}
}

8.3 Semaphore

package com.dahan.add;
​
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
​
public class SemaphoreDemo {public static void main(String[] args) {//线程数量Semaphore semaphore = new Semaphore(3);for (int i = 0; i < 6; i++) {new Thread(()->{try {//acquire 获得semaphore.acquire();System.out.println(Thread.currentThread().getName()+"获得车位");TimeUnit.SECONDS.sleep(2);System.out.println(Thread.currentThread().getName()+"离开车位");} catch (InterruptedException e) {e.printStackTrace();}finally { // release 释放semaphore.release();}}).start();
​}}
}

原理

semaphore.acquire();获得,假设已经满了,等待,等待被释放

semaphore.release();释放,会将当前的信号量释放+1 然后唤醒等待的线程

作用:多个共享资源互斥的使用,并发限流,控制最大的线程数

读写锁

ReadWriteLock

读可以被多个线程同时读,写的时候只能有一个线程去写

package com.dahan.rw;
​
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
​
public class ReadWriteLockDeom {public static void main(String[] args) {MyCacheLock myCache = new MyCacheLock();for (int i = 0; i < 5; i++) {final int temp = i;new Thread(()->{myCache.put(temp+"",temp+"");},String.valueOf(i)).start();
​}for (int i = 0; i < 5; i++) {final int temp = i;new Thread(()->{myCache.get(temp+"");},String.valueOf(i)).start();
​}}
}
​
/*
加锁的*/
class MyCacheLock{private volatile Map<String,Object> map = new HashMap<>();//读写锁 更加细粒度的控制private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();//存,写的时候,只希望有一个线程写public void put(String key,Object value){readWriteLock.writeLock().lock();
​
​try {System.out.println(Thread.currentThread().getName()+"写入"+key);map.put(key, value);System.out.println(Thread.currentThread().getName()+"写入ok");} catch (Exception e) {e.printStackTrace();} finally {readWriteLock.writeLock().unlock();}
​}//读,取,所有人都可以读public void get(String key){readWriteLock.readLock().lock();
​try {System.out.println(Thread.currentThread().getName()+"读取"+key);Object o = map.get(key);System.out.println(Thread.currentThread().getName()+"读取ok");} catch (Exception e) {e.printStackTrace();} finally {readWriteLock.readLock().unlock();}
​}
}
​
​
​
/*
自定义缓存*/
//class MyCache{
//    private volatile Map<String,Object> map = new HashMap<>();
//
//    //存,写
//    public void put(String key,Object value){
//        System.out.println(Thread.currentThread().getName()+"写入"+key);
//        map.put(key, value);
//        System.out.println(Thread.currentThread().getName()+"写入ok");
//    }
//    //读,取
//    public void get(String key){
//        System.out.println(Thread.currentThread().getName()+"读取"+key);
//        Object o = map.get(key);
//        System.out.println(Thread.currentThread().getName()+"读取ok");
//    }
//}

阻塞队列

四组API

方式抛出异常不会抛出异常阻塞等待超时等待
添加add(底层调用的还是offer)offerputoffer(,,)
移除removepolltakepoll(,)
检测队首元素elementpeek

  1. 抛出异常

  2. 不会抛出异常

  3. 阻塞等待

  4. 超时等待

同步队列

SynchronousQueue同步队列

没有容量,进去一个元素,必须等待取出来之后才能再往里面放一个元素

put take

package com.dahan.dq;
​
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
​
public class SynchronousQueueDemo {public static void main(String[] args) {BlockingQueue<String> blockingQueue = new SynchronousQueue<>();new Thread(()->{
​try {System.out.println(Thread.currentThread().getName()+"put1");blockingQueue.put("1");System.out.println(Thread.currentThread().getName()+"put1");blockingQueue.put("2");System.out.println(Thread.currentThread().getName()+"put1");blockingQueue.put("3");} catch (InterruptedException e) {e.printStackTrace();}},"a").start();
​new Thread(()->{try {TimeUnit.SECONDS.sleep(3);System.out.println(Thread.currentThread().getName()+blockingQueue.take());TimeUnit.SECONDS.sleep(3);System.out.println(Thread.currentThread().getName()+blockingQueue.take());TimeUnit.SECONDS.sleep(3);System.out.println(Thread.currentThread().getName()+blockingQueue.take());} catch (InterruptedException e) {e.printStackTrace();}}).start();}
}

线程池

线程池的好处:

  1. 降低资源的消耗

  2. 提高响应速度

  3. 方便管理 线程复用,可以控制最大并发数,管理线程

线程池:三大方法

package com.dahan.pool;
​
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
​
// Executors 工具类 3大方法
public class Tset01 {public static void main(String[] args) {Executors.newSingleThreadExecutor();//单个线程Executors.newFixedThreadPool(5);//创建一个固定的线程池的大小ExecutorService service =Executors.newCachedThreadPool();//可伸缩的,遇强则强
​try {for (int i = 0; i <10 ; i++) {//使用线程池来创建线程service.execute(()->{System.out.println(Thread.currentThread().getName()+"ok");});}} catch (Exception e) {e.printStackTrace();} finally {//线程池用完,程序结束,关闭线程池service.shutdown();}}
}

七大参数

方法源码分析

public static ExecutorService newSingleThreadExecutor() {return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));
}
​
public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());}
​
public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());}
​
//本质 ThreadPoolExecutor
public ThreadPoolExecutor(int corePoolSize, //核心线程池大小int maximumPoolSize,//最大核心线程池大小long keepAliveTime,//超时了没有人调用就会释放TimeUnit unit,//超时单位BlockingQueue<Runnable> workQueue) {//阻塞队列this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(),//线程工厂,创建线程的,一般不用defaultHandler);//拒绝策略}

手动创建一个线程池,四种拒绝策略

package com.dahan.pool;
​
import java.util.concurrent.*;
​
public class Tset01 {public static void main(String[] args) {//自定义线程工作ExecutorService threadPoolExecutor = new ThreadPoolExecutor(2,5,3, TimeUnit.SECONDS,new LinkedBlockingDeque<>(3),Executors.defaultThreadFactory(),
//                new ThreadPoolExecutor.AbortPolicy()//银行满了 还有人进,不处理这个人
//                new ThreadPoolExecutor.CallerRunsPolicy()//哪来的去哪里
//                new ThreadPoolExecutor.DiscardPolicy()//队列满了 不会抛出异常new ThreadPoolExecutor.DiscardOldestPolicy()//队列满了 尝试和最早的竞争,也不会抛出异常);
​try {//超出最大 抛出 RejectedExecutionExceptionfor (int i = 0; i <10 ; i++) {//使用线程池来创建线程threadPoolExecutor.execute(()->{System.out.println(Thread.currentThread().getName()+"ok");});}} catch (Exception e) {e.printStackTrace();} finally {//线程池用完,程序结束,关闭线程池threadPoolExecutor.shutdown();}}
}

小结 和拓展

//最大线程如何定义
//1. CPU 密集型 几核就是几,可以保持CPU的效率最高
//2。 IO 密集型 > 判断程序中十分耗io的线程
//    程序 15个大型任务,io十分占用资源

四大函数式接口

Function接口

package com.dahan.function;
​
import java.util.function.Function;
​
//函数型接口
//只要是函数式接口 可以用lambda表达式简化
public class Demo01 {public static void main(String[] args) {
//        Function function = new Function<String,String>(){
//            @Override
//            public String apply(String str){
//                return str;
//            }
//        };Function function = (str)->{return str};System.out.println(function.apply("asd"));}//工具类 输出输入的值}

Predicate 接口

package com.dahan.function;
​
import java.util.function.Predicate;
​
//断定型接口 有一个输入参数 返回值只能是布尔值
public class Demo02 {public static void main(String[] args) {
//       Predicate predicate = new Predicate<String>() {
//            @Override
//            public boolean test(String o) {
//                if (o.isEmpty());
//                return false;
//            }
//        };Predicate<String> predicate = (o)->{return o.isEmpty();};System.out.println(predicate.test(""));}
}

Consumer消费性接口

package com.dahan.function;
​
​
import java.util.function.Consumer;
​
//消费性接口 只有输入没有返回值
public class Demo03 {public static void main(String[] args) {
//        Consumer consumer = new Consumer<String>() {
//            @Override
//            public void accept(String o) {
//                System.out.println(o);
//            }
//        };Consumer<String> consumer = (o)->{System.out.println(o);};consumer.accept("dssd");}
}

Supplier 供给型接口

package com.dahan.function;
​
import java.util.function.Supplier;
​
​
//Supplier 供给型接口,没有参数,只有返回值
public class Demo04 {public static void main(String[] args) {
//        Supplier<String> objectSupplier = new Supplier<String>(){
//
//            @Override
//            public String get() {
//                System.out.println("get()");
//                return "1024";
//            }
//        };Supplier<String> objectSupplier = ()->{return "1024";};System.out.println(objectSupplier.get());}
}

Stream流式计算

什么是Stream流式计算

package com.dahan.Stream;
​
import java.lang.reflect.Array;
import java.util.Arrays;
import java.util.List;
​
/*
1. id必须是偶数
2. 年龄必须大于23
3. 用户名转化为大写
4. 用户名字母倒着排序
5. 只输出一个用户*/
​
public class Test {public static void main(String[] args) {User user1 = new User(1,"a",21);User user2 = new User(2,"b",22);User user3 = new User(3,"c",23);User user4 = new User(4,"d",24);User user5 = new User(5,"e",25);
​//集合就是管理存储List<User> list = Arrays.asList(user1,user2,user3,user4,user5);
​//计算交给Stream流list.stream().filter(u->{return u.getId()%2==0;}).filter(u->{return u.getAge()>23;}).map(u->{return u.getName().toUpperCase();}).sorted((u1,u2)->{return u2.compareTo(u1);}).limit(1).forEach(System.out::println);}
}

Forkjoin

什么是Forkjoin

并行执行任务,提高效率,大数据量

把一个大任务拆成多个小任务并行执行

ForkJoinPool线程池可以把一个大任务分拆成小任务并行执行,任务类必须继承自RecursiveTaskRecursiveAction

使用Fork/Join模式可以进行并行计算以提高效率。

Forkjion特点:工作窃取

public class Main {public static void main(String[] args) throws Exception {// 创建2000个随机数组成的数组:long[] array = new long[2000];long expectedSum = 0;for (int i = 0; i < array.length; i++) {array[i] = random();expectedSum += array[i];}System.out.println("Expected sum: " + expectedSum);// fork/join:ForkJoinTask<Long> task = new SumTask(array, 0, array.length);long startTime = System.currentTimeMillis();Long result = ForkJoinPool.commonPool().invoke(task);long endTime = System.currentTimeMillis();System.out.println("Fork/join sum: " + result + " in " + (endTime - startTime) + " ms.");}
​static Random random = new Random(0);
​static long random() {return random.nextInt(10000);}
}
​
class SumTask extends RecursiveTask<Long> {static final int THRESHOLD = 500;long[] array;int start;int end;
​SumTask(long[] array, int start, int end) {this.array = array;this.start = start;this.end = end;}
​@Overrideprotected Long compute() {if (end - start <= THRESHOLD) {// 如果任务足够小,直接计算:long sum = 0;for (int i = start; i < end; i++) {sum += this.array[i];// 故意放慢计算速度:try {Thread.sleep(1);} catch (InterruptedException e) {}}return sum;}// 任务太大,一分为二:int middle = (end + start) / 2;System.out.println(String.format("split %d~%d ==> %d~%d, %d~%d", start, end, start, middle, middle, end));SumTask subtask1 = new SumTask(this.array, start, middle);SumTask subtask2 = new SumTask(this.array, middle, end);invokeAll(subtask1, subtask2);Long subresult1 = subtask1.join();Long subresult2 = subtask2.join();Long result = subresult1 + subresult2;System.out.println("result = " + subresult1 + " + " + subresult2 + " ==> " + result);return result;}
}

异步回调

Future 设计的初衷:对将来的某个事件的结果进行建模

package com.dahan.future;
​
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
​
public class Test1 {public static void main(String[] args) throws ExecutionException, InterruptedException {//没有返回值的
//        CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(()->{
//            try {
//                TimeUnit.SECONDS.sleep(2);
//            } catch (InterruptedException e) {
//                e.printStackTrace();
//            }
//            System.out.println(Thread.currentThread().getName()+"runAsync=>Void");
//        });
//        System.out.println("111");
//        completableFuture.get();//获取阻塞执行结果//有返回值的supplyAsync异步回调CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(()->{return 1024;});completableFuture.whenComplete((t,u)->{System.out.println("t:"+t);//正常的返回结果System.out.println("u:"+u);//错误信息}).exceptionally((e)->{e.printStackTrace();return 233;//可以获取到错误的返回结果});}
}

JMM

对volatile的理解

volatile是java虚拟机提供轻量级的同步机制

  1. 保证可见性

  2. 不保证原子性

  3. 禁止指令重排

什么是JMM

JMM: Java内存模型,不存在的东西,是一个概念

关于JMM的一些同步的约定:

  1. 线程解锁前,必须把共享变量立刻刷回主存

  2. 线程加锁前,必须读取主存中的最新值到工作内存中

  3. 加锁和解锁是同一把锁

线程,工作内存,主内存

八种操作

内存交互操作有8种,虚拟机实现必须保证每一个操作都是原子的,不可再分的

    • lock(锁定):作用于主内存的变量,把一个变量标识为线程独占状态

    • unlock(解锁):作用于主内存的变量,把一个处于锁定状态的变量释放出来,释放后的变量才可以被其他线程锁定

    • read(读取):作用于主内存变量,他把一个变量的值从主内存传输到线程的工作内存中,以便随后的load动作使用

    • load(载入):作用于工作内存的变量,他把read操作 从主内存中变量放入工作内存中

    • use(使用):作用于工作内存的变量,他把工作内存中的变量传输给执行引擎,每当虚拟机遇到一个需要使用到变量的值,就会使用到这个指令

    • assign(赋值):作用于工作内存的变量,把一个从执行引擎中接受到的值放入工作内存的变量副本中

    • store(存储):作用于主内存中的变量,他把一个冲工作内存中一个变量的值传送到主内存中,以便后续的write使用

    • write(写入):作用于主内存中的变量,他把store操作从工作内存中得到的变量的放入主内存的变量中

  • JMM对这八种指令的使用,制定了如下规则

Volatile

保证可见性

package com.dahan.future;
​
import java.util.concurrent.TimeUnit;
​
public class JMMDemo {//加volatile可以保证可见性private static volatile int number = 0;
​public static void main(String[] args) {new Thread(()->{//线程1while (number==0){
​}}).start();try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}number=1;System.out.println(number);}
}

不保证原子性

原子性:不可分割

线程A在执行任务时不能被打扰,也不能被分割,要么同时成功要么同时失败

package com.dahan.future;
​
//不保证原子性
public class VDemo1 {private static volatile int number=0;
​public static void add(){number++;//不是一个原子性操作}
​public static void main(String[] args) {for (int i = 0; i < 20; i++) {new Thread(()->{for (int j = 0; j < 1000; j++) {add();}}).start();
​}while (Thread.activeCount()>2){//Java中默认main gc两个线程一直在运行 则线程数大于2时Thread.yield();}System.out.println(number);}
}

如果不加lock和synchronized 怎么保证原子性

使用原子类 解决原子性问题

package com.dahan.future;
​
import java.util.concurrent.atomic.AtomicInteger;
​
​
public class VDemo1 {private static volatile AtomicInteger number=new AtomicInteger();
​public static void add(){number.getAndIncrement();//+1方法 底层用的CAS}
​public static void main(String[] args) {for (int i = 0; i < 20; i++) {new Thread(()->{for (int j = 0; j < 1000; j++) {add();}}).start();
​}while (Thread.activeCount()>2){//Java中默认main gc两个线程一直在运行 则线程数大于2时Thread.yield();}System.out.println(number);}
}

这些类的底层都直接和操作系统挂钩,在内存中修改值,Unsafe类是一个特殊存在

指令重排

什么是指令重排:你写的程序,计算机并不是按照你写的那样去执行的

处理器在进行指令重排的时候,考虑:数据之间的依赖性

线程A线程B
x=ay=b
b=1a=2

正常的结果:x=0 b=0,但是可能由于指令重排

线程A线程B
b=1a=2
x=ay=b

指令重排导致的诡异结果;x=2;y=1;

volatile可以避免指令重排

内存屏障,CPU指令,作用:

  1. 保持特定的操作的执行顺序

  2. 可以保持某些变量的内存可见性(利用这些特性volatile实现了可见性)

volatile可以保证可见性,不能保证原子性。由于内存屏障,可以保证避免指令重排的现象产生

单例模式

单例模式属于创建型模式,它提供了一种创建对象的最佳方式。

这种模式涉及到一个单一的类,该类负责创建自己的对象,同时确保只有单个对象被创建,这个类提供了一种访问其唯一的对象的方式,可以直接访问,不需要实例化该类的对象

注意:

  1. 单例类只能有一个实例

  2. 单例类必须自己创建自己的唯一实例

  3. 单例类必须给其他对象提供这一实例

优点:

  1. 在内存里只有一个实例,减少了内存的开销,尤其是频繁的创建和销毁实例

  2. 避免对资源的多重占用

缺点:没有接口,不能继承,与单一职责原则冲突,一个类应该只关心内部逻辑,而不关心外面怎么样来实例化

getInstance()方法:调用getInstance方法来得到对象,而getInstance保证了每次调用都返回相同的对象。

懒汉式

是否Lazy初始化:是

是否多线程安全:是

实现难度:易

描述:这种方式具备很好的lazy loading,能够在多线程中很好的工作,但是效率低

有点:第一次调用才初始化,避免内存浪费

缺点:必须加锁synchronized才能保证单例,但加锁会影响效率

getInstance()的性能对应用程序不是很关键(该方法使用不太频繁)

单例的实现主要是通过以下两个步骤:

  1. 将该类的构造方法定义为私有方法,这样其他处的代码就无法通过调用该类的构造方法来实例化该类的对象,只有通过该类提供的静态方法来得到该类的唯一实例;

  2. 在该类内提供一个静态方法,当我们调用这个方法时,如果类持有的引用不为空就返回这个引用,如果类保持的引用为空就创建该类的实例并将实例的引用赋予该类保持的引用。

public class Singleton{// 指向自己实例的私有静态引用private static Singleton instance;// 私有的构造方法private Singletion(){}// 以自己实例为返回值的静态的公有方法,静态工厂方法public static synchronized Singletion getInstance(){// 被动创建,在真正需要使用时才去创建if(instance==null){instance = new Singletion();}return instance;}
}

饿汉式

是否Lazy初始化:否

是否多线程安全:是

实现难度:易

描述:这种方式比较常用,但容易产生垃圾对象

优点:没有加锁,执行效率会提高。

缺点:类加载时就初始化,浪费内存

它基于 classloader 机制避免了多线程的同步问题,不过,instance 在类装载时就实例化,虽然导致类装载的原因有很多种,在单例模式中大多数都是调用 getInstance 方法, 但是也不能确定有其他的方式(或者其他的静态方法)导致类装载,这时候初始化 instance 显然没有达到 lazy loading 的效果。

public class Singleton{// 指向自己实例的私有静态引用,主动创建private static Singleton instance = new Singleton();private Singleton(){}public static Singleton getInstance(){return instance;}
}

双重校验锁(DCL)

是否Lazy初始化:是

是否多线程安全:是

实现难度:较复杂

描述:采用双锁机制,安全且在多线程情况下能保持高性能

getInstance()的性能对应用程序很关键

public class Singleton {  //程序运行时创建一个静态只读的进程辅助对象private volatile static Singleton singleton;  private Singleton (){}  public static Singleton getSingleton() {  //先判断是否存在,不存在再加锁处理if (singleton == null) {  //在同一个时刻加了锁的那部分程序只有一个线程可以进入synchronized (Singleton.class) {  if (singleton == null) {  singleton = new Singleton();  }  }  }  return singleton;  }  
}

登记式/静态内部类

是否 Lazy 初始化:

是否多线程安全:

实现难度:一般

描述:这种方式能达到双检锁方式一样的功效,但实现更简单。对静态域使用延迟初始化,应使用这种方式而不是双检锁方式。这种方式只适用于静态域的情况,双检锁方式可在实例域需要延迟初始化时使用。 这种方式同样利用了 classloader 机制来保证初始化 instance 时只有一个线程,它跟第 3 种方式不同的是:第 3 种方式只要 Singleton 类被装载了,那么 instance 就会被实例化(没有达到 lazy loading 效果),而这种方式是 Singleton 类被装载了,instance 不一定被初始化。因为 SingletonHolder 类没有被主动使用,只有通过显式调用 getInstance 方法时,才会显式装载 SingletonHolder 类,从而实例化 instance。想象一下,如果实例化 instance 很消耗资源,所以想让它延迟加载,另外一方面,又不希望在 Singleton 类加载时就实例化,因为不能确保 Singleton 类还可能在其他的地方被主动使用从而被加载,那么这个时候实例化 instance 显然是不合适的。这个时候,这种方式相比第 3 种方式就显得很合理。

public class Singleton {  private static class SingletonHolder {  //在第一次引用类的任何成员时创建实例,公共语言运行库负责处理变量初始化private static final Singleton INSTANCE = new Singleton();  }  private Singleton (){}  public static final Singleton getInstance() {  return SingletonHolder.INSTANCE;  }  
}

深入理解CAS

什么是CAS

package com.dahan.cas;
​
import java.util.concurrent.atomic.AtomicInteger;
​
public class CASDemp {
​
​
​public static void main(String[] args) {AtomicInteger atomicInteger = new AtomicInteger(2020);//期望,更新//public final boolean compareAndSet(int expect, int update)//如果我期望的值达到了 那么就更新 否则 就不更新 CAS是CPU的并发原语System.out.println(atomicInteger.compareAndSet(2020, 2021));//返回trueSystem.out.println(atomicInteger.get());atomicInteger.getAndIncrement();
​
​System.out.println(atomicInteger.compareAndSet(2020, 2021));//返回falseSystem.out.println(atomicInteger.get());}
}

CAS:比较当前工作内存中的值和主内存中的值,如果这个值是期望的,那么则执行操作,如果不是就一直循环

缺点:

  1. 循环会耗时

  2. 一次性只能保证一个共享变量的原子性

  3. ABA问题

CAS:ANA问题(狸猫换太子)

package com.dahan.cas;
​
import java.util.concurrent.atomic.AtomicInteger;
​
public class CASDemp {
​
​
​public static void main(String[] args) {AtomicInteger atomicInteger = new AtomicInteger(2020);//期望,更新//public final boolean compareAndSet(int expect, int update)//如果我期望的值达到了 那么就更新 否则 就不更新 CAS是CPU的并发原语//===========捣乱的线程============System.out.println(atomicInteger.compareAndSet(2020, 2021));//返回trueSystem.out.println(atomicInteger.get());System.out.println(atomicInteger.compareAndSet(2021, 2020));//返回trueSystem.out.println(atomicInteger.get());
​//===========期望的线程============System.out.println(atomicInteger.compareAndSet(2020, 2021));//返回falseSystem.out.println(atomicInteger.get());}
}

原子引用

带版本号的原子操作

package com.dahan.cas;
​
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicStampedReference;
​
public class CASDemp {
​
​
​public static void main(String[] args) {AtomicStampedReference<Integer> atomic = new AtomicStampedReference<>(123, 1);new Thread(()->{int stamp = atomic.getStamp();//获得版本号System.out.println("a1=>"+stamp);try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}atomic.compareAndSet(123,111,atomic.getStamp(),atomic.getStamp()+1);System.out.println("a2=>"+atomic.getStamp());atomic.compareAndSet(111,123,atomic.getStamp(),atomic.getStamp()+1);System.out.println("a3=>"+atomic.getStamp());},"a").start();new Thread(()->{int stamp = atomic.getStamp();//获得版本号System.out.println("b1=>"+stamp);
​try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}atomic.compareAndSet(123,121,stamp,atomic.getStamp());System.out.println("b2=>"+atomic.getStamp());},"b").start();}
}

注意

各种锁的理解

公平锁 非公平锁

公平锁:不能插队,必须先来后到

非公平锁:可以插队,默认就是非公平锁

public ReentrantLock(){sync = new NonfairSync();
}
public ReentrantLock(boolean fair){sync = fair ? new FairSync() : new NonfairSync();
}

可重入锁

可重入锁(递归锁)

自旋锁

自定义一个锁

package com.dahan.lock;
​
import java.util.concurrent.atomic.AtomicReference;
​
//自旋锁
public class SpinlockDemo {
​AtomicReference<Thread> atomicReference = new AtomicReference<>();
​
​
​public void mylock(){Thread thread = Thread.currentThread();System.out.println(Thread.currentThread().getName()+"==>mylock");
​//自旋锁while (!atomicReference.compareAndSet(null,thread)){
​}}//解锁public void myUnlock(){Thread thread = Thread.currentThread();System.out.println(Thread.currentThread().getName()+"=>myUnlock");atomicReference.compareAndSet(thread,null);}
​
}

测试

package com.dahan.lock;
​
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
​
public class TestSpinLock {public static void main(String[] args) throws InterruptedException {
//        ReentrantLock reentrantLock = new ReentrantLock();
//        reentrantLock.lock();
//        reentrantLock.unlock();
​SpinlockDemo spinlockDemo = new SpinlockDemo();
​new Thread(()->{spinlockDemo.mylock();try {TimeUnit.SECONDS.sleep(3);} catch (Exception e) {e.printStackTrace();} finally {spinlockDemo.myUnlock();}},"a").start();TimeUnit.SECONDS.sleep(1);new Thread(()->{
​spinlockDemo.mylock();try {TimeUnit.SECONDS.sleep(1);} catch (Exception e) {e.printStackTrace();} finally {spinlockDemo.myUnlock();}},"b").start();}
}

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词