本文介绍了无法在回退方法中从父线程访问 InheritableThreadLocal 对象的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有 InheritableThreadLocal<ConcurrentHashMap<String, Object>> 线程,当请求通过过滤器到来时初始化并在其中设置一些 transaction_id.

I have InheritableThreadLocal<ConcurrentHashMap<String, Object>> thread that initializes when a request comes via the filter and set some transaction_id in it.

现在在服务层,我通过 CompletableFuture 调用了 10 个不同的 API 调用.所有 API 服务类都有一个 execute 方法,该方法使用 RestTempate 进行 API 调用.我把 @HystrixCommand 放在 execute 方法上.

Now at the service layer, I'm calling 10 different API calls via CompletableFuture. All API service class have one execute method that is using RestTempate to make an API call. I put @HystrixCommand on execute method.

execute 方法是 void 类型,但它将 API 响应放在 InheritableThreadLocal 对象中.

execute method is void type but it put the API response in InheritableThreadLocal object.

问题是当 API 调用失败 Hystrix 调用 FallBackMethod 并且当我将错误响应放入 InheritableThreadLocal 时,我无法将该错误响应发送给客户端.

Problem is when an API call fails Hystrix call FallBackMethod and when I put error response in InheritableThreadLocal, I'm not able to send that error response to the client.

public class ThreadLocalUtil {

    private static InheritableThreadLocal<ConcurrentHashMap<String, Object>> transmittableThreadLocal = new InheritableThreadLocal<>();

    public static void addDataToThreadLocalMap(String key, Object value) {
        Map<String, Object> existingDataMap = transmittableThreadLocal.get();
        if (value != null) {
            existingDataMap.put(key, value);
        }
    }

    public static Object getDataFromThreadLocalMap(String key) {
        Map<String, Object> existingDataMap = transmittableThreadLocal.get();
        return existingDataMap.get(key);
    }

    public static void clearThreadLocalDataMap() {
        if (transmittableThreadLocal != null)
            transmittableThreadLocal.remove();
    }

    public static Object getRequestData(String key) {
        Map<String, Object> existingDataMap = transmittableThreadLocal.get();
        if (existingDataMap != null) {
            return existingDataMap.get(key);
        }
        return "-1";
    }

    public static void initThreadLocals() {
        ConcurrentHashMap<String, Object> dataForDataMap = new ConcurrentHashMap<String, Object>();
        String requestId = "REQUEST_ID_" + System.currentTimeMillis();
        dataForDataMap.put("REQUEST_ID", requestId);
        transmittableThreadLocal.set(dataForDataMap);
    }
}

CommonFilter.class

@Component
@Order(1)
public class CommonFilter extends OncePerRequestFilter {

  @Override
  protected void doFilterInternal(HttpServletRequest request, HttpServletResponse response, FilterChain filterChain)
          throws ServletException, IOException {
      try {
          ThreadLocalUtil.initThreadLocals();
          filterChain.doFilter(request, response);
      } catch (Exception e) {
          if (e instanceof ServletException) {
              throw (ServletException) e;
          }
      } finally {
          ThreadLocalUtil.clearThreadLocalDataMap();
      }

  }

EmployeeService.class

@Component
public abstract class EmployeeService {

    @Autowired
    private ThreadLocalUtil threadLocalUtil;

    public abstract void getEmployee(int employeeId);

    public void fallbackMethod(int employeeid) {
        threadLocalUtil.addDataToThreadLocalMap("ErrorResponse", "Fallback response:: No employee details available temporarily");
    }
}

EmployeeServiceImpl.class

@Service
public class EmployeeServiceImpl extends EmployeeService {

    @HystrixCommand(fallbackMethod = "fallbackMethod", commandProperties = {
            @HystrixProperty(name = "execution.isolation.thread.timeoutInMilliseconds", value = "900"),
            @HystrixProperty(name = "circuitBreaker.errorThresholdPercentage", value = "10") })
    public void getEmployee(int employeeId) {
        System.out.println("Getting Employee details for " + employeeId + ", threadLocalUtil : " + threadLocalUtil.getDataFromThreadLocalMap("EMPLOYE_ID"));
        String response = restTemplate.exchange("http://localhost:8011/findEmployeeDetails/{employeeid}",
                HttpMethod.GET, null, new ParameterizedTypeReference<String>() {
                }, employeeId).getBody();

        threadLocalUtil.addDataToThreadLocalMap("Response", response);
    }

    @Autowired
    RestTemplate restTemplate;

    @Autowired
    private ThreadLocalUtil threadLocalUtil;
}

推荐答案

所以,首先由于 Hystrix 在内部使用 ThreadPoolExecutor(线程创建一次并重用),所以使用 InheritableThreadLocal 是错误的.

So, first of all since internally Hystrix uses ThreadPoolExecutor (Threads created once and reused), so it is wrong to use InheritableThreadLocal.

从上述问题以及您在我的博客中提出的问题,我了解到您的问题是

From the above question and what you asked in my blog, I understand that you problem is

InheritableThreadLocal 在 hystrix 回退方法中变为 null

进一步添加(您可以验证这一点)

Further adding to this (you may verify this)

InheritableThreadLocal 在 hystrix 回退方法中变为 null 仅在超时的情况下而不是在任何其他异常的情况下

我会推荐其他人参考我的博客.Hystrix 回退在超时的情况下,发生在 hystrix-timer 线程中.Hystrix 回退执行线程您可以通过记录 Thread.currentThread().getName()

I would recommend others to refer to my blog. Hystrix fallback in case of timeout, takes place in hystrix-timer thread.Hystrix fallback execution threadYou can verify this by logging Thread.currentThread().getName()

由于 hystrix-timer 线程 的父线程不是您的调用线程,因此您的transmittableThreadLocal.get() 变为空.

Since the parent of hystrix-timer thread is not your calling thread, and so your transmittableThreadLocal.get() becomes null.

为了解决这个问题,我建议使用 HystrixCommandExecutionHookHystrixRequestVariableDefault.使用它你可以实现像 onStart、onExecutionStart、onFallbackStart 等的钩子,你需要在其中获取/设置 threadLocal 变量.详情请参阅博客的最后一节.

To solve this I would recommend using HystrixCommandExecutionHook and HystrixRequestVariableDefault. Using this you can implement hooks like onStart, onExecutionStart, onFallbackStart etc., in which you need to get/set the threadLocal variables. For more details you can refer to the last section in the blog.

更新:对于您的用例,您可以按如下方式修改代码:

Update:For your use-case you can modify your code as follows:

ThreadLocalUtil.java

public class ThreadLocalUtil {

    private static ThreadLocal<ConcurrentHashMap<String, Object>> transmittableThreadLocal = new ThreadLocal<>();

    public static ConcurrentHashMap<String, Object> getThreadLocalData() {
        return transmittableThreadLocal.get();
    }

    public static void setThreadLocalData(ConcurrentHashMap<String, Object> data) {
        transmittableThreadLocal.set(data);
    }

    public static void addDataToThreadLocalMap(String key, Object value) {
        Map<String, Object> existingDataMap = transmittableThreadLocal.get();
        if (value != null) {
            existingDataMap.put(key, value);
        }
    }

    public static Object getDataFromThreadLocalMap(String key) {
        Map<String, Object> existingDataMap = transmittableThreadLocal.get();
        return existingDataMap.get(key);
    }

    public static void clearThreadLocalDataMap() {
        if (transmittableThreadLocal != null)
            transmittableThreadLocal.remove();
    }

    public static Object getRequestData(String key) {
        Map<String, Object> existingDataMap = transmittableThreadLocal.get();
        if (existingDataMap != null) {
            return existingDataMap.get(key);
        }
        return "-1";
    }



    public static void initThreadLocals() {
        transmittableThreadLocal.set(new ConcurrentHashMap<>());
        String requestId = "REQUEST_ID_" + System.currentTimeMillis();
        addDataToThreadLocalMap("REQUEST_ID", requestId);
    }
}

EmployeeService.java

@Component
public abstract class EmployeeService {
    public abstract void getEmployee(int employeeId);

    public void fallbackMethod(int employeeid) {
        threadLocalUtil.addDataToThreadLocalMap("ErrorResponse", "Fallback response:: No employee details available temporarily");
    }
}

EmployeeServiceImpl.java

@Service
public class EmployeeServiceImpl extends EmployeeService {

    @HystrixCommand(fallbackMethod = "fallbackMethod", commandProperties = {
            @HystrixProperty(name = "execution.isolation.thread.timeoutInMilliseconds", value = "900"),
            @HystrixProperty(name = "circuitBreaker.errorThresholdPercentage", value = "10") })
    public void getEmployee(int employeeId) {
        System.out.println("Getting Employee details for " + employeeId + ", threadLocalUtil : " + threadLocalUtil.getDataFromThreadLocalMap("EMPLOYEE_ID"));
        String response = restTemplate.exchange("http://localhost:8011/findEmployeeDetails/{employeeid}",
                HttpMethod.GET, null, new ParameterizedTypeReference<String>() {
                }, employeeId).getBody();

        threadLocalUtil.addDataToThreadLocalMap("Response", response);
    }

    @Autowired
    RestTemplate restTemplate;
}

HystrixHook.java

public class HystrixHook extends HystrixCommandExecutionHook {

    private HystrixRequestVariableDefault<ConcurrentHashMap<String, Object>> hrv = new HystrixRequestVariableDefault<>();

    @Override
    public <T> void onStart(HystrixInvokable<T> commandInstance) {
        HystrixRequestContext.initializeContext();
        getThreadLocals();
    }

    @Override
    public <T> void onExecutionStart(HystrixInvokable<T> commandInstance) {
        setThreadLocals();
    }


    @Override
    public <T> void onFallbackStart(HystrixInvokable<T> commandInstance) {
        setThreadLocals();
    }


    @Override
    public <T> void onSuccess(HystrixInvokable<T> commandInstance) {
        HystrixRequestContext.getContextForCurrentThread().shutdown();
        super.onSuccess(commandInstance);
    }

    @Override
    public <T> Exception onError(HystrixInvokable<T> commandInstance, HystrixRuntimeException.FailureType failureType, Exception e) {
        HystrixRequestContext.getContextForCurrentThread().shutdown();
        return super.onError(commandInstance, failureType, e);
    }

    private void getThreadLocals() {
        hrv.set(ThreadLocalUtil.getThreadLocalData());
    }

    private void setThreadLocals() {
        ThreadLocalUtil.setThreadLocalData(hrv.get());
    }
}

AbcApplication.java

public class AbcApplication {
    public static void main(String[] args) {
        HystrixPlugins.getInstance().registerCommandExecutionHook(new HystrixHook());
        SpringApplication.run(Abc.class, args);
    }
}

希望能帮到你

这篇关于无法在回退方法中从父线程访问 InheritableThreadLocal 对象的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

07-15 11:02