本文将从三个方面详细介绍在使用RabbitMQ时如何确保消息不丢失的方法:
@Bean
public Queue myQueue() {
return new Queue("queue-name", true, true, false);
}
@Bean
public DirectExchange myExchange() {
return new DirectExchange("exchange-name", true, false);
}
Message message = MessageBuilder.withBody("hello".getBytes(StandardCharsets.UTF_8)).setDeliveryMode(MessageDeliveryMode.PERSISTENT).build();
rabbitTemplate.convertAndSend("exchange-name", "routing-key", message);
我们需要手动开启 Confirm 机制,可以在配置文件中添加以下配置:
spring:
rabbitmq:
publisher-confirm-type: correlated # MQ异步回调方式接收回执消息,效率高于simple
# publisher-confirm-type: none # 关闭confirm机制
# publisher-confirm-type: simple # 同步阻塞并等待MQ的回执消息,效率很低
publisher-returns: true
rabbitTemplate.setConfirmCallback((CorrelationData correlationData, boolean ack, String cause) -> {
if (ack) {
log.info("message delivery to exchange, message id: {}", correlationData.getId());
// 处理消息确认
} else {
log.error("message not delivery to exchange, cause: {}", cause);
// 处理消息未确认
}
});
rabbitTemplate.setReturnsCallback(returnedMessage -> {
log.error("message routing failed: exchange({}), routing({}), replyCode({}), replyText({}), message({})",
returnedMessage.getExchange(), returnedMessage.getRoutingKey(), returnedMessage.getReplyCode(),
returnedMessage.getReplyText(), returnedMessage.getMessage());
// 处理消息发送到Queue失败
});
RabbitMQ消费者处理消息成功后可以向MQ发送 ack 回执,MQ收到 ack 后会在队列中删除该消息,从而确保消息不会丢失。若消费者在处理消息中出现异常,则会发送 nack 回执,MQ收到 nack 后会再次投递消息。
AMQP的 ack 回执处理方式有以下几种:
最好使用手动 ack 回执,这样消费者在处理完消息后,可以决定是否确认收到消息。如果在处理消息时发生错误,可以选择重新将消息放回队列,或者拒绝这条消息,这样可以防止丢失并且减少重复处理的情况。
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: manual #手动ack
retry:
enabled: true #开启重试机制
虽然我们进行消息持久化机制、生产者 Confirm 异步回调机制、消费者手动 ack 回执机制,等一系列操作,但是由于 RabbitMQ 的持久化过程是异步的,所以无法保证消息在传递过程中做到100%不丢失。
若业务需要做到消息100%不丢失,可以引入本地消息表,通过轮询(或其他方式)的方式来进行消息的重新投递。
因篇幅问题不能全部显示,请点此查看更多更全内容