↖  Java服务并发开发中的线程池-2..


-loading- -loading- -loading-

2022-06-10 , 3892 , 116 , 104


Tomcat自定义ThreadPoolExecutor

Tomcat自定义线程池继承于java.util.concurrent.ThreadPoolExecutor,并新增了一些成员变量来更高效地统计已经提交但尚未完成的任务数量(submittedCount),包括已经在队列中的任务和已经交给工作线程但还未开始执行的任务。


Tomcat在自定义线程池ThreadPoolExecutor中重写了execute()方法,并实现对提交执行的任务进行submittedCount加一。Tomcat在自定义ThreadPoolExecutor中,当线程池抛出RejectedExecutionException异常后,会调用force()方法再次向TaskQueue中进行添加任务的尝试。如果添加失败,则submittedCount减一后,再抛出RejectedExecutionException。


Tomcat自定义任务队列

在Tomcat中重新定义了一个阻塞队列TaskQueue,它继承于LinkedBlockingQueue。在Tomcat中,核心线程数默认值为10,最大线程数默认为200,为了避免线程到达核心线程数后后续任务放入队列等待,Tomcat通过自定义任务队列TaskQueue重写offer方法实现了核心线程池数达到配置数后线程的创建。

具体地,从线程池任务调度机制实现可知,当offer方法返回false时,线程池将尝试创建新新线程,从而实现任务的快速响应。TaskQueue核心实现代码如下:


Tomcat自定义任务线程

Tomcat中通过自定义任务线程TaskThread实现对每个线程创建时间的记录;使用静态内部类WrappingRunnable对Runnable进行包装,用于对StopPooledThreadException异常类型的处理。


思考&小结


Tomcat为什么要自定义线程池和任务队列实现?

JUC原生线程池在提交任务时,当工作线程数达到核心线程数后,继续提交任务会尝试将任务放入阻塞队列中,只有当前运行线程数未达到最大设定值且在任务队列任务满后,才会继续创建新的工作线程来处理任务,因此JUC原生线程池无法满足Tomcat快速响应的诉求。

Tomcat为什么使用无界队列?
Tomcat在EndPoint中通过acceptCount和maxConnections两个参数来避免过多请求积压。其中maxConnections为Tomcat在任意时刻接收和处理的最大连接数,当Tomcat接收的连接数达到maxConnections时,Acceptor不会读取accept队列中的连接;这时accept队列中的线程会一直阻塞着,直到Tomcat接收的连接数小于maxConnections(maxConnections默认为10000,如果设置为-1,则连接数不受限制)。acceptCount为accept队列的长度,当accept队列中连接的个数达到acceptCount时,即队列满,此时进来的请求一律被拒绝,默认值是100(基于Tomcat 8.5.43版本)。因此,通过acceptCount和maxConnections两个参数作用后,Tomcat默认的无界任务队列通常不会造成OOM。


最佳实践


避免用Executors 的创建线程池



Executors常用方法有以下几个:

newCachedThreadPool():创建一个可缓存的线程池,调用 execute 将重用以前构造的线程(如果线程可用)。如果没有可用的线程,则创建一个新线程并添加到线程池中。终止并从缓存中移除那些已有 60 秒钟未被使用的线程。CachedThreadPool适用于并发执行大量短期耗时短的任务,或者负载较轻的服务器;

newFiexedThreadPool(int nThreads):创建固定数目线程的线程池,线程数小于nThreads时,提交新的任务会创建新的线程,当线程数等于nThreads时,提交新的任务后任务会被加入到阻塞队列,正在执行的线程执行完毕后从队列中取任务执行,FiexedThreadPool适用于负载略重但任务不是特别多的场景,为了合理利用资源,需要限制线程数量;

newSingleThreadExecutor() 创建一个单线程化的 Executor,SingleThreadExecutor适用于串行执行任务的场景,每个任务按顺序执行,不需要并发执行;

newScheduledThreadPool(int corePoolSize) 创建一个支持定时及周期性的任务执行的线程池,多数情况下可用来替代 Timer 类。ScheduledThreadPool中,返回了一个ScheduledThreadPoolExecutor实例,而ScheduledThreadPoolExecutor实际上继承了ThreadPoolExecutor。从代码中可以看出,ScheduledThreadPool基于ThreadPoolExecutor,corePoolSize大小为传入的corePoolSize,maximumPoolSize大小为Integer.MAX_VALUE,超时时间为0,workQueue为DelayedWorkQueue。实际上ScheduledThreadPool是一个调度池,其实现了schedule、scheduleAtFixedRate、scheduleWithFixedDelay三个方法,可以实现延迟执行、周期执行等操作;

newSingleThreadScheduledExecutor() 创建一个corePoolSize为1的ScheduledThreadPoolExecutor;

newWorkStealingPool(int parallelism)返回一个ForkJoinPool实例,ForkJoinPool 主要用于实现“分而治之”的算法,适合于计算密集型的任务。

Executors类看起来功能比较强大、用起来还比较方便,但存在如下弊端:

-loading- -loading--loading-


FiexedThreadPool和SingleThreadPool任务队列长度为Integer.MAX_VALUE,可能会堆积大量的请求,从而导致OOM;

UfqiLong

CachedThreadPool和ScheduledThreadPool允许创建的线程数量为Integer.MAX_VALUE,可能会创建大量的线程,从而导致OOM;

使用线程时,可以直接调用 ThreadPoolExecutor 的构造函数来创建线程池,并根据业务实际场景来设置corePoolSize、blockingQueue、RejectedExecuteHandler等参数。


避免使用局部线程池

使用局部线程池时,若任务执行完后没有执行shutdown()方法或有其他不当引用,极易造成系统资源耗尽。


合理设置线程池参数

在工程实践中,通常使用下述公式来计算核心线程数:

nThreads=(w+c)/c*n*u=(w/c+1)*n*u

其中,w为等待时间,c为计算时间,n为CPU核心数(通常可通过 Runtime.getRuntime().availableProcessors()方法获取),u为CPU目标利用率(取值区间为[0, 1]);在最大化CPU利用率的情况下,当处理的任务为计算密集型任务时,即等待时间w为0,此时核心线程数等于CPU核心数。

上述计算公式是理想情况下的建议核心线程数,而不同系统/应用在运行不同的任务时可能会有一定的差异,因此最佳线程数参数还需要根据任务的实际运行情况和压测表现进行微调。

增加异常处理

为了更好地发现、分析和解决问题,建议在使用多线程时增加对异常的处理,异常处理通常有下述方案:

在任务代码处增加try...catch异常处理

如果使用的Future方式,则可通过Future对象的get方法接收抛出的异常

为工作线程设置setUncaughtExceptionHandler,在uncaughtException方法中处理异常

优雅关闭线程池


为了实现优雅停机的目标,我们应当先调用shutdown方法,调用这个方法也就意味着,这个线程池不会再接收任何新的任务,但是已经提交的任务还会继续执行。之后我们还应当调用awaitTermination方法,这个方法可以设定线程池在关闭之前的最大超时时间,如果在超时时间结束之前线程池能够正常关闭则会返回true,否则,超时会返回false。通常我们需要根据业务场景预估一个合理的超时时间,然后调用该方法。

如果awaitTermination方法返回false,但又希望尽可能在线程池关闭之后再做其他资源回收工作,可以考虑再调用一下shutdownNow方法,此时队列中所有尚未被处理的任务都会被丢弃,同时会设置线程池中每个线程的中断标志位。shutdownNow并不保证一定可以让正在运行的线程停止工作,除非提交给线程的任务能够正确响应中断。


----


ThreadLocal线程变量概述


什么是ThreadLocal

ThreadLocal类提供了线程本地变量(thread-local variables),这些变量不同于普通的变量,访问线程本地变量的每个线程(通过其get或set方法)都有其自己的独立初始化的变量副本,因此ThreadLocal没有多线程竞争的问题,不需要单独进行加锁。

ThreadLocal使用场景

每个线程都需要有属于自己的实例数据(线程隔离);

框架跨层数据的传递;

需要参数全局传递的复杂调用链路的场景;

数据库连接的管理,在AOP的各种嵌套调用中保证事务的一致性;

ThreadLocal的原理与实践

对于ThreadLocal而言,常用的方法有get/set/initialValue 3个方法。

众所周知,在java中SimpleDateFormat有线程安全问题,为了安全地使用SimpleDateFormat,除了1)创建SimpleDateFormat局部变量;和2)加同步锁 两种方案外,我们还可以使用3)ThreadLocal的方案:

/**
* 使用 ThreadLocal 定义一个全局的 SimpleDateFormat
*/
private static ThreadLocal<SimpleDateFormat> simpleDateFormatThreadLocal = new
ThreadLocal<SimpleDateFormat>() {
@Override
protected SimpleDateFormat initialValue() {
return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
}
};
// 用法
String dateString = simpleDateFormatThreadLocal.get().format(calendar.getTime());

ThreadLocal原理

Thread 内部维护了一个  ThreadLocal.ThreadLocalMap 实例(threadLocals),ThreadLocal 的操作都是围绕着 threadLocals 来操作的。

threadLocal.get()方法


ThreadLocalMap

从JDK源码可见,ThreadLocalMap中的Entry是弱引用类型的,这就意味着如果这个ThreadLocal只被这个Entry引用,而没有被其他对象强引用时,就会在下一次GC的时候回收掉。


ThreadLocal示例

鹰眼链路ThreadLocal的使用

EagleEye(鹰眼)作为全链路监控系统在集团内部被广泛使用,traceId、rpcId、压测标等信息存储在EagleEye的ThreadLocal变量中,并在HSF/Dubbo服务调用间进行传递。EagleEye通过Filter将数据初始化到ThreadLocal中,部分相关代码如下:


Bad case:XX项目权益领取失败问题


在某权益领取原有链路中,通过app打开一级页面后才能发起权益领取请求,请求经过淘系无线网关(Mtop)后到达服务端,服务端通过mtop sdk获取当前会话信息。

-loading- -loading--loading-


UfqiLong

在XX项目中,对权益领取链路进行了升级改造,在一级页面请求时,通过服务端同时发起权益领取请求。具体地,服务端在处理一级页面请求时,同时通过调用hsf/dubbo接口来进行权益领取,因此在发起rpc调用时需要携带用户当前会话信息,在服务提供端将会话信息进行提取并注入到mtop上下文,从而才能通过mtop sdk获取到会话id等信息。某开发同学在实现时,因ThreadLocal使用不当造成下述问题:

问题1:因ThreadLocal初始化时机不当,造成获取不到会话信息,进而导致权益领取失败;

问题2:请求完成时,因未清理ThreadLocal中的变量值,导致脏数据;


【问题1:权益领取失败分析】

在权益领取服务中,该应用构建了一套高效和线程安全的依赖注入框架,基于该框架的业务逻辑模块通常抽象为xxxModule形式,Module间为网状依赖关系,框架会按依赖关系自动调用init方法(其中,被依赖的module 的init方法先执行)。

在应用中,权益领取接口的主入口为CommonXXApplyModule类,CommonXXApplyModule依赖XXSessionModule。当请求来临时,会按依赖关系依次调用init方法,因此XXSessionModule的init方法会优先执行;而开发同学在CommonXXApplyModule类中的init方法中通过调用recoverMtopContext()方法来期望恢复mtop上下文,因recoverMtopContext()方法的调用时机过晚,从而导致XXSessionModule模块获取不到正确的会话id等信息而导致权益领取失败。



【问题2:脏数据分析】

权益领取服务在处理请求时,若当前线程曾经处理过权益领取请求,因ThreadLocal变量值未被清理,此时XXSessionModule通过mtop SDK获取会话信息时得到的是前一次请求的会话信息,从而造成脏数据。

【解决方案】

在依赖注入框架入口处AbstractGate#visit(或在XXSessionModule中)通过recoverMtopContext方法注入mtop上下文信息,并在入口方法的finally代码块清理当前请求的threadlocal变量值。


思考&小结

ThreadLocalMap中的Entry为什么要设计为弱引用类型?

若使用强引用类型,则threadlocal的引用链为:Thread -> ThreadLocal.ThreadLocalMap -> Entry[] -> Entry -> key(threadLocal对象)和value;在这种场景下,只要这个线程还在运行(如线程池场景),若不调用remove方法,则该对象及关联的所有强引用对象都不会被垃圾回收器回收。

使用static和不使用static修饰threadlocal变量有和区别?

若使用static关键字进行修饰,则一个线程仅对应一个线程变量;否则,threadlocal语义变为perThread-perInstance,容易引发内存泄漏,如下述示例:


----


最佳实践


ThreadLocal变量值初始化和清理建议成对出现


如果不执行清理操作,则可能会出现:

内存泄漏:由于ThreadLocalMap的中key是弱引用,而Value是强引用。这就导致了一个问题,ThreadLocal在没有外部对象强引用时,发生GC时弱引用Key会被回收,而Value不会回收,从而Entry里面的元素出现<null,value>的情况。如果创建ThreadLocal的线程一直持续运行,那么这个Entry对象中的value就有可能一直得不到回收,这样可能会导致内存泄露。

脏数据:由于线程复用,在用户1请求时,可能保存了业务数据在ThreadLocal中,若不清理,则用户2的请求进来时,可能会读到用户1的数据。

建议使用try...finally 进行清理。


ThreadLocal变量建议使用static进行修饰

我们在使用ThreadLocal时,通常期望的语义是perThread,若不使用static进行修饰,则语义变为perThread-perInstance;在线程池场景下,若不用static进行修饰,创建的线程相关实例可能会达到 M * N个(其中M为线程数,N为对应类的实例数),易造成内存泄漏(https://errorprone.info/bugpattern/ThreadLocalUsage)。

谨慎使用ThreadLocal.withInitial

在应用中,谨慎使用ThreadLocal.withInitial(Supplier<? extends S> supplier)这个工厂方法创建ThreadLocal对象,一旦不同线程的ThreadLocal使用了同一个Supplier对象,那么隔离也就无从谈起了,如:

// 反例,实际上使用了共享对象obj而并未隔离,
private static ThreadLocal<Obj> threadLocal = ThreadLocal.withIntitial(() -> obj);


总结

在java工程实践中,线程池和线程变量被广泛使用,因线程池和线程变量的不当使用经常造成安全生产事故,因此,正确使用线程池和线程变量是每一位开发人员必须修炼的基本功。本文从线程池和线程变量的使用出发,简要介绍了线程池和线程变量的原理和使用实践,各开发人员可结合最佳实践和实际应用场景,正确地使用线程和线程变量,构建出稳定、高效的java应用服务。


+线程 +任务 +队列 +方法 +调用

本页Url

↖回首页 +当前续 +尾续 +修订 +评论✍️


👍6 仁智互见 👎0
  • 还没有评论. → +评论
  • -loading- -loading- -loading-


    🔗 连载目录

    🤖 智能推荐

    +
    AddToFav   
    新闻 经典 官宣