ClickHouse UPDATE 机制详解 - 若

news/2025/9/24 11:54:06/文章来源:https://www.cnblogs.com/zhanchenjin/p/19108881

ClickHouse UPDATE 机制详解

问题现象

在使用ClickHouse进行UPDATE操作时,经常会遇到这样的现象:

UPDATE ethereum.block_tasks SETstatus = 'pending', owner = 'consumer-1_1758676754070328000', assigned_at = '2025-09-24 09:19:14.07', updated_at = '2025-09-24 09:19:14.07'
WHERE start_block = 12345;

执行结果:

  • RowsAffected = 0
  • 但通过SELECT查询却能查到更新后的数据 ✅

这种看似矛盾的现象让很多开发者困惑,实际上是ClickHouse UPDATE机制的正常行为。

ClickHouse UPDATE机制原理

1. 异步Mutations机制

ClickHouse的UPDATE操作不是传统的就地更新,而是通过mutations机制异步处理:

传统数据库UPDATE:
[数据] → [直接修改] → [立即生效]ClickHouse UPDATE:
[数据] → [创建mutation] → [后台异步处理] → [最终生效]

2. 执行流程

UPDATE table SET column = 'value' WHERE condition;

执行步骤:

  1. 提交mutation:ClickHouse立即返回,但实际更新在后台进行
  2. 异步处理:后台进程处理mutation,重写相关数据块
  3. 最终一致性:查询时总是返回最新数据

3. 为什么RowsAffected = 0

  • RowsAffected = 0 表示mutation已成功提交到队列
  • 不表示实际影响的行数
  • 实际更新在后台异步进行
  • 这是ClickHouse的设计特性,不是错误

监控Mutation状态

1. 查看Mutation队列

-- 查看所有mutations
SELECT mutation_id,table,command,create_time,is_done,latest_failed_part,latest_fail_reason
FROM system.mutations 
WHERE table = 'block_tasks' 
ORDER BY create_time DESC 
LIMIT 10;

2. 监控执行进度

-- 查看未完成的mutations
SELECT count() as pending_mutations
FROM system.mutations 
WHERE table = 'block_tasks' 
AND is_done = 0;-- 查看最近的mutation详情
SELECT mutation_id,create_time,is_done,elapsed_time
FROM system.mutations 
WHERE table = 'block_tasks' 
ORDER BY create_time DESC 
LIMIT 1;

3. 检查Mutation性能

-- 查看mutation性能统计
SELECT table,count() as total_mutations,sum(is_done) as completed_mutations,avg(elapsed_time) as avg_elapsed_time
FROM system.mutations 
WHERE table = 'block_tasks'
GROUP BY table;

等待时间估算

1. 影响因子

因素 影响程度 说明
数据量 数据越多,处理时间越长
数据块数量 每个块需要单独处理
系统负载 CPU、内存、磁盘I/O
UPDATE复杂度 子查询、批量更新
并发度 其他mutation的竞争

2. 典型等待时间

小表(<10万行):     5-30秒
中等表(10万-1000万行):  1-10分钟
大表(>1000万行):   10-60分钟

3. 实际测试数据

-- 测试不同规模的UPDATE时间
-- 100万行数据,简单UPDATE:约2-5分钟
-- 1000万行数据,批量UPDATE:约10-30分钟
-- 复杂子查询UPDATE:时间增加2-5倍

代码实现方案

1. Go语言等待实现

package mainimport ("fmt""time""gorm.io/gorm"
)// 等待mutation完成的通用函数
func waitForMutation(db *gorm.DB, tableName string, timeout time.Duration) error {start := time.Now()ticker := time.NewTicker(1 * time.Second)defer ticker.Stop()for {select {case <-ticker.C:var count int64err := db.Raw("SELECT count() FROM system.mutations WHERE table = ? AND is_done = 0", tableName).Scan(&count).Errorif err != nil {return err}if count == 0 {fmt.Printf("Mutation completed in %v\n", time.Since(start))return nil}// 检查超时if time.Since(start) > timeout {return fmt.Errorf("mutation timeout after %v", timeout)}case <-time.After(timeout):return fmt.Errorf("mutation timeout after %v", timeout)}}
}// 使用示例
func updateBlockTask(db *gorm.DB, taskID int64) error {// 执行UPDATEerr := db.Exec("UPDATE block_tasks SET status = 'finished' WHERE id = ?", taskID).Errorif err != nil {return err}// 等待mutation完成return waitForMutation(db, "block_tasks", 5*time.Minute)
}

2. 带进度显示的等待

// 显示mutation进度的等待函数
func waitForMutationWithProgress(db *gorm.DB, tableName string, timeout time.Duration) error {start := time.Now()ticker := time.NewTicker(2 * time.Second)defer ticker.Stop()for {select {case <-ticker.C:var mutations []struct {MutationID string    `gorm:"column:mutation_id"`CreateTime time.Time `gorm:"column:create_time"`IsDone     bool      `gorm:"column:is_done"`}err := db.Raw(`SELECT mutation_id, create_time, is_done FROM system.mutations WHERE table = ? AND is_done = 0 ORDER BY create_time DESC LIMIT 5`, tableName).Scan(&mutations).Errorif err != nil {return err}if len(mutations) == 0 {fmt.Printf("All mutations completed in %v\n", time.Since(start))return nil}// 显示进度elapsed := time.Since(start)fmt.Printf("Mutation in progress for %v, %d pending...\n", elapsed, len(mutations))// 检查超时if elapsed > timeout {return fmt.Errorf("mutation timeout after %v", timeout)}case <-time.After(timeout):return fmt.Errorf("mutation timeout after %v", timeout)}}
}

3. 批量更新优化

// 批量更新,减少mutation数量
func batchUpdateTasks(db *gorm.DB, tasks []*model.BlockTask) error {return db.Transaction(func(tx *gorm.DB) error {for _, task := range tasks {err := tx.Table(model.BlockTaskTableName).Where("start_block = ? AND end_block = ?", task.StartBlock, task.EndBlock).Updates(map[string]interface{}{"status":     task.Status,"updated_at": time.Now(),}).Errorif err != nil {return err}}return nil})
}

最佳实践

1. 设置合理的超时时间

// 根据表大小动态设置超时时间
func getMutationTimeout(tableSize int64) time.Duration {switch {case tableSize < 100000:return 1 * time.Minutecase tableSize < 1000000:return 5 * time.Minutecase tableSize < 10000000:return 15 * time.Minutedefault:return 30 * time.Minute}
}

2. 异步处理策略

// 对于非关键更新,使用异步处理
func asyncUpdate(db *gorm.DB, taskID int64) {go func() {err := db.Exec("UPDATE block_tasks SET status = 'finished' WHERE id = ?", taskID).Errorif err != nil {log.Printf("Async update failed: %v", err)}}()
}

3. 错误处理和重试

// 带重试的更新函数
func updateWithRetry(db *gorm.DB, taskID int64, maxRetries int) error {for i := 0; i < maxRetries; i++ {err := db.Exec("UPDATE block_tasks SET status = 'finished' WHERE id = ?", taskID).Errorif err == nil {// 等待mutation完成err = waitForMutation(db, "block_tasks", 5*time.Minute)if err == nil {return nil}}if i < maxRetries-1 {time.Sleep(time.Duration(i+1) * time.Second) // 指数退避}}return fmt.Errorf("update failed after %d retries", maxRetries)
}

常见问题解决

1. Mutation卡住不动

-- 检查是否有失败的mutations
SELECT mutation_id,latest_failed_part,latest_fail_reason
FROM system.mutations 
WHERE table = 'block_tasks' 
AND is_done = 0 
AND latest_failed_part != '';

2. 性能优化

-- 优化mutation性能的设置
ALTER TABLE block_tasks MODIFY SETTING number_of_mutations_to_throw = 100,number_of_mutations_to_delay = 50;

3. 监控和告警

// 监控mutation积压
func monitorMutationBacklog(db *gorm.DB) {ticker := time.NewTicker(30 * time.Second)go func() {for range ticker.C {var count int64db.Raw("SELECT count() FROM system.mutations WHERE is_done = 0").Scan(&count)if count > 10 {log.Printf("Warning: %d mutations pending", count)}}}()
}

总结

ClickHouse的UPDATE机制具有以下特点:

  1. 异步处理:UPDATE立即返回,实际更新在后台进行
  2. 最终一致性:查询时总是返回最新数据
  3. RowsAffected不可靠:不能依赖此值判断更新是否成功
  4. 需要等待机制:通过监控system.mutations表等待完成
  5. 性能考虑:大表更新可能需要较长时间

关键要点:

  • 理解异步机制,不要被RowsAffected = 0误导
  • 实现等待机制,确保更新完成
  • 设置合理的超时时间
  • 监控mutation状态,及时发现问题
  • 考虑异步处理,提高系统响应性

这种机制虽然增加了复杂性,但提供了更好的并发性能和最终一致性保证。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/915669.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Jetpack Room 从入门到精通 - 实践

pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important; font-family: "Consolas", "Monaco", "Courier New", …

网站建设unohacha免费北京网站建设

导言 在 Rust 中&#xff0c;互斥器&#xff08;Mutex&#xff09;是一种用于在多个线程之间共享数据的并发原语。互斥器提供了一种安全的方式&#xff0c;允许多个线程访问共享数据&#xff0c;但每次只允许一个线程进行写操作。本篇博客将详细介绍 Rust 中互斥器的使用方法&…

不干胶网站做最好的wordpress关闭

应用场景&#xff1a;一个游戏可能会衍生出其他APP或小程序之类的软件&#xff0c;例如王者营地是王者荣耀的官方APP&#xff0c;王者营地提供资讯、赛事、社区、战绩等功能。所以游戏端会和衍生出来的软件端做一些数据互通。这里把软件端称为中台系统。 Get请求和Post请求的区…

自建个网站怎么做网站建设与管理是干什么的

Windows 下本地 Docker RAGFlow 部署指南 环境要求部署步骤1. 克隆代码仓库2. 配置 Docker 镜像加速(可选)3. 修改端口配置(可选)4. 启动服务5. 验证服务状态6. 访问服务7. 登录系统8. 配置模型8.1 使用 Ollama 本地模型8.2 使用在线 API 服务9. 开始使用10. 常见问题处理端…

ClickHouse index_granularity 详解 - 若

ClickHouse index_granularity 详解 什么是 index_granularity index_granularity 是ClickHouse中一个重要的性能配置参数,它定义了索引的粒度(granularity),即每多少个数据行会创建一个索引标记(index mark)。 …

PADS笔记

PADS笔记PCB设计流程准备--功能确定、元器件选型 元件库建立-元器件符号、器件封装 绘制原理图-根据电路功能,将元器件符号进行连接 导出网络表--将元器件的连接关系,以及元器件的信息导出一个文件,以方便导入到其他…

【2025最新教程】Claude Code国内使用_保姆级新手安装使用教程_最强AI编程工具

【2025最新教程】Claude Code国内使用_保姆级新手安装使用教程_最强AI编程工具什么是 Claude Code Claude Code 是 Anthropic 推出的一个 agentic 编码工具 (agentic coding tool),可以在命令行(terminal)中运行,或…

如何计算sequence粒度的负载均衡损失 - 教程

pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important; font-family: "Consolas", "Monaco", "Courier New", …

P13885 [蓝桥杯 2023 省 Java/Python A] 反异或 01 串

发现操作完后必定为一个回文串,并且至多消去区间一半数量的 \(1\),求最长回文串即可。

获取网站缩略图的asp代码wordpress的cms主题

防火墙在解决方案及典型项目中的应用 防火墙作为基础安全防护产品&#xff0c;在各种解决方案、业务场景中配套应用&#xff0c;本节给出各类方案资料链接方便查阅。 防火墙在华为网络解决方案中的应用 解决方案 文档 主要应用 CloudFabric云数据中心网解决方案 资料专区…

怎样的网站打开速度块北京房产网二手房出售

这一部分开始&#xff0c;我们将讲解Python中的组合数据类型&#xff0c;这里的知识十分基础而且重要&#xff0c;也已经与C语言的框架愈差愈远。 目录 序列和索引 1、概念 2、切片操作 3、序列的其他操作 列表 1、概念 2、创建与删除 3、列表的操作 4、列表生成式 …

网站开发 有哪些优化功能4p营销理论

随着电商行业的快速发展&#xff0c;个性化服务已经成为提升用户体验和增加用户粘性的关键。基于API的电商平台数据定制和推荐系统是实现这一目标的重要技术手段。 未来&#xff0c;个性化服务可能会朝以下几个方向发展&#xff1a; 更精准的用户画像&#xff1a;通过API接口…

clickhouse轻量级更新 - 若

轻量级更新(Lightweight Updates)是ClickHouse中的一个重要特性,让我详细解释一下: 什么是轻量级更新 轻量级更新是ClickHouse提供的一种高效的UPDATE机制,它允许在不重写整个数据块的情况下更新数据。 传统更新 …

西电PCB设计指南第3章学习笔记

西电PCB设计指南第3章学习笔记 三、PCB的设计与规范画图前的准备确定外轮廓(在机械层核对尺寸,安装孔位,定义PCB边界轮廓)设置layerstack(节点厚度和属性)话说我好像安装了专门算这个的软件?嘿嘿嘿:happy:那么为…

Vitrualbox、kali、metaspolitable2下载安装

太多资源看不过眼,整理了几个下载比较快、安装教程比较实用的链接。 这里下的是Virtualbox7.2.2和7.2.2版本的扩展包、kali2025.3和metasplotable2,这里直接用的最新的kali和最新稳定版本的Virtualbox版本。 注意下载…

有域名了如何建网站ftp服务器上传不了wordpress

题解&#xff1a; 我发现拉格朗日乘数法真是个好东西。。 我是不会说我数学竞赛求最值都是用这个东西的 由于我不太会打那个符号就用li代表通常偏导数中的lanmuda 。。。 这题里化简一下就可以得到 2 li * ki * ​(vi​−vi′​)* vi^2​1 然后一旦li确定 我们会发现这个三次函…

LazyLLM端到端实战:用RAG+Agent实现自动出题与学习计划的个性化学习助手智能体

1. 为什么做这个学习助手Agent? 最近,我在写一本关于Git和开源的技术书,这本书未来有个推广方向,就是面向高校作为教材使用。所以我需要在每一章结束在之后,设计若干道练习题,然后还需要为这本书编写配套的PPT以…

补充图

最小生成树(K算法和P算法)(1)K 算法(每次找最小边,判断是否存在环)TIPS:使用并查集实现合并、查询等操作。(常数级别),这里暂未使用。(2)P 算法

域名+邮件推送+事件总线=实现每天定时邮件!

需求:十二点之前我就要睡觉了,我希望给自己一个提醒,但是这个提醒不能是闹钟,因为我一旦在闹钟之前睡去,这闹钟反而要一直响个不停了。 根据这个需求,我盯上了自动邮件,发现目前市场上的自动邮件服务都是付费居…

llm入门环境

Jupyter Notebook安装 官网 https://jupyter.org/install 命令安装 $ pip install jupyterlab 启动 $ jupyter-lab Langchain安装 命令 $ pip install langchain$ conda inatall langchain -c conda-forge 也可以使用V…