基于Spring-AOP的自定义分片工具

发布者:京东云
发布于:2022-11-22 10:34

作者:陈昌浩

1 背景

随着数据量的增长,发现系统在与其他系统交互时,批量接口会出现超时现象,发现原批量接口在实现时,没有做分片处理,当数据过大时或超过其他系统阈值时,就会出现错误。由于与其他系统交互比较多,一个一个接口做分片优化,改动量较大,所以考虑通过 AOP 解决此问题。

2 Spring-AOP

AOP (Aspect Orient Programming), 直译过来就是面向切面编程。AOP 是一种编程思想,是面向对象编程(OOP)的一种补充。面向对象编程将程序抽象成各个层次的对象,而面向切面编程是将程序抽象成各个切面。

Spring 中的 AOP 是通过动态代理实现的。 Spring AOP 不能拦截对对象字段的修改,也不支持构造器连接点,我们无法在 Bean 创建时应用通知。

3 功能实现

自定义分片处理分三个部分:自定义注解(MethodPartAndRetryer)、重试器(RetryUtil)、切面实现 (RetryAspectAop)。

3.1 MethodPartAndRetryer

源码

@Target(ElementType.METHOD)@Retention(RetentionPolicy.RUNTIME)@Documentedpublic @interface MethodPartAndRetryer {/**
* 失败重试次数
* @return
*/int times() default 3;/**
* 失败间隔执行时间 300毫秒
* @return
*/long waitTime() default 300L;/**
* 分片大小
* @return
*/int parts() default 200;
}

@interface 说明这个类是个注解。
@Target 是这个注解的作用域

 public enum ElementType {/** 类、接口(包括注释类型)或枚举声明 */TYPE,/** 字段声明(包括枚举常量) */FIELD,/** 方法声明 */METHOD,/** 正式的参数声明 */PARAMETER,/** 构造函数声明 */CONSTRUCTOR,/** 局部变量声明 */LOCAL_VARIABLE,/** 注释类型声明*/ANNOTATION_TYPE,/** 程序包声明 */PACKAGE,/**类型参数声明*/TYPE_PARAMETER,/**类型的使用*/TYPE_USE}

@Retention 注解的生命周期

public enum RetentionPolicy {/** 编译器处理完后不存储在class中*/SOURCE,/**注释将被编译器记录在类文件中,但不需要在运行时被VM保留。 这是默认值*/CLASS,/**编译器存储在class中,可以由虚拟机读取*/RUNTIME}
  • times ():接口调用失败时,重试的次数。
  • waitTime (): 接口调用失败是,间隔多长时间再次调用。
  • int parts ():进行分片时,每个分片的大小。

3.2 RetryUtil

源码

public class RetryUtil<V> {public Retryer<V> getDefaultRetryer(int times,long waitTime) {
Retryer<V> retryer = RetryerBuilder.<V>newBuilder()
.retryIfException()
.retryIfRuntimeException()
.retryIfExceptionOfType(Exception.class)
.withWaitStrategy(WaitStrategies.fixedWait(waitTime, TimeUnit.MILLISECONDS))
.withStopStrategy(StopStrategies.stopAfterAttempt(times))
.build();return retryer;
}
}

说明

  • RetryerBuilder:是用于配置和创建 Retryer 的构建器。
  • retryIfException:抛出 runtime 异常、checked 异常时都会重试,但是抛出 error 不会重试。
  • retryIfRuntimeException:只会在抛 runtime 异常的时候才重试,checked 异常和 error 都不重试。
  • retryIfExceptionOfType:允许我们只在发生特定异常的时候才重试。
  • withWaitStrategy: 等待策略,每次请求间隔。
  • withStopStrategy: 停止策略,重试多少次后停止。

3.3 RetryAspectAop

源码:

public class RetryAspectAop {public Object around(final ProceedingJoinPoint point) throws Throwable {Object result = null;
final Object[] args = point.getArgs();boolean isHandler1 = isHandler(args);if (isHandler1) {String className = point.getSignature().getDeclaringTypeName();String methodName = point.getSignature().getName();Object firstArg = args[0];
List<Object> paramList = (List<Object>) firstArg;//获取方法信息Method method = getCurrentMethod(point);//获取注解信息MethodPartAndRetryer retryable = AnnotationUtils.getAnnotation(method, MethodPartAndRetryer.class);//重试机制Retryer<Object> retryer = new RetryUtil<Object>().getDefaultRetryer(retryable.times(),retryable.waitTime());//分片List<List<Object>> requestList = Lists.partition(paramList, retryable.parts());for (List<Object> partList : requestList) {
args[0] = partList;Object tempResult = retryer.call(new Callable<Object>() {@Overridepublic Object call() throws Exception {try {return point.proceed(args);
} catch (Throwable throwable) {
log.error(String.format("分片重试报错,类%s-方法%s",className,methodName),throwable);throw new RuntimeException("分片重试出错");
}
}
});if (null != tempResult) {if (tempResult instanceof Boolean) {if (!((Boolean) tempResult)) {
log.error(String.format("分片执行报错返回类型不能转化bolean,类%s-方法%s",className,methodName));throw new RuntimeException("分片执行报错!");
}
result = tempResult;
} else if (tempResult instanceof List) {if(result ==null){
result =Lists.newArrayList();
}
((List) result).addAll((List) tempResult);
}else {
log.error(String.format("分片执行返回的类型不支持,类%s-方法%s",className,methodName));throw new RuntimeException("不支持该返回类型");
}
} else {
log.error(String.format("分片执行返回的结果为空,类%s-方法%s",className,methodName));throw new RuntimeException("调用结果为空");
}
}
} else {
result = point.proceed(args);
}return result;
}private boolean isHandler(Object[] args) {boolean isHandler = false;if (null != args && args.length > 0) {Object firstArg = args[0];//如果第一个参数是list 并且数量大于1if (firstArg!=null&&firstArg instanceof List &&((List) firstArg).size()>1) {
isHandler = true;
}
}return isHandler;
}private Method getCurrentMethod(ProceedingJoinPoint point) {try {
Signature sig = point.getSignature();
MethodSignature msig = (MethodSignature) sig;Object target = point.getTarget();return target.getClass().getMethod(msig.getName(), msig.getParameterTypes());
} catch (NoSuchMethodException e) {throw new RuntimeException(e);
}
}
}

说明:

getCurrentMethod:获取方法信息即要做分片的批量调用的接口。
isHandler1:判断是否要做分片处理,只有第一参数是 list 并且 list 的值大于 1 时才做分片处理。
around:具体分片逻辑。

  1. 获取要分片方法的参数。
  2. 判断是否要做分片处理。
  3. 获取方法。
  4. 获取重试次数、重试间隔时间和分片大小。
  5. 生成重试器。
  6. 根据设置的分片大小,做分片处理。
  7. 调用批量接口并处理结果。

4 功能使用

4.1 配置文件

4.2 代码示例

  1. @MethodPartAndRetryer(parts=100)
  2. public Boolean writeBackOfGoodsSN(List<SerialDTO> listSerial,ObCheckWorker workerData)

只要在需要做分片的批量接口方法上,加上 MethodPartAndRetryer 注解就可以,重试次数、重试间隔时间和分片大小可以在注解时设置,也可以使用默认值。

5 小结

通过自定义分片工具,可以快速的对老代码进行分片处理,而且增加了重试机制,提高了程序的可用性,提高了对老代码的重构效率。



声明:该文观点仅代表作者本人,转载请注明来自看雪