分布式事务的几种实现(阿里seata)
标签: 分布式事务的几种实现(阿里seata) 博客 51CTO博客
2023-04-03 18:23:45 214浏览
seata的部署和配置
TCC模式的实现
通过阿里的Seata去运用TCC模式
一个分布式的全局事务,整体是 两阶段提交 的模型。全局事务是由若干分支事务组成的,分支事务要满足 两阶段提交 的模型要求,即需要每个分支事务都具备自己的:
- 一阶段 prepare 行为
- 二阶段 commit 或 rollback 行为
定义TCC接口
由于我们使用的是SpringCloud+Feign,Feign的调用基于http,因此此处我们使用LocalTCC便可。值得注意的是,@LocalTCC一定需要注解在接口上,此接口可以是寻常的业务接口,只要实现了TCC的两阶段提交对应方法便可。
- @LocalTCC 适用于SpringCloud+Feign模式下的TCC
- @TwoPhaseBusinessAction 注解try方法,其中name为当前tcc方法的bean名称,写方法名便可(记得全局唯一),commitMethod指向提交方法,rollbackMethod指向事务回滚方法。指定好三个方法之后,seata会根据全局事务的成功或失败,去帮我们自动调用提交方法或者回滚方法。
- @BusinessActionContextParameter 注解可以将参数传递到二阶段(commitMethod/rollbackMethod)的方法。
BusinessActionContext 便是指TCC事务上下文
下面根据模型定义Service和实现,定义三个方法,prepare行为,commit和rollback方法。
@LocalTCC
public interface TccService {
/**
* 定义两阶段提交prepare方法
* name = 该tcc的bean名称,全局唯一
* commitMethod = commit 为二阶段确认方法
* rollbackMethod = rollback 为二阶段取消方法
* BusinessActionContextParameter注解 传递参数到二阶段中
*
* @return String
*/
@TwoPhaseBusinessAction(name = "insert", commitMethod = "commitTcc", rollbackMethod = "cancel")
String insert(Long productId,Integer count);
/**
* commit方法、可以另命名,但要保证与commitMethod一致
* context可以传递try方法的参数
*
* @param context 上下文
* @return boolean
*/
boolean commitTcc(BusinessActionContext context);
/**
* 二阶段rollback方法
*
* @param context 上下文
* @return boolean
*/
boolean cancel(BusinessActionContext context);
}
@Slf4j
@Service
public class TccServiceImpl implements TccService {
@Autowired
private ResourceStorageService resourceStorageService;
/**
* tcc服务t(try)方法
* 根据实际业务场景选择实际业务执行逻辑或者资源预留逻辑
*
* @return String
*/
@Override
@PostMapping("/tcc-insert")
@Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRED)
public String insert(Long productId,Integer count) {
log.info("xid = " + RootContext.getXID());
resourceStorageService.decrease(productId, count);
//放开以下注解抛出异常
// throw new RuntimeException("服务tcc测试回滚");
return "success";
}
/**
* tcc服务 confirm方法
* 若一阶段采用资源预留,在二阶段确认时要提交预留的资源
*
* @param context 上下文
* @return boolean
*/
@Override
public boolean commitTcc(BusinessActionContext context) {
log.info("xid = " + context.getXid() + "提交成功");
// 若try成功,一阶段资源预留,这里则要提交资源
return true;
}
/**
* tcc 服务 cancel方法
*
* @param context 上下文
* @return boolean
*/
@Override
public boolean cancel(BusinessActionContext context) {
//todo 这里写回滚操作
System.out.println("please manually rollback this data:" + context.getActionContext("params"));
return true;
}
}
提供调用方法:
@Api(tags = "资源库存")
@RestController
@RequestMapping("/storage")
public class StorageController {
@Autowired
private TccService tccService;
/**
* 扣减库存
*/
@RequestMapping("/decrease")
public CommonResult decrease(Long productId, Integer count) {
tccService.insert(productId,count);
return new CommonResult("扣减库存成功!",200);
}
}
下面通过feign调用,这里写调用方:
@Api(tags = "分布式-接口测试demo")
@RestController
@RequestMapping(value = "/v1/distributed")
public class DistributedController {
/**
* 库存服务,减少库存
*/
@Resource
private FeignStorageService feignStorageService;
/**
* 增加一条主题内容
*/
@Resource
private TopicService topicService;
@GetMapping("/transaction")
@ApiOperation(value = "分布式事务seata测试")
@GlobalTransactional(name = "fsp-tra", rollbackFor = Exception.class)
public CommonResult transaction() {
//减少库存
feignStorageService.decrease(1L, 1);
//增加一条topic
TopicDO topicDO = new TopicDO();
topicDO.setName("购物热");
topicService.save(topicDO);
//执行出错
int a = 1 / 0;
return CommonResult.success("");
}
}
库存的feign调用客户端
/**
* @author haitao.li
* @description: 库存
* @date 2021/10/25 10:26
*/
@FeignClient(value = "tcly-storage")
public interface FeignStorageService {
/**
* 扣减库存
* @param productId 查询id
* @param count 产品数量
* @return 扣除结果
*/
@RequestMapping("/storage/decrease")
CommonResult<Object> decrease(@RequestParam Long productId,@RequestParam Integer count);
}
上面的代码,调用的时候,通过@GlobalTransactional(name = “fsp-tra”, rollbackFor = Exception.class)进行全局事务管理。
下面根据以上代码进行测试。
调用到最后的时候,一行代码出错: int a = 1 / 0;
正常情况 下,扣减库存和插入topic都会失败
执行前的storage:
执行前的topic
执行后结果:
topic没有添加,但是库存扣减成功了(因为我这里的cancel方法里没有做实际的回滚操作,看下面,如果实际执行了cancel则说明TCC正常)。
下面是库存服务 中的两阶段结果:
全局事务执行成功,并且库存服务的两阶段模型执行了第二阶段的cancel操作。
如果开启了@LocalTCC,那么必须通过cancel方法进行回滚。
XA/AT(2PC)
一个下订单的例子,进行扣减库存,生成订单,余额扣除。
通过业务模块business进行分布式 事务全局管控,storage,account,order三个微服务之间通过feign调用并处理事务。
样例场景是 Seata 经典的,涉及库存、订单、账户 3 个微服务的商品订购业务。
在样例中,上层编程模型与 AT 模式完全相同。只需要修改数据源代理,即可实现 XA 模式与 AT 模式之间的切换。
@Bean("dataSource")
public DataSource dataSource(DruidDataSource druidDataSource) {
// DataSourceProxy for AT mode
// return new DataSourceProxy(druidDataSource);
// DataSourceProxyXA for XA mode
return new DataSourceProxyXA(druidDataSource);
}
完整代码(下载后根据以下步骤):
下载demo:seata-xa
准备工作
- 执行
sql/all_in_one.sql
- 下载最新版本的 Seata Sever
- 解压并启动 Seata server
unzip seata-server-xxx.zip
cd distribution
sh ./bin/seata-server.sh 8091 file
- 启动 AccountXA, OrderXA, StorageXA, BusinessXA 服务
测试
- 无错误成功提交
curl http://127.0.0.1:8084/purchase
具体调用参数请结合 BusinessController 的代码。
数据初始化逻辑,参见 BusinessService#initData() 方法。
基于初始化数据,和默认的调用逻辑,purchase 将可以被成功调用 3 次。
每次账户余额扣减 3000,由最初的 10000 减少到 1000。
第 4 次调用,因为账户余额不足,purchase 调用将失败。相应的:库存、订单、账户都回滚。
XA 模式与 AT 模式
只要切换数据源代理类型,该样例即可在 XA 模式和 AT 模式之间切换。
DataSourceConfiguration
XA 模式使用 DataSourceProxyXA
public class DataSourceProxy {
@Bean("dataSourceProxy")
public DataSource dataSource(DruidDataSource druidDataSource) {
// DataSourceProxyXA for XA mode
return new DataSourceProxyXA(druidDataSource);
}
}
AT 模式使用 DataSourceProxy
public class DataSourceProxy {
@Bean("dataSourceProxy")
public DataSource dataSource(DruidDataSource druidDataSource) {
// DataSourceProxyXA for AT mode
return new DataSourceProxy(druidDataSource);
}
}
当然,AT 模式需要在数据库中建立 undo_log 表。(XA 模式是不需要这个表的)
CREATE TABLE `undo_log` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`branch_id` bigint(20) NOT NULL,
`xid` varchar(100) NOT NULL,
`context` varchar(128) NOT NULL,
`rollback_info` longblob NOT NULL,
`log_status` int(11) NOT NULL,
`log_created` datetime NOT NULL,
`log_modified` datetime NOT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
saga模式
好博客就要一起分享哦!分享海报
此处可发布评论
评论(0)展开评论
展开评论