本文介绍了Spring AOP - 正确配置重试建议的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是 Spring AOP 的新手并且一直在尝试.

I am new to Spring AOP and have been experimenting a bit.

我正在尝试设置 Retry &我的一个项目通过 Spring AOP 进行速率限制.用例是这样的:-

I am trying to setup Retry & Rate Limiter through Spring AOP for one of my project.The use case is like this:-

  1. 检查 TPS 是否可用.如果不是,则抛出 ThrotledException
  2. 如果抛出 ThrottledExceptionRetry.

我遇到的问题是:这个限制&重试组合进入无限循环(如果 TPS = 0).也就是说,重试不会在x"次尝试后停止.

The issue I am running into is: This throttling & retry combo is running into an infinite loop (if TPS = 0). That is, retry is not stopping after 'x' attempts.

我的节流拦截器(在高级别)是这样的:

My Throttling Interceptor is (at a high level) like this:

@Before("<pointcut>")
public void invoke() throws ThrottlingException {
        if (throttler.isThrottled(throttleKey)) {
            throw new ThrottlingException("Call Throttled");
    }
}

我的重试拦截器是这样的:

My Retry Interceptor is like this:

@AfterThrowing(pointcut="execution(* com.company.xyz.method())", throwing="exception")
public Object invoke(JoinPoint jp, ThrottlingException exception) throws Throwable {
    return RetryingCallable.newRetryingCallable(new Callable<Object>() {

        @Override
        public Object call() throws Exception {
                MethodSignature  signature = (MethodSignature) p.getSignature();
                Method method = signature.getMethod();
                return method.invoke(jp.getThis(), (Object[]) null);
        }

    }, retryPolicy).call();
}

这里的 RetryingCallable 是一个简单的实现(由我公司的某个人编写的内部库),它接受一个 RetryAdvice 并应用它.

Here RetryingCallable is a simple implementation (internal library written by someone in my company) that takes in a RetryAdvice and applies that.

我相关的spring-config如下:

My relevant spring-config is as follows:

<bean id="retryInterceptor" class="com.company.xyz.RetryInterceptor">
    <constructor-arg index="0"><ref bean="retryPolicy"/></constructor-arg>
</bean>


<bean id="throttlingInterceptor" class="com.company.xyz.ThrottlingInterceptor">
    <constructor-arg><value>throttleKey</value></constructor-arg>
</bean>
<context:component-scan base-package="com.company.xyz">
  <context:include-filter type="annotation" expression="org.aspectj.lang.annotation.Aspect"/>
</context:component-scan>
<aop:aspectj-autoproxy/>

正如我所见,这里的问题是,在每个 ThrottlingException 上都应用了一个新的 Retry Advice,而不是前一个生效.

The issue here, as I see, is that on each ThrottlingException a new Retry Advice is being applied instead of the previous one being coming into affect.

有关如何解决此问题的任何意见?

Any inputs on how to fix this?

推荐答案

免责声明:不是 Spring 用户,因此我将提供一个纯粹的 AspectJ 解决方案这里.不过,它应该在 Spring AOP 中以相同的方式工作.您唯一需要更改的是从 @DeclarePresedence 切换到 @Order 以进行方面优先级配置,如 Spring AOP 手册.

Disclaimer: I am not a Spring user, thus I am going to present a pure AspectJ solution here. It should work the same way in Spring AOP though. The only thing you need to change is switch from @DeclarePresedence to @Order for aspect precedence configuration as described in the Spring AOP manual.

驱动程序应用:

package de.scrum_master.app;

public class Application {
    public static void main(String[] args) {
        new Application().doSomething();
    }

    public void doSomething() {
        System.out.println("Doing something");
    }
}

限制异常类:

package de.scrum_master.app;

public class ThrottlingException extends RuntimeException {
    private static final long serialVersionUID = 1L;

    public ThrottlingException(String arg0) {
        super(arg0);
    }
}

节流拦截器:

为了模拟节流情况,我创建了一个辅助方法 isThrottled(),它在 3 种情况下的 2 种情况下随机返回 true.

In order to emulate the throttling situation I created a helper method isThrottled() which returns true randomly in 2 out of 3 cases.

package de.scrum_master.aspect;

import java.util.Random;

import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Before;

import de.scrum_master.app.ThrottlingException;

@Aspect
public class ThrottlingInterceptor {
    private static final Random RANDOM = new Random();

    @Before("execution(* doSomething())")
    public void invoke(JoinPoint thisJoinPoint) throws ThrottlingException {
        System.out.println(getClass().getSimpleName() + " -> " + thisJoinPoint);
        if (isThrottled()) {
            throw new ThrottlingException("call throttled");
        }
    }

    private boolean isThrottled() {
        return RANDOM.nextInt(3) > 0;
    }
}

重试拦截器:

请注意,AspectJ 注释 @DeclarePrecedence("RetryInterceptor, *") 表示此拦截器将在任何其他拦截器之前执行.请将其替换为两个拦截器类上的 @Order 注释.否则 @Around 建议无法捕获节流拦截器抛出的异常.

Please note that the AspectJ annotation @DeclarePrecedence("RetryInterceptor, *") says that this interceptor is to be executed before any other ones. Please replace it with @Order annotations on both interceptor classes. Otherwise the @Around advice cannot catch exceptions thrown by the throttling interceptor.

另外值得一提的是,这个拦截器不需要任何反射来实现重试逻辑,它直接使用重试循环中的连接点来重试thisJoinPoint.proceed().这可以很容易地分解为实现不同类型重试行为的辅助方法或辅助类.只需确保使用 ProceedingJoinPoint 作为参数而不是 Callable.

Also worth mentioning is that this interceptor does not need any reflection in order to implement the retry logic, it directly uses the joinpoint within a retry loop in order to retry thisJoinPoint.proceed(). This can easily be factored out into a helper method or helper class implementing different kinds of retry behaviour. Just make sure to use the ProceedingJoinPoint as a parameter instead of a Callable.

package de.scrum_master.aspect;

import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.DeclarePrecedence;

import de.scrum_master.app.ThrottlingException;

@Aspect
@DeclarePrecedence("RetryInterceptor, *")
public class RetryInterceptor {
    private static int MAX_TRIES = 5;
    private static int WAIT_MILLIS_BETWEEN_TRIES = 1000;

    @Around("execution(* doSomething())")
    public Object invoke(ProceedingJoinPoint thisJoinPoint) throws Throwable {
        System.out.println(getClass().getSimpleName() + " -> " + thisJoinPoint);
        ThrottlingException throttlingException = null;
        for (int i = 1; i <= MAX_TRIES; i++) {
            try {
                return thisJoinPoint.proceed();
            }
            catch (ThrottlingException e) {
                throttlingException = e;
                System.out.println("  Throttled during try #" + i);
                Thread.sleep(WAIT_MILLIS_BETWEEN_TRIES);
            }
        }
        throw throttlingException;
    }
}

成功重试的控制台日志:

RetryInterceptor -> execution(void de.scrum_master.app.Application.doSomething())
ThrottlingInterceptor -> execution(void de.scrum_master.app.Application.doSomething())
  Throttled during try #1
ThrottlingInterceptor -> execution(void de.scrum_master.app.Application.doSomething())
  Throttled during try #2
ThrottlingInterceptor -> execution(void de.scrum_master.app.Application.doSomething())
Doing something

重试失败的控制台日志:

RetryInterceptor -> execution(void de.scrum_master.app.Application.doSomething())
ThrottlingInterceptor -> execution(void de.scrum_master.app.Application.doSomething())
  Throttled during try #1
ThrottlingInterceptor -> execution(void de.scrum_master.app.Application.doSomething())
  Throttled during try #2
ThrottlingInterceptor -> execution(void de.scrum_master.app.Application.doSomething())
  Throttled during try #3
ThrottlingInterceptor -> execution(void de.scrum_master.app.Application.doSomething())
  Throttled during try #4
ThrottlingInterceptor -> execution(void de.scrum_master.app.Application.doSomething())
  Throttled during try #5
Exception in thread "main" de.scrum_master.app.ThrottlingException: call throttled
    at de.scrum_master.aspect.ThrottlingInterceptor.invoke(ThrottlingInterceptor.aj:19)
    at de.scrum_master.app.Application.doSomething_aroundBody0(Application.java:9)
    at de.scrum_master.app.Application.doSomething_aroundBody1$advice(Application.java:22)
    at de.scrum_master.app.Application.doSomething(Application.java:1)
    at de.scrum_master.app.Application.main(Application.java:5)

请随时提出与我的回答相关的任何后续问题.

Feel free to ask any follow-up questions related to my answer.

更新:我不知道你的 RetryingCallableRetryPolicy 类/接口是如何工作的,你没有告诉我太多.但我编造了一些东西并让它像这样工作:

Update: I have no idea how your RetryingCallable and RetryPolicy classes/interfaces work, you did not tell me much about it. But I made up something and got it working like this:

package de.scrum_master.app;

import java.util.concurrent.Callable;

public interface RetryPolicy<V> {
    V apply(Callable<V> callable) throws Exception;
}
package de.scrum_master.app;

import java.util.concurrent.Callable;

public class DefaultRetryPolicy<V> implements RetryPolicy<V> {
    private static int MAX_TRIES = 5;
    private static int WAIT_MILLIS_BETWEEN_TRIES = 1000;

    @Override
    public V apply(Callable<V> callable) throws Exception {
        Exception throttlingException = null;
        for (int i = 1; i <= MAX_TRIES; i++) {
            try {
                return callable.call();
            }
            catch (ThrottlingException e) {
                throttlingException = e;
                System.out.println("  Throttled during try #" + i);
                Thread.sleep(WAIT_MILLIS_BETWEEN_TRIES);
            }
        }
        throw throttlingException;
    }
}
package de.scrum_master.app;

import java.util.concurrent.Callable;

public class RetryingCallable<V> {
    private RetryPolicy<V> retryPolicy;
    private Callable<V> callable;

    public RetryingCallable(Callable<V> callable, RetryPolicy<V> retryPolicy) {
        this.callable = callable;
        this.retryPolicy = retryPolicy;
    }

    public static <V> RetryingCallable<V> newRetryingCallable(Callable<V> callable, RetryPolicy<V> retryPolicy) {
        return new RetryingCallable<V>(callable, retryPolicy);
    }

    public V call() throws Exception {
        return retryPolicy.apply(callable);
    }
}

现在像这样更改重试拦截器:

Now change the retry interceptor like this:

package de.scrum_master.aspect;

import java.util.concurrent.Callable;

import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.DeclarePrecedence;

import de.scrum_master.app.DefaultRetryPolicy;
import de.scrum_master.app.RetryPolicy;
import de.scrum_master.app.RetryingCallable;

@Aspect
@DeclarePrecedence("RetryInterceptor, *")
public class RetryInterceptor {
    private RetryPolicy<Object> retryPolicy = new DefaultRetryPolicy<>();

    @Around("execution(* doSomething())")
    public Object invoke(ProceedingJoinPoint thisJoinPoint) throws Throwable {
        System.out.println(getClass().getSimpleName() + " -> " + thisJoinPoint);
        return RetryingCallable.newRetryingCallable(
            new Callable<Object>() {
                @Override
                public Object call() throws Exception {
                    return thisJoinPoint.proceed();
                }
            },
            retryPolicy
        ).call();
    }
}

日志输出将与您之前看到的非常相似.对我来说这很好用.

The log output will be pretty much similar to what you saw before. For me this works nicely.

这篇关于Spring AOP - 正确配置重试建议的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

09-06 03:53