用RabbitMQ(TTL+DLX(死信队列)) 实现延迟队列

延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。

虽然RabbitMQ中没有提供延迟队列功能,但是可以使用TTL+死信队列组合实现延迟队列效果
图片来源------黑马程序员
1.首先创建生产者的maven工程,导入依赖

<dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-context</artifactId>
        <version>5.1.9.RELEASE</version>
    </dependency>

    <dependency>
        <groupId>org.springframework.amqp</groupId>
        <artifactId>spring-rabbit</artifactId>
        <version>2.1.8.RELEASE</version>
    </dependency>

    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.12</version>
    </dependency>

    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-test</artifactId>
        <version>5.1.9.RELEASE</version>
    </dependency>
</dependencies>

<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.8.0</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
            </configuration>
        </plugin>
    </plugins>
</build>

2.创建rabbitmq.properties

rabbitmq.host=X.X.X.X
rabbitmq.port=5672
rabbitmq.username=XXX
rabbitmq.password=XX
rabbitmq.virtual-host=/

这里叉一下如何Linux(centos7)下创建rabbitmq用户
创建用户名admin,密码abc的用户:

rabbitmqctl add_user admin abc

设置admin为超级管理员

rabbitmqctl set_user_tags admin administrator

授权远程访问(也可以登录后,可视化配置)

rabbitmqctl set_permissions -p / admin "." "." ".*"

创建完成后,重启RabbitMQ

systemctl restart rabbitmq-server

也可在服务器安装好rabbitmq后
启用管理平台插件,启用插件后,可以可视化管理RabbitMQ。
注意这里如果开启了rabbitmq服务要先关闭再执行下面命令

rabbitmq-plugins enable rabbitmq_management

再启动rabbitmq服务

systemctl start rabbitmq-server

即可访问控制台页面
1、访问地址
http://X.X.X.X(这里是你装有rabbitmq的服务器ip):15672

注意!!防火墙是否放行了5672的端口
防火墙放行5672端口(centos7)

systemctl-cmd --zone=public --add-port=5672/tcp --permanent

放行之后即可访问到控制台页面
在这里插入图片描述
2、用户登录
默认账号密码都是guest,但是如果使用guest登录,会出现报错如下:

user can only log in localhost

原因是RabbitMQ3.3以后,guest账号只能在本机登录。这里就不去修改相应配置了,而是另外创建其他登录账号。

因为我们之前创建了一个rabbitmq的用户,所以用那个用户直接登录
登录后,用图形化界面创建用户
在这里插入图片描述
在这里插入图片描述
再回到生产者的maven
配置基于spring的rabbitmq的xml文件
创建一个spring-rabbitmq-producer.xml文件

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xmlns:raabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/context
       https://www.springframework.org/schema/context/spring-context.xsd
       http://www.springframework.org/schema/rabbit
       http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
    <!--加载配置文件-->
    <context:property-placeholder location="classpath:rabbitmq.properties"/>


    <!-- 定义rabbitmq connectionFactory -->
    <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
                               port="${rabbitmq.port}"
                               username="${rabbitmq.username}"
                               password="${rabbitmq.password}"
                               virtual-host="${rabbitmq.virtual-host}"/>

    <!--定义rabbitTemplate对象操作可以在代码中方便发送消息-->
    <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory" />

    <!--定义管理交换机、队列-->
    <rabbit:admin connection-factory="connectionFactory"/>
    
    <!--
        延迟队列:
        1.定义正常交换机(order_exchange)和队列(order_queue)
        2.定义死信交换机()和队列
        3.绑定,设置正常队列过期时间为30分钟
    -->
    <!--定义正常队列-->
    <raabbit:queue id="order_queue" name="order_queue">
        <!--绑定,设置正常队列过期时间为30分钟-->
        <rabbit:queue-arguments>
            <!--绑定死信交换机名称-->
            <entry key="x-dead-letter-exchange" value="order_exchange_dlx"></entry>
            <!--绑定死信队列的路由key(routingKey)-->
            <entry key="x-dead-letter-routing-key" value="dlx.order.cancel"></entry>
            <!--设置正常队列的过期时间   这边做演示就设置为10秒-->
            <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer"></entry>
        </rabbit:queue-arguments>

    </raabbit:queue>
    <!--定义正常交换机(order_exchange)-->
    <rabbit:topic-exchange id="order_exchange" name="order_exchange">
        <!--绑定队列-->
        <rabbit:bindings>
            <rabbit:binding pattern="order.#" queue="order_queue"></rabbit:binding>
        </rabbit:bindings>
    </rabbit:topic-exchange>


    <!--定义死信队列-->
    <raabbit:queue id="order_queue_dlx" name="order_queue_dlx"></raabbit:queue>
    <!--定义死信交换机(order_exchange_dlx)-->
    <rabbit:topic-exchange id="order_exchange_dlx" name="order_exchange_dlx">
        <!--绑定死信队列-->
        <rabbit:bindings>
            <rabbit:binding pattern="dlx.order.#" queue="order_queue_dlx"></rabbit:binding>
        </rabbit:bindings>
    </rabbit:topic-exchange>

</beans> 

创建测试类ProducerTest

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq-producer.xml")
public class ProducerTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testDelay() throws InterruptedException {
        //发送订单消息
        rabbitTemplate.convertAndSend("order_exchange","order.msg","订单信息:id=1,time=×××");

        for (int i = 10; i >0 ; i--) {
            System.out.println(i+"秒...");  //为了看到10秒延迟的效果 手动打印秒数
            Thread.sleep(1000);
        }
    }
}

创建一个消费者Consumer的maven工程
同样也是先配置rabbitmq.properties(同上就不说了)和spring-rabbitmq-consumer.xml文件
创建spring-rabbitmq-consumer.xm配置文件

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/context
       https://www.springframework.org/schema/context/spring-context.xsd
       http://www.springframework.org/schema/rabbit
       http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
    <!--加载配置文件-->
    <context:property-placeholder location="classpath:rabbitmq.properties"/>

<!--这里是扫描监听类的包名   写上自己的包名路径-->
    <context:component-scan base-package=""/>

    <!-- 定义rabbitmq connectionFactory -->
    <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
                               port="${rabbitmq.port}"
                               username="${rabbitmq.username}"
                               password="${rabbitmq.password}"
                               virtual-host="${rabbitmq.virtual-host}"/>

    <!--定义rabbitTemplate对象操作可以在代码中方便发送消息-->
    <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>


    <!--定义监听器容器-->
    <rabbit:listener-container connection-factory="connectionFactory" auto-declare="true">
        <!--定义监听器,注意:这里是监听死信队列!!-->
        <rabbit:listener ref="orderListener" queue-names="order_queue_dlx"/>
    </rabbit:listener-container>

</beans>

创建监听类OrderListener

@Component //加上扫描包注解
public class OrderListener implements ChannelAwareMessageListener {
    @Override
    public void onMessage(Message message, Channel channel) {
            //1.接收转换消息
            System.out.println(new String(message.getBody()));

            //2.处理业务逻辑    这边做模拟就打印控制台 不操作数据库
            System.out.println("service层处理具体业务逻辑.....");
            System.out.println("根据订单id去查询状态.....");
            System.out.println("判断用户是否支付成功");
            System.out.println("取消订单回滚库存");
    }
}

最后创建consumer测试类(ConsumerTest)

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq-consumer.xml")
public class ConsumerTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    //启动测试类开启监听
    @Test
    public void test() {
    //开启循环启动监听类
        while (true) {
        }
    }
}

我们首先启动Consumer测试类的test方法开启监听
在这里插入图片描述
再打开生产者ProducerTest的tstDelay方法去模拟发送订单消息
在这里插入图片描述
可以看到再控制台打印输出10秒后,我们回到consumerTest类
在这里插入图片描述
处于监听状态的consumer监听到了过期的订单消息
即实现了延迟队列

有小伙伴再跑consumerTest的监听方法时报错
在这里插入图片描述
这是因为我们配置文件里的队列(queue)和交换机(exchange)还没初始化 以至于consumer监听不到而导致报错
这里我们可以先写一个方法,先发送一条消息让rabbitmq完成
所有交换机和队列的初始化
在spring-rabbitmq-producer中加入,使用topic类型的交换机给consumer发送一条消息

 <raabbit:queue id="test_queue" name="test_queue" auto-declare="true"/>
    <rabbit:topic-exchange id="test_exchange" name="test_exchange" auto-declare="true">
        <rabbit:bindings>
            <rabbit:binding pattern="test.#" queue="test_queue"></rabbit:binding>
        </rabbit:bindings>
    </rabbit:topic-exchange>

在ProducerTest测试类中添加方法

@Test
    public void testSend(){
        rabbitTemplate.convertAndSend("test_exchange","test.ghc","Hello.test");
    }

执行此方法,可以看到
在这里插入图片描述
在这里插入图片描述
此时我们再开启消费者测试类的监听
在这里插入图片描述
可以看到是一个持续监听状态了,并且不会报错
此时我们再开启生产者测试类发送订单消息-------ok成功了
以上即是用java完成的RabbitMQ(TTL+死信队列)实现延迟队列功能!


版权声明:本文为qq_52955784原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
THE END
< <上一篇
下一篇>>