SpringBoot2.x+RabbitMQ实现消息发送状态监听
前言
springboot-amqp可以使用默认的rabbitTemplate实现消息的发送,由于rabbitTemplate.convertAndSend方法默认的超时时间为5秒,如果需要接受超时时间后的发送响应(注意此处为发送后消息服务器端的响应,并非消息被消费者消费后的响应,要接受消息被消费后的响应可以使用rabbitTemplate.sendAndReceive方法,该方法类似RPC调用过程),需要在rabbitmqTemplate中提前设定回调动作。
具体实现方式如下:
依赖说明
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.3.0.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
<version>2.3.0.RELEASE</version>
</dependency>
rabbitmqTemplate支持confirm消息和return消息发送状态监听,需要在application.properties中加入以下配置:
#发送者消息确认方式,默认为none,即不需要确认回调
spring.rabbitmq.publisher-confirm-type=correlated
#如果你的spring-rabbit版本小于2.1则使用下面的配置
#spring.rabbitmq.publisherConfirms=true
#是否接收return消息的配置,true为接受confirm消息,默认为false
spring.rabbitmq.template.mandatory=true
#是否接收return消息的配置,true为接受return消息,默认为false
#spring.rabbitmq.publisher-returns=true
另外需要在rabbitTempldate中加入相应的发送回调,以下为confirm消息回调示例代码
@Component
public class RabbitSender {
@Resource
private RabbitTemplate rabbitTemplate;
private RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
// 此处编写回调
System.out.println("correlationData:" + correlationData);
System.out.println("send" + cause);
}
};
public void send(Object message, Map<String, Object> properties) {
rabbitTemplate.setConfirmCallback(confirmCallback);
rabbitTemplate.convertAndSend...
...
}
}
rabbitTemplate.setConfirmCallback后不可变更,变更callback会抛出异常,源码如下:
public void setConfirmCallback(ConfirmCallback confirmCallback) {
Assert.state(this.confirmCallback == null || this.confirmCallback == confirmCallback,
"Only one ConfirmCallback is supported by each RabbitTemplate");
this.confirmCallback = confirmCallback;
}
版权声明:本文为techa原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。