Limt1000,10 查询多少个数据呢,10个嘛.其实不是的 ,数据库做的是比较笨的操作,是查询了1010个数据返回最后的10个,因此当分库分表后,协同查询的数据是倍数增长的,因此如何解决"翻页"问题呢.这边采用 Elasticsearch)+ HBase + Binlog 构建一个实时数据通道,实现数据捕获,存储,检索实现一个快速处理数据的一个方案.
1. 核心架构设计
graph LR
A[MySQL Binlog] --> B[消息队列]
B --> C{HBase}
B --> D{Elasticsearch}
C --> E[OLAP分析]
D --> F[实时检索]
- Binlog:捕获MySQL数据变更
- 消息队列:解耦数据流(推荐Kafka)
- HBase:存储全量数据(高吞吐写入)
- Elasticsearch:提供实时检索能力
2. 关键组件部署
(1) Binlog捕获层
- 工具选择:Canal或Debezium
- 配置示例(Canal):
canal.instance.master.address=192.168.1.10:3306 canal.mq.servers=kafka:9092 canal.mq.topic=user_behavior
(2) 消息队列(Kafka)
- 创建Topic:
bin/kafka-topics.sh --create --topic user_behavior \ --partitions 32 --replication-factor 3 --bootstrap-server kafka:9092
- 分区策略:按业务主键(如
user_id
)分区,保证顺序性
(3) HBase存储层
- 表设计:
create 'user_data', {NAME => 'cf', VERSIONS => 3}, {SPLITS => ['1','2','3','4','5']} # 预分区提升写入性能
- RowKey设计:
hash(user_id)_timestamp
(避免热点)
(4) Elasticsearch索引
- 动态模板(防止字段爆炸):
{ "mappings": { "dynamic_templates": [{ "strings_as_keyword": { "match_mapping_type": "string", "mapping": { "type": "keyword" } } }] } }
3. 数据同步流程
(1) 实时写入HBase
// Kafka Consumer伪代码
while (true) {
ConsumerRecords records = consumer.poll(100);
for (record : records) {
Put put = new Put(Bytes.toBytes(record.key()));
put.addColumn("cf", "name", Bytes.toBytes(record.value().name));
hTable.put(put); // 批量提交提升吞吐
}
hTable.flushCommits();
}
优化技巧:
- 启用HBase异步写入(
setAutoFlush(false)
) - 批量提交(每1000条或200ms提交一次)
(2) ES增量索引
使用Logstash Kafka插件:
input { kafka { topics => ["user_behavior"] } }
filter {
json { source => "message" }
mutate { remove_field => ["@timestamp", "@version"] }
}
output {
elasticsearch {
hosts => ["es-node:9200"]
index => "user_behavior_%{+YYYY.MM.dd}"
document_id => "%{user_id}" # 防止重复文档
}
}
4. 性能优化策略
组件 | 优化手段 | 目标值 |
---|---|---|
Kafka | 增加分区数(≥CPU核心数) | 吞吐≥100MB/s |
HBase | 开启BulkLoad压缩(SNAPPY) | 写入TPS≥50,000 |
ES | 禁用_source 字段+分片路由 | 查询延迟<100ms |
JVM | G1GC+堆内存配置(≤32GB) | GC停顿<200ms |
5. 容灾保障机制
- 数据重放:
- Kafka保留周期:7天(
log.retention.hours=168
) - HBase启用WAL(Write-Ahead-Log)
- Kafka保留周期:7天(
- 一致性保障:
# 双写校验伪代码 def verify_consistency(user_id): hbase_data = hbase.get(user_id) es_data = es.get(user_id) if hbase_data != es_data: es.reindex(user_id, hbase_data) # 以HBase为基准修复
- 监控体系:
- Prometheus监控:Kafka堆积量、HBase RegionServer负载
- Elastic APM:跟踪端到端数据处理延迟
6. 典型查询场景
(1) 精确查询(HBase)
Get get = new Get(Bytes.toBytes("user_10086"));
Result result = table.get(get); // 平均响应时间<10ms
(2) 复杂检索(ES)
GET /user_behavior/_search
{
"query": {
"bool": {
"must": [
{ "range": { "last_login": { "gte": "now-30d" } } },
{ "term": { "status": "active" } }
]
}
},
"aggs": {
"city_stats": { "terms": { "field": "city" } }
}
}
7. 扩展方案
当数据量达亿级时:
- HBase分层存储:
- 热数据:SSD存储
- 冷数据:迁移至HDFS(通过HBase协处理器)
- ES索引优化:
- 冷热分离架构(Hot-Warm)
- 滚动索引(
ilm_policy
自动管理生命周期)
- 资源隔离:
- HBase RegionServer分组:交易组/日志组
- ES专用协调节点(coordinating only node)
关键提示:在千万级数据场景中,建议将Binlog→Kafka→HBase的延迟控制在1秒内,ES索引延迟控制在5秒内,通过横向扩展Kafka消费者和ES节点实现线性扩容。