6. 基础设施层
基础设施层为数据库删除功能提供了核心的技术支撑,包括数据库连接、关系型数据库操作、缓存管理和搜索引擎等关键组件。这些组件通过契约层(Contract)和实现层(Implementation)的分离设计,确保了删除操作的可靠性、一致性和高性能。
6.1 数据库基础设施
数据库契约层
文件位置:backend/infra/contract/orm/database.go
package orm
import (
"gorm.io/gorm"
)
type DB = gorm.DB
设计作用:
- 为GORM数据库对象提供类型别名,统一数据库接口
- 作为契约层抽象,便于后续数据库实现的替换
- 为数据库相关的数据访问层提供统一的数据库连接接口
MySQL数据库实现
文件位置:backend/infra/impl/mysql/mysql.go
package mysql
import (
"fmt"
"os"
"gorm.io/driver/mysql"
"gorm.io/gorm"
)
func New() (*gorm.DB, error) {
dsn := os.Getenv("MYSQL_DSN")
db, err := gorm.Open(mysql.Open(dsn))
if err != nil {
return nil, fmt.Errorf("mysql open, dsn: %s, err: %w", dsn, err)
}
return db, nil
}
在数据库删除中的作用:
- 为
DraftDatabaseInfo
和OnlineDatabaseInfo
DAO提供数据库连接,支持数据库信息的删除操作 - 通过GORM ORM框架,执行安全的数据库信息表删除操作
- 支持事务处理,确保数据库删除过程的数据一致性和原子性
- 连接池管理,提高数据库并发删除的性能和稳定性
删除操作初始化流程:
main.go → application.Init() → appinfra.Init() → mysql.New() → DatabaseDAO注入 → 执行删除
6.2 关系型数据库操作基础设施
数据库操作契约层
文件位置:backend/infra/contract/rdb/rdb.go
package rdb
import (
"context"
)
type Service interface {
// 删除数据表
DropTable(ctx context.Context, req *DropTableRequest) (*DropTableResponse, error)
// 删除数据
DeleteData(ctx context.Context, req *DeleteDataRequest) (*DeleteDataResponse, error)
// 其他数据库操作方法...
}
MySQL实现层
文件位置:backend/infra/impl/rdb/mysql.go
// DeleteData delete data
func (m *mysqlService) DeleteData(ctx context.Context, req *rdb.DeleteDataRequest) (*rdb.DeleteDataResponse, error) {
if req == nil {
return nil, fmt.Errorf("invalid request")
}
whereClause, whereValues, err := m.buildWhereClause(req.Where)
if err != nil {
return nil, fmt.Errorf("failed to build where clause: %v", err)
}
limitClause := ""
if req.Limit != nil {
limitClause = fmt.Sprintf(" LIMIT %d", *req.Limit)
}
deleteSQL := fmt.Sprintf("DELETE FROM `%s`%s%s",
req.TableName,
whereClause,
limitClause,
)
logs.CtxInfof(ctx, "[DeleteData] execute sql is %s, value is %v, req is %v", deleteSQL, whereValues, req)
result := m.db.WithContext(ctx).Exec(deleteSQL, whereValues...)
if result.Error != nil {
return nil, fmt.Errorf("failed to delete data: %v", result.Error)
}
affectedRows := result.RowsAffected
return &rdb.DeleteDataResponse{AffectedRows: affectedRows}, nil
}
在数据库删除中的作用:
- 物理表删除:执行实际的数据库物理表删除操作,清理数据库资源
- 数据清理:提供删除数据库记录的基础操作
- 事务支持:支持在事务中执行复杂的删除操作
- SQL构建:安全构建删除SQL语句,防止SQL注入
- 日志记录:记录删除操作的SQL语句和参数,便于审计和问题排查
删除流程:
DatabaseService.DeleteDatabase → 执行事务 → 删除元数据 → DropTable → 提交事务
6.3 缓存系统基础设施
缓存契约层
文件位置:backend/infra/contract/cache/cache.go
package cache
type Cmdable interface {
Pipeline() Pipeliner
StringCmdable
HashCmdable
GenericCmdable
ListCmdable
}
type StringCmdable interface {
Set(ctx context.Context, key string, value interface{}, expiration time.Duration) StatusCmd
Get(ctx context.Context, key string) StringCmd
IncrBy(ctx context.Context, key string, value int64) IntCmd
}
Redis缓存实现
文件位置:backend/infra/impl/cache/redis/redis.go
func New() cache.Cmdable {
addr := os.Getenv("REDIS_ADDR")
password := os.Getenv("REDIS_PASSWORD")
return NewWithAddrAndPassword(addr, password)
}
func NewWithAddrAndPassword(addr, password string) cache.Cmdable {
rdb := redis.NewClient(&redis.Options{
Addr: addr,
Password: password,
PoolSize: 100,
MinIdleConns: 10,
MaxIdleConns: 30,
ConnMaxIdleTime: 5 * time.Minute,
DialTimeout: 5 * time.Second,
ReadTimeout: 3 * time.Second,
WriteTimeout: 3 * time.Second,
})
return &redisImpl{client: rdb}
}
在数据库删除中的作用:
- 权限验证缓存:缓存用户权限信息,快速验证删除数据库权限
- 数据库信息缓存:缓存待删除数据库的基本信息,减少数据库查询
- 分布式锁:防止并发删除同一数据库,确保删除操作的原子性
- 删除状态缓存:临时存储删除操作的状态,支持删除进度查询
- 事件去重:缓存已处理的删除事件ID,避免重复处理
删除操作缓存使用场景:
1. 权限缓存:user_perm:{user_id}:{space_id}:{database_id}
2. 数据库缓存:database_info:{database_id}
3. 删除锁:lock:database_delete:{database_id}
4. 删除状态:delete_status:{database_id}:{operation_id}
5. 事件去重:event_processed:{event_id}
6.4 ElasticSearch搜索基础设施
ElasticSearch契约层
文件位置:backend/infra/contract/es/es.go
package es
type Client interface {
Create(ctx context.Context, index, id string, document any) error
Update(ctx context.Context, index, id string, document any) error
Delete(ctx context.Context, index, id string) error
Search(ctx context.Context, index string, req *Request) (*Response, error)
Exists(ctx context.Context, index string) (bool, error)
CreateIndex(ctx context.Context, index string, properties map[string]any) error
}
type BulkIndexer interface {
Add(ctx context.Context, item BulkIndexerItem) error
Close(ctx context.Context) error
}
ElasticSearch实现层
文件位置:backend/infra/impl/es/es_impl.go
func New() (es.Client, error) {
version := os.Getenv("ES_VERSION")
switch version {
case "7":
return newES7Client()
case "8":
return newES8Client()
default:
return newES8Client() // 默认使用ES8
}
}
在数据库删除中的作用:
- 索引删除:将删除的数据库从ES的
coze_resource
索引中移除 - 搜索结果更新:确保删除的数据库不再出现在搜索结果中
- 关联数据清理:清理与删除数据库相关的搜索索引和元数据
- 实时同步:数据库删除后实时从搜索引擎中移除
删除操作的索引处理:
{
"operation": "delete",
"res_id": 123456789,
"res_type": 1,
"delete_time": 1703123456789,
"operator_id": 987654321,
"space_id": 111222333
}
删除索引执行流程:
1. 用户删除工作流 → API Gateway → WorkflowService.DeleteDraftWorkflow()
2. 执行数据库删除 → 发布删除事件 → ES删除处理器
3. 构建删除请求 → esClient.Delete(ctx, "coze_resource", workflowID)
4. 索引清理 → 验证删除结果 → 记录删除日志
6.5 基础设施层架构优势
依赖倒置原则
- 契约层抽象:业务层依赖接口而非具体实现
- 实现层解耦:可以灵活替换数据库、缓存、搜索引擎的具体实现
- 测试友好:通过Mock接口进行单元测试
配置驱动
- 环境变量配置:通过环境变量控制各组件的连接参数
- 版本兼容:支持ES7/ES8版本切换,数据库驱动切换
- 性能调优:连接池、超时时间等参数可配置
高可用设计
- 连接池管理:数据库和Redis连接池,提高并发性能
- 错误处理:完善的错误处理和重试机制
- 监控支持:提供性能指标和健康检查接口
扩展性支持
- 水平扩展:分布式ID生成支持多实例部署
- 存储扩展:支持分库分表、读写分离
- 搜索扩展:支持ES集群部署和索引分片
这种基础设施层的设计为工作流删除功能提供了稳定、高效、可扩展的技术底座,确保了删除操作在高并发场景下的安全性、一致性和可靠性。
7. 数据存储层
7.1 数据库表结构
bot_table_info 表设计
文件位置:helm/charts/opencoze/files/mysql/schema.sql
真实DDL结构:
CREATE TABLE IF NOT EXISTS `bot_table_info` (
`id` bigint unsigned NOT NULL AUTO_INCREMENT COMMENT '主键ID',
`table_name` varchar(255) NOT NULL COMMENT '表名',
`table_desc` varchar(500) DEFAULT NULL COMMENT '表描述',
`creator_id` bigint NOT NULL COMMENT '创建者ID',
`space_id` bigint NOT NULL COMMENT '工作空间ID',
`actual_table_name` varchar(255) NOT NULL COMMENT '实际物理表名',
`fields` longtext NOT NULL COMMENT '表字段定义JSON',
`status` tinyint NOT NULL DEFAULT 1 COMMENT '状态:0草稿 1在线 2删除',
`version` int NOT NULL DEFAULT 0 COMMENT '版本号',
`draft_id` bigint NOT NULL COMMENT '对应草稿ID',
`created_at` bigint unsigned NOT NULL DEFAULT 0 COMMENT '创建时间戳',
`updated_at` bigint unsigned NOT NULL DEFAULT 0 COMMENT '更新时间戳',
PRIMARY KEY (`id`),
UNIQUE KEY `idx_table_name_space` (`table_name`,`space_id`),
KEY `idx_creator_id` (`creator_id`),
KEY `idx_space_id` (`space_id`),
KEY `idx_draft_id` (`draft_id`)
) ENGINE=InnoDB CHARSET utf8mb4 COLLATE utf8mb4_0900_ai_ci COMMENT='数据库元数据表';
表结构特点:
- 元数据存储:存储数据库的元信息,不存储实际数据
- 空间隔离:通过
space_id
实现多租户数据隔离 - JSON存储:
fields
字段使用JSON格式存储表结构定义 - 关联关系:通过
draft_id
关联草稿表和在线表 - 索引优化:在关键查询字段上建立索引,特别是表名+空间ID的唯一索引
- 版本控制:使用
version
字段支持乐观锁机制
bot_table_info字段详解:
id
:自增主键,数据库唯一标识table_name
:数据库表名,用户可见的名称table_desc
:数据库描述信息creator_id
:创建者用户ID,用于权限验证space_id
:所属工作空间ID,用于空间隔离actual_table_name
:实际物理表名,系统内部使用fields
:表字段定义JSON,存储表结构信息status
:状态标识version
:版本号,用于乐观锁draft_id
:对应草稿表的ID,关联草稿信息created_at
/updated_at
:毫秒级时间戳,记录创建和更新时间
bot_table_info_draft 表设计
真实DDL结构:
CREATE TABLE IF NOT EXISTS `bot_table_info_draft` (
`id` bigint unsigned NOT NULL AUTO_INCREMENT COMMENT '主键ID',
`table_name` varchar(255) NOT NULL COMMENT '表名',
`table_desc` varchar(500) DEFAULT NULL COMMENT '表描述',
`creator_id` bigint NOT NULL COMMENT '创建者ID',
`space_id` bigint NOT NULL COMMENT '工作空间ID',
`actual_table_name` varchar(255) NOT NULL COMMENT '实际物理表名',
`fields` longtext NOT NULL COMMENT '表字段定义JSON',
`status` tinyint NOT NULL DEFAULT 0 COMMENT '状态:0草稿 1在线 2删除',
`version` int NOT NULL DEFAULT 0 COMMENT '版本号',
`created_at` bigint unsigned NOT NULL DEFAULT 0 COMMENT '创建时间戳',
`updated_at` bigint unsigned NOT NULL DEFAULT 0 COMMENT '更新时间戳',
PRIMARY KEY (`id`),
KEY `idx_creator_id` (`creator_id`),
KEY `idx_space_id` (`space_id`)
) ENGINE=InnoDB CHARSET utf8mb4 COLLATE utf8mb4_0900_ai_ci COMMENT='数据库草稿表';
表结构特点:
- 草稿存储:存储数据库的草稿版本信息
- 空间隔离:通过
space_id
实现多租户数据隔离 - 与在线表结构一致:保持与在线表相似的结构设计
- 索引设计:针对常用查询场景优化索引结构
bot_table_info_draft字段详解:
id
:草稿表主键ID,自增table_name
:数据库表名table_desc
:数据库描述信息creator_id
:创建者用户ID,用于权限验证space_id
:所属工作空间ID,用于空间隔离actual_table_name
:实际物理表名fields
:表字段定义JSON,存储表结构信息status
:状态标识,默认草稿状态version
:版本号,用于乐观锁created_at
/updated_at
:毫秒级时间戳,记录创建和更新时间
7.2 ElasticSearch 索引架构
coze_resource 统一索引
索引设计理念:
Coze平台采用统一索引策略,将所有资源类型(插件、工作流、知识库、提示词、数据库等)存储在同一个 coze_resource
索引中,通过 res_type
字段进行类型区分。
数据库在索引中的映射:
{
"mappings": {
"properties": {
"res_id": {
"type": "long",
"description": "资源ID,对应bot_table_info.id"
},
"res_type": {
"type": "integer",
"description": "资源类型,数据库为7"
},
"name": {
"type": "text",
"analyzer": "standard",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
},
"description": "数据库表名,支持全文搜索和精确匹配"
},
"owner_id": {
"type": "long",
"description": "所有者ID"
},
"space_id": {
"type": "long",
"description": "工作空间ID"
},
"status": {
"type": "integer",
"description": "数据库状态"
},
"create_time": {
"type": "long",
"description": "创建时间戳(毫秒)"
},
"update_time": {
"type": "long",
"description": "更新时间戳(毫秒)"
}
}
}
}
资源类型常量定义:
const (
ResTypePlugin = 1 // 插件
ResTypeWorkflow = 2 // 工作流
ResTypeKnowledge = 4 // 知识库
ResTypePrompt = 6 // 提示词
ResTypeDatabase = 7 // 数据库
)
7.3 数据同步机制
事件驱动的删除同步架构
删除同步流程:
- 删除操作触发:数据库删除操作触发删除领域事件
- 事件发布:通过事件总线发布
ResourceDomainEvent
删除事件 - 事件处理:
resourceHandlerImpl
监听并处理删除事件 - 索引清理:将删除操作同步到ElasticSearch,移除相关索引
删除同步核心代码:
// 资源删除事件处理器
type resourceHandlerImpl struct {
esClient es.Client
logger logs.Logger
}
// 处理数据库删除领域事件
func (r *resourceHandlerImpl) HandleDatabaseDeleteEvent(ctx context.Context, event *entity.ResourceDomainEvent) error {
if event.OpType != entity.Deleted {
return fmt.Errorf("invalid operation type for delete handler: %v", event.OpType)
}
// 记录删除操作日志
r.logger.InfoCtx(ctx, "Processing database delete event",
"database_id", event.ResID,
"space_id", event.SpaceID,
"operator_id", event.OperatorID)
return r.deleteFromIndex(ctx, event.ResID)
}
// 从索引中删除数据库
func (r *resourceHandlerImpl) deleteFromIndex(ctx context.Context, databaseID int64) error {
indexName := "coze_resource"
docID := conv.Int64ToStr(databaseID)
// 执行索引删除
err := r.esClient.Delete(ctx, indexName, docID)
if err != nil {
r.logger.ErrorCtx(ctx, "Failed to delete database from index",
"database_id", databaseID, "error", err)
return fmt.Errorf("delete database from ES index failed: %w", err)
}
// 验证删除结果
exists, checkErr := r.esClient.Exists(ctx, indexName, docID)
if checkErr != nil {
r.logger.WarnCtx(ctx, "Failed to verify deletion",
"database_id", databaseID, "error", checkErr)
} else if exists {
r.logger.ErrorCtx(ctx, "Database still exists in index after deletion",
"database_id", databaseID)
return fmt.Errorf("database deletion verification failed")
}
r.logger.InfoCtx(ctx, "Successfully deleted database from index",
"database_id", databaseID)
// 检查物理表资源状态
if err := r.checkPhysicalTableResource(ctx); err != nil {
return fmt.Errorf("physical table resource check failed: %w", err)
}
}
7.4 数据库删除操作存储层设计原则
数据库删除数据一致性保证
- 删除一致性:采用事件驱动模式,保证MySQL删除和ElasticSearch索引清理的最终一致性
- 删除幂等性:数据库删除操作支持重试,避免重复删除导致的异常
- 删除事务边界:数据库删除操作在同一事务中完成在线库和草稿库的删除,保证原子性
- 删除验证:数据库删除完成后验证数据确实被移除,确保删除操作的完整性
- 物理表删除:除了删除元数据,还需要删除实际的物理数据表,确保资源完全释放
数据库删除性能优化策略
- 删除索引优化:基于数据库主键ID的删除操作,具有最佳性能
- 异步删除处理:数据库索引删除事件处理采用异步模式,不阻塞删除主流程
- 删除缓存清理:及时清理数据库相关缓存,避免删除后的脏数据
- 事务隔离级别:使用适当的事务隔离级别,平衡一致性和性能
数据库删除操作扩展性考虑
- 分片删除:支持按
space_id
进行分片删除,提高大规模数据库删除的效率 - 删除队列:使用消息队列处理数据库删除事件,支持高并发删除场景
- 删除监控:独立的数据库删除操作监控,及时发现删除异常
- 物理资源管理:统一管理数据库物理表资源,支持不同存储引擎的扩展
数据库删除安全保障
- 权限验证:严格的数据库删除权限验证,确保只有授权用户可以删除
- 删除审计:完整的数据库删除操作审计日志,支持删除行为追踪
- 删除确认:重要数据库删除前的二次确认机制
- 删除恢复:通过备份支持数据库数据恢复
- 引用检查:删除前检查数据库是否被其他资源引用
7.5 数据库删除操作监控和运维
数据库删除操作监控
// 数据库删除操作监控指标
type DatabaseDeleteMetrics struct {
DatabaseDeleteSuccessCount int64 // 数据库删除成功次数
DatabaseDeleteFailureCount int64 // 数据库删除失败次数
DatabaseDeleteLatency time.Duration // 数据库删除操作延迟
LastDatabaseDeleteTime time.Time // 最后数据库删除时间
DatabaseIndexCleanupCount int64 // 数据库索引清理次数
DatabaseDeleteEventCount int64 // 数据库删除事件处理次数
DatabaseDeleteQueueSize int64 // 数据库删除队列大小
PhysicalTableDropCount int64 // 物理表删除次数
PhysicalTableDropLatency time.Duration // 物理表删除延迟
}
// 数据库删除监控指标收集
func (r *resourceHandlerImpl) collectDatabaseDeleteMetrics(ctx context.Context, startTime time.Time, databaseID int64, err error) {
latency := time.Since(startTime)
if err != nil {
metrics.DatabaseDeleteFailureCount++
log.ErrorCtx(ctx, "database delete failed",
"database_id", databaseID, "error", err, "latency", latency)
} else {
metrics.DatabaseDeleteSuccessCount++
metrics.DatabaseDeleteLatency = latency
metrics.LastDatabaseDeleteTime = time.Now()
log.InfoCtx(ctx, "database delete succeeded",
"database_id", databaseID, "latency", latency)
}
}
// 数据库删除操作健康检查
func (r *resourceHandlerImpl) databaseDeleteHealthCheck(ctx context.Context) error {
// 检查数据库连接
if err := r.db.Ping(); err != nil {
return fmt.Errorf("database connection failed: %w", err)
}
// 检查ES连接
if _, err := r.esClient.Ping(ctx); err != nil {
return fmt.Errorf("elasticsearch connection failed: %w", err)
}
// 检查数据库删除队列状态
if queueSize := r.getDatabaseDeleteQueueSize(); queueSize > 1000 {
return fmt.Errorf("database delete queue size too large: %d", queueSize)
}
数据库删除数据质量保证
- 删除一致性检查:定期验证MySQL和ElasticSearch中数据库删除数据的一致性
- 删除完整性验证:确保数据库删除操作完全清理了相关数据、索引和物理表
- 删除异常恢复:提供数据库删除失败的重试和修复机制
- 删除性能监控:监控数据库删除操作性能,特别是物理表删除的资源消耗
- 删除审计追踪:完整记录数据库删除操作的执行过程和结果
- 物理资源验证:确认物理表已成功删除,避免资源泄漏