当前位置:首页 > 技术分析 > 正文内容

性能调优实战:Spring Boot 多线程处理SQL IN语句大量值的优化方案

ruisui881个月前 (03-28)技术分析5

环境:SpringBoot3.4.0


1. 简介

当我们编写的SQL语句包含有IN语句并且包含大量值时,往往会遇到性能瓶颈,甚至可能导致数据库报错。特别是在处理大数据集时,这种问题尤为突出。大量值的IN语句不仅会增加数据库的查询负担,还可能导致内存消耗过高、查询速度下降,甚至在某些数据库中会因为值过多而直接报错。

MySQL:没有固定的限制值,更多受限于 max_allowed_packet 参数所影响的整体SQL语句大小。

SHOW VARIABLES LIKE '%max_allowed_packet%';

输出结果

当我们执行超大SQL时,将看到如下的错误:

这与你整个执行的sql大小有关

Oracle:理论上支持的 IN 子句值的数量上限为1000项,超出此数目会导致错误。

Oracle好像是不能修改此限制的?

通常我们遇到次情况时可以采取如下的方式解决:

  • 使用临时表
  • 将IN语句中的值进行分批执行

在本篇文章中,我们通过AOP结合多线程技术,自动优化因SQL IN语句包含过多值引起的错误或是导致的性能低下问题。

2. 实战案例

2.1 自定义注解

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface SplitQuery {
  /**线程池bean名称;类型必须是Executor*/
  String executorName() default "" ;
  
  /**批处理大小*/
  int batchSize() default 100 ;
  
  /**返回值结果处理器beanName;类型必须是ResultHandler*/
  String handlerName() default "" ;
}

该注解标注了需要被处理的方法。

@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.PARAMETER)
public @interface BatchParam {
}

该注解标注方法参数中哪个参数需要被处理。

2.2 返回值处理器定义

在切面中通过多线程处理完数据后,可以将结果传递给一个具体的返回值处理器来进一步处理。通过将数据处理和结果处理分离到不同的组件中(即多线程处理逻辑和返回值处理器),系统变得更加模块化。这种设计有助于降低组件之间的耦合度。当需要更改数据处理逻辑或结果处理方式时,只需修改相应的组件即可,无需对整个切面或业务逻辑进行大规模调整。这大大提高了系统的可扩展性和灵活性。

接口定义

public interface ResultHandler {
  T process(List<Object> result) ;
}

默认实现

public class DefaultResultHandler implements ResultHandler<Object> {
  @Override
  public Object process(List<Object> result) {
    return result ;
  }
}

默认处理器,不进行任何的处理直接返回结果;我们应该根据自己的业务来实现具体的逻辑处理。

2.3 切面定义

切面中我们会根据具体IN参数(List集合)的个数与注解中配置的批次大小进行拆分成多个线程进行并发处理数据(List.size / batchSize)。

@Aspect
@Component
public class SplitQueryAspect implements ApplicationContextAware {
  private static final Logger logger = LoggerFactory.getLogger(SplitQueryAspect.class) ;
  
  /**默认使用虚拟线程*/
  private static final Executor defaultExecutor = Executors.newVirtualThreadPerTaskExecutor() ;
  
  private ApplicationContext context ;
  
  @Pointcut("@annotation(sq)")
  private void splitPc(SplitQuery sq) {}
  
  @Around("splitPc(sq)")
  public Object splitQueryAround(ProceedingJoinPoint pjp, SplitQuery sq) throws Throwable {
    int batchSize = sq.batchSize() ;
    Executor executor = getExecutor(sq.executorName()) ;
    Object[] args = pjp.getArgs() ;
    MethodSignature ms = (MethodSignature) pjp.getSignature() ;
    
    Parameter[] parameters = ms.getMethod().getParameters() ;
    int index = -1 ;
    for (int i = 0, len = parameters.length; i < len; i++) {
      Parameter param = parameters[i] ;
      BatchParam batchParam = param.getAnnotation(BatchParam.class) ;
      if (batchParam != null) {
        index = i ;
        break ;
      }
    }
    Object arg = args[index] ;
    // 这里只考虑了参数集合是List情况
    if (index == -1 
        || !List.class.isAssignableFrom(arg.getClass()) 
        || ((List) arg).size() <= batchSize) {
      logger.info("直接调用目标方法...") ;
      return pjp.proceed() ;
    }
    ResultHandler resultHandler = getResultHandler(sq.handlerName()) ;
    final int paramIndex = index ;
    List data = (List) arg ;
    // 这里我们使用的guava进行拆分集合
    List partitions = Lists.partition(data, batchSize) ;
    List<Object> result = partitions.stream().map(chunk -> {
      return CompletableFuture.supplyAsync(() -> {
        try {
          Object[] newArgs = new Object[args.length] ;
          System.arraycopy(args, 0, newArgs, 0, args.length) ;
          newArgs[paramIndex] = chunk ;
          logger.info("处理批次数据: {}", newArgs[paramIndex]) ;
          return pjp.proceed(newArgs) ;
        } catch (Throwable e) {
          return null ;
        }
      }, executor) ; // 设置线程池
    }).collect(Collectors.toList())
        .stream()
        .map(CompletableFuture::join)
        // 过滤数据为null或空的情况
        .filter(obj -> obj != null && !((List)obj).isEmpty())
        .collect(Collectors.toList()) ;
    return resultHandler.process(result) ;
  }
  
  private Executor getExecutor(String executorName) {
    if (StringUtils.hasLength(executorName)) {
      try {
        return this.context.getBean(executorName, Executor.class) ;
      } catch (Exception e) {
        logger.warn("不存beanName为: {} 的线程池,将使用默认的虚拟线程池对象", executorName);
        return defaultExecutor ;
      }
    }
    return defaultExecutor ;
  }
  private ResultHandler getResultHandler(String handlerName) {
    if (StringUtils.hasLength(handlerName)) {
      try {
        return this.context.getBean(handlerName, ResultHandler.class) ;
      } catch (Exception e) {
        logger.warn("不存beanName为: {} 的结果处理器,将使用DefaultResultHandler", handlerName);
        return new DefaultResultHandler() ;
      }
    }
    return new DefaultResultHandler() ;
  }
  @Override
  public void setApplicationContext(ApplicationContext context) throws BeansException {
    this.context = context ;
  }
}

以上我们就完成了切面的编写,接下来我们就可以进行测试了。

2.4 业务代码编写

Repository接口定义

public interface PersonRepository extends JpaRepository {
  List findByAgeAndNameContainingAndIdIn(Integer age, String name, List ids) ;
}

自定义了一个根据age,name和id进行查询的方法。

Service业务方法

@Service
public class PersonService {
  private final PersonRepository personRepository ;
  public PersonService(PersonRepository personRepository) {
    this.personRepository = personRepository;
  }
  @SplitQuery(batchSize = 2, handlerName = "personResultHandler")
  public List query(Integer age, @BatchParam List ids, String name) {
    return this.personRepository.findByAgeAndNameContainingAndIdIn(age, name, ids) ;
  }
}

这里的query方法将通过切面多线程进行处理,其中设置了返回值处理器,该处理器定义如下:

@Component("personResultHandler")
public class PersonInResultHandler implements ResultHandler<List> {
  @Override
  public List process(List<Object> result) {
    if (result == null) {
      return null ;
    }
    return result.stream()
    // 这里我们知道返回的类型,所有可以直接进行类型的转换
    .flatMap(obj -> ((List)obj).stream())
    .collect(Collectors.toList()) ;
  }
}

2.5 测试

@RestController
@RequestMapping("/persons")
public class PersonController {
  private final PersonService personService ;
  public PersonController(PersonService personService) {
    this.personService = personService;
  }
  @GetMapping("/query")
  public ResponseEntity<List> query() {
    return ResponseEntity.ok(this.personService.query(11, 
      List.of(1L, 2L, 3L, 4L, 5L), "a")) ;
  }
}

调用上面的接口最终控制台SQL输出如下:

通过3个线程执行

我们将batchSize修改为6后再进行测试:

直接调用了目标方法,因为我们的List中的值小于batchSize的个数。

扫描二维码推送至手机访问。

版权声明:本文由ruisui88发布,如需转载请注明出处。

本文链接:http://www.ruisui88.com/post/3094.html

标签: 线程优化
分享给朋友:

“性能调优实战:Spring Boot 多线程处理SQL IN语句大量值的优化方案” 的相关文章

使用cgroup限制进程资源

这里使用containerd项目中的cgroup包来实现进程资源限制。先写一个耗费一个CPU并且一秒增加10m内存的测试进程package mainimport ( "fmt" "math/rand" "time")func main() { go f...

国产操作系统上Vim的详解03--安装和使用插件 | 统信 | 麒麟 | 中科方德

原文链接:国产操作系统上Vim的详解03--使用Vundle插件管理器来安装和使用插件 | 统信 | 麒麟 | 中科方德Hello,大家好啊!今天给大家带来一篇在国产操作系统上使用Vundle插件管理器来安装和使用Vim插件的详解文章。Vundle是Vim的一款强大的插件管理器,可以帮助我们轻松地安...

vue3使用vue-router路由(路由懒加载、路由传参)

vue-router 是 vue的一个插件库1. 专门用来实现一个SPA单页面应用2 .基于vue的项目基本都会用到此库SPA的理解1) 单页Web应用(single page web application,SPA)2) 整个应用只有一个完整的页面3) 点击页面中的链接不会刷新页面, 本身也不会向...

Vue实战篇|使用路由管理用户权限(动态路由)

权限控制是后台管理系统比较常见的需求,如果我们需要对某些页面的添加权限控制的话,那我们可以在路由管理中的权限做一些校验,没有通过权限校验的给出相应的提示或者直接跳转到报错页面。跟着我一起来学vue实战篇路由管理权限吧!权限校验函数getCurrentAuthority()函数用于获取当前用户权限,一...

嵌入式实操——基于RT1170 使能SEMC配置SDRAM功能(八)

本文主要是通过迁移的思维,记录本人初次使用NXP MCUXpresso SDK API进行BSP开发MCUXpresso SDK SEMC API 接口链接  在MCUXpresso SDK 框架下提供了对SEMC DDR进行操作的接口。学习链接:https://community.nxp.com/...

微信企业号首款永久免费应用问世

7月14日,微信企业号移动办公应用领跑者——办公逸宣布:其所研发的微信办公应用将永久免费,企事业单位只要拥有微信企业号都可以免费安装办公逸各项应用,此举标志着微信办公免费时代现已到来!据悉,办公逸(www.bangongyi.com)现已推出四大微信办公套件,分别为:移动办公管理套件、客户关系管理套...