目录
RabbitMQ保证消息可靠性
生产者丢失消息
MQ丢失消息
消费端丢失了数据
Kakfa的消息可靠性
生产者的消息可靠性
Kakfa的消息可靠性
消费者的消息可靠性
RabbitMQ保证消息可靠性
生产者丢失消息
1.事务消息保证
生产者在发送消息之前,开启事务消息随后生产者发送消息,消息发送之后,如果消息没有被MQ接收到的话,生产者会收到异常报错,生产者回滚事务,然后重试消息,如果收到了消息,就能提交事务了
@Autowired
private RabbitTemplate rabbitTemplate;public void sendTransactionalMessage() {ConnectionFactory connectionFactory = rabbitTemplate.getConnectionFactory();Channel channel = connectionFactory.createConnection().createChannel(false);try {channel.txSelect(); // 开启事务channel.basicPublish("exchange", "routing.key", null, "message".getBytes());channel.txCommit(); // 提交事务} catch (Exception e) {channel.txRollback(); // 出错回滚}
}
2.使用confirm机制
- 普通confirm机制,就是发送消息之后,等待服务器confirm之后再发送下一个消息
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {if (ack) {System.out.println("消息成功发送到Broker");} else {System.out.println("消息发送失败,原因:" + cause);}
});rabbitTemplate.convertAndSend("exchange", "routing.key", "message");
- 批量confirm机制,每发送一批消息之后,等待服务器confirm
Channel channel = connection.createChannel(false);
channel.confirmSelect();for (int i = 0; i < 100; i++) {channel.basicPublish("exchange", "routing.key", null, ("msg" + i).getBytes());
}
channel.waitForConfirms(); // 等待所有消息确认
- 异步confirm机制,服务器confirm一个或者多个消息之后,客户端(生产者)能够通过回调函数来确定消息是否被confirm(推荐)
SortedSet<Long> pendingSet = Collections.synchronizedSortedSet(new TreeSet<>());
channel.confirmSelect();channel.addConfirmListener(new ConfirmListener() {public void handleAck(long tag, boolean multiple) {if (multiple) pendingSet.headSet(tag + 1).clear();else pendingSet.remove(tag);}public void handleNack(long tag, boolean multiple) {System.err.println("未确认消息:" + tag);if (multiple) pendingSet.headSet(tag + 1).clear();else pendingSet.remove(tag);}
});while (true) {long seq = channel.getNextPublishSeqNo();channel.basicPublish("demo.exchange", "demo.key",MessageProperties.PERSISTENT_TEXT_PLAIN, "hello".getBytes());pendingSet.add(seq);
}
MQ丢失消息
防止MQ的丢失数据的话,方法就是开启RabbitMQ的持久化,消息写入之后(也就是到了MQ之后)就直接持久化到磁盘中,即使Rabbimq自己挂了之后,会恢复数据。
设置持久化步骤
- 创建queue的时候直接设置持久化,此时就能持久化queue的元数据(不是消息)
@Bean
public Queue durableQueue() {return new Queue("myQueue", true); // true 表示持久化
}
- 发送消息的时候指定消息为deliveryMode设置为2,也就是设置消息为持久化,此时消息可以持久化磁盘上
MessageProperties props = new MessageProperties();
props.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
Message message = new Message("content".getBytes(), props);
rabbitTemplate.send("exchange", "routing.key", message);
极端情况:
消息写到RabbitMQ之后,但是还没有持久化到磁盘之后直接挂了,导致内存中消息丢失。
解决方法:持久化与生产者的confirm机制配合,当且仅当持久化了消息之后,再confirm,避免数据与消息丢失,此时生产者收不到ack,也是可以自己重发
消费端丢失了数据
意思就是消息已经拉取到了信息,还没有处理(注意这是已经告诉MQ我拉取到数据了),结果进程挂了,重启之后继续消费下一条消息,导致中间的这一条没有消费到,此时数据丢失了。
利用ack机制处理
取消RabbiMQ的自动ack,也就是一个api,可以在消费端消费完了消息之后再调用api告诉MQ我们收到并且处理了该消息。如果没有返回ack,RabbitMQ会把该消息分配给其他的consumer处理,消息不会丢失。通过配置处理
spring:rabbitmq:listener:simple:acknowledge-mode: manual
Kakfa的消息可靠性
生产者的消息可靠性
在kafka中,可以在producer(生产段)设置一个参数,也就是ack=all,要求每个数据,必须写入所有的replica(也就是所有该分区的副本),才认为是接收成功。该参数设置的是你的leader接收到消息后,所有的follower都同步到消息后才认为写成功
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all"); // 等待所有副本确认
props.put("retries", Integer.MAX_VALUE); // 无限重试
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");KafkaProducer<String, String> producer = new KafkaProducer<>(props);
ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", "key", "value");producer.send(record, (metadata, exception) -> {if (exception == null) {System.out.println("发送成功:" + metadata.offset());} else {exception.printStackTrace();}
});
producer.close();
Kakfa的消息可靠性
kafka默认是会将消息持久化到磁盘上的,但是还是有情况会导致丢失数据
kafka某个broker宕机,随后重新选举partition的leader。倘若在该broker中的partition中的leader副本中的消息,还没有被其他broker中的follower同步,此时同步缺失的数据就丢失了,也就是少了一些数据
解决方法:
- 给 topic 设置
replication.factor
参数:这个值必须大于 1,要求每个 partition 必须有至少 2 个副本。 - 在 Kafka 服务端设置
min.insync.replicas
参数:这个值必须大于 1,这个是要求一个 leader 至少感知到有至少一个 follower 还跟自己保持联系。 - 在 producer 端设置
acks=all
:这个是要求每条数据,必须是写入所有 replica 之后,才能认为是写成功了。 - 在 producer 端设置
retries=MAX
(很大很大很大的一个值,无限次重试的意思):这个是要求一旦写入失败,就无限重试,卡在这里了。
按照上面的配置之后,leader的切换就不会导致数据缺失了。
消费者的消息可靠性
唯一可能也是类似于RabbitMQ中的,也就是说你消费到该消息的时候,消费者自动提交offset,让kafka以为你消费好了该消息,但是自己还没处理就宕机后,会导致重启后没有消费该消息。
解决方法:
关闭kafka默认的自动提交offset,通过消费端业务逻辑处理完消息后,再手动提交offset,当然这里就是会导致重复消费了,这里就是幂等性的问题了。比如你刚处理完,还没提交 offset,结果自己挂了,此时肯定会重复消费一次
手动提交api:consumer.commitSync();
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my-group");
props.put("enable.auto.commit", "false"); // 关闭自动提交
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my-topic"));while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {// 处理消息System.out.println("处理消息:" + record.value());}// 手动提交 offsetconsumer.commitSync();
}