“幂等”不等于“分布式锁”,也不得不考虑返回值
1. 概览
在分布式系统中,幂等是一个非常重要的概念,常常与“重试”一起出现。当调用一个远程服务发生超时,调用方并不知道请求是否执行成功,这就是典型的“第三态”问题。对于这个问题最常见的解决方案便是进行主动重试,假如该操作是一个数据库插入操作,重试将对系统产生副作用(创建多条记录),这时我们常常会说,被调用接口需要保障幂等。
1.1. 背景
幂等可以简单定义如下:任意多次执行所产生的影响均与第一次执行的影响相同。
【注】从幂等定义上看,重心放在了操作之后的影响,及多次操作不会破坏内部状态。但在实际工作当中,除了内部状态外,接口的返回值也是一个重要要素,多次重复操作返回相同的结果往往更符合使用者的预期。
举个例子,在订单系统中,用户使用优惠券下单后会调用冻结接口对优惠券进行冻结操作,站着订单系统的视角,当进行冻结重试时你期望:
- 优惠券抛出异常,告知订单优惠券已经冻结;
- 优惠券直接返回上次的结果 “冻结成功;
大家可以思考下两种方案在下游使用时的异同,当然最好提供两种机制,由使用方根据场景进行定制。
在不同的场景下,幂等保护的方案是不同的,常见幂等处理策略有:
- 天然具有幂等性,不需要保护。比如读操作、按主键数据删除等;
- 使用上游信息作为下游主键或唯一键的插入场景,由于有键的约束,可以保障幂等。比如以 userId 作为 Card(名片)的主键;
- 数据库字段的直接更新操作也具有一定的幂等性。比如用户修改姓名;
- 直接使用外部存储,以幂等键为标识,对方法执行情况进行记录,并以此为判断依据完成幂等保护;
由于其他策略与场景强绑定,idempotent 重心放在方案4上,已覆盖更多的业务场景。
1.2. 目标
快速为非幂等接口增加幂等保护。
- 基于“能力声明化”的方式,为接口快速添加幂等保护;
- 支持常见的两种幂等保护策略;
- 直接返回上次的执行结果;
- 抛出异常告知重复提交;
- 支持存储层的扩展,并提供常见的存储实现,包括:
- redis 实现,适用于一般通用场景,性能好但缓存过期后可能存在幂等不生效的情况;
- db 实现,适用于比较严格的场景,比如与订单、金额相关的业务;
2. 快速入门
2.1. 准备工作
首先,引入 lego starter,在 maven pom 中添加如下信息:
com.geekhalo.lego
lego-starter
0.1.15-idempotent-SNAPSHOT
然后,以 JpaRepository 为例实现对 IdempotentExecutorFactory 的配置,具体如下:
@Configuration
public?class?IdempotentConfiguration?extends?IdempotentConfigurationSupport?{
????@Bean("dbExecutorFactory")
????public?IdempotentExecutorFactory?redisExecutorFactory(JpaBasedExecutionRecordRepository?recordRepository){
????????return?createExecutorFactory(recordRepository);
????}
}
其中,
IdempotentConfigurationSupport 已经提供 idempotent 所需的很多 Bean,同时提供 createExecutorFactory(repository) 方法,用以完成 IdempotentExecutorFactory 的创建。
使用 Jpa 需要调整 EnableJpaRepositories 相关配置,具体如下:
@Configuration
@EnableJpaRepositories(basePackages?=?{
????????"com.geekhalo.lego.core.idempotent.support.repository"
},?repositoryFactoryBeanClass?=?JpaBasedQueryObjectRepositoryFactoryBean.class)
public?class?SpringDataJpaConfiguration?{
}
其中,
com.geekhalo.lego.core.idempotent.support.repository 为固定包名,指向 Jpa 默认实现
JpaBasedExecutionRecordRepository,Spring Data Jpa 会自动生成实现的代理对象。
最后,在数据库中增加 幂等所需表,sql 如下:
CREATE?TABLE?`idempotent_execution_record`?(
???`id`?bigint(20)?NOT?NULL?AUTO_INCREMENT,
???`type`?int(11)?NOT?NULL,
???`unique_key`?varchar(64)?NOT?NULL,
???`status`?int(11)?NOT?NULL,
???`result`?varchar(1024)?DEFAULT?NULL,
???`create_date`?datetime?DEFAULT?NULL,
???`update_date`?datetime?DEFAULT?NULL,
???PRIMARY?KEY?(`id`),
???UNIQUE?KEY?`unq_type_key`?(`type`,`unique_key`)
)?ENGINE=InnoDB;
至此,便完成了基本配置。
【注】关于 Spring data jpa 配置,可以自行到网上进行检索。
2.2. 初识幂等保护
在方法上增加 @Idempotent 注解便可以使其具备幂等保护,示例如下:
@Idempotent(executorFactory?=?"dbExecutorFactory",?group?=?1,?keyEl?=?"#key",
????????handleType?=?IdempotentHandleType.RESULT)
@Transactional
public?Long?putForResult(String?key,?Long?data){
????return?put(key,?data);
}
其中 @Idempotent 为核心配置,详细信息如下:
- executorFactory 为 IdempotentExecutorFactory,及在 IdempotentConfiguration 中配置的bean,默认为 DEFAULT_EXECUTOR_FACTORY
- group 为组信息,用于区分不同的业务场景,同一业务场景使用相同的配置;
- keyEl 为提取幂等键所用的 SpringEl 表达式,#key 说明入参的 key 将作为幂等键,group + key 为一个完整的幂等键,唯一识别一次请求;
- handleType 是处理类型,及重复提交时如何处理请求
- RESULT,直接返回上次的执行结果
- ERROR,直接抛出 RepeatedSubmitException 异常
编写简单的测试用例如下:
@Test
void?putForResult()?{
????BaseIdempotentService?idempotentService?=?getIdempotentService();
????String?key?=?String.valueOf(RandomUtils.nextLong());
????Long?value?=?RandomUtils.nextLong();
????{???//?第一次操作,返回值和最终结果符合预期
????????Long?result?=?idempotentService.putForResult(key,?value);
????????Assertions.assertEquals(value,?result);
????????Assertions.assertEquals(value,?idempotentService.getValue(key));
????}
????{???//?第二次操作,返回值和最终结果?与第一次一致(直接获取返回值,没有执行业务逻辑)
????????Long?valueNew?=?RandomUtils.nextLong();
????????Long?result?=?idempotentService.putForResult(key,?valueNew);
????????Assertions.assertEquals(value,?result);
????????Assertions.assertEquals(value,?idempotentService.getValue(key));
????}
}
运行测试用例,测试通过,可得出如下结论:
- 第一次操作,与正常方法一致,成功返回结果值;
- 第二次操作,逻辑方法未执行,直接返回第一次的运行结果;
这是最常见的一种工作模式,除直接返回上次执行结果外,当发生重复提交时也可以抛出异常中断流程,只需将 handleType 设置为 ERROR 即可,具体如下:
@Idempotent(executorFactory?=?"dbExecutorFactory",?group?=?1,?keyEl?=?"#key",
????handleType?=?IdempotentHandleType.ERROR)
@Transactional
public?Long?putForError(String?key,?Long?data){
????return?put(key,?data);
}
编写测试用例,具体如下:
@Test
void?putForError()?{
????BaseIdempotentService?idempotentService?=?getIdempotentService();
????String?key?=?String.valueOf(RandomUtils.nextLong());
????Long?value?=?RandomUtils.nextLong();
????{?//?第一次操作,返回值和最终结果符合预期
????????Long?result?=?idempotentService.putForError(key,?value);
????????Assertions.assertEquals(value,?result);
????????Assertions.assertEquals(value,?idempotentService.getValue(key));
????}
????{?//?第二次操作,直接抛出异常,结果与第一次一致
????????Assertions.assertThrows(RepeatedSubmitException.class,?()?->{
????????????Long?valueNew?=?RandomUtils.nextLong();
????????????idempotentService.putForError(key,?valueNew);
????????});
????????Assertions.assertEquals(value,?idempotentService.getValue(key));
????}
}
运行测试用例,测试通过,可以得出:
- 第一次操作,与正常方法一致,成功返回结果值;
- 第二次操作,直接抛出 RepeatedSubmitException 异常,同时方法未执行,结果与第一次调用一致;
2.3. 幂等与异常
异常是一种特殊的返回值!!!
如果将异常看做是一种特殊的返回值,那幂等接口在第二次请求时同样需要抛出异常,示例代码如下:
@Idempotent(executorFactory?=?"dbExecutorFactory",?group?=?1,?keyEl?=?"#key",
????????handleType?=?IdempotentHandleType.RESULT)
@Transactional
public?Long?putExceptionForResult(String?key,?Long?data)?{
????return?putException(key,?data);
}
protected?Long?putException(String?key,?Long?data){
????this.data.put(key,?data);
????throw?new?IdempotentTestException();
}
@Idempotent 注解没有变化,只是在 putException 方法执行后抛出 IdempotentTestException 异常。
编写简单测试用例如下:
@Test
void?putExceptionForResult(){
????BaseIdempotentService?idempotentService?=?getIdempotentService();
????String?key?=?String.valueOf(RandomUtils.nextLong());
????Long?value?=?RandomUtils.nextLong();
????{???//?第一次操作,抛出异常
????????Assertions.assertThrows(IdempotentTestException.class,
????????????????()->idempotentService.putExceptionForResult(key,?value));
????????Assertions.assertEquals(value,?idempotentService.getValue(key));
????}
????{???//?第二次操作,返回值和最终结果?与第一一致(直接获取返回值,没有执行业务逻辑)
????????Long?valueNew?=?RandomUtils.nextLong();
????????Assertions.assertThrows(IdempotentTestException.class,
????????????????()->idempotentService.putExceptionForResult(key,?valueNew));
????????Assertions.assertEquals(value,?idempotentService.getValue(key));
????}
}
运行测试用例,用例通过,可知:
- 第一次操作,与方法逻辑一致,更新数据并抛出 IdempotentTestException 异常;
- 第二次操作,直接抛出 IdempotentTestException 异常,同时方法未执行,结果与第一次一致;
2.4. 并发保护
如果上一个请求执行尚未结束,新的请求已经开启,那会如何?
这就是最常见的并发场景,idempotent 对其也进行了支持,当出现并发请求时会直接抛出
ConcurrentRequestException,用于中断处理。
首先,使用 sleep 模拟一个耗时的方法,具体如下:
@Idempotent(executorFactory?=?"dbExecutorFactory",?group?=?1,?keyEl?=?"#key",
????????????handleType?=?IdempotentHandleType.RESULT)
@Transactional
public?Long?putWaitForResult(String?key,?Long?data)?{
????return?putForWait(key,?data);
}
protected?Long?putForWait(String?key,?Long?data){
????try?{
????????TimeUnit.SECONDS.sleep(3);
????}?catch?(InterruptedException?e)?{
????????e.printStackTrace();
????}
????return?put(key,?data);
}
putWaitForResult 方法调用时会主动 sleep 3 秒,然后才执行真正的逻辑。
编写测试代码如下:
@Test
void?putWaitForResult(){
????String?key?=?String.valueOf(RandomUtils.nextLong());
????Long?value?=?RandomUtils.nextLong();
????//?主线程抛出?ConcurrentRequestException?
????Assertions.assertThrows(ConcurrentRequestException.class,?()?->
????????testForConcurrent(baseIdempotentService?->
????????????baseIdempotentService.putWaitForResult(key,?value))
????);
}
private?void?testForConcurrent(Consumer?consumer)?throws?InterruptedException?{
????//?启动一个线程执行任务,模拟并发场景
????Thread?thread?=?new?Thread(()?->?consumer.accept(getIdempotentService()));
????thread.start();
????//?主线程?sleep?1?秒,与异步线程并行执行任务
????TimeUnit.SECONDS.sleep(1);
????consumer.accept(getIdempotentService());
}
运行单元测试,测试通过,核心测试逻辑如下:
- 创建一个线程,执行耗时方法;
- 等待 1 秒后,主线程也执行耗时方法;
- 此时,两个线程并发执行耗时方法,后进入的主线程直接抛出 ConcurrentRequestException;
2.5. Redis 支持
DB 具有非常好的一致性,但性能存在一定的问题。在一致性要求不高,性能要求高的场景,可以使用 Redis 作为 ExecutionRecord 的存储引擎。
引入 redis 非常简单,大致分两步:
- 在 IdempotentConfiguration 中注册 redisExecutorFactory bean;
- @Idempotent 注解中使用 redisExecutorFactory 即可;
添加 redisExecutorFactory Bean,具体如下:
@Configuration
public?class?IdempotentConfiguration?extends?IdempotentConfigurationSupport?{
????@Bean("redisExecutorFactory")
????public?IdempotentExecutorFactory?redisExecutorFactory(ExecutionRecordRepository?executionRecordRepository){
????????return?createExecutorFactory(executionRecordRepository);
????}
????@Bean
????public?ExecutionRecordRepository?executionRecordRepository(RedisTemplate?recordRedisTemplate){
????????return?new?RedisBasedExecutionRecordRepository("ide-%s-%s",?Duration.ofDays(7),?recordRedisTemplate);
????}
????@Bean
????public?RedisTemplate?recordRedisTemplate(RedisConnectionFactory?redisConnectionFactory){
????????RedisTemplate?redisTemplate?=?new?RedisTemplate();
????????redisTemplate.setConnectionFactory(redisConnectionFactory);
????????redisTemplate.setKeySerializer(new?StringRedisSerializer());
????????ObjectMapper?objectMapper?=?new?ObjectMapper();
????????objectMapper.configure(FAIL_ON_UNKNOWN_PROPERTIES,?false);
????????Jackson2JsonRedisSerializer?executionRecordJackson2JsonRedisSerializer?=?new?Jackson2JsonRedisSerializer<>(ExecutionRecord.class);
????????executionRecordJackson2JsonRedisSerializer.setObjectMapper(objectMapper);
????????redisTemplate.setValueSerializer(executionRecordJackson2JsonRedisSerializer);
????????return?redisTemplate;
????}
}
@Idempotent 注解调整如下:
@Idempotent(executorFactory?=?"redisExecutorFactory",?group?=?1,?keyEl?=?"#key",
????????handleType?=?IdempotentHandleType.RESULT)
@Override
public?Long?putForResult(String?key,?Long?data){
????return?put(key,?data);
}
这样,所有的幂等信息都会存储在 redis 中。
【注】一般 redis 不会对数据进行持久存储,只能保障在一段时间内的幂等性,超出时间后,由于 key 被自动清理,幂等将不再生效。对于业务场景不太严格但性能要求较高的场景才可使用,比如为过滤系统中由于 retry 机制造成的重复请求。
3. 设计&扩展
3.1. 整体结构
image
整体设计比较简单,运行流程如下:
- IdempotentInterceptor 会对 @Idempotent 注解标记的方法进行拦截;
- 当方法第一次被调用时,会读取 @Idempotent 注解上的配置信息,使用 IdempotentExecutorFactoris 为每个方法创建一个 IdempotentExecutor 实例;
- 在方法调用时,将请求直接路由到 IdempotentExecutor 实例,由 IdempotentExecutor 完成核心流程;
- 其中,IdempotentExecutorFactories 拥有多个 IdempotentExecutorFactory 实例,并根据 @Idempotent 上配置的 executorFactory 属性使用对应的实例完成创建工作;
从设计上看,系统中可以同时配置多个 IdempotentExecutorFactory,然后根据不同的业务场景设置不同的 executorFactory。
3.2. 核心流程
image
IdempotentExecutor处理核心流程如下:
- 通过 SpringEL 表达式从入参中提取 unique key 信息;
- 根据 group 和 unique key 从 ExecutionRecordRepository 中读取执行记录 ExecutionRecord;
- 如果 ExecutionRecord 为已完成状态,则根据配置直接返回 ExecutionRecord 的执行结果 或者 直接抛出 RepeatedSubmitException 异常;
- 如果 ExecutionRecord 为执行中,则出现并发问题,直接抛出 ConcurrentRequestException 异常;
- 如果 ExecutionRecord 为未执行,先执行方法获取返回值,然后使用 ExecutionRecordRepository 对 ExecutionRecord 进行更新,然后返回执行结果;
4. 项目信息
项目仓库地址:
https://gitee.com/litao851025/lego
项目文档地址:
https://gitee.com/litao851025/lego/wikis/support/idempotent