go-zero(十七)结合DTM :实现分布式事务

1. 基础概念介绍

1.1 什么是分布式事务

在微服务架构中,一个业务操作常常需要调用多个服务来完成。例如,在电商系统中下单时,需要同时操作订单服务和库存服务。这种跨服务的操作就需要分布式事务来保证数据一致性。

分布式事务面临以下挑战:

  • 多个服务间如何协调
  • 服务失败如何回滚
  • 如何保证最终一致性
  • 网络故障、服务不可用等问题处理

1.2 什么是DTM

DTM (Distributed Transaction Manager) 是一个开源的分布式事务管理器,它支持多种事务模式,如TCC、SAGA、XA等。DTM的特点包括:

  • 简单易用的接口
  • 支持多种事务模式
  • 支持多种编程语言
  • 高可用、高性能设计

DTM让开发者可以像使用本地事务一样简单地使用分布式事务,大大降低了开发难度。

1.3 分布式事务模式简介

分布式事务有多种实现模式,本教程将介绍两种常用模式:

SAGA模式
将一个大事务拆分为多个小事务,每个小事务都有对应的补偿操作。如果某个小事务失败,则执行已完成事务的补偿操作,实现回滚。

TCC模式
将一个事务分为三个阶段:Try(尝试)、Confirm(确认)、Cancel(取消)。Try阶段进行资源检查和预留,Confirm阶段确认执行,Cancel阶段在Try失败后进行取消操作。

这两种模式各有优缺点,适用于不同的业务场景。

1.4分布式事务模式对比

让我们了解一下DTM支持的几种事务模式:

事务模式使用场景优点缺点
SAGA长事务、低一致性要求实现简单,可靠性高最终一致性,补偿逻辑复杂
TCC对一致性要求高强一致性,性能好实现复杂,需要多个接口
XA标准事务处理强一致性,使用简单性能较差,锁定资源时间长
2阶段(2pc)消息可靠消息投递简单易用,适合异步最终一致性

2. 项目概述

2.1 业务场景

本教程将实现一个简化的电商下单场景,涉及两个核心服务:

  1. 订单服务:负责创建和管理订单
  2. 库存服务:负责管理商品库存

业务流程如下:

  • 用户下单时,需要创建订单并扣减库存
  • 如果任一操作失败,需要回滚所有操作,保证数据一致性

我们将使用两种不同的分布式事务模式来实现这个场景:

  • 使用TCC模式处理订单服务
  • 使用SAGA模式处理库存服务

3. 环境准备

3.1 前置环境

首先,要先把以下环境部署好:

  1. Go(1.16+)
  2. Go-Zero
  3. MySQL(5.7+):用于存储数据和DTM事务信息
  4. etcd:用于服务发现

3.2DTM服务部署

Docker快速启动
DTM 服务可以通过 Docker 快速启动:

# 拉取 DTM 镜像
docker pull yedf/dtm:latest# 启动 DTM 服务
docker run -it --name dtm -p 36789:36789 -p 36790:36790 -e STORE_DRIVER=mysql -e STORE_HOST=mysql:3306 -e STORE_USER=root -e STORE_PASSWORD=123456 -e STORE_PORT=3306 yedf/dtm:latest

注意:如果环境都是使用docker部署,网络要使用同一个。

本地部署
或者拉取源码启动:

git clone https://github.com/dtm-labs/dtm

源码拉取成功后,我们需要修改conf.sample.yml文件,把服务注册到ETCD,把下面的设置取消注释:

 MicroService: # gRPC/HTTP based microservice configDriver: 'dtm-driver-gozero' # name of the driver to handle register/discoverTarget: 'etcd://localhost:2379/dtmservice' # register dtm server to this urlEndPoint: 'localhost:36790'Store:Driver: 'mysql'Host: 'localhost'User: 'root'Password: '123456'Port: 3306DB: 'dtm'  

然后使用这个配置启动服务

go run mian.go -c conf.sample.yml

部署完成后,可以通过浏览器访问 http://localhost:36789 ,来查看管理页面 ,这个页面可以很直观的查看到,分布式事务是否成功,以及使用的哪些事务模式。

4.使用SAGA模式

我们先从简单的SAGA模式开始,实现库存服务的扣减和补偿操作。

4.1 SAGA模式介绍

SAGA模式是一种分布式事务解决方案,将一个大事务拆分为多个小事务,每个小事务都有对应的补偿操作。

SAGA的流程如下:

  1. 执行一系列正向操作(如扣减库存)
  2. 如果任一操作失败,则执行已完成操作的补偿操作(如增加库存)

SAGA适用于可以通过补偿操作回滚的业务场景。

4.2 数据库设计

为了演示分布式事务,我们需要创建三个数据库:订单数据库、库存数据库和DTM数据库。

订单数据库:

CREATE DATABASE IF NOT EXISTS dtm_order;
USE dtm_order;CREATE TABLE `orders` (`id` bigint(20) NOT NULL AUTO_INCREMENT,`user_id` bigint(20) NOT NULL COMMENT '用户ID',`product_id` bigint(20) NOT NULL COMMENT '产品ID',`amount` int(11) NOT NULL COMMENT '数量',`money` decimal(10,2) NOT NULL COMMENT '金额',`status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '状态: 0-创建 1-完成',`create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,`update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,PRIMARY KEY (`id`),KEY `idx_user_id` (`user_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

库存数据库:

CREATE DATABASE IF NOT EXISTS dtm_stock;
USE dtm_stock;CREATE TABLE `stock` (`id` bigint(20) NOT NULL AUTO_INCREMENT,`product_id` bigint(20) NOT NULL COMMENT '产品ID',`amount` int(11) NOT NULL COMMENT '库存数量',`create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,`update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,PRIMARY KEY (`id`),UNIQUE KEY `uniq_product_id` (`product_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;-- 插入测试数据
INSERT INTO `stock` (`product_id`, `amount`) VALUES (1, 100);
INSERT INTO `stock` (`product_id`, `amount`) VALUES (2, 200);

DTM数据库:

CREATE DATABASE IF NOT EXISTS dtm;

DTM会自动创建所需的表,无需手动创建。

注意: 创建的数据库要和dtm配置文件中指定的数据库要一致

在这里插入图片描述

4.3 库存服务实现

首先,创建库存服务的Proto定义文件stock/stock.proto:

syntax = "proto3";package stock;
option go_package="./stock";// 库存服务
service Stock {// 扣减库存rpc DeductStock(DeductStockReq) returns (DeductStockResp);// 补偿库存rpc CompensateStock(CompensateStockReq) returns (CompensateStockResp);
}// 扣减库存请求
message DeductStockReq {int64 productId = 1;  // 产品IDint32 amount = 2;     // 数量
}// 扣减库存响应
message DeductStockResp {bool success = 1;     // 是否成功string message = 2;   // 消息
}// 补偿库存请求
message CompensateStockReq {int64 productId = 1;  // 产品IDint32 amount = 2;     // 数量
}// 补偿库存响应
message CompensateStockResp {bool success = 1;     // 是否成功string message = 2;   // 消息
}

切换到 rpc/stock/目录下,使用goctl生成代码:

cd stock
goctl rpc protoc stock.proto --go_out=. --go-grpc_out=. --zrpc_out=.

修改库存服务的配置文件stock/etc/stock.yaml:

Name: stock.rpc
ListenOn: 0.0.0.0:9002
Etcd:Hosts:- 127.0.0.1:2379Key: stock.rpcDB:DataSource: root:123456@tcp(localhost:3306)/dtm_stock?charset=utf8mb4&parseTime=true&loc=Asia%2FShanghaiDTM: etcd://localhost:2379/dtmservice

修改配置结构stock/internal/config/config.go:

package configimport ("github.com/zeromicro/go-zero/zrpc"
)type Config struct {zrpc.RpcServerConf//增加数据库字段DB struct {DataSource string}DTM string // DTM服务地址
}

修改服务上下文stock/internal/svc/service_context.go:

package svcimport ("github.com/Mikaelemmmm/gozerodtm/stock/internal/config""github.com/Mikaelemmmm/gozerodtm/stock/internal/model""github.com/zeromicro/go-zero/core/stores/sqlx"
)type ServiceContext struct {Config     config.ConfigStockModel model.StockModel  // 库存模型
}func NewServiceContext(c config.Config) *ServiceContext {return &ServiceContext{Config:     c,StockModel: model.NewStockModel(sqlx.NewMysql(c.DB.DataSource)),  // 初始化库存模型}
}

切换到 stock/model 目录下,使用goctl工具生成model代码:

goctl model mysql datasource -url="root:123456@tcp(127.0.0.1:33069)/dtm_stock" -table="stock" -dir .

生成代码后,到stock/model/stockmodel.go,增加DeductStockAddStock方法:

package modelimport ("database/sql""fmt""time""github.com/zeromicro/go-zero/core/stores/sqlx"
)var _ StockModel = (*customStockModel)(nil)/*
....
*/type (// StockModel 是stock表的模型StockModel interface {// DeductStock 扣减库存DeductStock(productId int64, amount int32) error//AddStock 增加库存(用于补偿)AddStock(productId int64, amount int32) error}// DeductStock 扣减库存
func (m *customStockModel) DeductStock(productId int64, amount int32) error {query := fmt.Sprintf("update %s set amount = amount - ? where product_id = ? and amount >= ?", m.table)result, err := m.conn.Exec(query, amount, productId, amount)if err != nil {return err}affected, err := result.RowsAffected()if err != nil {return err}if affected == 0 {return fmt.Errorf("库存不足")}return nil
}// AddStock 增加库存(用于补偿)
func (m *customStockModel) AddStock(productId int64, amount int32) error {query := fmt.Sprintf("update %s set amount = amount + ? where product_id = ?", m.table)_, err := m.conn.Exec(query, amount, productId)return err
}

实现扣减库存逻辑stock/internal/logic/deduct_stock_logic.go:

package logicimport ("context""github.com/dtm-labs/dtm/client/dtmcli""google.golang.org/grpc/codes""google.golang.org/grpc/status""dtm_demo/rpc/stock/internal/svc""dtm_demo/rpc/stock/stock""github.com/zeromicro/go-zero/core/logx"
)
/*
....
*/
// 扣减库存
func (l *DeductStockLogic) DeductStock(in *stock.DeductStockReq) (*stock.DeductStockResp, error) {// todo: add your logic here and delete this lineerr := l.svcCtx.StockModel.DeductStock(in.ProductId, in.Amount)if err != nil {logx.Errorf("扣减库存失败: %v", err)return nil, status.Error(codes.Aborted, dtmcli.ResultFailure) // 直接返回错误}return &stock.DeductStockResp{Success: true,Message: "扣减库存成功",}, nil
}

需要注意的是, 如果事务执行失败,返回的是RPC的状态码 codes.Aborted 和 事务结果dtmcli.ResultFailure ,如果直接返回错误的,DTM是不会进行事务补偿的。

return nil, status.Error(codes.Aborted, dtmcli.ResultFailure) 
  • Aborted:表示操作被中止,通常是由于并发问题,如序列检查失败、事务中止等。

  • ResultFailuredtmimp.ResultFailure)表示事务或事务分支执行失败。当事务或事务分支在执行过程中遇到错误,例如数据库插入失败、服务调用超时等,会返回失败结果。这个结果会触发全局事务管理器进行相应的处理,如回滚事务。

后面会单独介绍下gRPC状态码和dtmcli的事务状态。

实现补偿库存逻辑stock/internal/logic/compensate_stock_logic.go:

package logicimport ("context""fmt""dtm_demo/rpc/stock/internal/svc""dtm_demo/rpc/stock/stock""github.com/zeromicro/go-zero/core/logx"
)/*
....
*/// 补偿库存
func (l *CompensateStockLogic) CompensateStock(in *stock.CompensateStockReq) (*stock.CompensateStockResp, error) {// todo: add your logic here and delete this linefmt.Println("收到补偿库存请求:", in.ProductId, in.Amount)err := l.svcCtx.StockModel.AddStock(in.ProductId, in.Amount)if err != nil {logx.Errorf("补偿库存失败: %v", err)return &stock.CompensateStockResp{Success: false,Message: err.Error(),}, nil}return &stock.CompensateStockResp{Success: true,Message: "补偿库存成功",}, nil
}

4.4 SAGA调用示例

创建test_saga.go,演示如何使用SAGA模式调用库存服务:

package mainimport ("dtm_demo/rpc/stock/stock""log""github.com/dtm-labs/client/dtmgrpc""github.com/zeromicro/go-zero/core/discov""github.com/zeromicro/go-zero/zrpc"
)func main() {// 初始化客户端stockRpcConf := zrpc.RpcClientConf{Etcd: discov.EtcdConf{Hosts: []string{"localhost:2379"},Key:   "stock.rpc",},}client := zrpc.MustNewClient(stockRpcConf)client.Conn()// DTM服务地址dtmServer := "etcd://localhost:2379/dtmservice"// 创建请求stockReq := &stock.DeductStockReq{ProductId: 1,Amount:    10,  //设置库存}// 生成全局事务IDgid := dtmgrpc.MustGenGid(dtmServer)// 创建SAGA事务saga := dtmgrpc.NewSagaGrpc(dtmServer, gid)//stockRpcBusiServer 实际上就是自动获取库存服务地址 etcd://localhost:2379/stock.rpcstockRpcBusiServer, err := stockRpcConf.BuildTarget()if err != nil {log.Fatal(err)}saga.Add(stockRpcBusiServer+"/stock.Stock/DeductStock", stockRpcBusiServer+"/stock.Stock/CompensateStock", stockReq)// 提交事务err = saga.Submit()if err != nil {log.Fatalf("SAGA事务提交失败: %v", err)}
}

需要注意的是 /stock.Stock/DeductStock/stock.Stock/CompensateStock 这个路径是区分大小写的,一定要和goctl工具生成的一致

启动库存服务:

go run stock.go 

运行测试程序:

go run test_saga.go

如果一切正常,库存将被成功扣减。如果服务出现异常(例如库存不足),DTM会自动执行补偿操作,增加库存。

当我把 Amount 设置为 100 ,库存服务提示库存不足,并自动进行补偿

在这里插入图片描述

5. 使用TCC模式

接下来,我们使用更复杂的TCC模式实现订单服务。

5.1 TCC模式介绍

TCC(Try-Confirm-Cancel)是一种更灵活的分布式事务模式,它将一个事务分为三个阶段:

  1. Try:资源检查和预留,但不真正执行业务操作
  2. Confirm:确认执行业务操作,只有当所有Try都成功时执行
  3. Cancel:取消操作,当任一Try失败时执行

TCC模式比SAGA模式更为灵活,但也更复杂,需要业务自行实现三个阶段的处理逻辑。

5.2 数据库设计

orderstock,仍然使用SAGA演示中的数据库表,在dtm_stock数据库中添加库存冻结表stock_tcc.sql:


USE dtm_stock;
CREATE TABLE stock_tcc (id bigint(20) NOT NULL AUTO_INCREMENT,product_id bigint(20) NOT NULL COMMENT '产品ID',freeze_amount int(11) NOT NULL COMMENT '冻结数量',transaction_id varchar(64) NOT NULL COMMENT '事务ID',create_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,update_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,PRIMARY KEY (`id`),UNIQUE KEY uniq_trans_product (`transaction_id`, `product_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

5.3 Tcc库存服务实现

因为我们现在要实现TCC模式,需要使用到 try , confirm ,cancel 三个方法,所以需要重新定义库存服务:

syntax = "proto3";package pb;
option go_package="./pb";// 库存查询请求
message ProductStockReq {int64 product_id = 1;
}// 库存查询响应
message ProductStockResp {int64 product_id = 1;int32 amount = 2;
}// 扣减库存请求
message DeductStockReq {int64 product_id = 1;int32 amount = 2;
}// 扣减库存响应
message DeductStockResp {bool success = 1;
}// TCC事务相关
message TccReq {string transaction_id = 1;int64 product_id = 2;int32 amount = 3;float money = 4;
}message TccResp {bool success = 1;
}// 库存服务
service Stock {// 查询商品库存rpc GetProductStock(ProductStockReq) returns (ProductStockResp);// 普通扣减库存(非TCC模式)//rpc DeductStock(DeductStockReq) returns (DeductStockResp);// TCC模式相关接口rpc TryDeductStock(TccReq) returns (TccResp);     // Try阶段rpc ConfirmDeductStock(TccReq) returns (TccResp); // Confirm阶段rpc CancelDeductStock(TccReq) returns (TccResp);  // Cancel阶段
}

切换到/stock下,使用goctl生成代码:

 goctl rpc protoc stock.proto --go_out=. --go-grpc_out=. --zrpc_out=.

切换到 stock_svr/internal/model 目录下,生成model代码:

goctl model mysql datasource -url="root:123456@tcp(127.0.0.1:33069)/dtm_stock" -table="stock" -dir .goctl model mysql datasource -url="root:123456@tcp(127.0.0.1:33069)/dtm_stock" -table="stock_tcc" -dir .

修改stock.yaml:

Name: stock.rpc
ListenOn: 0.0.0.0:8080
Etcd:Hosts:- 127.0.0.1:2379Key: stock.rpcDB:DataSource: root:123456@tcp(127.0.0.1:33069)/dtm_stock?charset=utf8mb4&parseTime=true&loc=Asia%2FShanghai

修改stock_svr/internal/config/config.go:

type Config struct {zrpc.RpcServerConfDB struct {DataSource string}
}

修改stock_svr/internal/svc/servicecontext.go:

type ServiceContext struct {Config        config.ConfigStockModel    model.StockModel // 库存模型StockTccModel model.StockTccModel
}func NewServiceContext(c config.Config) *ServiceContext {return &ServiceContext{Config:        c,StockModel:    model.NewStockModel(sqlx.NewMysql(c.DB.DataSource)),    // 初始化库存模型StockTccModel: model.NewStockTccModel(sqlx.NewMysql(c.DB.DataSource)), // 初始化库存模型}
}

修改stock_svr/internal/model/stocktccmodel.go 增加新方法:

type (StockTccModel interface {stockTccModelwithSession(session sqlx.Session) StockTccModel//增加DeleteByTransProductId 方法DeleteByTransProductId(ctx context.Context, transactionId string, productId int64) error}
/***/// DeleteByTransProductId 根据事务ID和商品ID删除记录
func (m *customStockTccModel) DeleteByTransProductId(ctx context.Context, transactionId string, productId int64) error {query := fmt.Sprintf("delete from %s where transaction_id = ? and product_id = ?", m.table)_, err := m.conn.ExecCtx(ctx, query, transactionId, productId)return err
}

修改stock_svr/internal/logic/trydeductstocklogic.go 实现Try接口:


// TCC模式相关接口
func (l *TryDeductStockLogic) TryDeductStock(in *pb.TccReq) (*pb.TccResp, error) {// todo: add your logic here and delete this linefmt.Println("尝试扣减库存")//查询商品库存stockInfo, err := l.svcCtx.StockModel.FindOneByProductId(l.ctx, in.ProductId)if err != nil && err != model.ErrNotFound {//return &pb.TccResp{Success: false}, fmt.Errorf("没有该商品")//return nil, status.Error(codes.Aborted, dtmcli.ResultFailure)//!一般数据库不会错误不需要dtm回滚,就让他一直重试//这时候就不要返回codes.Aborted, dtmcli.ResultFailure ,具体自己把控!!!return nil, status.Error(codes.Internal, err.Error())}// 检查库存是否足够if stockInfo.Amount < int64(in.Amount) {//return &pb.TccResp{Success: false}, fmt.Errorf("库存不足")//如果库存不足,直接返回失败,让dtm回滚//返回 codes.Aborted, dtmcli.ResultFailure 才能回滚return nil, status.Error(codes.Aborted, dtmcli.ResultFailure)}//如果钱小于100,直接返回失败,让dtm回滚if in.Money < 100 {return nil, status.Error(codes.Aborted, dtmcli.ResultFailure)}// 扣减库存stockInfo.Amount -= int64(in.Amount)err = l.svcCtx.StockModel.Update(l.ctx, stockInfo)if err != nil {//return &pb.TccResp{Success: false}, fmt.Errorf("库存扣除失败")//如果库存扣除失败,直接返回失败,不要让它一直重试,让dtm回滚 ,return nil, status.Error(codes.Aborted, dtmcli.ResultFailure)}// 记录冻结库存,用于回滚freeze := &model.StockTcc{ProductId:     in.ProductId,FreezeAmount:  int64(in.Amount),TransactionId: in.TransactionId,}_, err = l.svcCtx.StockTccModel.Insert(l.ctx, freeze)if err != nil {// 如果冻结记录失败,回滚库存stockInfo.Amount += int64(in.Amount)_ = l.svcCtx.StockModel.Update(l.ctx, stockInfo)//return &pb.TccResp{Success: false}, fmt.Errorf("记录冻结库存失败: %v", err)return nil, status.Error(codes.Aborted, dtmcli.ResultFailure)}return &pb.TccResp{Success: true}, nil
}

修改stock_svr/internal/logic/confirmdeductstocklogic.go,实现 confirm 接口:

// Confirm阶段:确认扣减库存
func (l *ConfirmDeductStockLogic) ConfirmDeductStock(in *pb.TccReq) (*pb.TccResp, error) {// todo: add your logic here and delete this linefmt.Println("Confirm阶段:确认扣减库存")err := l.svcCtx.StockTccModel.DeleteByTransProductId(l.ctx, in.TransactionId, in.ProductId)if err != nil {logx.Errorf("确认扣减库存失败: %v", err)// 这里返回失败,DTM会进行重试//return &pb.TccResp{Success: false}, errreturn nil, status.Error(codes.Aborted, dtmcli.ResultFailure)}return &pb.TccResp{Success: true}, nil
}

修改stock_svr/internal/logic/canceldeductstocklogic.go,实现 cancel方法:


func (l *CancelDeductStockLogic) CancelDeductStock(in *pb.TccReq) (*pb.TccResp, error) {// todo: add your logic here and delete this linefmt.Println("取消扣减库存")// 查找冻结记录freeze, err := l.svcCtx.StockTccModel.FindOneByTransactionIdProductId(l.ctx, in.TransactionId, in.ProductId)if err != nil {if err == model.ErrNotFound {// 如果没有找到冻结记录,可能是已经处理过了,直接返回成功return &pb.TccResp{Success: true}, nil}/*在 TCC(Try-Confirm-Cancel)事务模式里,Cancel 方法本身的作用就是进行事务回滚,所以一般不需要再在 Cancel 方法里额外执行事务回滚操作*/return &pb.TccResp{Success: false}, fmt.Errorf("查询冻结记录失败: %v", err)//return nil, status.Error(codes.Aborted, dtmcli.ResultFailure)}// 恢复库存stockInfo, err := l.svcCtx.StockModel.FindOneByProductId(l.ctx, in.ProductId)if err != nil {return &pb.TccResp{Success: false}, fmt.Errorf("查询商品库存失败: %v", err)}stockInfo.Amount += freeze.FreezeAmounterr = l.svcCtx.StockModel.Update(l.ctx, stockInfo)if err != nil {return &pb.TccResp{Success: false}, fmt.Errorf("恢复库存失败: %v", err)}// 删除冻结记录err = l.svcCtx.StockTccModel.Delete(l.ctx, freeze.Id)if err != nil {logx.Errorf("删除冻结记录失败: %v", err)// 虽然删除冻结记录失败,但库存已经恢复,下次重试时不会重复恢复库存}return &pb.TccResp{Success: true}, nil
}

注意错误返回方法:

  1. codes.Aborted 一般表示操作被中止,而 dtmcli.ResultFailure 告知 DTM 操作失败,需要回滚整个事务。当 Confirm 阶段出现问题,且希望 DTM 回滚整个事务时,适合用这个返回。

适用场景:
确认操作无法完成,比如数据库记录不存在、数据状态不一致等,这种情况下需要 DTM 触发 Cancel 阶段来回滚事务。

  1. status.Error(codes.Internal, err.Error())
    codes.Internal 表示服务器内部出错,一般意味着系统出现了临时故障,DTM 会持续重试该操作,直至成功或者达到最大重试次数。

适用场景:
操作失败是由临时问题引起的,像数据库连接暂时中断、网络抖动等,这种情况下希望 DTM 重试操作,直至成功。

5.4 订单服务Api实现

首先,创建订单服务的Proto定义文件order/order.api:

```syntax = "v1"type (CreateOrderReq {UserId    int64 `json:"userId"`ProductId int64 `json:"productId"`Amount    int   `json:"amount"`Money      float64 `json:"money"`}CreateOrderResp {OrderId int64 `json:"orderId"`}OrderInfo {Id         int64   `json:"id"`UserId     int64   `json:"userId"`ProductId  int64   `json:"productId"`Amount     int     `json:"amount"`Money      float64 `json:"money"`Status     int     `json:"status"`CreateTime string  `json:"createTime"`}GetOrderReq {OrderId int64 `json:"orderId"`}GetOrderResp {Order OrderInfo `json:"order"`}
)service Order {@handler CreateOrderpost /api/order/create (CreateOrderReq) returns (CreateOrderResp)@handler GetOrderpost /api/order/get (GetOrderReq) returns (GetOrderResp)
}

切换到api/order/下,使用goctl生成代码:

goctl api go -api .\order.api -dir .    

切换到api/order/internal/model下, 生成model代码:

goctl model mysql datasource -url="root:123456@tcp(127.0.0.1:33069)/dtm_order" -table="orders" -dir .

修改order.yaml:

Name: Order
Host: 0.0.0.0
Port: 8888StockRpcConf:Etcd:Hosts:- 127.0.0.1:2379Key: stock.rpcDB:DataSource:  root:123456@tcp(127.0.0.1:33069)/dtm_order?charset=utf8mb4&parseTime=true&loc=Asia%2FShanghaiDTM: etcd://127.0.0.1:2379/dtmservice

修改order/internal/config/config.go:


type Config struct {rest.RestConfStockRpcConf zrpc.RpcClientConfDTM stringDB  struct {DataSource string}
}

修改order/internal/svc/servicecontext.go:


type ServiceContext struct {Config config.ConfigOrderModel model.OrdersModelStockRpc   stock.Stock
}func NewServiceContext(c config.Config) *ServiceContext {return &ServiceContext{Config:     c,OrderModel: model.NewOrdersModel(sqlx.NewMysql(c.DB.DataSource)),StockRpc:   stock.NewStock(zrpc.MustNewClient(c.StockRpcConf)),}
}

修改order/internal/logic/createorderlogic.go :


func (l *CreateOrderLogic) CreateOrder(req *types.CreateOrderReq) (resp *types.CreateOrderResp, err error) {// todo: add your logic here and delete this line//使用tcc,让try去检查并扣减库存,confirm和cancel去处理库存的回滚/*// 查询商品库存stockResp, err := l.svcCtx.StockRpc.GetProductStock(l.ctx, &pb.ProductStockReq{ProductId: req.ProductId,})if err != nil && !errors.Is(err, model.ErrNotFound) {return nil, fmt.Errorf("查询商品库存失败: %v", err)}fmt.Println(stockResp.Amount)if stockResp.Amount < int32(req.Amount) {return nil, fmt.Errorf("库存不足")}*/order := &model.Orders{UserId:    req.UserId,ProductId: req.ProductId,Amount:    int64(req.Amount),Money:     req.Money,Status:    0, // 0-创建中}result, err := l.svcCtx.OrderModel.Insert(l.ctx, order)if err != nil {return nil, fmt.Errorf("创建订单失败: %v", err)}orderId, err := result.LastInsertId()if err != nil {return nil, fmt.Errorf("获取订单ID失败: %v", err)}// 4. 使用DTM的TCC模式进行分布式事务处理dtmServer := l.svcCtx.Config.DTM // DTM服务地址stockServer, err := l.svcCtx.Config.StockRpcConf.BuildTarget()if err != nil {}// 生成全局事务IDgid := dtmgrpc.MustGenGid(dtmServer)// 创建TCC事务err = dtmgrpc.TccGlobalTransaction(dtmServer, gid, func(tcc *dtmgrpc.TccGrpc) error {req := &pb.TccReq{ProductId: req.ProductId,Amount:    int32(req.Amount),Money:     float32(req.Money),}r := &emptypb.Empty{} // 用于接收响应err := tcc.CallBranch(req,stockServer+"/pb.Stock/TryDeductStock",stockServer+"/pb.Stock/ConfirmDeductStock",stockServer+"/pb.Stock/CancelDeductStock",r)return err})if err != nil {return nil, fmt.Errorf("创建订单失败: %v", err)}// 5. 更新订单状态为已创建orderUpdate, err := l.svcCtx.OrderModel.FindOne(l.ctx, orderId)if err != nil {logx.Errorf("查询订单失败: %v", err)// 这里不返回错误,因为已经扣减了库存,订单状态可以通过其他方式修复} else {orderUpdate.Status = 1 // 1-已完成err = l.svcCtx.OrderModel.Update(l.ctx, orderUpdate)if err != nil {logx.Errorf("更新订单状态失败: %v", err)// 同上,不返回错误}}return &types.CreateOrderResp{OrderId: orderId,}, nil
}

5.5 测试TCC

先启动 stock 服务

go run stock.go

再启动order api

go run order.go

创建请求:

curl -X POST -H "Content-Type: application/json" -d '{"userId": 1, "productId": 1, "amount": 1 , "money" :200.10}' http://localhost:8888/api/order/create

在这里插入图片描述

去stock服务那边查看日志,可以看到 先启动try ,然后使用confirm
在这里插入图片描述

解下模拟失败请求,钱设置为99.9:

curl -X POST -H "Content-Type: application/json" -d '{"userId": 1, "productId": 1, "amount": 999, "money" :99.9}' http://localhost:8888/api/order/create

事务启动cancel 进行回滚
在这里插入图片描述

6.DTM屏障 ,Barrier机制

6.1 Barrier机制介绍

DTM的Barrier机制主要解决以下问题:

  1. 空补偿问题:在Saga模式中,如果正向操作还未执行,补偿操作就执行了,这是空补偿。
  2. 空回滚问题:在TCC模式中,如果Try操作未执行,Cancel操作就执行了,这是空回滚。
  3. 悬挂问题:在分布式事务中,可能由于网络延迟等原因,先收到了Cancel/Confirm操作,后收到Try操作,此时Try操作应该被拒绝,这是悬挂。
  4. 幂等: 由于任何一个请求都可能出现网络异常,出现重复请求,所有的分布式事务分支操作,都需要保证幂等性

Barrier的主要原理是在数据库中添加一个barrier表,并使用数据库事务保证原子性,记录每个全局事务、分支事务、操作类型的执行情况,从而防止以上问题的发生。

用Dtm官方提供的dtmcli.barrier.mysql.sql sql语句来创建dtm_barrier

create database if not exists dtm_barrier
/*!40100 DEFAULT CHARACTER SET utf8mb4 */
;
drop table if exists dtm_barrier.barrier;
create table if not exists dtm_barrier.barrier(id bigint(22) PRIMARY KEY AUTO_INCREMENT,trans_type varchar(45) default '',gid varchar(128) default '',branch_id varchar(128) default '',op varchar(45) default '',barrier_id varchar(45) default '',reason varchar(45) default '' comment 'the branch type who insert this record',create_time datetime DEFAULT now(),update_time datetime DEFAULT now(),key(create_time),key(update_time),UNIQUE key(gid, branch_id, op, barrier_id)
) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4;

6.2 使用Barrier改进服务

我们以之前的sage的例子,做简单的修改。

需要注意的是,在rpc的业务中,如果使用了barrier的话,那么在model中与db交互时候必须要用事务,并且一定要跟barrier用同一个事务

,到stock/model/stockmodel.go,修改DeductStockAddStock方法:

package modelimport ("database/sql""fmt""time""github.com/zeromicro/go-zero/core/stores/sqlx"
)var _ StockModel = (*customStockModel)(nil)/*
....
*/type (// StockModel 是stock表的模型StockModel interface {// DeductStock 扣减库存DeductStock(tx *sql.Tx, productId int64, amount int32) (sql.Result, error)//AddStock 增加库存(用于补偿)AddStock(tx *sql.Tx, productId int64, amount int32)  (sql.Result, error)}// DeductStock 扣减库存
func (m *customStockModel) DeductStock(tx *sql.Tx, productId int64, amount int32) (sql.Result, error) {query := fmt.Sprintf("update %s set amount = amount - ? where product_id = ? and amount >= ?", m.table)return tx.Exec(query, amount, productId, amount)
}// AddStock 增加库存(用于补偿)
func (m *customStockModel) AddStock(tx *sql.Tx, productId int64, amount int32)  (sql.Result, error){query := fmt.Sprintf("update %s set amount = amount + ? where product_id = ?", m.table)return tx.Exec(query, amount, productId)}

实现扣减库存逻辑stock/internal/logic/deduct_stock_logic.go:

package logicimport ("context""github.com/dtm-labs/dtm/client/dtmcli""google.golang.org/grpc/codes""google.golang.org/grpc/status""dtm_demo/rpc/stock/internal/svc""dtm_demo/rpc/stock/stock""github.com/zeromicro/go-zero/core/logx"
)
/*
....
*/
// 扣减库存func (l *DeductStockLogic) DeductStock(in *stock.DeductStockReq) (*stock.DeductStockResp, error) {// todo: add your logic here and delete this line//barrier防止空补偿、空悬挂等具体看dtm官网即可,别忘记加barrier表在当前库中,因为判断补偿与要执行的sql一起本地事务barrier, err := dtmgrpc.BarrierFromGrpc(l.ctx)db, err := sqlx.NewMysql(l.svcCtx.Config.DB.DataSource).RawDB()if err != nil {//!!!一般数据库不会错误不需要dtm回滚,就让他一直重试,这时候就不要返回codes.Aborted, dtmcli.ResultFailure 就可以了,具体自己把控!!!return nil, status.Error(codes.Internal, err.Error())}if err := barrier.CallWithDB(db, func(tx *sql.Tx) error {_,err := l.svcCtx.StockModel.DeductStock(tx,in.ProductId, in.Amount)if err != nil {logx.Errorf("扣减库存失败: %v", err)return nil, status.Error(codes.Aborted, dtmcli.ResultFailure) // 直接返回错误}return nil}); err != nil {//!!!一般数据库不会错误不需要dtm回滚,就让他一直重试,这时候就不要返回codes.Aborted, dtmcli.ResultFailure 就可以了,具体自己把控!!!return nil, status.Error(codes.Internal, err.Error())}return &stock.DeductStockResp{Success: true,Message: "扣减库存成功",}, nil
}

修改补偿库存逻辑stock/internal/logic/compensate_stock_logic.go:

package logicimport ("context""fmt""dtm_demo/rpc/stock/internal/svc""dtm_demo/rpc/stock/stock""github.com/zeromicro/go-zero/core/logx"
)/*
....
*/// 补偿库存
func (l *CompensateStockLogic) CompensateStock(in *stock.CompensateStockReq) (*stock.CompensateStockResp, error) {// todo: add your logic here and delete this linefmt.Println("收到补偿库存请求:", in.ProductId, in.Amount)//barrier防止空补偿、空悬挂等具体看dtm官网即可,别忘记加barrier表在当前库中,因为判断补偿与要执行的sql一起本地事务barrier, err := dtmgrpc.BarrierFromGrpc(l.ctx)db, err := sqlx.NewMysql(l.svcCtx.Config.DB.DataSource).RawDB()if err != nil {//!!!一般数据库不会错误不需要dtm回滚,就让他一直重试,这时候就不要返回codes.Aborted, dtmcli.ResultFailure 就可以了,具体自己把控!!!return nil, status.Error(codes.Internal, err.Error())}if err := barrier.CallWithDB(db, func(tx *sql.Tx) error {_, err := l.svcCtx.StockModel.AddStock(tx, in.ProductId, in.Amount)if err != nil {logx.Errorf("补偿库存失败: %v", err)return nil, status.Error(codes.Aborted, dtmcli.ResultFailure) // 直接返回错误}return nil}); err != nil {//!!!一般数据库不会错误不需要dtm回滚,就让他一直重试,这时候就不要返回codes.Aborted, dtmcli.ResultFailure 就可以了,具体自己把控!!!return nil, status.Error(codes.Internal, err.Error())}return &stock.CompensateStockResp{Success: true,Message: "补偿库存成功",}, nil
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/81089.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

2025 简易Scrum指南(简体中文版)

Scrum是一个轻量级的、以团队为中心的框架&#xff0c;用于解决复杂的问题并创造价值。Scrum有意保持非完整性&#xff0c;Scrum的设计初衷旨在依靠使用者的集体智慧来不断演进构建。 Scrum建立在实验主义和精益思想的基础上&#xff0c;它赋能团队灵活巧妙地工作&#xff0c;…

2025最新福昕PDF编辑器,PDF万能处理工具

软件介绍 Foxit PDF Editor Pro 2025 中文特别版&#xff08;以前称为 Foxit PhantomPDF Business&#xff09;是一款专为满足各种办公需求而设计的业务就绪的PDF工具包。 软件特点 1. 强大的PDF编辑能力 创建新文档&#xff1a;用户可以从无到有地构建PDF文档&#xff0c;添…

ollama的若干实践

1. 本地ollama 1.1 本地安装ollama 方法 1&#xff1a;手动检查最新版本并下载 访问 Ollama 的 GitHub Releases 页面&#xff1a; 打开 https://github.com/ollama/ollama/releases 查看最新的稳定版本&#xff08;如 v0.7.0 或更高&#xff09; 手动下载最新版本&#xff08…

Spring Security源码解析

秒懂SpringBoot之全网最易懂的Spring Security教程 SpringBoot整合Spring-Security 认证篇&#xff08;保姆级教程&#xff09; SpringBoot整合Spring Security【超详细教程】 spring security 超详细使用教程&#xff08;接入springboot、前后端分离&#xff09; Security 自…

LeetCode 3392.统计符合条件长度为 3 的子数组数目:一次遍历模拟

【LetMeFly】3392.统计符合条件长度为 3 的子数组数目&#xff1a;一次遍历模拟 力扣题目链接&#xff1a;https://leetcode.cn/problems/count-subarrays-of-length-three-with-a-condition/ 给你一个整数数组 nums &#xff0c;请你返回长度为 3 的 子数组&#xff0c;满足…

读论文笔记-CoOp:对CLIP的handcrafted改进

读论文笔记-Learning to Prompt for Vision-Language Models Problems 现有基于prompt engineering的多模态模型在设计合适的prompt时有很大困难&#xff0c;从而设计了一种更简单的方法来制作prompt。 Motivations prompt engineering虽然促进了视觉表示的学习&#xff0c…

从零构建 MCP Server 与 Client:打造你的第一个 AI 工具集成应用

目录 &#x1f680; 从零构建 MCP Server 与 Client&#xff1a;打造你的第一个 AI 工具集成应用 &#x1f9f1; 1. 准备工作 &#x1f6e0;️ 2. 构建 MCP Server&#xff08;服务端&#xff09; 2.1 初始化服务器 &#x1f9e9; 3. 添加自定义工具&#xff08;Tools&…

Django 自定义celery-beat调度器,查询自定义表的Cron表达式进行任务调度

学习目标&#xff1a; 通过自定义的CronScheduler调度器在兼容标准的调度器的情况下&#xff0c;查询自定义任务表去生成调度任务并分配给celery worker进行执行 不了解Celery框架的小伙伴可以先看一下我的上一篇文章&#xff1a;Celery框架组件分析及使用 学习内容&#xff…

蓝桥杯 1. 确定字符串是否包含唯一字符

确定字符串是否包含唯一字符 原题目链接 题目描述 实现一个算法来识别一个字符串的字符是否是唯一的&#xff08;忽略字母大小写&#xff09;。 若唯一&#xff0c;则输出 YES&#xff0c;否则输出 NO。 输入描述 输入一行字符串&#xff0c;长度不超过 100。 输出描述 输…

a-upload组件实现文件的上传——.pdf,.ppt,.pptx,.doc,.docx,.xls,.xlsx,.txt

实现下面的上传/下载/删除功能&#xff1a;要求支持&#xff1a;【.pdf,.ppt,.pptx,.doc,.docx,.xls,.xlsx,.txt】 分析上面的效果图&#xff0c;分为【上传】按钮和【文件列表】功能&#xff1a; 解决步骤1&#xff1a;上传按钮 直接上代码&#xff1a; <a-uploadmultip…

.NET Core 数据库ORM框架用法简述

.NET Core ORM框架用法简述 一、主流.NET Core ORM框架概述 在.NET Core生态系统中&#xff0c;主流的ORM(Object-Relational Mapping)框架包括&#xff1a; ​​Entity Framework Core (EF Core)​​ - 微软官方推出的ORM框架​​Dapper​​ - 轻量级微ORM​​Npgsql.Entit…

halcon打开图形窗口

1、dev_open_window 参数如下&#xff1a; 1&#xff09;Row(输入参数) y方向上&#xff0c;图形窗口距离左上角顶端的像素个数 2&#xff09;Column(输入参数) x方向上&#xff0c;距离左上角左边的像素个数 3&#xff09;Width(输入参数) 图形窗口宽度 4&#xff09;He…

2025东三省D题深圳杯D题数学建模挑战赛数模思路代码文章教学

完整内容请看文章最下面的推广群 一、问题一&#xff1a;混合STR图谱中贡献者人数判定 问题解析 给定混合STR图谱&#xff0c;识别其中的真实贡献者人数是后续基因型分离与个体识别的前提。图谱中每个位点最多应出现2n个峰&#xff08;n为人数&#xff09;&#xff0c;但由…

iView Table 组件跨页选择功能实现文档

iView Table 组件跨页选择功能实现文档 功能概述 实现基于 iView Table 组件的多选功能&#xff0c;支持以下特性&#xff1a; ✅ 跨页数据持久化选择✅ 当前页全选/取消全选✅ 自动同步选中状态显示✅ 分页切换状态保持✅ 高性能大数据量支持 实现方案 技术栈 iView UI 4…

家庭服务器IPV6搭建无限邮箱系统指南

qq邮箱操作 // 邮箱配置信息 // 注意&#xff1a;使用QQ邮箱需要先开启IMAP服务并获取授权码 // 设置方法&#xff1a;登录QQ邮箱 -> 设置 -> 账户 -> 开启IMAP/SMTP服务 -> 生成授权码 服务器操作 fetchmail 同步QQ邮箱 nginx搭建web显示本地同步过来的邮箱 ssh…

Tauri v1 与 v2 配置对比

本文档对比 Tauri v1 和 v2 版本的配置结构和内容差异&#xff0c;帮助开发者了解版本变更并进行迁移。 配置结构变化 v1 配置结构 {"package": { ... },"tauri": { "allowlist": { ... },"bundle": { ... },"security":…

对js的Date二次封装,继承了原Date的所有方法,增加了自己扩展的方法,可以实现任意时间往前往后推算多少小时、多少天、多少周、多少月;

封装js时间工具 概述 该方法继承了 js 中 Date的所有方法&#xff1b;同时扩展了一部分自用方法&#xff1a; 1、任意时间 往前推多少小时&#xff0c;天&#xff0c;月&#xff0c;周&#xff1b;参数1、2必填&#xff0c;参数3可选beforeDate(num,formatter,dateVal); befo…

TimeDistill:通过跨架构蒸馏的MLP高效长期时间序列预测

原文地址&#xff1a;https://arxiv.org/abs/2502.15016 发表会议&#xff1a;暂定&#xff08;但是Star很高&#xff09; 代码地址&#xff1a;无 作者&#xff1a;Juntong Ni &#xff08;倪浚桐&#xff09;, Zewen Liu &#xff08;刘泽文&#xff09;, Shiyu Wang&…

DeepSeek最新大模型发布-DeepSeek-Prover-V2-671B

2025 年 4 月 30 日&#xff0c;DeepSeek 开源了新模型 DeepSeek-Prover-V2-671B&#xff0c;该模型聚焦数学定理证明任务&#xff0c;基于混合专家架构&#xff0c;使用 Lean 4 框架进行形式化推理训练&#xff0c;参数规模达 6710 亿&#xff0c;结合强化学习与大规模合成数据…

如何用AI生成假期旅行照?

以下是2025年最新AI生成假期旅行照片的实用工具推荐及使用指南&#xff0c;结合工具特点、研发背景和适用场景进行综合解析&#xff1a; 一、主流AI旅行照片生成工具推荐与对比 1. 搜狐简单AI&#xff08;国内工具&#xff09; • 特点&#xff1a; • 一键优化与背景替换&…