Seata TC端协调全局事务

 1、Seata server注册器

//来自RM分支事务注册
super.registerProcessor(MessageType.TYPE_BRANCH_REGISTER, onRequestProcessor, messageExecutor);
//开启全局事务
super.registerProcessor(MessageType.TYPE_GLOBAL_BEGIN, onRequestProcessor, messageExecutor);
//提交全局事务
super.registerProcessor(MessageType.TYPE_GLOBAL_COMMIT, onRequestProcessor, messageExecutor);
//回滚全局事务
super.registerProcessor(MessageType.TYPE_GLOBAL_ROLLBACK, onRequestProcessor, messageExecutor);
//RM分支事务提交
super.registerProcessor(MessageType.TYPE_BRANCH_COMMIT_RESULT, onResponseProcessor, messageExecutor);
//RM分支事务回滚
super.registerProcessor(MessageType.TYPE_BRANCH_ROLLBACK_RESULT, onResponseProcessor, messageExecutor);

2、DefaultCoordinator协调者

TC开启全局事务
io.seata.server.coordinator.DefaultCoordinator#doGlobalBegin
@Override
public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)throws TransactionException {GlobalSession session = GlobalSession.createGlobalSession(applicationId, transactionServiceGroup, name,timeout);MDC.put(RootContext.MDC_KEY_XID, session.getXid());session.addSessionLifecycleListener(SessionHolder.getRootSessionManager());session.begin();// transaction start eventeventBus.post(new GlobalTransactionEvent(session.getTransactionId(), GlobalTransactionEvent.ROLE_TC,session.getTransactionName(), applicationId, transactionServiceGroup, session.getBeginTime(), null, session.getStatus()));return session.getXid();}
TC提交全局事务
io.seata.server.coordinator.DefaultCore#doGlobalCommit   @Overridepublic boolean doGlobalCommit(GlobalSession globalSession, boolean retrying) throws TransactionException {boolean success = true;// start committing eventeventBus.post(new GlobalTransactionEvent(globalSession.getTransactionId(), GlobalTransactionEvent.ROLE_TC,globalSession.getTransactionName(), globalSession.getApplicationId(), globalSession.getTransactionServiceGroup(),globalSession.getBeginTime(), null, globalSession.getStatus()));if (globalSession.isSaga()) {success = getCore(BranchType.SAGA).doGlobalCommit(globalSession, retrying);} else {Boolean result = SessionHelper.forEach(globalSession.getSortedBranches(), branchSession -> {// if not retrying, skip the canBeCommittedAsync branchesif (!retrying && branchSession.canBeCommittedAsync()) {return CONTINUE;}BranchStatus currentStatus = branchSession.getStatus();if (currentStatus == BranchStatus.PhaseOne_Failed) {globalSession.removeBranch(branchSession);return CONTINUE;}try {BranchStatus branchStatus = getCore(branchSession.getBranchType()).branchCommit(globalSession, branchSession);switch (branchStatus) {case PhaseTwo_Committed:globalSession.removeBranch(branchSession);return CONTINUE;case PhaseTwo_CommitFailed_Unretryable:if (globalSession.canBeCommittedAsync()) {LOGGER.error("Committing branch transaction[{}], status: PhaseTwo_CommitFailed_Unretryable, please check the business log.", branchSession.getBranchId());return CONTINUE;} else {SessionHelper.endCommitFailed(globalSession);LOGGER.error("Committing global transaction[{}] finally failed, caused by branch transaction[{}] commit failed.", globalSession.getXid(), branchSession.getBranchId());return false;}default:if (!retrying) {globalSession.queueToRetryCommit();return false;}if (globalSession.canBeCommittedAsync()) {LOGGER.error("Committing branch transaction[{}], status:{} and will retry later",branchSession.getBranchId(), branchStatus);return CONTINUE;} else {LOGGER.error("Committing global transaction[{}] failed, caused by branch transaction[{}] commit failed, will retry later.", globalSession.getXid(), branchSession.getBranchId());return false;}}} catch (Exception ex) {StackTraceLogger.error(LOGGER, ex, "Committing branch transaction exception: {}",new String[] {branchSession.toString()});if (!retrying) {globalSession.queueToRetryCommit();throw new TransactionException(ex);}}return CONTINUE;});// Return if the result is not nullif (result != null) {return result;}//If has branch and not all remaining branches can be committed asynchronously,//do print log and return falseif (globalSession.hasBranch() && !globalSession.canBeCommittedAsync()) {LOGGER.info("Committing global transaction is NOT done, xid = {}.", globalSession.getXid());return false;}}// if it succeeds and there is no branch, retrying=true is the asynchronous state when retrying. EndCommitted is executed to improve concurrency performance, and the global transaction ends..if (success && globalSession.getBranchSessions().isEmpty() && retrying) {SessionHelper.endCommitted(globalSession);// committed eventeventBus.post(new GlobalTransactionEvent(globalSession.getTransactionId(), GlobalTransactionEvent.ROLE_TC,globalSession.getTransactionName(), globalSession.getApplicationId(), globalSession.getTransactionServiceGroup(),globalSession.getBeginTime(), System.currentTimeMillis(), globalSession.getStatus()));LOGGER.info("Committing global transaction is successfully done, xid = {}.", globalSession.getXid());}return success;}@Overridepublic GlobalStatus commit(String xid) throws TransactionException {GlobalSession globalSession = SessionHolder.findGlobalSession(xid);if (globalSession == null) {return GlobalStatus.Finished;}globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());// just lock changeStatusboolean shouldCommit = SessionHolder.lockAndExecute(globalSession, () -> {// Highlight: Firstly, close the session, then no more branch can be registered.globalSession.closeAndClean();if (globalSession.getStatus() == GlobalStatus.Begin) {if (globalSession.canBeCommittedAsync()) {globalSession.asyncCommit();return false;} else {globalSession.changeStatus(GlobalStatus.Committing);return true;}}return false;});if (shouldCommit) {boolean success = doGlobalCommit(globalSession, false);//If successful and all remaining branches can be committed asynchronously, do async commit.if (success && globalSession.hasBranch() && globalSession.canBeCommittedAsync()) {globalSession.asyncCommit();return GlobalStatus.Committed;} else {return globalSession.getStatus();}} else {return globalSession.getStatus() == GlobalStatus.AsyncCommitting ? GlobalStatus.Committed : globalSession.getStatus();}}
TC回滚全局事务
io.seata.server.coordinator.DefaultCoordinator#doGlobalRollback@Overridepublic boolean doGlobalRollback(GlobalSession globalSession, boolean retrying) throws TransactionException {boolean success = true;// start rollback eventeventBus.post(new GlobalTransactionEvent(globalSession.getTransactionId(),GlobalTransactionEvent.ROLE_TC, globalSession.getTransactionName(),globalSession.getApplicationId(),globalSession.getTransactionServiceGroup(), globalSession.getBeginTime(),null, globalSession.getStatus()));if (globalSession.isSaga()) {success = getCore(BranchType.SAGA).doGlobalRollback(globalSession, retrying);} else {Boolean result = SessionHelper.forEach(globalSession.getReverseSortedBranches(), branchSession -> {BranchStatus currentBranchStatus = branchSession.getStatus();if (currentBranchStatus == BranchStatus.PhaseOne_Failed) {globalSession.removeBranch(branchSession);return CONTINUE;}try {BranchStatus branchStatus = branchRollback(globalSession, branchSession);switch (branchStatus) {case PhaseTwo_Rollbacked:globalSession.removeBranch(branchSession);LOGGER.info("Rollback branch transaction successfully, xid = {} branchId = {}", globalSession.getXid(), branchSession.getBranchId());return CONTINUE;case PhaseTwo_RollbackFailed_Unretryable:SessionHelper.endRollbackFailed(globalSession);LOGGER.info("Rollback branch transaction fail and stop retry, xid = {} branchId = {}", globalSession.getXid(), branchSession.getBranchId());return false;default:LOGGER.info("Rollback branch transaction fail and will retry, xid = {} branchId = {}", globalSession.getXid(), branchSession.getBranchId());if (!retrying) {globalSession.queueToRetryRollback();}return false;}} catch (Exception ex) {StackTraceLogger.error(LOGGER, ex,"Rollback branch transaction exception, xid = {} branchId = {} exception = {}",new String[] {globalSession.getXid(), String.valueOf(branchSession.getBranchId()), ex.getMessage()});if (!retrying) {globalSession.queueToRetryRollback();}throw new TransactionException(ex);}});// Return if the result is not nullif (result != null) {return result;}// In db mode, there is a problem of inconsistent data in multiple copies, resulting in new branch// transaction registration when rolling back.// 1. New branch transaction and rollback branch transaction have no data association// 2. New branch transaction has data association with rollback branch transaction// The second query can solve the first problem, and if it is the second problem, it may cause a rollback// failure due to data changes.GlobalSession globalSessionTwice = SessionHolder.findGlobalSession(globalSession.getXid());if (globalSessionTwice != null && globalSessionTwice.hasBranch()) {LOGGER.info("Rollbacking global transaction is NOT done, xid = {}.", globalSession.getXid());return false;}}if (success) {SessionHelper.endRollbacked(globalSession);// rollbacked eventeventBus.post(new GlobalTransactionEvent(globalSession.getTransactionId(),GlobalTransactionEvent.ROLE_TC, globalSession.getTransactionName(),globalSession.getApplicationId(),globalSession.getTransactionServiceGroup(),globalSession.getBeginTime(), System.currentTimeMillis(),globalSession.getStatus()));LOGGER.info("Rollback global transaction successfully, xid = {}.", globalSession.getXid());}return success;}@Overridepublic GlobalStatus rollback(String xid) throws TransactionException {GlobalSession globalSession = SessionHolder.findGlobalSession(xid);if (globalSession == null) {return GlobalStatus.Finished;}globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());// just lock changeStatusboolean shouldRollBack = SessionHolder.lockAndExecute(globalSession, () -> {globalSession.close(); // Highlight: Firstly, close the session, then no more branch can be registered.if (globalSession.getStatus() == GlobalStatus.Begin) {globalSession.changeStatus(GlobalStatus.Rollbacking);return true;}return false;});if (!shouldRollBack) {return globalSession.getStatus();}doGlobalRollback(globalSession, false);return globalSession.getStatus();}

RM注册分支事务到TC

io.seata.server.coordinator.DefaultCoordinator#doBranchRegister    
@Overrideprotected void doBranchRegister(BranchRegisterRequest request, BranchRegisterResponse response,RpcContext rpcContext) throws TransactionException {MDC.put(RootContext.MDC_KEY_XID, request.getXid());response.setBranchId(core.branchRegister(request.getBranchType(), request.getResourceId(), rpcContext.getClientId(),request.getXid(), request.getApplicationData(), request.getLockKey()));}

RM提交分支事务到TC

RM回滚分支事务到TC

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/620758.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

QEMU源码全解析 —— PCI设备模拟(9)

接前一篇文章: 上一回介绍了虚拟机如何通过北桥的MMIO来读写PCI设备的配置空间。PCI设备的配置空间中有MMIO的地址,也就是BAR信息,里面存放有BAR的基址,虚拟机可以通过读写这些BAR来与设备通信。然而,QEMU在设备初始化…

crackmapexec工具详解

下载地址:https://github.com/Porchetta-Industries/CrackMapExec wiki:https://www.crackmapexec.wiki/ 1.安装(MAC) 1.1.python3.9 pipx 安装(运行软件有警告,推荐 python3.11 pipx 安装) …

记录下载安装rabbitmq(Linux) 并整合springboot--详细版(全)

下载rabbitmq(Linux): erlang压缩包: https://share.weiyun.com/TGhfV8eZ rabbitMq-server压缩包: https://share.weiyun.com/ZXbUwWHD (因为RabbitMQ采用 Erlang 实现的工业级的消息队列(MQ)服务器&#…

五、带登录窗体的demo

做了一个简单的带登录窗体的demo,有用户名和密码不能为空的验证,原理是在main.cpp的主函数入口处: 1、将默认的MainWindow主窗体注释。 2、新建一个formlogin登录窗体,在主函数中先运行登录窗体。 3、在登录窗体中引用MainWind…

Redis学习指南(7)-Redis的字符串类型介绍

字符串类型特点 Redis的字符串类型是一种简单而灵活的数据结构,其特点如下: 任意数据类型: 字符串类型可以存储任意数据,包括文本、二进制数据等。直接访问和修改: 可以直接通过键名访问和修改整个字符串,而不需要像其他数据结构…

C++ 并发编程 | 管理线程

一、管理线程 1、启动线程 线程是通过构造std::thread对象来开始的&#xff0c;该对象指定了线程上要运行的任务&#xff0c;std::thread可以与任何可调用类型一起工作&#xff0c;例如&#xff1a; #include <iostream> #include <thread> using namespace std;v…

LeetCode第152题 - 乘积最大子数组

题目 解答 public class Solution {public int maxProduct(int[] nums) {if (nums null || nums.length 0) {return 0;}if (nums.length 1) {return nums[0];}int max nums[0];int min nums[0];int res nums[0];for (int i 1; i < nums.length; i) {int tmp max;m…

【算法题】51. N 皇后

题目 按照国际象棋的规则&#xff0c;皇后可以攻击与之处在同一行或同一列或同一斜线上的棋子。 n 皇后问题 研究的是如何将 n 个皇后放置在 nn 的棋盘上&#xff0c;并且使皇后彼此之间不能相互攻击。 给你一个整数 n &#xff0c;返回所有不同的 n 皇后问题 的解决方案。 …

ZZULIOJ 1126: 布尔矩阵的奇偶性

题目描述 一个布尔方阵具有奇偶均势特性&#xff0c;当且仅当 每行、每列总和为偶数&#xff0c;即包含偶数个1。如下面这个4*4的矩阵就具有奇偶均势特性&#xff1a; 1 0 1 0 0 0 0 0 1 1 1 1 0 1 0 1 编写程序&#xff0c;读入一个n阶方阵并检查它是否具有奇偶均势特性。如果…

杨中科 .NETCORE EFCORE 第一部分 基本使用

一 、什么是EF Core 什么是ORM 1、说明: 本课程需要你有数据库、SOL等基础知识。 2、ORM: ObjectRelational Mapping。让开发者用对象操作的形式操作关系数据库 比如插入: User user new User(Name"admin"Password"123”; orm.Save(user);比如查询: Book b…

mac上部署单体hbase

1. 简介 HBase 是一个开源的、分布式的、版本化的典型非关系型数据库。它是 Google BigTable 的开源实现&#xff0c;并且是 Apache 基金会的 Hadoop 项目的一部分1。HBase 在 Hadoop Distributed File System (HDFS) 上运行&#xff0c;作为一个列式存储非关系数据库管理系统…

yolov8在进行目标追踪时,model.track()中persist参数的含义

《博主简介》 小伙伴们好&#xff0c;我是阿旭。专注于人工智能、AIGC、python、计算机视觉相关分享研究。 ✌更多学习资源&#xff0c;可关注公-仲-hao:【阿旭算法与机器学习】&#xff0c;共同学习交流~ &#x1f44d;感谢小伙伴们点赞、关注&#xff01; 《------往期经典推…

Unity 面试篇|(五)热更新与Lua语言篇 【全面总结 | 持续更新】

目录 1.什么是热更新&#xff1f;2.主流的代码热更方案有哪些&#xff1f;3.AssetBundle介绍4.AssetBundle的具体开发流程5.AssetBundle的压缩格式6.AssetBundle对象的加载方式7.ssetBundle资源卸载8.资源如何打包&#xff1f;依赖项列表如何生成&#xff1f;9.如何解析版本文件…

华为OD机试2024年最新题库(Python)

我是一名软件开发培训机构老师&#xff0c;我的学生已经有上百人通过了华为OD机试&#xff0c;学生们每次考完试&#xff0c;会把题目拿出来一起交流分享。 重要&#xff1a;2024年1月-5月&#xff0c;考的都是OD统一考试&#xff08;C卷&#xff09;&#xff0c;题库已经整理…

Flutter之运行错误:this and base files have different roots

运行时报错&#xff1a; this and base files have different roots: E:\Demolpro\waqu\build\flutter-plugin-_android_lifecycle and C:\Users\78535\AppData\Local\Pub\Cache\hosted\pub.dev\flutter_pulgin_android_lifecycle-2.0.17\android 如图&#xff1a; 这种情况…

【JavaSE】P33~P113 方法,重载,数组,对象,构造器,this关键字,数组和对象的内存图,JavaBean

练习 1 方法方法定义及调用JShell简单使用&#xff08;要求Java9以上&#xff09;方法语句流程控制及注意事项选择循环 方法的重载 2 数组三种初始化方式数组在内存中的存储内存图 3 对象对象内存图Getter/Setter快捷写法this 关键字构造方法JavaBean的四个标准对象数组 二、编…

微信小程序支付之V2支付

一、引入maven <dependency><groupId>com.github.binarywang</groupId><artifactId>weixin-java-pay</artifactId><version>3.3.4.B</version><exclusions><exclusion><artifactId>qrcode-utils</artifactId&…

生成视频 图像拖拽生成视频

目录 motionctrl DragNUWA 预训练模型&#xff1a; motionctrl 该工具的多功能性使其成为视频制作领域的一项重要创新。它不仅能与现有的视频生成工具如LVDM和VideoCrafter1无缝集成&#xff0c;还能与SVD等其他视频生成方法兼容&#xff0c;为视频创造提供更多可能性。此外…

带F/V变换的反馈型相位控制电路D211,内置过载限制功能、电压监测功能

D211是一块相位控制集成电路&#xff0c;该电路内部具有F-V转换接口、控制放大器、过载限制、软启动、自动重触发、电压监视、电压电流同步等功能。主要应用于电动工具中马达转速的控制。 主要特点&#xff1a; 内置F-V转换接口 外控制集成放大器 内置过载限制功能 …

【数据结构】八大排序之计数排序算法

&#x1f984;个人主页:修修修也 &#x1f38f;所属专栏:数据结构 ⚙️操作环境:Visual Studio 2022 目录 一.计数排序简介及思想 二.计数排序代码实现 三.计数排序复杂度分析 &#x1f4cc;时间复杂度 &#x1f4cc;空间复杂度 结语 一.计数排序简介及思想 计数排序(Cou…