- 2023-06-30 18:31:08
- 7289 热度
- 0 评论
前言
上篇文章《Spring Cloud中Hystrix 线程隔离导致ThreadLocal数据丢失》我们对ThreadLocal数据丢失进行了详细的分析,并通过代码的方式复现了这个问题。
在上篇文章的末尾我也说了思路给大家提供了,如果需要能够在Hystrix 为线程隔离模式也能正确传递数据的话,需要我们自己去修改。
我这边以Zuul中自定义负载均衡策略来进行讲解,在Zuul中需要实现灰度发布的功能,需要在Filter中将请求的用户信息传递到自定的负载策略中,Zuul中整合了Hystrix,从Zuul Filter的请求到Ribbon的策略类中,线程已经发生了变化,变成了Hystrix提供的线程池来执行(配置隔离模式为线程)。这个时用ThreadLocal就会出问题了,数据传输会错乱。也就是我们前面分析的问题。
关于修改我说下自己分析问题的一些思路,ransmittable-thread-local可以解决这个问题,可以对线程或者线程池进行修饰,其实最终的原理就是对线程进行包装,在线程run之前和之后做一些处理来保证数据的正确传递。
改造思路
首先我想的就是改掉Hystrix中的线程池或者线程,只有这样才能让ransmittable-thread-local来接管线程中数据的传递。
通过调试的方式找到com.netflix.hystrix.HystrixThreadPool是Hystrix线程池的接口,里面定义了一个获取ExecutorService方法,代码如下:
public interface HystrixThreadPool { |
通过查找接口的实现类,发现只有一个默认的实现com.netflix.hystrix.HystrixThreadPool.HystrixThreadPoolDefault,实现也在接口中,是一个静态类。实现的方法如下:
@Override |
threadPool是类中的一个变量,主要是通过touchConfig方法来设置线程的参数,touchConfig代码如下:
private void touchConfig() { |
这是最外层获取线程池的地方,可以根据代码一步步进去看,最终获取线程池的代码在com.netflix.hystrix.strategy.concurrency.HystrixConcurrencyStrategy.getThreadPool方法中。
上面是线程池的源码分析,我们可以改造源码,将线程池用ransmittable-thread-local进行修饰。
改造线程方式
另外一种是改造线程的方式,在Hystrix将命令丢入线程池的时候对线程进行修饰也可以解决此问题,因为ransmittable-thread-local对线程池进行修饰,其原理也是改造了线程,通过源码可以看出:
public static ExecutorService getTtlExecutorService(ExecutorService executorService) { |
重点在TtlRunnable.get()
改造Hystrix中线程的方式,可以通过HystrixContextScheduler进行入手,Hystrix通过HystrixContextScheduler的ThreadPoolScheduler把命令submit到ThreadPoolExecutor中去执行。
通过上面的分析,最终可以定位到提交命令的代码如下:
private static class ThreadPoolWorker extends Worker { |
核心代码在schedule方法中,只需要将schedule中的sa进行修饰即可。
改造后的代码如下:
public Subscription schedule(final Action0 action) { |
改源码还涉及到重新打包等问题,每个项目都得用修改后的jar包,比较麻烦,最简单的做法就是在项目中建一个同样的HystrixContextScheduler类,包名也要和之前一样,让jvm优先加载,这样就能用这个修改的类来代替Hystrix原始的类。
最后我们来验证下这样的改动是否正确,首先我们在Zuul的Filter中进行值的传递:
RibbonFilterContextHolder是基于InheritableThreadLocal做的值传递,代码如下:
public class RibbonFilterContextHolder { |
private static AtomicInteger ac = new AtomicInteger(); |
通过AtomicInteger 进行数字的累加操作,后面测试的时候用10个线程并发测试,如如果在Ribbon的自定义负载策略中接收的值是0-9的话表示正确,否则错误。
接下来定义一个负载策略类,输出接收的值:
public class GrayPushRule extends AbstractLoadBalancerRule { |
然后增加配置,使用自定义的策略,还需要将Hystrix的线程池数量改小一点,这样才可以线程复用
fsh-house.ribbon.NFLoadBalancerRuleClassName=com.fangjia.fsh.api.rule.GrayPushRule |
启动服务,用ab进行测试:
ab -n 10 -c 10 http://192.168.10.170:2103/fsh-house/house/1 |
输出结果如下:
hystrix-RibbonCommand-3:10 |
很多数据都重复了,这就是线程复用导致的问题,接下来我们用上面讲的方式进行改造
需要将RibbonFilterContextHolder中的InheritableThreadLocal改成TransmittableThreadLocal
private static final TransmittableThreadLocal<RibbonFilterContext> contextHolder = new TransmittableThreadLocal<RibbonFilterContext>() { |
然后在项目中新建一个HystrixContextScheduler类,包名必须是com.netflix.hystrix.strategy.concurrency,代码就按上面贴的进行改,主要是对线程进行修饰:
FutureTask<?> f = (FutureTask<?>) executor.submit(TtlRunnable.get(sa)); |
再次启动服务,进行测试,结果如下:
hystrix-RibbonCommand-2:10 |
现在的结果已经是正确的
改造线程池方式
上面介绍了改造线程的方式,并且通过建一个同样的Java类来覆盖Jar包中的实现,感觉有点投机取巧,其实不用这么麻烦,Hystrix默认提供了HystrixPlugins类,可以让用户自定义线程池,下面来看看怎么使用:
在启动之前调用进行注册自定义实现的逻辑:
HystrixPlugins.getInstance().registerConcurrencyStrategy(new ThreadLocalHystrixConcurrencyStrategy()); |
ThreadLocalHystrixConcurrencyStrategy就是我们自定义的创建线程池的类,需要继承HystrixConcurrencyStrategy,前面也有讲到通过调试代码发现最终获取线程池的代码就在HystrixConcurrencyStrategy中。
我们只需要重写getThreadPool方法即可完成对线程池的改造,由于TtlExecutors只能修饰ExecutorService和Executor,而HystrixConcurrencyStrategy中返回的是ThreadPoolExecutor,我们需要对ThreadPoolExecutor进行包装一层,最终在execute方法中对线程修饰,也就相当于改造了线程池。
public class ThreadLocalHystrixConcurrencyStrategy extends HystrixConcurrencyStrategy { |
在doGetThreadPool方法中就返回包装的线程池,代码如下:
return new ThreadLocalThreadPoolExecutor(dynamicCoreSize, dynamicMaximumSize, keepAliveTime.get(), unit, workQueue, |
最后就是ThreadLocalThreadPoolExecutor的代码:
public class ThreadLocalThreadPoolExecutor extends ThreadPoolExecutor { |
完整源码参考:https://github.com/yinjihuan/spring-cloud/tree/master/fangjia-fsh-api
- Spring(403)
- Boot(208)
- Spring Boot(187)
- Java(82)
- Cloud(82)
- Spring Cloud(82)
- Security(60)
- Spring Security(54)
- Boot2(51)
- Spring Boot2(51)
- Redis(31)
- SQL(29)
- Mysql(25)
- IDE(24)
- Dalston(24)
- JDBC(22)
- IDEA(22)
- mongoDB(22)
- MVC(22)
- Web(21)
- CLI(20)
- Alibaba(19)
- SpringMVC(19)
- SpringBoot(17)
- Docker(17)
- Eclipse(16)
- Vue(16)
- Git(16)
- JPA(15)
- Apache(15)
- ORA(15)
- Oracle(14)
- jdk(14)
- Tomcat(14)
- Linux(14)
- HTTP(14)
- Mybatis(14)
- XML(13)
- JdbcTemplate(13)
- OAuth(13)
- Nacos(13)
- Pro(13)
- JSON(12)
- OAuth2(12)
- Data(12)
- stream(11)
- int(11)
- Myeclipse(11)
- not(10)
- Bug(10)
- maven(9)
- Map(9)
- Hystrix(9)
- ast(9)
- APP(8)
- Bit(8)
- API(8)
- session(8)
- Window(8)
- Swagger(8)
- JavaMail(7)
- Cache(7)
- File(7)
- IntelliJ(7)
- mail(7)
- windows(7)
- too(7)
- HTML(7)
- Github(7)
- Excel(6)
- Log4J(6)
- pushlet(6)
- apt(6)
- read(6)
- Freemarker(6)
- WebFlux(6)
- JSP(6)
- Bean(6)
- error(6)
- Server(6)
- nginx(6)
- ueditor(6)
- jar(6)
- ehcache(6)
- UDP(6)
- RabbitMQ(6)
- star(6)
- and(6)
- Struts(5)
- string(5)
- script(5)
- Syntaxhighlighter(5)
- Tool(5)
- Controller(5)
- swagger2(5)
- ldquo(5)
- input(5)
- Servlet(5)
- Config(5)
- discuz(5)