1.共享模型之不可变
1.1 日期转换的问题
存在线程安全问题
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); for (int i = 0; i < 10; i++) { new Thread(()->{ try { log.info("{}", sdf.parse("2024-12-1")); } catch (ParseException e) { e.printStackTrace(); } }).start(); } /////////////////////////////////////////// java.lang.NumberFormatException: multiple points at sun.misc.FloatingDecimal.readJavaFormatString(FloatingDecimal.java:1890) at sun.misc.FloatingDecimal.parseDouble(FloatingDecimal.java:110) at java.lang.Double.parseDouble(Double.java:538) at java.text.DigitList.getDouble(DigitList.java:169) at java.text.DecimalFormat.parse(DecimalFormat.java:2089) at java.text.SimpleDateFormat.subParse(SimpleDateFormat.java:1869) at java.text.SimpleDateFormat.parse(SimpleDateFormat.java:1514) at java.text.DateFormat.parse(DateFormat.java:364) at com.renex.c8.test6.lambda$main$0(test6.java:16) at java.lang.Thread.run(Thread.java:750)
- 更换使用
DateTimeFormatter类,解决线程安全问题
DateTimeFormatter stf = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); for (int i = 0; i < 10; i++) { new Thread(()->{ TemporalAccessor parse = stf.parse("2020-12-31 23:59:59"); log.info("{}",parse); }).start(); } //////////////////////////////////////// [Thread-0] INFO test6 - {},ISO resolved to 2020-12-31T23:59:59 [Thread-7] INFO test6 - {},ISO resolved to 2020-12-31T23:59:59 [Thread-5] INFO test6 - {},ISO resolved to 2020-12-31T23:59:59 [Thread-2] INFO test6 - {},ISO resolved to 2020-12-31T23:59:59 [Thread-9] INFO test6 - {},ISO resolved to 2020-12-31T23:59:59
分析DateTimeFormatter类
public final class DateTimeFormatter { /** * The printer and/or parser to use, not null. */ private final CompositePrinterParser printerParser; /** * The locale to use for formatting, not null. */ private final Locale locale; //....
我们能够看到这个类中大多都是使用final进行修饰的,这保证了唯一性
1.2 不可变设计
众所周知,String类是不可变的,以它为例
public final class String implements java.io.Serializable, Comparable<String>, CharSequence { /** The value is used for character storage. */ private final char value[];
1.2.1 final的使用
发现该类,类中的所有属性都是final的
- 属性用final修饰保证了改属性是只读的,不能修改
- 类用final修饰保证了该类中的方法不能被覆盖,防止子类无意间破坏不可变性
1.2.2 保护性拷贝
public String substring(int beginIndex) { if (beginIndex < 0) { throw new StringIndexOutOfBoundsException(beginIndex); } int subLen = value.length - beginIndex; if (subLen < 0) { throw new StringIndexOutOfBoundsException(subLen); } return (beginIndex == 0) ? this : new String(value, beginIndex, subLen); }
以substring为例,它们这种方法是如何做到线程安全的呢?
- 可以发现,内部都是调用String的构造方法创建了一个新的字符串,再进入这个构造看看,是否对final char[] value做出了修改
public String(char value[], int offset, int count) { if (offset < 0) { throw new StringIndexOutOfBoundsException(offset); } if (count <= 0) { if (count < 0) { throw new StringIndexOutOfBoundsException(count); } if (offset <= value.length) { this.value = "".value; return; } } // Note: offset or count might be near -1>>>1. if (offset > value.length - count) { throw new StringIndexOutOfBoundsException(offset + count); } this.value = Arrays.copyOfRange(value, offset, offset+count); }
结果发现也没有,构造新字符串对象时,会产生新的char[] value,对内容进行复制。这种通过创建副本对象来避免共享的手段称之为【保护性拷贝】
2. 共享模型之享元
享元模式(FlyWeight),属于结构型设计模式,主要解决实例化大量相同的对象,从而导致可能的内存泄漏的问题。
为了解决这个问题,享元模式提出的解决办法是将相同的对象保存在内存中,且仅保存一个对象,因此该对象应该是不可被修改的,当需要获取该对象实例时,直接从内存中读取即可,从而避免了相同对象的重复创建。
定义:运用共享技术有效地支持大量细粒度的对象。当需要重用数量优先的同一类对象时,可以使用该设计模式
2.1 体现
2.1.1 包装类
在JDK中Boolean、Byte、Integer、Long、Character等包装类提供了valueOf()方法,例如Long的valueOf会缓存-128~127之间的Long对象,在这个范围之间会重用对象,大于这个范围,才会新建Long对象
public static Long valueOf(long l) { final int offset = 128; if (l >= -128 && l <= 127) { // will cache return LongCache.cache[(int)l + offset]; } return new Long(l); }
Byte、Short、Long缓存的范围都是-128~127
Character缓存的范围是0~127
Integer的默认范围是-128~127,最小值不能变,但最大值可以通过调整JVM参数来改变
Boolean缓存了 TRUE和FALSE
2.1.2 String串池
2.1.3 BigDeciaml.BigInteger
2.2 final原理
2.2.1 设置final变量的原理
public class TestFinal{ final int a = 20; }
字节码:
Classfile /E:/TestFinal.class Last modified 2024年12月8日; size 257 bytes MD5 checksum e29e78101c9b3e7772eec3fdaf7e9530 Compiled from "TestFinal.java" public class TestFinal minor version: 0 major version: 55 flags: (0x0021) ACC_PUBLIC, ACC_SUPER this_class: #3 // TestFinal super_class: #4 // java/lang/Object interfaces: 0, fields: 1, methods: 1, attributes: 1 Constant pool: #1 = Methodref #4.#15 // java/lang/Object."<init>":()V #2 = Fieldref #3.#16 // TestFinal.a:I #3 = Class #17 // TestFinal #4 = Class #18 // java/lang/Object #5 = Utf8 a #6 = Utf8 I #7 = Utf8 ConstantValue #8 = Integer 20 #9 = Utf8 <init> #10 = Utf8 ()V #11 = Utf8 Code #12 = Utf8 LineNumberTable #13 = Utf8 SourceFile #14 = Utf8 TestFinal.java #15 = NameAndType #9:#10 // "<init>":()V #16 = NameAndType #5:#6 // a:I #17 = Utf8 TestFinal #18 = Utf8 java/lang/Object { final int a; descriptor: I flags: (0x0010) ACC_FINAL ConstantValue: int 20 public TestFinal(); descriptor: ()V flags: (0x0001) ACC_PUBLIC Code: stack=2, locals=1, args_size=1 0: aload_0 1: invokespecial #1 // Method java/lang/Object."<init>":()V 4: aload_0 5: bipush 20 7: putfield #2 // Field a:I // ------------------写屏障 10: return LineNumberTable: line 1: 0 line 2: 4 } SourceFile: "TestFinal.java"
发现final变量的赋值也会用过putfield指令来完成,同样在这条指令之后也会加入写屏障,保证在其他线程读到它的值时不会出现为0的情况
2.2.2 读final变量的值
实际上,在字节码中,会将final变量的值复制到方法区域的栈中,拿着这个赋值的值进行计算
2.3 无状态
在web阶段学习时,设计Servlet时为了保证其线程安全,都会有这样的建议,不要为Servlet设置成员变量,这种没有任何成员变量的类是线程安全的
因为成员变量保存的数据也可以称为状态信息,因此没有成员变量就称之为【无状态】
3. 自定义线程池
package com.renex.c8; import lombok.extern.slf4j.Slf4j; import lombok.val; import java.util.ArrayDeque; import java.util.Deque; import java.util.HashSet; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; @Slf4j(topic = "TestPool") public class TestPool { public static void main(String[] args) throws InterruptedException { ThreadPool threadPool = new ThreadPool(1, 1000, TimeUnit.MICROSECONDS, 1,(queue,task)->{ // 1. 一直等待 // queue.put(task);// 死等策略 // 2. 带超时等待 // queue.offer(task,1000,TimeUnit.MILLISECONDS); // 3. 放弃任务执行 // log.error("放弃:{}",task);// 当出现问题后,会一直持续运行,等待接收信息 // 4. 让调用者抛出异常 // throw new RuntimeException("任务执行失败"+task); // 当出现问题后,会结束进程 // 5. 让调用者自己执行任务 task.run(); }); for (int i = 0; i < 5; i++) { int j = i; threadPool.execute(()->{ try { Thread.sleep(1000L); } catch (InterruptedException e) { throw new RuntimeException(e); } log.info("{}",j); }); } } } /** * 拒绝策略 * - 将拒绝的各种策略抽象出来,让调用者进行选择 */ @FunctionalInterface interface RejectPolicy<T>{ void reject(BlockingQueue<T> queue,T task) throws InterruptedException; } @Slf4j(topic = "ThreadPool") class ThreadPool{ // 任务队列 private BlockingQueue<Runnable> taskQueue; // 线程集合 private HashSet<Runnable> works= new HashSet<>(); // 核心线程数 private int coreSize; // 获取任务的超时时间数 private long timeout; private TimeUnit timeUnit; private RejectPolicy<Runnable> rejectPolicy; /** * 初始化线程池 * @param coreSize * @param timeout * @param timeUnit * @param queueCapcity */ public ThreadPool(int coreSize, int timeout, TimeUnit timeUnit, int queueCapcity,RejectPolicy<Runnable> rejectPolicy) { this.coreSize = coreSize; this.timeout = timeout; this.timeUnit = timeUnit; this.taskQueue = new BlockingQueue<>(queueCapcity); this.rejectPolicy = rejectPolicy; } // 执行任务 public void execute(Runnable task) throws InterruptedException { // 当任务数没有超过coreSize时,直接交给worker对象执行 // 如果任务数超过coreSize时,加入任务队列暂存 synchronized (works){ if (works.size() < coreSize){ Worker worker = new Worker(task); log.info("新增Worker:{},{}",worker,task); works.add(worker); worker.start(); }else { log.info("加入任务队列,{}",task); taskQueue.put(task); // 1. 一直等待 // 2. 带超时等待 // 3. 放弃任务执行 // 4. 让调用者抛出异常 // 5. 让调用者自己执行任务 taskQueue.tryPut(rejectPolicy,task); } } } class Worker extends Thread{ private Runnable task; public Worker(Runnable task) { this.task = task; } @Override public void run() { // 执行任务 // 1. 当task不为空,执行任务 // 2. 当task执行完毕,再接着从任务队列获取任务并执行 // 使用take()方法,就是阻塞状态 // while (task!=null || (task = taskQueue.take()) != null){ // 使用poll()方法,即可设置超时时间 while (task!=null || (task = taskQueue.poll(1000,TimeUnit.MICROSECONDS)) != null){ try { log.info("正在执行...{}",task); task.run(); }catch (Exception e){ e.printStackTrace(); }finally { task = null; } } synchronized (works){ log.info("worker被移除:{}",this); works.remove(this); } } } } /** * 阻塞队列 * @param <T> */ @Slf4j(topic = "BlockingQueue") class BlockingQueue<T> { // 1. 任务队列 private Deque<T> deque = new ArrayDeque<>(); // 2. 锁 private ReentrantLock lock = new ReentrantLock(); // 3. 生产者条件变量 private Condition fulWaitSet = lock.newCondition(); // 4. 消费者条件变量 private Condition emptyWaitSet = lock.newCondition(); // 5. 容量 private int capcity; public BlockingQueue(int queueCapcity) { this.capcity = queueCapcity; } /** * 超时阻塞获取 * @return */ public T poll(long timeout, TimeUnit unit){ lock.lock(); try { // 将超时时间转换成 纳秒 long nanos = unit.toNanos(timeout); while (deque.isEmpty()){ try { // 返回的是剩余的时间 if (nanos<=0){ return null; } nanos = emptyWaitSet.awaitNanos(nanos); } catch (InterruptedException e) { throw new RuntimeException(e); } } T t = deque.removeFirst(); return t; } finally { lock.unlock(); } } // 阻塞获取 public T take(){ lock.lock(); try { while (deque.isEmpty()){ try { emptyWaitSet.await(); } catch (InterruptedException e) { throw new RuntimeException(e); } } T t = deque.removeFirst(); return t; } finally { lock.unlock(); } } // 带超时时间队列添加 public boolean offer(T task,long timeout,TimeUnit timeUnit) throws InterruptedException { lock.lock(); try { long nanos = timeUnit.toNanos(timeout); while (deque.size()==capcity){ try { log.info("等待加入任务队列:{}...",task); if (nanos<=0){ return false; } fulWaitSet.awaitNanos(nanos); } catch (InterruptedException c) { throw new RuntimeException(c); } } deque.addLast(task); emptyWaitSet.signal(); return true; } finally { lock.unlock(); } } // 阻塞添加 public void put(T e) throws InterruptedException { lock.lock(); try { while (deque.size()==capcity){ try { log.info("正在等待任务队列:{}",e); fulWaitSet.await(); } catch (InterruptedException c) { throw new RuntimeException(c); } } deque.addLast(e); emptyWaitSet.signal(); } finally { lock.unlock(); } } // 获取大小 public int size(){ lock.lock(); try{ return deque.size(); }finally { lock.unlock(); } } public void tryPut(RejectPolicy<T> rejectPolicy, T task) { lock.lock(); try { // 判断队列是否溢出 if (deque.size() == capcity){ // 使用拒绝策略,来针对不同场景使用不同的策略 rejectPolicy.reject(this,task); }else { // 没有溢出 log.info("加入任务队列:{}",task); deque.addLast(task); emptyWaitSet.signal(); } } catch (InterruptedException e) { throw new RuntimeException(e); } finally { lock.unlock(); } } }
4. JDK的线程池策略
4.1 ThreadPoolExecutor
4.1.1 线程池状态
ThreadPoolExecutor使用int的高3位来表示线程池状态,低29位表示线程数量
| 状态名 | 高3位 | 接收新任务 | 处理阻塞队列任务 | 说明 |
| RUNNING | 111 | Y | Y | |
| SHUTDOWN | 000 | N | Y | 不会接收新任务,但会处理阻塞队列剩余任务 |
| STOP | 001 | N | N | 会终端真正该执行的任务,并抛弃阻塞队列任务 |
| TIDYING | 010 | - | - | 任务全部执行完毕,活动线程为0即将进入中介 |
| TERMINATED | 011 | - | - | 终结状态 |
这些信息存储在一个原子变量ctl中,目的是将线程吃状态与线程个数合二为一,这样就可以用一次cas原子操作进行赋值
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); // c为旧值,ctlOf 返回结果为新值 ctl.compareAndSet(c,ctlOf(targetState,workerCountOf(c))); // rs为高3位代表线程池状态,wc为低29位代表线程个数,ctl是合并它们 private static int ctlOf(int rs,int wc){return rs | wc;}
4.1.2 构造方法
/** * Creates a new ThreadPoolExecutor with the given initial parameters and default thread factory. */ public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,RejectedExecutionHandler handler) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler,handler); }
- corePoolSize 核心线程数目(最多保留的线程数)
- maximumPoolSize 最大线程数目
- keepAliveTime 生存时间-针对救急线程
- unit 时间单位-针对救急线程
- workQueue 阻塞队列
- threadFactory 线程工厂-可以为线程创建时起名称
- handler 拒绝策略
工作方式:
- 线程池中刚开始没有线程,当一个任务提交给线程池后,线程池会创建一个新线程来执行任务
- 当线程数达到corePoolSize并没有线程空闲,这时再加入任务,新加的任务会被加入workQueue队列排队,直到有空闲的线程
- 如果队列选择了有界队列,而任务超过了队列大小时,会创建
maximumPoolSize - corePoolSize数目的应急线程来救急。
而应急当然就是救急用的,所以当高峰过去而没有任务做,就会被结束,由keepAliveTime 和 unit 来控制。 - 如果线程达到
maximumPoolSize仍然有新任务,这时会执行拒绝策略。拒绝策略JDK提供了4种实现,其他著名框架也提供了实现
- AbotPolicy 让调用者抛出 RejectExecutionException 异常(默认策略)
- CallerRunsPolicy 让调用者运行任务
- DiscardPolicy 放弃本次任务
- DiscardOldestPolicy 放弃队列中最早的任务,本任务取而代之
- Dubbo 的实现,在抛出 RejectExecutionException 异常之前会记录日志,并dump线程栈信息,方便定位问题
- Netty 的实现,是创建一个新线程来执行任务
- ActiveMQ 的实现,带超时等待(60s)尝试放入队列,类似我们之前的拒绝策略
- PinPoint的实现,它使用了一个拒绝策略链,会逐一尝试策略链中每种拒绝策略
- 当高峰过去后,
超过corePoolSize的救急线程如果一段时间没有任务做,需要结束节省资源,这个时间由 keepAliveTime 和 unit 来控制
根据这个构造方法,JDK Executors类中提供了众多工厂方法来创建各种用途的线程池
4.1.3 newFixedThreadPool
- 固定大小线程池
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory); }
特点
- 核心线程数 == 最大线程数(没有救急线程被创建),因此也无需超时时间
- 阻塞队列是无界的,可以放任意数量的任务
适用于任务量已知,相对耗时的任务
演示
public static void main(String[] args) { ExecutorService pool = Executors.newFixedThreadPool(2,new ThreadFactory() { private final AtomicInteger atomicInteger = new AtomicInteger(); // 重写该方法,可以更改线程名称 @Override public Thread newThread(Runnable r) { return new Thread(r,"pool_t"+atomicInteger.getAndIncrement()); } }); pool.execute(()->{ log.info("1"); }); pool.execute(()->{ log.info("2"); }); pool.execute(()->{ log.info("3"); }); pool.shutdown(); }
4.1.4 newCachedThreadPool
- 带缓冲线程池
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
特点
- 核心线程数是0,最大线程数是 Integer.MAX_VALUE,救急线程的空闲生存时间是60s,意味着
- 全部都是救急线程(60s后可以回收)
- 救急线程可以无限创建
- 队列采用了SynchronousQueue实现,特点是:它没有容量,没有线程来取是放不进去的。
{ SynchronousQueue<Integer> que = new SynchronousQueue<>(); new Thread(()->{ try { log.info("putting {}",1); que.put(1); log.info("{} putted...",1); log.info("putting {}",2); que.put(2); log.info("{} putted...",2); } catch (InterruptedException e) { e.printStackTrace(); } },"t1").start(); Thread.sleep(1); new Thread(()->{ try { log.info("taking {}",1); que.take(); } catch (InterruptedException e) { e.printStackTrace(); } },"t2").start(); new Thread(()->{ try { log.info("taking {}",2); que.take(); } catch (InterruptedException e) { e.printStackTrace(); } },"t3").start(); } ////////////////////////////////////////////// [t1] INFO test7 - putting 1 [t2] INFO test7 - taking 1 [t1] INFO test7 - 1 putted... [t3] INFO test7 - taking 2 [t1] INFO test7 - putting 2 [t1] INFO test7 - 2 putted...
整个线程池表现为:线程会根据任务量不断增长,没有上限,当任务执行完毕,空闲1分钟后释放线程
适合任务数比较密集,但每个任务执行时间较短的情况
4.1.5 newSingleThreadExecutor
- 单线程线程池
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
使用场景:
- 希望多个任务排队执行
- 线程数固定为1,任务数多于1时,会放入无界队列排队。
任务执行完毕后,这个线程也不会得到释放
区别:
- 自己创建一个但线程串行执行任务,如果任务执行失败而终止,那么没有任何补救措施,而线程池还会新建一个线程,保证池的正常工作
- Executors.newSingleThreadExceutor() 线程个数始终为1,不能修改
- FinalizableDelegateExecutorService应用的是装饰器模式,只对外暴露了ExecutorService接口,因此不能调用ThreadPoolExecutor中特有的方法
- Executors.newFixedThreadPool(1)初始时为1,以后还可以修改
- 对外暴露的是ThreadPoolExecutor对象,可以强转后调用 setCorePoolSize 等方法进行修改
4.1.6 提交任务
// 执行任务 void execute(Runnable command); // 提交任务task,用返回值Future获得任务执行结果 <T> Future<T> submit(Callable<T> task); // 提交tasks中所有任务 <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException; // 提交tasks中所有任务,带超时时间 <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout,TimeUnit unit) throws InterruptedException; // 提交tasks中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其他任务取消 <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException,ExecutionException; // 提交tasks中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其他任务取消,带超时时间 <T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout,TimeUnit unit) throws InterruptedException,ExecutionException;
- submit()演示代码:
ExecutorService pool = Executors.newFixedThreadPool(2); Future<String> future = pool.submit(new Callable<String>() { @Override public String call() throws Exception { log.info("running..."); Thread.sleep(1000); return "ok"; } }); log.info("{}",future.get());
- invokeAll()演示代码:
ExecutorService pool = Executors.newFixedThreadPool(2); List<Future<String>> futures = pool.invokeAll(Arrays.asList( (Callable<String>) () -> { log.info("hello_1"); Thread.sleep(1000); return "1"; }, (Callable<String>) () -> { log.info("hello_2"); Thread.sleep(1000); return "2"; }, (Callable<String>) () -> { log.info("hello_3"); Thread.sleep(1000); return "3"; } ));
- invokeAny()演示代码:
ExecutorService pool = Executors.newFixedThreadPool(2); String futures = pool.invokeAny(Arrays.asList( (Callable<String>) () -> { log.info("hello_1"); Thread.sleep(1000); return "1"; }, (Callable<String>) () -> { log.info("hello_2"); Thread.sleep(1000); return "2"; }, (Callable<String>) () -> { log.info("hello_3"); Thread.sleep(1000); return "3"; } )); log.info("{}", futures);
4.1.7 关闭线程池
4.1.7.1 shutdown()
/** 线程池状态变为 SHUTDOWN - 不会接收新任务 - 但已提交任务会执行完 - 此方法不会阻塞调用线程的执行 */ void shutdown();
public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); // 修改线程池状态 advanceRunState(SHUTDOWN); // 仅会打断空闲线程 interruptIdleWorkers(); onShutdown(); // 扩展点 ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } // 尝试终结(没有运行的线程可以立刻终结,如果还有运行的线程也不会等) tryTerminate(); }
再深入onShutdown()方法
void onShutdown() { BlockingQueue<Runnable> q = super.getQueue(); boolean keepDelayed = getExecuteExistingDelayedTasksAfterShutdownPolicy(); boolean keepPeriodic = getContinueExistingPeriodicTasksAfterShutdownPolicy(); if (!keepDelayed && !keepPeriodic) { for (Object e : q.toArray()) if (e instanceof RunnableScheduledFuture<?>) ((RunnableScheduledFuture<?>) e).cancel(false); q.clear(); } else { // Traverse snapshot to avoid iterator exceptions for (Object e : q.toArray()) { if (e instanceof RunnableScheduledFuture) { RunnableScheduledFuture<?> t = (RunnableScheduledFuture<?>)e; if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) || t.isCancelled()) { // also remove if already cancelled if (q.remove(t)) t.cancel(false); } } } } tryTerminate(); }
4.1.7.2 shutdownNow()
/** 线程池状态变为STOP - 不会接收新任务 - 会将队列中的任务返回 - 并用interrupt的方式中断正在执行的任务 */ List<Runnable> shutdownNow();
public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); // 修改线程池状态 advanceRunState(STOP); // 打断所有线程 interruptWorkers(); // 获取队列中剩余任务 tasks = drainQueue(); } finally { mainLock.unlock(); } // 尝试终结 tryTerminate(); return tasks; }
4.1.7.3 其他方法
// 不在RUNNING状态的线程池,此方法就返回true boolean isShutdown(); // 线程池状态是否是 TERMINATED boolean isTerminated(); // 调用shutdown后,由于调用线程并不会等待所有任务运行结束,因此如果它想在线程池TERMINATED后做些事情,可以利用此方法等待 boolean awaitTermination(long timeout,TimeUnit unit) throws InterruptedException;
4.2 设计模式之 工作线程
4.2.1 定义
让有限的工作线程(Worker Thread)来轮流异步处理无限多的任务
也可以将其归类为分工模式,它的典型实现就是线程池,也体现了经典设计模式中的享元模式
享元模式(FlyWeight),属于结构型设计模式,主要解决实例化大量相同的对象,从而导致可能的内存泄漏的问题。
为了解决这个问题,享元模式提出的解决办法是将相同的对象保存在内存中,且仅保存一个对象,因此该对象应该是不可被修改的,当需要获取该对象实例时,直接从内存中读取即可,从而避免了相同对象的重复创建。
定义:运用共享技术有效地支持大量细粒度的对象。
例子:一个线程轮流处理多个任务,如果每个任务都有对应的线程来进行处理,那么成本就太高了,
不同的任务类型应该使用不同的线程池,这样能够彼岸饥饿,并能提升效率
或许我们能这样想,让合适的线程做与它更合适的任务,就好比岗位中职位的区分,例如一个餐馆中的工人,可以分为服务员和厨师,这两种职业分别就对应着不同的线程,他们所干的活就对应着不同的任务类型。
4.2.2 饥饿
固定大小线程池会有饥饿现象
- 两个工具人是同一个线程池中的两个线程
- 他们要做的事情是:为客人点餐和后厨做菜,这是两个阶段的工作
- 客人点餐:必须先点完餐,等菜做好,上菜,在此期间处理点餐的工人必须等待
- 后厨做菜:直接做
- 例子:工人A处理了点餐任务,接下来它要等着工人B把才做好,然后上菜,他俩也配合的很好
- 但现在同时来了两个客人,这时候工人A和工人B都去处理点餐了,没人做菜,产生死锁
——————————————产生死锁(饥饿)现象——————————
static List<String> list = Arrays.asList("三鲜","锅包肉","糖醋里脊","油爆大虾"); static Random random= new Random(); static String cooking() {return list.get(random.nextInt(list.size()));}; public static void main(String[] args) { ExecutorService pool = Executors.newFixedThreadPool(2); pool.execute( ()->{ log.info("处理点餐..."); Future<String> future = pool.submit(() -> { log.info("做菜"); return cooking(); }); try { log.info("上菜:{}",future.get()); } catch (InterruptedException e) { throw new RuntimeException(e); } catch (ExecutionException e) { throw new RuntimeException(e); } } ); pool.execute( ()->{ log.info("处理点餐..."); Future<String> future = pool.submit(() -> { log.info("做菜"); return cooking(); }); try { log.info("上菜:{}",future.get()); } catch (InterruptedException e) { throw new RuntimeException(e); } catch (ExecutionException e) { throw new RuntimeException(e); } } ); }
饥饿现象,是否可以通过增加线程数量来解决呢?
- 可以,但治标不治本
如果再往后又出现了多个任务,难不成每次都需要扩充线程数量来解决吗?
根本解决方式是用不同的线程池来进行处理
static List<String> list = Arrays.asList("三鲜","锅包肉","糖醋里脊","油爆大虾"); static Random random= new Random(); static String cooking() {return list.get(random.nextInt(list.size()));}; public static void main(String[] args) { ExecutorService pool1 = Executors.newFixedThreadPool(1); // 多用一个线程池 ExecutorService pool2 = Executors.newFixedThreadPool(1); /** * pool1 - 线程池1做任务的分发 * pool2 - 线程池2做任务的执行 */ pool1.execute( ()->{ log.info("处理点餐..."); Future<String> future = pool2.submit(() -> { log.info("做菜"); return cooking(); }); try { log.info("上菜:{}",future.get()); } catch (InterruptedException e) { throw new RuntimeException(e); } catch (ExecutionException e) { throw new RuntimeException(e); } } ); pool1.execute( ()->{ log.info("处理点餐..."); Future<String> future = pool2.submit(() -> { log.info("做菜"); return cooking(); }); try { log.info("上菜:{}",future.get()); } catch (InterruptedException e) { throw new RuntimeException(e); } catch (ExecutionException e) { throw new RuntimeException(e); } } ); }
一个线程池做任务分发
另外的线程池就做任务的执行
线程池之间运行的流程不同,处理的方式不同,所占的内存也不同
pool2只需要关注池中有无需要处理的任务,而无需操心其他事务
4.2.3 创建多少线程池合适
- 过小会导致程序不能充分的利用系统资源、容易导致饥饿
- 过大会导致更多的线程上下文切换,占用更多的内存
4.2.3.1 CPU密集型运算
通常采用 cpu核心数+1 能够实现最优的CPU利用率
+1是保证当线程由于页缺失故障(操作系统)或其他原因导致暂停时,额外的这个线程就能顶上去,保证CPU时钟周期不被浪费
4.2.3.2 I/O 密集运算
CPU不总是处于繁忙状态,例如,当你执行业务计算时,这时候会使用CPU资源
但当你执行I/O操作时、远程RPC调用时,包括进行数据库操作时,这时候CPU就闲下来了,你可以利用多线程提高它的利用率
经验公式如下:
线程数=CPU核心数*期望CPU利用率*总时间(CPU计算时间+等待时间)/CPU计算时间
4.3 任务调度线程池
在【任务调度线程池】功能加入之前,可以使用 java.util.Timer 来实现定时功能
当需要在一个时间周期内重复执行某个任务时使用
Timer的优点在于
- 简单
- 易用
但由于所有的任务都是由一个线程来调度,因此所有任务都是串行执行的,同一时间只能有一个任务在执行,前一个任务的延迟或异常都将会影响到之后的任务
Timer timer = new Timer(); TimerTask timerTask = new TimerTask() { @Override public void run() { log.info("task 1"); try { Thread.sleep(200); } catch (InterruptedException e) { throw new RuntimeException(e); } } }; TimerTask timerTask2 = new TimerTask() { @Override public void run() { log.info("task 2"); } }; log.info("start..."); timer.schedule(timerTask, 1000); timer.schedule(timerTask2, 1000); ///////////////////////////////////////////// [main] INFO com.renex.c9.test1 - start... [Timer-0] INFO com.renex.c9.test1 - task 1 [Timer-0] INFO com.renex.c9.test1 - task 2
可以看到task2本应该先执行,但是由于task1的延迟,导致task的执行时间被延后
并且,当出现异常后,会停止执行任务!
使用ScheduledThreadPoolExecutor延时执行
ScheduledExecutorService po = Executors.newScheduledThreadPool(2); po.schedule(()->{ log.info("tast1"); try { Thread.sleep(200); } catch (InterruptedException e) { throw new RuntimeException(e); } },1, TimeUnit.SECONDS); po.schedule(()->{ log.info("tast2"); },1, TimeUnit.SECONDS); //////////////////////////////////////////// [pool-1-thread-2] INFO com.renex.c9.test1 - tast2 [pool-1-thread-1] INFO com.renex.c9.test1 - tast1
当线程池线程数量大于任务数量,那么就会遵守延时效应
而如果任务数量打过线程数量,就还是串行执行
而当出现异常后,并不会任务的执行流程并不会被停止
4.3.1 周期性执行
指定一个间隔时间,重复执行任务
- scheduleAtFixedRate() 方法
ScheduledExecutorService pool = Executors.newScheduledThreadPool(2); log.info("start..."); pool.scheduleAtFixedRate(()->{ log.info("running... 1"); }, 1, 1, TimeUnit.SECONDS); ////////////////////////////// [main] INFO com.renex.c9.test1 - start... [pool-1-thread-1] INFO com.renex.c9.test1 - running... 1 [pool-1-thread-1] INFO com.renex.c9.test1 - running... 1 [pool-1-thread-1] INFO com.renex.c9.test1 - running... 1 [pool-1-thread-1] INFO com.renex.c9.test1 - running... 1
固定间隔时间
- scheduleWithFixedDelay() 方法
pool.scheduleWithFixedDelay(()->{ log.info("running... 1"); }, 1, 2, TimeUnit.SECONDS);
从每个周期的结束时间开始计算下一个周期的开始时间
4.3.2 线程池应用-定时任务
/** * 如何让每周四 18:00:00 定时执行任务 * @param args */ public static void main(String[] args) { ScheduledExecutorService pool = Executors.newScheduledThreadPool(1); long period = 1000*60*24*7; // 当前时间 LocalDateTime now = LocalDateTime.now(); // 周几 LocalDateTime time = LocalDateTime.now().withHour(18).withSecond(0).withMinute(0).withNano(0).with(DayOfWeek.THURSDAY); //如果当前时间> 本周周四,必须找到下周四 if (now.compareTo(time)>0){ time = time.plusWeeks(1); } long delay = Duration.between(now, time).toMillis(); pool.scheduleAtFixedRate(()->{ log.info("running..."); },delay,period, TimeUnit.SECONDS); }
4.4 Tomcat 线程池
Tomcat 在哪里用到了线程池呢?
- LimitLatch 用来限流,可以控制最大连接个数,类似J.U.C中的Semaphore
- Acceptor只负责【接收新的socket连接】
- Poller只负责监听socket channel是否有【可读的I/O事件】
- 一旦可读,封装一个任务对象(socketProcessor),提交给Executor线程池处理
- Exectuor线程池中的工作线程最终负责【处理请求】
能够看到,每一个线程池都负责了不同的工作,分工明确
Tomcat线程池扩展了ThreadPoolExecutor,行为稍有不同
- 如果总线程数达到maximumPoolSize
- 这时不会立刻抛出RejectedExecutionException异常
- 而是再次尝试将任务放入队列,如果还失败,才抛出RejectedExecutionException异常
4.5 Fork/Join
4.5.1 概念
Fork/Join是JDK1.7加入的新的线程池实现,它体现的是一种分治思想,适用于能够进行任务拆分的CPU密集型运算
所谓的任务拆分,是将一个大任务拆分为算法上相同的小任务,直至不能拆分可以直接求解。
Fork/Join在分治的基础上加入了多线程,可以把每个任务的分解和合并交给不同的线程来完成,进一步提升了运算效率
Fork/Join默认会创建于CPU核心数大小相同的线程池
4.5.2 使用
提交给Fork/Join线程池的任务需要继承Recursivetask(有返回值)或RecursiveAction(没有返回值)
public class test3 { public static void main(String[] args) { ForkJoinPool pool = new ForkJoinPool(4); System.out.println(pool.invoke(new MyTask(5))); } } class MyTask extends RecursiveTask<Integer>{ private int n; public MyTask(int n){ this.n = n; } @Override protected Integer compute() { if (n==1){ log.info("join(){}",n); return 1; } MyTask t1 = new MyTask(n - 1); t1.fork(); log.info("fork(){}+{}",n,t1); Integer result = t1.join(); log.info("join(){}+{}={}",n,t1,result); return result; } } ////////////////////////////////////////////// [ForkJoinPool-1-worker-2] INFO com.renex.c9.MyTask - fork()4+com.renex.c9.MyTask@713c7c65 [ForkJoinPool-1-worker-3] INFO com.renex.c9.MyTask - fork()3+com.renex.c9.MyTask@71261eae [ForkJoinPool-1-worker-0] INFO com.renex.c9.MyTask - fork()2+com.renex.c9.MyTask@770db75a [ForkJoinPool-1-worker-3] INFO com.renex.c9.MyTask - join()1 [ForkJoinPool-1-worker-1] INFO com.renex.c9.MyTask - fork()5+com.renex.c9.MyTask@511e0595 [ForkJoinPool-1-worker-0] INFO com.renex.c9.MyTask - join()2+com.renex.c9.MyTask@770db75a=1 [ForkJoinPool-1-worker-3] INFO com.renex.c9.MyTask - join()3+com.renex.c9.MyTask@71261eae=1 [ForkJoinPool-1-worker-2] INFO com.renex.c9.MyTask - join()4+com.renex.c9.MyTask@713c7c65=1 [ForkJoinPool-1-worker-1] INFO com.renex.c9.MyTask - join()5+com.renex.c9.MyTask@511e0595=1
5. 👍JUC 专栏 - 前篇回顾👍
- 带你重新认识进程与线程!!让你深层次了解线程运行的睡眠与打断!!
- 共享问题解决与synchronized对象锁分析!全程干货!!快快收藏!!
- 常见的设计模式概念分析与多把锁使用场景!!理解线程状态转换条件!带你深入JUC!
- 从JMM内存模型的角度来分析CAS并发性问题
- 带你研究共享模型通过无锁的方式解决并发问题!本文分析对原子对象的使用!