JUC
1.0 传统的生产者消费者问题
使用synchronized实现生产者消费者模式
package com.juclearn;public class JucTest {public static void main(String[] args) throws Exception{Data data=new Data();new Thread(()->{try {for(int i=1;i<=10;i++){data.productor();}} catch (Exception e) {throw new RuntimeException(e);}},"productor").start();new Thread(()->{try {while (true){data.consumer();}} catch (Exception e) {e.printStackTrace();}},"consumer").start();}
}
class Data{private int number=0;public synchronized void productor() throws Exception{while (number>0){this.wait();}number+=1;System.out.println("生产了一个,number="+number);this.notifyAll();}public synchronized void consumer() throws Exception{while (number==0){this.wait();}number-=1;System.out.println("消费了一个,number="+number);this.notifyAll();}
}
使用juc的Lock 实现的生产者消费者
package com.juclearn;import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;public class JucTest {public static void main(String[] args) throws Exception{Data data=new Data();new Thread(()->{try {for(int i=1;i<=10;i++){data.productor();}} catch (Exception e) {throw new RuntimeException(e);}},"productor").start();new Thread(()->{try {while (true){data.consumer();}} catch (Exception e) {e.printStackTrace();}},"consumer").start();}
}
class Data{private int number=0;private final Lock lock= new ReentrantLock();private final Condition condition= lock.newCondition();public void productor() throws Exception{try {lock.lock();while (number>0){condition.await();}number+=1;System.out.println("生产了一个,number="+number);condition.signalAll();} catch (InterruptedException e) {throw new RuntimeException(e);} finally {lock.unlock();}}public void consumer() throws Exception{try {lock.lock();while (number==0){condition.await();}number-=1;System.out.println("消费了一个,number="+number);condition.signalAll();} catch (InterruptedException e) {throw new RuntimeException(e);} finally {lock.unlock();}}
}
1.1 使用condition实现精准唤醒
对于上面的代码,可能我们会疑惑为什么要用lock和condition, 使用这个可以实习精准唤醒
package com.juclearn;import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;public class JucTest {public static void main(String[] args) throws Exception{Data data=new Data();new Thread(()->{try {while (true){data.printA();}} catch (Exception e) {e.printStackTrace();}},"A").start();new Thread(()->{try {while (true){data.printB();}} catch (Exception e) {e.printStackTrace();}},"B").start();new Thread(()->{try {while (true){data.printC();}} catch (Exception e) {e.printStackTrace();}},"C").start();}
}
class Data{private int number=1;private final Lock lock= new ReentrantLock();private final Condition conditionA= lock.newCondition();private final Condition conditionB= lock.newCondition();private final Condition conditionC= lock.newCondition();public void printA() throws Exception{try {lock.lock();while (number!=1){conditionA.await();}System.out.println("number="+number);number+=1;conditionB.signal();} catch (InterruptedException e) {throw new RuntimeException(e);} finally {lock.unlock();}}public void printB() throws Exception{try {lock.lock();while (number!=2){conditionB.await();}System.out.println("number="+number);number+=1;conditionC.signal();} catch (InterruptedException e) {throw new RuntimeException(e);} finally {lock.unlock();}}public void printC() throws Exception{try {lock.lock();while (number!=3){conditionC.await();}System.out.println("number="+number);number=1;conditionA.signal();} catch (InterruptedException e) {throw new RuntimeException(e);} finally {lock.unlock();}}
}
此时实现精准唤醒 使用Lock的方式锁对象就很清晰,就是我们定义的那个lock对象
1.2集合的线程安全问题
package com.juclearn;import java.util.ArrayList;
import java.util.List;
import java.util.UUID;public class Safe {public static void main(String[] args) {List<String> list=new ArrayList<>();for(int i=1;i<=500;i++){new Thread(()->{list.add(UUID.randomUUID().toString().substring(0,3));System.out.println(list);},String.valueOf(i)).start();}}
}
ArrayList是线程不安全的
- 在多线程并发写入时,可能出现:
- 数据丢失(两个线程同时写入,其中一个被覆盖)
- 数组越界异常(
ArrayIndexOutOfBoundsException) - 内部结构损坏(比如
size和实际元素不一致) - 无限循环或 null 值出现在奇怪位置
我们可以使用线程安全的集合
List<String> list=new Vector<>();List<String> list=new CopyOnWriteArrayList<>();List<String> list=Collections.synchronizedList(new ArrayList<>());
这三类都可以
CopyOnWriteArrayList 利用的是可重入锁,在读取时读取的是同一个List ,读取不加锁,在写入时复制一个List 在复制的这个list中写,最后合并,写入时加锁,从而解决线程安全问题,
Vector利用的是互斥锁,在读取和写入时都加锁,所以效率低
| 场景 | Vector |
CopyOnWriteArrayList |
|---|---|---|
| 高并发读 | ❌ 慢(每次读都要竞争锁) | ✅ 极快(完全无锁) |
| 频繁写 | ✅ 相对高效(只改原数组) | ❌ 极慢(每次写都复制整个数组) |
| 读多写少 | 一般 | ✅ 非常适合 |
| 写多读少 | ✅ 更合适 | ❌ 不适合(内存和 CPU 开销大) |
同理Set也是线程不安全的,多线程下也会出现并发修改异常,此时也可也使用工具类将其变的安全,也可以使用
CopyOnWriteSet
Map也是线程不安全的,并发下安全的Map有Collections提供的,还有JUC提供的CurrentHashMap
1.3Callable
FutureTask的内部有状态机
会维护这个FutureTask是否有被执行,如果有被执行过或者正在被执行,就不会再次执行
此时调用两个线程执行同一个FutureTask也只会执行一次
获取FutureTask返回值的get方法是阻塞方法
1.4 CountDownLatch
这是一个线程计数器,提供了阻塞的await方法,只有计数器的值归零后才继续执行
package com.juclearn;import java.util.*;
import java.util.concurrent.CountDownLatch;public class Safe {public static void main(String[] args) throws Exception{CountDownLatch countDownLatch=new CountDownLatch(10);for(int i=1;i<=10;i++){new Thread(()->{countDownLatch.countDown();System.out.println(Thread.currentThread().getName());},String.valueOf(i)).start();}countDownLatch.await();System.out.println("等待结束");}
}
countDownLatch.countDown();
是计数器减一的方法
1.5 CyclicBarrier
CyclicBarrier 和CountDownLatch的功能差不多
package com.juclearn;import java.util.*;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;public class Safe {public static void main(String[] args) throws Exception{CyclicBarrier cyclicBarrier=new CyclicBarrier(8,()->{System.out.println("线程计数完成");});for(int i=1;i<=8;i++){new Thread(()->{System.out.println(Thread.currentThread().getName()+"执行");try {cyclicBarrier.await();System.out.println(Thread.currentThread().getName()+"继续执行");} catch (InterruptedException e) {throw new RuntimeException(e);} catch (BrokenBarrierException e) {throw new RuntimeException(e);}},String.valueOf(i)).start();}}
}
定义CyclicBarrier的过程中加了一个计数总数和一个计数完成后执行的线程
在后续的子线程中,调用
cyclicBarrier.await(); 触发计数器减一,并将当前线程阻塞,当计数器减到0 时,解除所有
线程的阻塞,并且执行我们一开始设置的线程
我们这里的代码如果设置大于计数器计数上限的数,也就是这里的8的线程数,后续来的线程就会一直阻塞
1.6 SemaPhore
SemaPhore可以用于做限流操作
package com.juclearn;import java.util.*;
import java.util.concurrent.*;public class Safe {public static void main(String[] args) throws Exception{Semaphore semaphore=new Semaphore(3);for(int i=1;i<=8;i++){new Thread(()->{try {semaphore.acquire();System.out.println(Thread.currentThread().getName()+"执行");TimeUnit.SECONDS.sleep(4);System.out.println(Thread.currentThread().getName()+"执行结束");} catch (InterruptedException e) {throw new RuntimeException(e);}finally {semaphore.release();}},String.valueOf(i)).start();}}
}
使用过程和锁类似,semaphore.acquire触发时将计数器减一,如果计数器为0就阻塞,直到有值可减
当我们执行的方法结束后,使用release就会将计数器加1 ,这是不是和我们的互斥锁有点像,只不过限定了锁的
数量
1.7 ReadWriteLock
读写锁是一种更加细粒度的锁
package com.juclearn;import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;public class Safe {public static void main(String[] args) throws Exception{//测试读写锁Cache cache = new Cache();//写入for (int i = 1; i <= 5; i++) {final int temp = i;new Thread(() -> {cache.put("" + temp, temp);}, String.valueOf(i)).start();}}
}class Cache {private final Map<String, Object> map = new HashMap<>();private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();public void put(String key, Object value) {readWriteLock.writeLock().lock();try {System.out.println(Thread.currentThread().getName() + " 正在写入 " + key);map.put(key, value);System.out.println(Thread.currentThread().getName() + " 写入完成 ");} finally {readWriteLock.writeLock().unlock();}}public Object get(String key) {readWriteLock.readLock().lock();try {System.out.println(Thread.currentThread().getName() + " 正在读取 " + key);Object value = map.get(key);System.out.println(Thread.currentThread().getName() + " 读取完成 ");return value;} finally {readWriteLock.readLock().unlock();}}
}
读锁获取后可以被多个线程重复获取,但读锁未释放时不能获取写锁
写锁是一个纯粹的互斥锁,写锁一旦被获取其他读和写操作都不能获取锁
1.8 阻塞队列
阻塞队列根据添加和弹出的最终形式,分为四组api
第一组就是 add 和remove 当阻塞队列满了去add或者没有元素去弹出会抛出异常
使用element方法获取队首元素时如果没用元素也会抛出异常
package com.juclearn;import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;public class Safe {public static void main(String[] args) throws Exception {ArrayBlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(4);for (int i = 1; i <= 5; i++) {System.out.println(blockingQueue.add(String.valueOf(i)));}for(int i=1;i<=5;i++){System.out.println(blockingQueue.remove());}}
}
第二组是使用offer 和poll
此时添加时已满会返回false ,弹出时没值会返回null,获取队首的方法是peek 该方法同理
package com.juclearn;import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;public class Safe {public static void main(String[] args) throws Exception {ArrayBlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(4);for (int i = 1; i <= 4; i++) {System.out.println(blockingQueue.offer(String.valueOf(i)));}for(int i=1;i<=5;i++){System.out.println(blockingQueue.poll());}}
}
如果想要阻塞 那就是 put take
如果想要设置阻塞时间,就在offer 和poll里加时间,设置等待时间
1.9 同步队列
同步队列没有容量,进去一个元素就不能再加
必须先将这个元素取出来才能继续加,如果没取直接添加就会阻塞
没用元素直接取也会阻塞
package com.juclearn;import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;public class Safe {public static void main(String[] args) throws Exception {SynchronousQueue<String> synchronousQueue = new SynchronousQueue<>();for(int i=1;i<=1;i++) {new Thread(() -> {try {synchronousQueue.put("1");} catch (InterruptedException e) {throw new RuntimeException(e);}}).start();}TimeUnit.SECONDS.sleep(1);for(int i=1;i<=1;i++){new Thread(() -> {try {synchronousQueue.take();} catch (InterruptedException e) {throw new RuntimeException(e);}}).start();}}
}
2.0 线程池
2.0.1 三大方法
在创建线程池的过程中存在三大方法
package com.juclearn;import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;public class Safe {public static void main(String[] args) throws Exception {//创建单个线程的线程池//ExecutorService threadPool=Executors.newSingleThreadExecutor();//创建固定数量线程的线程池ExecutorService threadPool =Executors.newFixedThreadPool(4);//创建可伸缩数量线程的线程池//ExecutorService threadPool = Executors.newCachedThreadPool();try {for (int i = 1; i <=10 ; i++) {threadPool.execute(()->{System.out.println(Thread.currentThread().getName()+" ok");});}} finally {threadPool.shutdown();}}
}
2.0.2 七大参数
我们查看三大方法创建的底层代码
发现底层都是直接调用ThreadPoolExecutor
在阿里巴巴java开发手册中也写过最好直接使用ThreadPoolExecutor来创建线程,而不是使用Excutor ,因为
Excutor使用的一些默认参数不安全
我们来看一下怎么用 ThreadPoolExecutor 创建线程池
package com.juclearn;import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;public class Safe {public static void main(String[] args) throws Exception {ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(2,5,3,TimeUnit.SECONDS,new LinkedBlockingDeque<>(3),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());try {for (int i = 1; i <=10 ; i++) {poolExecutor.execute(()->{System.out.println(Thread.currentThread().getName()+" ok");});}} finally {poolExecutor.shutdown();}}
}
其中 第一个参数是核心线程数,也就是一直开放的线程数,第二个参数是最大线程数,当核心线程全部被使用切
后面的阻塞队列被填满时就会开放这几个线程
第三个参数是非核心线程的存活时间,也就是对于非核心线程,如果这么长时间后还没用被使用,就会被销毁
第四个参数是上一个时间参数的单位
第五个参数是用于存放未被获得线程的任务的阻塞队列
第六个参数是线程工厂,用来产生线程
第七个参数是拒绝策略,也就是当阻塞队列满了,所有线程都开放了,还有任务进来时的处理策略,当前的策略是
还有任务就报错
对于第七个参数,我们还有其他的拒绝策略:
new ThreadPoolExecutor.CallerRunsPolicy()
这种策略是当当前线程池的阻塞队列和所有线程都开放后,如果再加入新的任务,则由我们调用excuate方法的线
程,一般是主线程来处理
另一种策略:
new ThreadPoolExecutor.DiscardPolicy()
当前这种策略当线程池的阻塞队列和所有线程都开放后,加入新任务时直接不执行,也不抛出异常
new ThreadPoolExecutor.DiscardOldestPolicy()
当前策略有新任务加入会替换掉阻塞队列中最老的任务,然后尝试进行任务提交,不会对已经获取线程在执行的任
务进行替换,需要注意的是,第二次尝试提交也不一定能提交成功,也要看当前是否有空位,因为我们刚才腾出的
空位可能会被其他的任务占用
最大线程池大小设置规范
对于 Cpu密集型任务:
对于该类任务,应该设置的最大线程池大小为 cpu核心数量+1
这是为了防止某个线程因为缺页中断(Page Fault)或其他原因导致 CPU 暂停,额外的那个线程可以补上,保证
CPU 时钟周期不被浪费。
对于IO密集型任务:
应该设置设置为
WT是线程等待时间(IO,锁等待时间,网络通信等)
ST是线程真正使用CPU进行计算的时间
对于核心线程池大小设计
对于流量平稳的核心服务,应该将核心线程池大小和最大线程池大小设置的相等
对于流量波动大,但允许一定延迟的任务,应该将核心线程池大小按照平均流量计算,最大线程池大小根据峰值流
量计算