在bean初始化阶段,通过代理方式将其适配为自定义的 IJobHandler 实现对象XxlJobTraceWrapper。
真正 invoke时,从beanFactory中获取Tracing对象开启新的Span。
XxlJobBeanPostProcessor implements BeanPostProcessor, BeanFactoryAware:
import java.lang.reflect.Modifier;
import org.aopalliance.aop.Advice;
import org.springframework.aop.framework.AopConfigException;
import org.springframework.aop.framework.ProxyFactoryBean;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.stereotype.Component;
import com.xxl.job.core.handler.IJobHandler;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
/**
* xxjob入口增加trace日志
*
* @author admin
*
*/
@Component
@Slf4j
public class XxlJobBeanPostProcessor implements BeanPostProcessor, BeanFactoryAware {
@Setter
private BeanFactory beanFactory;
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
/**
* 不能直接用XxjobHnadlerTraceWrapper进行包装,返出去的类型不匹配。只能通过代理方式
*/
return bean instanceof IJobHandler ? wrap(bean) : bean;
}
private Object wrap(Object bean) {
boolean classFinal = Modifier.isFinal(bean.getClass().getModifiers());
boolean cglibProxy = !classFinal;
IJobHandler job = (IJobHandler) bean;
try {
return createProxy(bean, cglibProxy, new XxlJobMethodInterceptor<IJobHandler>(job, this.beanFactory));
} catch (AopConfigException ex) {
if (cglibProxy) {
if (log.isDebugEnabled()) {
log.debug("Exception occurred while trying to create a proxy, falling back to JDK proxy", ex);
}
return createProxy(bean, false, new XxlJobMethodInterceptor<IJobHandler>(job, this.beanFactory));
}
throw ex;
}
}
Object createProxy(Object bean, boolean cglibProxy, Advice advice) {
ProxyFactoryBean factory = new ProxyFactoryBean();
factory.setProxyTargetClass(cglibProxy);
factory.addAdvice(advice);
factory.setTarget(bean);
return factory.getObject();
}
}
XxlJobMethodInterceptor:
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import org.aopalliance.intercept.MethodInterceptor;
import org.aopalliance.intercept.MethodInvocation;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.util.ReflectionUtils;
import com.xxl.job.core.handler.IJobHandler;
import lombok.AllArgsConstructor;
@AllArgsConstructor
public class XxlJobMethodInterceptor<T extends IJobHandler> implements MethodInterceptor {
private final T delegate;
private final BeanFactory beanFactory;
@Override
public Object invoke(MethodInvocation invocation) throws Throwable {
IJobHandler jobHandler = new XxlJobTraceWrapper(this.beanFactory, this.delegate);
Method methodOnTracedBean = getMethod(invocation, jobHandler);
if (methodOnTracedBean != null) {
try {
return methodOnTracedBean.invoke(jobHandler, invocation.getArguments());
} catch (InvocationTargetException ex) {
Throwable cause = ex.getCause();
throw (cause != null) ? cause : ex;
}
}
return invocation.proceed();
}
private Method getMethod(MethodInvocation invocation, Object object) {
Method method = invocation.getMethod();
return ReflectionUtils.findMethod(object.getClass(), method.getName(), method.getParameterTypes());
}
}
XxlJobTraceWrapper :
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.springframework.cloud.sleuth.DefaultSpanNamer;
import org.springframework.cloud.sleuth.SpanNamer;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.handler.IJobHandler;
import brave.ScopedSpan;
import brave.Tracing;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@RequiredArgsConstructor
public class XxlJobTraceWrapper extends IJobHandler {
private final BeanFactory beanFactory;
private final IJobHandler delegate;
private Tracing tracing;
private SpanNamer spanNamer;
@Override
public ReturnT<String> execute(String param) throws Exception {
if (this.tracing == null) {
try {
this.tracing = this.beanFactory.getBean(Tracing.class);
} catch (NoSuchBeanDefinitionException e) {
return this.delegate.execute(param);
}
}
return doExecute(param);
}
private ReturnT<String> doExecute(String param) throws Exception {
ScopedSpan span = this.tracing.tracer().startScopedSpanWithParent(spanNamer().name(delegate, "XXJOB"),
this.tracing.currentTraceContext().get());
try {
return this.delegate.execute(param);
} catch (Exception | Error e) {
span.error(e);
throw e;
} finally {
span.finish();
}
}
// due to some race conditions trace keys might not be ready yet
private SpanNamer spanNamer() {
if (this.spanNamer == null) {
try {
this.spanNamer = this.beanFactory.getBean(SpanNamer.class);
} catch (NoSuchBeanDefinitionException e) {
log.warn("SpanNamer bean not found - will provide a manually created instance");
return new DefaultSpanNamer();
}
}
return this.spanNamer;
}
}