JAVA操作RabbitMQ

一、 linux环境安装

RabbitMQ介绍:使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。AMQP协议更多用在企业系统内对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其次。

rabbitMQ官网: https://www.rabbitmq.com/download.html
在这里插入图片描述
安装步骤如下:
1. 安装erlang环境, rpm -ivh erlang-22.3.4.12-1.el7.x86_64.rpm
2. 安装rabbitMQ(需要有外网),yum install -y rabbitmq-server-3.7.18-1.el7.noarch.rpm
3. 拷贝配置文件,cp /usr/share/doc/rabbitmq-server-3.7.18/rabbitmq.config.example /etc/rabbitmq/rabbitmq.config
4.、修改rabbitmq.config文件,开启允许来宾用户登录访问页面(用户密码guest/guest)
5启动rabbitMQ插件管理,rabbitmq-plugins enable rabbitmq_management
6启动RabbitMQ服务(关闭、重启、查看状态命令)
systemctl start rabbitmq-server
systemctl stop rabbitmq-server
systemctl restart rabbitmq-server
systemctl status rabbitmq-server
7关闭防火墙
[root@xupeng ~]# systemctl disable firewalld
[root@xupeng ~]# systemctl stop firewalld
8访问页面,端口是15672
在这里插入图片描述
9、页面介绍
在这里插入图片描述
10、错误排查,因为我之前安装过一遍,所以没有问题,初次安装可能会出现如下两个错误
10.1. 提示socat依赖被需要,则执行yum -y install socat
10.2. rabbitMQ启动不起来,可以尝试再次调用重启命令,还是起不来 journalctl -xe 命令查看错误日志。我的日志里有这么一条错误ERROR: epmd error for host 192: badarg (unknown POSIX error) ,大概意思就是主机名称不能有数字,调整命令如下

# 查看下自己主机名称
[root@xupeng ~]# hostname
xupeng
# 修改下主机名称
[root@xupeng ~]# set-hostname xupeng

.
11、下面是实际操作,

[root@xupeng test]# ll
总用量 32200
drwxr-xr-x. 2 root root      222 94 12:09 bin
-rw-r--r--. 1 root root 19991728 98 09:39 erlang-22.3.4.12-1.el7.x86_64.rpm
-rw-r--r--. 1 root root 10494431 98 09:39 rabbitmq-server-3.7.18-1.el7.noarch.rpm
drwxrwxr-x. 7 root root     4096 104 2021 redis-6.2.6
-rw-r--r--. 1 root root  2476542 412 22:58 redis-6.2.6.tar.gz

[root@xupeng test]# rpm -ivh erlang-22.3.4.12-1.el7.x86_64.rpm 
警告:erlang-22.3.4.12-1.el7.x86_64.rpm: 头V4 RSA/SHA1 Signature, 密钥 ID 6026dfca: NOKEY
准备中...                          ################################# [100%]
正在升级/安装...
   1:erlang-22.3.4.12-1.el7           ################################# [100%]
[root@xupeng test]# yum list | grep erlang
erlang.x86_64                             22.3.4.12-1.el7              installed
[root@xupeng test]# yum -y remove erlang-*
已加载插件:fastestmirror, langpacks
参数 erlang-22.3.4.12-1.el7.x86_64.rpm 没有匹配
不删除任何软件包

[root@xupeng test]# yum install -y rabbitmq-server-3.7.18-1.el7.noarch.rpm
已加载插件:fastestmirror, langpacks
正在检查 rabbitmq-server-3.7.18-1.el7.noarch.rpm: rabbitmq-server-3.7.18-1.el7.noarch
rabbitmq-server-3.7.18-1.el7.noarch.rpm 将被安装
正在解决依赖关系
There are unfinished transactions remaining. You might consider running yum-complete-transaction, or "yum-complete-transaction --cleanup-only" and "yum history redo last", first to finish them. If those don't work you'll have to try removing/installing packages by hand (maybe package-cleanup can help).
--> 正在检查事务
---> 软件包 rabbitmq-server.noarch.0.3.7.18-1.el7 将被 安装
--> 解决依赖关系完成

依赖关系解决

===============================================================================================================================================================================================================================
 Package                                            架构                                      版本                                               源                                                                       大小
===============================================================================================================================================================================================================================
正在安装:
 rabbitmq-server                                    noarch                                    3.7.18-1.el7                                       /rabbitmq-server-3.7.18-1.el7.noarch                                     11 M

事务概要
===============================================================================================================================================================================================================================
安装  1 软件包

总计:11 M
安装大小:11 M
Downloading packages:
Running transaction check
Running transaction test
Transaction test succeeded
Running transaction
警告:RPM 数据库已被非 yum 程序修改。
  正在安装    : rabbitmq-server-3.7.18-1.el7.noarch                                                                                                                                                                        1/1 
  验证中      : rabbitmq-server-3.7.18-1.el7.noarch                                                                                                                                                                        1/1 

已安装:
  rabbitmq-server.noarch 0:3.7.18-1.el7                                                                                                                                                                                        
完毕!


[root@xupeng test]# ll /usr/share/doc/rabbitmq-server-3.7.18/rabbitmq*
-rw-r--r--. 1 root root 33235 916 2019 /usr/share/doc/rabbitmq-server-3.7.18/rabbitmq.config.example
[root@xupeng test]# cp /usr/share/doc/rabbitmq-server-3.7.18/rabbitmq.config.example /etc/rabbitmq/rabbitmq.config

修改配置文件
在这里插入图片描述
继续回到linux

[root@xupeng rabbitmq]# rabbitmq-plugins enable rabbitmq_management
Enabling plugins on node rabbit@xupeng:
rabbitmq_management
The following plugins have been configured:
  rabbitmq_management
  rabbitmq_management_agent
  rabbitmq_web_dispatch
Applying plugin configuration to rabbit@xupeng...
The following plugins have been enabled:
  rabbitmq_management
  rabbitmq_management_agent
  rabbitmq_web_dispatch
set 3 plugins.
Offline change; changes will take effect at broker restart.

[root@xupeng ~]# systemctl disable firewalld
[root@xupeng ~]# systemctl stop firewalld
[root@xupeng ~]# systemctl start rabbitmq-server

二、模型演示

最左侧圆点是生产者,最右侧圆点是消费者,红色的是队列,深蓝的圆点是路由
大致使用就是生产者的消息可以通过路由或直接向队列里添加,消费者从队列里获取
在这里插入图片描述

<dependencies>
    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.7.2</version>
    </dependency>
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.11</version>
    </dependency>
    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.7.2</version>
    </dependency>
</dependencies>

2.1.HelloWord模型(直连)

消费者直接放消息到队列,生产者取走。只允许一个消费者对一生产者。
首先在管理页面创建虚拟主机,配置一个用户访问虚拟主机
图一↓
在这里插入图片描述
图二↓在这里插入图片描述
图三↓
在这里插入图片描述
代码测试↓
1.启动Customer类main方法
2.启动Provider类main方法
3.管理页面可以看到当前队列信息
在这里插入图片描述

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
public class Provider {
    public static void main(String[] args) throws IOException {
        Connection connection = RabbitMQUtils.getConnection();
        // 从连接中获取通道
        Channel channel = connection.createChannel();
        // 参数一:让通道与消息队列绑定,这样才知道发送给哪个消息队列,队列名称(没有自己创建)
        // 参数二:队列是否要持久化,true开启。
        // 参数三:是否独占队列,true独占
        // 参数四:是否消费完成后自动删除队列,true自动删除,
        // 参数五:额外参数
        channel.queueDeclare("hello", false, false, false, null);
        // 发布消息
        // 参数1:交换机名称  参数2:队列名称  参数3:额外参数  参数4:消息
        channel.basicPublish("","hello",null,"我要向队列中传递消息了".getBytes());
        RabbitMQUtils.closeChannlAndConnection(channel, connection);
    }
}

import com.rabbitmq.client.*;
import java.io.IOException;
public class Customer {
    public static void main(String[] args) throws Exception {
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare("hello", false, false, false, null);
        // 消费消息
        // 参数1:消费的消息队列名称  参数2:开启消息的自动确认机制 参数3:回调函数
        channel.basicConsume("hello",true, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("body是:" + new String(body));
            }
        });
        // 因为basicConsume函数属于异步消费,正常需要开启线程去监听消息队列,如果关闭会导致主线程退出
        // 消费者就因该一直阻塞,等待消费
        // channel.close();
        // connection.close();
    }
}


/**
* 获取连接工具类,防止导错类,类路径也给出来了
*/
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class RabbitMQUtils {
    // 创建链接MQ的工厂
    private static ConnectionFactory connectionFactory;
    static {
        connectionFactory = new ConnectionFactory();
        // 设置地址
        connectionFactory.setHost("192.168.10.132");
        // 端口不是web控制页面的那个端口
        connectionFactory.setPort(5672);
        // 虚拟主机
        connectionFactory.setVirtualHost("ems");
        // 用户和密码
        connectionFactory.setUsername("ems");
        connectionFactory.setPassword("ems");
    }
    public static Connection getConnection(){
        try {
            // 获取链接对象
            Connection connection = connectionFactory.newConnection();
            return connection;
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }
    public static void closeChannlAndConnection(Channel channel, Connection connection){
        try {
            channel.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
        try {
            connection.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

细节补充
queueDeclare方法的第二个参数是否持久化队列,设置为true的话,在重启MQ时会保存在磁盘,下次启动时恢复队列。开启持久化的话,标志是个D,但是消息队列里的消息不会持久化,依然丢失。
持久化消息通过channel.basicPublish方法配置MessageProperties.PERSISTENT_TEXT_PLAIN
//channel.basicPublish(“”,“hello”, MessageProperties.PERSISTENT_TEXT_PLAIN,“你好呀11”.getBytes());
在这里插入图片描述

2.2.WorkQueue(任务模型)

直连模型如果消费速度比生产速度慢,可能会导致任务队列的任务越来越多,并且得不到消费。任务模型可以让多个消费者绑定到一个队列。共同消费队列中的消息。队列中的消息一旦消费,就会消失,因此任务是不会被重复执行的。

2.2.1 平均分配 - 假设有两个消费者,十个任务,每个线程会各分配5个,并不会因为某一线程处理速度快而多消费
启动下图的Customer1、Customer2,然后启动Provider向队列中添加消息查看效果。
因为开启了自动确认机制,如果消息消费过程中出现异常,该消息并不会放回至队列中,导致消息消失。自动确认机制是预先从队列获取好任务并删除队列中的任务,实际上此时还并为消费。

public class Provider {
    public static void main(String[] args) throws IOException {
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare("work", true, false, false, null);
        for (int i = 0; i < 10; i++) {
            channel.basicPublish("","work", null,("你好呀" + i).getBytes());
        }
        RabbitMQUtils.closeChannlAndConnection(channel, connection);
    }
}
public class Customer1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare("work", true, false, false, null);
        channel.basicConsume("work",true, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者1:" + new String(body));
            }
        });
    }
}
public class Customer2 {// 与消费者1代码一至}

2.2.2 能者多劳 - 处理速度快的线程会多消费任务
修改消费者代码,关闭消息自动确认机制,由于消费者2方法必然会报错导致消息消费失败,会将消息重新放回队列。如果不调用channel.basicAck或channel.basicNack,方法线程无法判断消息是否处理完成导致线程一直阻塞。从而无法重新去消息队列获取数据。

public class Customer1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare("work", true, false, false, null);
        // 表示一次只消费一个消息
        channel.basicQos(1);
        // 关闭任务自动确认机制
        channel.basicConsume("work",false, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者1:" + new String(body));
                try {
                    Thread.sleep(10);
                    // 此处需要手动确认任务完成,否则不会接取下一个任务,队列任务中会出现未确认的数量
                    channel.basicAck(envelope.getDeliveryTag(), false);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
    }
}

public class Customer2 {
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare("work", true, false, false, null);
        // 表示一次只消费一个消息
        channel.basicQos(1);
        // 关闭自动确认机制
        channel.basicConsume("work",false, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    System.out.println("消费者2:" + new String(body));
                    Thread.sleep(40);
                    int x = 10 / 0;
                    // 此处需要手动确认任务完成,否则不会接取下一个任务,队列任务中会出现未确认的数量
                    channel.basicAck(envelope.getDeliveryTag(), false);
                } catch (Exception e) {
                    e.printStackTrace();
                    channel.basicNack(envelope.getDeliveryTag(), false, true);
                }
            }
        });
    }
}

2.3.fanout模型(广播)

Publish/Subscribe:广播模型,可以有多个消费者,并且每个消费者有自己的队列,生产者将消息发送给交换机,交换机来决定放入哪个队列当中
下图代码:消费者启动时创建了两个临时队列,生产者将信息传入到logs交换机中,交换机将消息放入到了两个队列中,所以消费者1与消费者2均能消费同样的信息

public class Provider {
    public static void main(String[] args) throws IOException {
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();
        // 指定交换机,交换机名称,交换机类型, fanout = 广播类型
        channel.exchangeDeclare("logs","fanout");
        // 交换机指向logs,第二个参数是路由key参数,此时还没有意义,可以为空
        channel.basicPublish("logs","work", null,"你好呀".getBytes());
        RabbitMQUtils.closeChannlAndConnection(channel, connection);
    }
}
public class Customer1 {
    public static void main(String[] args) throws Exception {
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();
        // 绑定交换机
        channel.exchangeDeclare("logs","fanout");
        // 获取临时队列,并绑定临时队列
        String queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName, "logs", "");
        // 消费消息
        channel.basicConsume(queueName,true, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者1:" + new String(body));
            }
        });
    }
}
public class Customer2 {
  //....一样的代码....
}

在这里插入图片描述
在这里插入图片描述

2.4.Direct模型(路由直连)

广播模式增强版,队列与交换机的绑定时,要指定一个RoutingKey (路由key),Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与队列的Routing key完全一致,才会接收到消息。

public class Provider {
    public static void main(String[] args) throws IOException {
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();
        // 指定交换机,交换机类型
        channel.exchangeDeclare("logs2","direct");
        // 交换机指向logs,指定路由参数
        channel.basicPublish("logs2","info", null,"我是info信息".getBytes());
        channel.basicPublish("logs2","error", null,"我是error信息".getBytes());
        RabbitMQUtils.closeChannlAndConnection(channel, connection);
    }
}
public class Customer1 {
    public static void main(String[] args) throws Exception {
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();
        // 绑定交换机,指定模式为路由直连
        channel.exchangeDeclare("logs2","direct");
        // 获取临时队列,并绑定临时队列, routingKey
        String queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName, "logs2", "error");
        // 消费消息
        channel.basicConsume(queueName,true, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者1:" + new String(body));
            }
        });
    }
}	
public class Customer2 {
    public static void main(String[] args) throws Exception {
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();
        // 绑定交换机,指定模式为路由直连
        channel.exchangeDeclare("logs2","direct");
        // 获取临时队列,并绑定临时队列, routingKey
        String queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName, "logs2", "info");
        // 消费消息
        channel.basicConsume(queueName,true, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者1:" + new String(body));
            }
        });
    }
}

2.5.Topic模型(订阅/动态路由模型)

路由模式的路由key是一个固定值,而Topic模型的路由值可以用通配符
通配符*:一个或多个词语,比如log.*,可以匹配log.a.b,log.aaaa.bbb
通配符#:只能匹配一个词,比如log.#能陪陪log.a,log.b
PS: 也可以俩通配符一起用,比如: *.log.#

public class Provider {
    public static void main(String[] args) throws IOException {
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();
        // 指定交换机,交换机名称,交换机类型, topic= 动态路由模式
        channel.exchangeDeclare("topics","topic");
        // 发送消息
        channel.basicPublish("topics","user.save", null,("我是AAAAA").getBytes());
        RabbitMQUtils.closeChannlAndConnection(channel, connection);
    }
}

public class Customer1 {
    public static void main(String[] args) throws Exception {
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();
        // 绑定交换机
        channel.exchangeDeclare("topics","topic");
        // 获取临时队列,并绑定临时队列, routingKey
        String queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName, "topics", "user.#");
        channel.queueBind(queueName, "topics", "*.user.#");
        // 消费消息
        channel.basicConsume(queueName,true, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者1:" + new String(body));
            }
        });
    }
}

三、springboot整合

依赖与配置文件

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
server.port=80
spring.application.name=rabbitMQboot
spring.rabbitmq.host=192.168.14.130
spring.rabbitmq.port=5672
spring.rabbitmq.username=ems
spring.rabbitmq.password=123456
spring.rabbitmq.virtual-host=/ems

3.1 直连模型

/**
*	用于向消息队列添加数据
*/
@Controller
@RequestMapping("/test")
public class TestController {
    // 注入RabbitTemplate
    @Autowired
    RabbitTemplate rabbitTemplate;
    @RequestMapping("/hello")
    public void helloWord(){
        rabbitTemplate.convertAndSend("hello","我是消息");
    }
}
// 表示监听的队列,declare 表示是否持久化队列,是否自动删除
// 默认持久化 非独占 自动删除
@Component
@RabbitListener(queuesToDeclare = @Queue(name = "hello",declare = "false",autoDelete = "true"))
public class CustomerController {
    // 表示这个是处理消息的回调函数
    @RabbitHandler
    public void receivel(String msg){
        System.out.println("收到消息:" + msg);
    }
}

3.2 任务模型

@Controller
@RequestMapping("/test")
public class TestController {
    // 注入RabbitTemplate
    @Autowired
    RabbitTemplate rabbitTemplate;
    @RequestMapping("/work")
    public void work(){
        for (int i = 0; i < 10; i++) {
            rabbitTemplate.convertAndSend("work","work模型" + i);
        }
    }
}
@Component
public class WorkController {
    @RabbitListener(queuesToDeclare = @Queue("work"))
    public void receivel1(String msg){
        System.out.println("消费者1" + msg);
    }
    @RabbitListener(queuesToDeclare = @Queue("work"))
    public void receivel2(String msg){
        System.out.println("消费者2" + msg);
    }
}

3.3 广播模型

@Controller
@RequestMapping("/test")
public class TestController {
    @Autowired
    RabbitTemplate rabbitTemplate;
    @RequestMapping("/fanout")
    public void fanout(){
        // 路由key不写
        rabbitTemplate.convertAndSend("logs","","广播数据");
    }
}

@Component
public class FanoutController {
    @RabbitListener(bindings =@QueueBinding(
            // 绑定一个临时队列,绑定交换机
            value = @Queue,exchange = @Exchange(value = "logs",type = "fanout")
        ))
    public void receivel1(String msg){
        System.out.println("消费者1" + msg);
    }
    @RabbitListener(bindings =@QueueBinding(
            // 绑定一个临时队列,绑定交换机
            value = @Queue,exchange = @Exchange(value = "logs",type = "fanout")
    ))
    public void receivel2(String msg){
        System.out.println("消费者2" + msg);
    }
}

3.4 路由模型

@Controller
@RequestMapping("/test")
public class TestController {
    // 注入RabbitTemplate
    @Autowired
    RabbitTemplate rabbitTemplate;
    @RequestMapping("/route")
    public void route(){
        rabbitTemplate.convertAndSend("directs","info","发送info级别的路由信息");
    }
}

@Component
public class RouteController {
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue,//临时队列
            exchange = @Exchange(value = "directs",type = "direct"),
            key = {"info","error"} //路由key
    ))
    public void receivel1(String msg){
        System.out.println("消费者1" + msg);
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue,//临时队列
            exchange = @Exchange(value = "directs",type = "direct"),
            key = {"info"} //路由key
    ))
    public void receive2(String msg){
        System.out.println("消费者2" + msg);
    }
}

3.4 、订阅(动态路由)

@Controller
@RequestMapping("/test")
public class TestController {
    // 注入RabbitTemplate
    @Autowired
    RabbitTemplate rabbitTemplate;
    @RequestMapping("/topic")
    public void topic(){
        rabbitTemplate.convertAndSend("topics","rizhi.info","动态路由");
    }
}
@Component
public class TopicController {
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue,
            exchange = @Exchange(type = "topic",name = "topics"),
            key = {"user.#","rizhi.info"}
    ))
    public void recevicel(String msg){
        System.out.println("消费者1" + msg);
    }
}

版权声明:本文为weixin_44080194原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
THE END
< <上一篇
下一篇>>