一、Log Compaction核心概念
1.什么是Log Compaction?
图表
代码
复制
下载
全屏
graph TB A[原始Log] --> B[Key-Value消息流] B --> C{Log Compaction} C --> D[保留每个Key的最新值] C --> E[删除过期旧值] D --> F[压缩后的Log] subgraph "消息示例" G[key=A, value=1, offset=0] H[key=B, value=2, offset=1] I[key=A, value=3, offset=2] J[key=C, value=4, offset=3] K[key=A, value=5, offset=4] end subgraph "压缩后" L[key=B, value=2] M[key=C, value=4] N[key=A, value=5] end二、Log Compaction实现机制
1.核心数据结构
java
复制
下载
public class LogCleaner implements Runnable { // Cleaner状态管理 private final Map<TopicPartition, LogCleanerStats> cleanerStats = new ConcurrentHashMap<>(); private final Map<TopicPartition, CleanerCheckpoint> checkpoints = new ConcurrentHashMap<>(); // 压缩任务队列 private final BlockingQueue<CleanerTask> taskQueue = new LinkedBlockingQueue<>(); // 压缩配置 private final long maxMessageSize; private final double maxIoBytesPerSecond; private final int numThreads; public class CleanerTask { private final TopicPartition topicPartition; private final long startOffset; // 压缩起始偏移量 private final long endOffset; // 压缩结束偏移量 private final NavigableMap<Long, Segment> segments; // 压缩执行 public void compact() { try { // 1. 构建OffsetMap OffsetMap offsetMap = buildOffsetMap(); // 2. 读取脏段(Dirty Segments) List<Segment> dirtySegments = selectDirtySegments(); // 3. 执行压缩 List<Segment> cleanedSegments = doClean(dirtySegments, offsetMap); // 4. 替换旧段 replaceSegments(dirtySegments, cleanedSegments); // 5. 更新检查点 updateCheckpoint(); } catch (Exception e) { logger.error("Clean failed for {}", topicPartition, e); } } } }2.压缩算法实现
java
复制
下载
public class LogCleaner { /** * 构建OffsetMap - 记录每个Key的最新偏移量 */ private OffsetMap buildOffsetMap(LogSegment headSegment, long startOffset) { OffsetMap offsetMap = new SkimpyOffsetMap(memory); // 从head段开始扫描,找到每个key的最新位置 for (RecordBatch batch : headSegment.batchesFrom(startOffset)) { for (Record record : batch) { if (record.hasKey()) { // 使用hash存储key到最新offset的映射 long offset = batch.baseOffset() + record.offsetDelta(); offsetMap.put(record.key(), offset); } } } return offsetMap; } /** * 执行压缩清理 */ private List<LogSegment> doClean(List<LogSegment> dirtySegments, OffsetMap offsetMap) throws IOException { List<LogSegment> cleanedSegments = new ArrayList<>(); LogSegment currentCleaned = null; // 按偏移量顺序处理脏段 for (LogSegment dirtySegment : dirtySegments) { // 读取脏段中的所有记录 for (RecordBatch batch : dirtySegment.batches()) { for (Record record : batch) { if (!record.hasKey()) { // 没有key的消息直接保留 currentCleaned.append(record); continue; } long lastOffset = offsetMap.get(record.key()); long currentOffset = batch.baseOffset() + record.offsetDelta(); if (lastOffset == -1) { // Key第一次出现,保留 offsetMap.put(record.key(), currentOffset); currentCleaned.append(record); } else if (currentOffset >= lastOffset) { // 这是该Key的最新版本 offsetMap.put(record.key(), currentOffset); currentCleaned.append(record); } else { // 这是旧版本,跳过 cleanerStats.skippedRecords().increment(); } } } // 当前清理段达到大小限制,创建新段 if (currentCleaned.size() >= segmentSize) { cleanedSegments.add(currentCleaned); currentCleaned = LogSegment.create(); } } if (currentCleaned != null && currentCleaned.size() > 0) { cleanedSegments.add(currentCleaned); } return cleanedSegments; } }3.压缩触发条件
java
复制
下载
public class LogCleanerManager { // 压缩检查逻辑 public boolean needsCleaning(TopicPartition tp, Log log) { // 1. 检查Topic是否启用压缩 if (!log.config.compact) { return false; } // 2. 计算脏比率(Dirty Ratio) long firstDirtyOffset = log.firstDirtyOffset(); long logEndOffset = log.logEndOffset(); if (firstDirtyOffset == logEndOffset) { return false; // 没有脏数据 } double dirtyRatio = (double) (logEndOffset - firstDirtyOffset) / (logEndOffset - log.activeSegment().baseOffset()); // 3. 检查脏比率是否超过阈值 double thresholdRatio = log.config.minCleanableRatio; return dirtyRatio > thresholdRatio; } // 选择要压缩的分区 public List<TopicPartition> selectPartitionsToClean() { return allLogs.entrySet().stream() .filter(entry -> needsCleaning(entry.getKey(), entry.getValue())) .sorted((a, b) -> { // 按脏比率排序,最脏的优先 double ratioA = calculateDirtyRatio(a.getValue()); double ratioB = calculateDirtyRatio(b.getValue()); return Double.compare(ratioB, ratioA); }) .map(Map.Entry::getKey) .collect(Collectors.toList()); } }三、删除策略实现
1.基于时间的删除策略
java
复制
下载
public class LogManager { // 基于时间的删除实现 public void deleteOldSegmentsByTime() { long now = time.milliseconds(); for (Log log : allLogs.values()) { // 检查每个段的创建时间 for (LogSegment segment : log.segments()) { long segmentAge = now - segment.created(); // 检查是否超过保留时间 if (segmentAge > log.config.retentionMs) { // 检查段是否可以被删除 if (canDeleteSegment(segment, log)) { deleteSegment(segment); } } } } } // 精确的保留时间检查 private boolean shouldDeleteByTime(LogSegment segment, long retentionMs, long currentTime) { // 获取段中最后一条消息的时间戳 long lastModified = segment.lastModified(); // 如果消息有时间戳,使用消息时间戳 if (log.config.messageTimestampType == TimestampType.LOG_APPEND_TIME) { lastModified = segment.maxTimestamp(); } return currentTime - lastModified > retentionMs; } }2.基于大小的删除策略
java
复制
下载
public class LogManager { // 基于日志大小的删除 public void deleteOldSegmentsBySize() { for (Log log : allLogs.values()) { long totalSize = log.size(); long retentionSize = log.config.retentionSize; if (retentionSize < 0 || totalSize <= retentionSize) { continue; } // 计算需要删除多少数据 long bytesToDelete = totalSize - retentionSize; long bytesDeleted = 0; // 从最老的段开始删除 for (LogSegment segment : log.segments()) { if (bytesDeleted >= bytesToDelete) { break; } if (canDeleteSegment(segment, log)) { long segmentSize = segment.size(); deleteSegment(segment); bytesDeleted += segmentSize; } } } } }篇幅限制下面就只能给大家展示小册部分内容了。整理了一份核心面试笔记包括了:Java面试、Spring、JVM、MyBatis、Redis、MySQL、并发编程、微服务、Linux、Springboot、SpringCloud、MQ、Kafc
需要全套面试笔记及答案
【点击此处即可/免费获取】
3.基于偏移量的删除策略
java
复制
下载
public class LogManager { // 保留最少数据(基于起始偏移量) public void deleteSegmentsToRetainMinOffset() { for (Log log : allLogs.values()) { long logStartOffset = log.logStartOffset(); long minOffsetToRetain = calculateMinOffsetToRetain(log); // 删除起始偏移量之前的所有段 for (LogSegment segment : log.segments()) { if (segment.baseOffset() < minOffsetToRetain) { if (canDeleteSegment(segment, log)) { deleteSegment(segment); } } else { break; // 按偏移量排序,后续的段不需要检查 } } } } // 计算最小保留偏移量 private long calculateMinOffsetToRetain(Log log) { // 考虑多个因素: // 1. Consumer滞后 // 2. 事务状态 // 3. 领导副本状态 long minConsumerOffset = Long.MAX_VALUE; // 获取所有消费者的最小偏移量 for (ConsumerState consumer : log.consumers()) { long offset = consumer.committedOffset(); minConsumerOffset = Math.min(minConsumerOffset, offset); } // 考虑事务控制消息 long transactionControlOffset = log.lastStableOffset(); // 返回需要保留的最小偏移量 return Math.max(log.logStartOffset(), Math.min(minConsumerOffset, transactionControlOffset)); } }四、高级特性与优化
1.增量压缩与检查点
java
复制
下载
public class LogCleaner { // 压缩检查点管理 private class CleanerCheckpoint { private final Map<TopicPartition, Long> lastCleanOffset = new HashMap<>(); // 保存检查点 public synchronized void saveCheckpoint(TopicPartition tp, long cleanOffset) { lastCleanOffset.put(tp, cleanOffset); // 持久化到磁盘 try (FileOutputStream fos = new FileOutputStream(checkpointFile)) { Properties props = new Properties(); lastCleanOffset.forEach((k, v) -> props.setProperty(k.toString(), String.valueOf(v))); props.store(fos, "Log cleaner checkpoint"); } } // 加载检查点 public synchronized void loadCheckpoint() { try (FileInputStream fis = new FileInputStream(checkpointFile)) { Properties props = new Properties(); props.load(fis); props.forEach((k, v) -> { TopicPartition tp = parseTopicPartition(k.toString()); lastCleanOffset.put(tp, Long.parseLong(v.toString())); }); } } } // 增量压缩优化 public void incrementalClean() { for (TopicPartition tp : partitionsToClean) { Log log = getLog(tp); // 获取上次压缩位置 long lastCleanOffset = checkpoints.getOrDefault(tp, 0L); // 只压缩新的脏数据 long dirtyStart = Math.max(lastCleanOffset, log.firstDirtyOffset()); long dirtyEnd = log.logEndOffset(); if (dirtyStart < dirtyEnd) { CleanerTask task = new CleanerTask(tp, dirtyStart, dirtyEnd); taskQueue.add(task); } } } }2.压缩策略配置
properties
复制
下载
# Kafka Broker配置示例 ############################# Log Compaction ############################# # 启用压缩 log.cleaner.enable=true # 清理线程数 log.cleaner.threads=8 # 清理器总内存 log.cleaner.dedupe.buffer.size=134217728 # 128MB # 压缩触发阈值 log.cleaner.min.cleanable.ratio=0.5 # 删除策略配置 log.retention.hours=168 # 基于时间:7天 log.retention.bytes=1073741824 # 基于大小:1GB log.retention.check.interval.ms=300000 # 每5分钟检查一次 # 段文件配置 log.segment.bytes=1073741824 # 1GB log.segment.ms=604800000 # 7天 log.segment.delete.delay.ms=60000 # 删除延迟1分钟
3.压缩性能优化
java
复制
下载
public class LogCleaner { // 内存优化 - 分块处理大日志 private List<LogSegment> cleanLargeLog(Log log, List<LogSegment> dirtySegments, long maxBufferSize) { List<LogSegment> cleanedSegments = new ArrayList<>(); long currentBufferUsage = 0; for (LogSegment segment : dirtySegments) { long segmentSize = segment.size(); if (currentBufferUsage + segmentSize > maxBufferSize) { // 分批处理 List<LogSegment> batch = new ArrayList<>(); batch.add(segment); List<LogSegment> cleanedBatch = doClean(batch); cleanedSegments.addAll(cleanedBatch); currentBufferUsage = 0; } else { currentBufferUsage += segmentSize; } } return cleanedSegments; } // IO优化 - 零拷贝和批量写入 private void optimizeIOPerformance() { // 使用sendfile系统调用 FileChannel sourceChannel = dirtySegment.fileChannel(); FileChannel targetChannel = cleanedSegment.fileChannel(); long position = 0; long count = dirtySegment.size(); // 零拷贝传输 sourceChannel.transferTo(position, count, targetChannel); // 批量写入索引 cleanedSegment.updateIndex(records); } }五、监控与运维
1.压缩状态监控
java
复制
下载
public class LogCleanerMetrics { // 关键监控指标 private final Meter compactionRate = new Meter(); private final Histogram compactionLatency = new Histogram(); private final Gauge<Double> dirtyRatio = new Gauge<>(); private final Counter deletedMessages = new Counter(); private final Counter retainedMessages = new Counter(); // 暴露JMX指标 public void registerMBeans() { MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); ObjectName name = new ObjectName( "kafka.log:type=LogCleaner,name=" + topicPartition); mbs.registerMBean(new LogCleanerMXBean() { @Override public double getCompactionRate() { return compactionRate.getOneMinuteRate(); } @Override public double getAvgCompactionLatency() { return compactionLatency.getMean(); } @Override public double getDirtyRatio() { return dirtyRatio.getValue(); } @Override public long getDeletedMessages() { return deletedMessages.getCount(); } @Override public long getRetainedMessages() { retainedMessages.getCount(); } }, name); } }2.运维命令与工具
bash
复制
下载
# 查看Topic的压缩状态 kafka-topics.sh --describe --topic my-compacted-topic --bootstrap-server localhost:9092 # 手动触发压缩 kafka-configs.sh --bootstrap-server localhost:9092 \ --entity-type topics \ --entity-name my-compacted-topic \ --alter --add-config "cleanup.policy=compact" # 查看压缩进度 kafka-run-class.sh kafka.tools.LogCleanerProgress \ --bootstrap-server localhost:9092 \ --topic my-compacted-topic # 检查段文件状态 kafka-dump-log.sh --files /tmp/kafka-logs/my-topic-0/00000000000000000000.log \ --print-data-log # 设置删除策略 kafka-configs.sh --bootstrap-server localhost:9092 \ --entity-type topics \ --entity-name my-topic \ --alter \ --add-config "retention.ms=604800000,retention.bytes=1073741824"
3.故障诊断与修复
java
复制
下载
public class LogCompactionValidator { // 验证压缩完整性 public void validateCompaction(TopicPartition tp) { Log log = getLog(tp); // 1. 检查key的唯一性 Map<Bytes, Long> keyOffsets = new HashMap<>(); for (LogSegment segment : log.segments()) { for (RecordBatch batch : segment.batches()) { for (Record record : batch) { if (record.hasKey()) { Bytes key = Bytes.wrap(record.key()); Long prevOffset = keyOffsets.get(key); long currentOffset = batch.baseOffset() + record.offsetDelta(); if (prevOffset != null && currentOffset > prevOffset) { // 发现重复的key,但偏移量更大 - 正常情况 } else if (prevOffset != null && currentOffset < prevOffset) { // 压缩错误:发现了更旧的key版本 logger.error("Compaction error: found older version of key {} " + "at offset {} than previous offset {}", key, currentOffset, prevOffset); } keyOffsets.put(key, currentOffset); } } } } // 2. 检查段连续性 long prevEndOffset = -1; for (LogSegment segment : log.segments()) { if (prevEndOffset != -1 && segment.baseOffset() != prevEndOffset) { logger.error("Segment gap found: prevEndOffset={}, baseOffset={}", prevEndOffset, segment.baseOffset()); } prevEndOffset = segment.baseOffset() + segment.sizeInRecords(); } } }篇幅限制下面就只能给大家展示小册部分内容了。整理了一份核心面试笔记包括了:Java面试、Spring、JVM、MyBatis、Redis、MySQL、并发编程、微服务、Linux、Springboot、SpringCloud、MQ、Kafc
需要全套面试笔记及答案
【点击此处即可/免费获取】
六、最佳实践与配置建议
1.压缩策略选择指南
yaml
复制
下载
应用场景与配置建议: # 场景1: 变更数据捕获(CDC) cleanup.policy: compact,delete compression.type: snappy retention.ms: 604800000 # 7天 min.compaction.lag.ms: 3600000 # 延迟1小时压缩 # 场景2: 会话存储 cleanup.policy: compact,delete delete.retention.ms: 86400000 # 保留删除记录24小时 segment.ms: 3600000 # 每1小时一个段 min.cleanable.dirty.ratio: 0.1 # 快速压缩 # 场景3: 长期归档 cleanup.policy: compact retention.ms: -1 # 不基于时间删除 retention.bytes: 1099511627776 # 1TB segment.bytes: 5368709120 # 5GB大段文件 # 场景4: 高吞吐日志 cleanup.policy: delete retention.hours: 168 segment.bytes: 1073741824 # 1GB segment.ms: 604800000 # 7天
2.性能优化配置
properties
复制
下载
# 生产环境推荐配置 # 内存配置 log.cleaner.dedupe.buffer.size=268435456 # 256MB log.cleaner.io.buffer.size=524288 # 512KB log.cleaner.io.max.bytes.per.second=104857600 # 100MB/s限速 # 线程配置 log.cleaner.threads=4 # 根据CPU核心数调整 num.io.threads=8 num.network.threads=3 # 段配置 log.segment.bytes=1073741824 # 1GB log.index.interval.bytes=4096 # 每4KB建索引 log.flush.interval.messages=10000 log.flush.interval.ms=1000 # 删除策略 log.retention.check.interval.ms=300000 # 5分钟检查一次 log.segment.delete.delay.ms=60000 # 延迟1分钟删除 log.cleaner.backoff.ms=15000 # 压缩失败后等待15秒
3.常见问题解决方案
bash
复制
下载
# 问题1: 压缩速度慢 解决方案: 1. 增加清理线程数: log.cleaner.threads=8 2. 增加内存: log.cleaner.dedupe.buffer.size=536870912 3. 调整段大小: log.segment.bytes=536870912 # 问题2: 磁盘空间不足 解决方案: 1. 缩短保留时间: retention.ms=172800000 (2天) 2. 启用压缩: compression.type=lz4 3. 增加段删除频率: log.cleanup.policy=delete # 问题3: 压缩导致CPU使用率高 解决方案: 1. 降低压缩优先级: log.cleaner.io.max.bytes.per.second=52428800 2. 调整压缩阈值: log.cleaner.min.cleanable.ratio=0.75 3. 分时段压缩: 在业务低峰期进行压缩 # 问题4: Key重复导致数据丢失 解决方案: 1. 启用删除保留: delete.retention.ms=86400000 2. 监控压缩状态: 使用LogCleanerProgress工具 3. 定期验证数据完整性
七、总结
Kafka的Log Compaction和删除策略提供了灵活的数据生命周期管理:
核心优势:
空间效率:自动删除重复数据,节省存储空间
数据完整性:保证每个key至少有一个最新值
性能优化:增量压缩减少IO开销
灵活配置:支持时间、大小、偏移量多种删除策略
关键配置点:
cleanup.policy=compact:启用压缩min.cleanable.dirty.ratio:控制压缩触发阈值retention.ms/retention.bytes:设置删除策略delete.retention.ms:删除记录的保留时间
最佳实践:
根据业务场景选择合适的清理策略
监控压缩状态和磁盘使用率
定期验证数据完整性
在生产环境前充分测试配置
正确配置Log Compaction和删除策略,可以在保证数据完整性的同时,显著提升Kafka集群的性能和存储效率。