1. Connector 能做什么
OpenSearch SQL Connector 是一个 Sink(写入端):
- 支持 Batch Sink
- 支持 Streaming Sink
- 支持 Streaming 的 Append & Upsert Mode
关键规则很简单:
- DDL 定义了
PRIMARY KEY:Sink 走Upsert,能承接带 UPDATE/DELETE 的 changelog 流 - DDL 没定义主键:只能Append(仅 INSERT),无法正确消费 UPDATE/DELETE
2. 版本与依赖:Flink 2.2 的注意点
文档明确写了:Flink 2.2 目前还没有(yet)可用的 connector 发行物,并且 OpenSearch connector 不在 Flink 二进制发行包里。
工程上意味着两件事:
- 你跑集群任务时,必须把 connector 依赖打进 uber-jar 或放进 Flink 的
lib/目录(让集群全局可见) - 如果你使用的是 Flink 2.2,要提前确认你实际环境里能拿到对应版本的 connector jar(很多团队会固定到一个已发布可用的 Flink/connector 组合)
3. 5 分钟上手:创建 OpenSearch Sink 表
最小可用 DDL(写入到静态 index):
CREATETABLEmyUserTable(user_id STRING,user_name STRING,uvBIGINT,pvBIGINT,PRIMARYKEY(user_id)NOTENFORCED)WITH('connector'='opensearch','hosts'='http://localhost:9200','index'='users');然后你就可以把任意上游表的结果写进去:
INSERTINTOmyUserTableSELECTuser_id,user_name,uv,pvFROMsome_agg_table;4. Append vs Upsert:一切从主键开始
这类“写搜索引擎”的 connector,最容易踩的坑就是:上游 SQL 产生了更新流,但下游表没主键,导致写入语义对不上。
怎么判断上游是不是会产生 UPDATE/DELETE?
- 聚合(尤其是持续聚合)、去重、TopN、Join 等都很容易产出 changelog
- 只要你的结果不是纯 INSERT-only,就强烈建议为 sink 表定义主键,让 connector 进入 Upsert 模式
5. 文档 ID 生成规则:主键会变成_id
OpenSearch connector 会用主键来计算 document id:
- 会把所有主键字段按 DDL 顺序拼接成一个字符串
- 拼接分隔符由
document-id.key-delimiter控制,默认_ - document id 有限制:最大 512 bytes、不能有空白字符
- 一些类型不适合做主键(例如 BYTES、ROW、ARRAY、MAP 等),因为字符串表示不稳定或不直观
示例:复合主键 + 自定义分隔符
CREATETABLEuserMetrics(tenant_id STRING,user_id STRING,uvBIGINT,pvBIGINT,PRIMARYKEY(tenant_id,user_id)NOTENFORCED)WITH('connector'='opensearch','hosts'='http://localhost:9200','index'='users','document-id.key-delimiter'='$');最终 id 会长这样:tenantA$user123
实战建议:
- 主键字段尽量短、稳定、无空白、无易变格式
- 如果主键很长(尤其拼接后可能超过 512 bytes),要在业务侧设计更短的 key(例如 hash、短码、数值 id)
6. Dynamic Index:按字段/按日期路由写入
index支持静态与动态两种写法:
- 静态:
'users' - 动态:
'index-{field_name}' - 动态日期格式:
'users-{log_ts|yyyy-MM-dd}' - 甚至可以用
now():'users-{now()|yyyy-MM-dd}'
6.1 基于字段时间分索引(推荐)
典型日志场景:
CREATETABLEods_logs_opensearch(log_id STRING,log_tsTIMESTAMP(3),levelSTRING,msg STRING,PRIMARYKEY(log_id)NOTENFORCED)WITH('connector'='opensearch','hosts'='http://localhost:9200','index'='logs-{log_ts|yyyy-MM-dd}');这样同一天的数据会落在同一天的索引里,便于冷热分层、生命周期管理。
6.2 基于 now() 的动态索引有硬限制
文档强调:如果动态 index 由系统时间now()生成,对于 changelog 流无法保证“同一个主键每次更新落在同一个 index”,因此这种写法只适合 append-only 流。
一句话:你只要需要 Upsert,就别用now()来决定 index。
7. 交付语义:NONE / AT_LEAST_ONCE / EXACTLY_ONCE
OpenSearch connector 提供sink.delivery-guarantee:
NONE:尽力而为AT_LEAST_ONCE:至少一次(可能重复)EXACTLY_ONCE:故障恢复下也保证恰好一次
同时还有一个强相关开关:sink.flush-on-checkpoint(默认 true)
- 开着:checkpoint 时会等待 pending 的请求被 OpenSearch ack,才能推进检查点
- 关掉:checkpoint 不等 ack,也就谈不上强的至少一次保证(文档明确说会失去强保证)
实战建议(偏工程视角):
- 你想要 AT_LEAST_ONCE 或 EXACTLY_ONCE:
sink.flush-on-checkpoint不要关 - EXACTLY_ONCE 往往意味着 checkpoint、端到端幂等/事务语义、以及更严格的资源与延迟成本,要结合业务可接受的重复/延迟做取舍
8. Bulk 写入与性能调参:3 个 flush 开关 + 重试策略
OpenSearch 写入核心靠 bulk:
sink.bulk-flush.max-actions:每个 bulk 最大 action 数(默认 1000,设为 0 可禁用)sink.bulk-flush.max-size:每个 bulk 最大内存大小(默认 2mb,设为 0 可禁用)sink.bulk-flush.interval:定时 flush(默认 1s,设为 0 可禁用)
常见配置套路:
- 低延迟:interval 小一点、max-actions 小一点
- 高吞吐:max-actions/max-size 提高,interval 适当放大(但要关注 OOM 与下游写入压力)
失败重试(应对临时性错误):
sink.bulk-flush.backoff.strategy:DISABLED/CONSTANT/EXPONENTIALsink.bulk-flush.backoff.max-retriessink.bulk-flush.backoff.delay
提醒:重试会拉长 flush 时间,也可能拉长 checkpoint 等待时间,拥塞期要重点观察 checkpoint duration。
9. 连接与安全:HTTPS、认证、前缀与超时
OpenSearch 常见生产配置点:
hosts:支持多个 host,用;分隔username+password:OpenSearch 默认带安全组件,启用认证时就需要它allow-insecure:允许 HTTPS 但不校验证书(只建议测试环境)connection.path-prefix:反向代理或网关场景常用(例如统一挂在/v1下)超时族参数:
connection.request-timeoutconnection.timeoutsocket.timeout
经验建议:
- 网关/代理链路较长时,把 socket timeout 配得略宽松,避免大 bulk 被误判超时
- allow-insecure 不要带进生产(证书校验关闭属于“埋雷型”配置)
10. format:默认 json,但要保证产物是“合法 JSON 文档”
OpenSearch 把文档当 JSON 存储,connector 的format默认就是内置json。你的上游输出必须能被序列化成有效 JSON 文档,否则要么写失败要么被 failure-handler 处理掉。
11. 上线前自检清单
- 你的 sink 表是否声明了主键?是否需要 Upsert?
- 你是否使用了动态 index?如果用了
now(),是否确保是 append-only? - 是否需要 EXACTLY_ONCE?是否开启了 checkpoint?是否保持
sink.flush-on-checkpoint=true? - bulk flush 三件套(actions/size/interval)是否匹配你的“吞吐 vs 延迟”目标?
- 拥塞期怎么处理?是否启用 backoff?最大重试次数是否会拖垮 checkpoint?
- OpenSearch 是否启用了安全认证?用户名密码/HTTPS/证书策略是否正确?
- connector jar 是否随任务一起发布到集群(uber-jar 或 Flink lib)?