【RabbitMQ实战】09客户端连接集群生产和消费消息
- 创业
- 2025-08-17 11:30:03

一、部署一个三节点集群
下面的链接是最快最简单的一种集群部署方法 3分钟部署一个RabbitMQ集群 上的的例子中,没有映射端口,所以没法从宿主机外部连接容器,下面的yml文件中,暴露了端口。 每个容器应用都映射了宿主机的端口,分别是5602,5612,5622 docker compse文件如下
version: '3' services: stats: image: bitnami/rabbitmq environment: - RABBITMQ_NODE_TYPE=stats - RABBITMQ_NODE_NAME=rabbit@stats - RABBITMQ_ERL_COOKIE=s3cr3tc00ki3 ports: - '15672:15672' - '5602:5672' volumes: - 'rabbitmqstats_data:/bitnami/rabbitmq/mnesia' queue-disc1: image: bitnami/rabbitmq environment: - RABBITMQ_NODE_TYPE=queue-disc - RABBITMQ_NODE_NAME=rabbit@queue-disc1 - RABBITMQ_CLUSTER_NODE_NAME=rabbit@stats - RABBITMQ_ERL_COOKIE=s3cr3tc00ki3 ports: - '5612:5672' volumes: - 'rabbitmqdisc1_data:/bitnami/rabbitmq/mnesia' queue-ram1: image: bitnami/rabbitmq environment: - RABBITMQ_NODE_TYPE=queue-ram - RABBITMQ_NODE_NAME=rabbit@queue-ram1 - RABBITMQ_CLUSTER_NODE_NAME=rabbit@stats - RABBITMQ_ERL_COOKIE=s3cr3tc00ki3 ports: - '5622:5672' volumes: - 'rabbitmqram1_data:/bitnami/rabbitmq/mnesia' volumes: rabbitmqstats_data: driver: local rabbitmqdisc1_data: driver: local rabbitmqram1_data: driver: local通过docker-compose up命令,就可以启动三个集群的容器了
[root@localhost mycompose]# docker-compose up 二、配置文件原来的单节点只配置host和port,现在集群节点,就要配置addresses了,如下所示:
server: port: 8080 spring: application: name: rabbitmq-demo #配置rabbitMq 服务器 rabbitmq: #单节点直接可以写host和port # host: 192.168.56.201 # port: 5672 #集群连接写ip和端口 addresses: 192.168.56.202:5602,192.168.56.202:5612,192.168.56.202:5622 username: user password: bitnami #虚拟host virtual-host: virtual01 template: mandatory: true #当mandatory设置为true时,交换器无法根据自身的类型和路由键找到一个符合条件的队列,那么RabbitMQ会调用Basic.Return命令将消息返回给生产者。当为false时,则直接丢弃消息 publisher-confirm-type: correlated #生产者回调确认机制,由回调来确定消息是否发布成功 publisher-returns: true #是否开启生产者returns listener: simple: acknowledge-mode: manual #手动回复方式,一般建议手动回复,即需要我们自己调用对应的ACK方法 prefetch: 10 #每个消费者可拉取的,还未ack的消息数量 concurrency: 3 #消费端(每个Listener)的最小线程数 max-concurrency: 10 #消费端(每个Listener)的最大线程数 三、代码 生产者和单节点的发送和消费代码一致,没有变化
@Slf4j @RestController @RequestMapping("/rabbit") public class RabbitSendController implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback { private static final String EXCHANGE_NAME = "my_exchange"; private static final String ROUTING_KEY = "my_routing"; @Autowired private RabbitTemplate rabbitTemplate; /** * 正常发送并被broker接收 * @return */ @RequestMapping("send") public String send() { for (int i = 0; i < 10; i++) { OrderInfo orderInfo = new OrderInfo(); orderInfo.setAddress("成都市高新区"); orderInfo.setOrderId(String.valueOf(i)); orderInfo.setProductName("华为P60:" + i); //设置回调关联的一个id String messageId = UUID.randomUUID().toString(); log.info("开始发送消息,当前消息关联id为:{}", messageId); CorrelationData correlationData = new CorrelationData(messageId); MessageProperties messageProperties = new MessageProperties(); messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT); Message message = MessageBuilder.withBody(new Gson().toJson(orderInfo).getBytes(StandardCharsets.UTF_8)) .andProperties(messageProperties).build(); //设置ack回调 rabbitTemplate.setConfirmCallback(this); //退回消息的回调 rabbitTemplate.setReturnCallback(this); rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, message, correlationData); } return "ok"; } /** * 设置一个非法的路由键,模拟消息被broker退回的情况,前提是 * spring.rabbitmq.template.mandatory=true 当mandatory设置为true时,交换器无法根据自身的类型和路由键找到一个符合条件的队列,那么RabbitMQ会调用Basic.Return命令将消息返回给生产者。当为false时,则直接丢弃消息 * <p> * spring.rabbitmq.publisher-returns=true 生产者回调确认机制,由回调来确定消息是否发布成功 * * @return */ @RequestMapping("send-return") public String sendAndReturn() { OrderInfo orderInfo = new OrderInfo(); orderInfo.setAddress("成都市高新区"); orderInfo.setOrderId("111"); orderInfo.setProductName("小米13"); //设置回调关联的一个id String messageId = UUID.randomUUID().toString(); log.info("开始发送消息,当前消息关联id为:{}", messageId); CorrelationData correlationData = new CorrelationData(messageId); MessageProperties messageProperties = new MessageProperties(); messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT); Message message = MessageBuilder.withBody(new Gson().toJson(orderInfo).getBytes(StandardCharsets.UTF_8)) .andProperties(messageProperties).build(); //设置ack回调 rabbitTemplate.setConfirmCallback(this); //退回消息的回调 rabbitTemplate.setReturnCallback(this); //下面这个RoutingKey是没有绑定的,所以发不出去 rabbitTemplate.convertAndSend(EXCHANGE_NAME, "error.routing", message, correlationData); return "ok"; } @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (correlationData == null) { return; } String messageId = correlationData.getId(); if (ack) { log.info("【confirm回调方法】,消息发布成功,messageId={}", messageId); } else { log.info("【confirm回调方法】,消息发布失败,messageId={}", messageId); } } @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.info("【returnedMessage回调方法】,消息被退回,message={},replyCode:{},replyText:{},exchange:{},routingKey:{}", new String(message.getBody()), replyCode, replyText, exchange, routingKey); } } 消费者 @Slf4j @Component public class RabbitOrderConsumer { private static final String EXCHANGE_NAME = "my_exchange"; private static final String QUEUE_NAME = "my_queue"; private static final String ROUTING_KEY = "my_routing"; @RabbitListener(bindings = {@QueueBinding(value = @Queue(value = QUEUE_NAME, durable = "true"), exchange = @Exchange(value = EXCHANGE_NAME, type = "topic", durable = "true"), key = ROUTING_KEY)}) public void handleMessage(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException { //上面这个tag是这么写的么,为什么每次传过来都是1?导致channel被重新创建 log.info("接收到消息:{},deliveryTag:{}", new String(message.getBody(), StandardCharsets.UTF_8), tag); channel.basicAck(tag, false); } }访问地址:http://localhost:8080/rabbit/send,然后就可以发送消息了,输出日志如下:
开始发送消息,当前消息关联id为:18049efe-a624-4288-a8f0-9c28fd776773 开始发送消息,当前消息关联id为:83d93f90-62f4-41cf-af02-03d496812561 开始发送消息,当前消息关联id为:f83257b2-95b6-408e-a5b9-74d0ec9f30b0 开始发送消息,当前消息关联id为:16a7e471-23ba-408b-9095-6add9ad1e270 开始发送消息,当前消息关联id为:152b0fb0-3a22-452d-93fe-662252c2fd8c 开始发送消息,当前消息关联id为:ade4f703-6075-485f-8e34-ec9b95bf59de 开始发送消息,当前消息关联id为:e4511f82-476a-4f4c-b704-4399baadeaf4 接收到消息:{"orderId":"1","productName":"华为P60:1","address":"成都市高新区"},deliveryTag:1 接收到消息:{"orderId":"0","productName":"华为P60:0","address":"成都市高新区"},deliveryTag:1 开始发送消息,当前消息关联id为:d8cd2dd6-bb9e-4d46-bc42-0d96df70748f 开始发送消息,当前消息关联id为:76950a93-5887-43c1-adef-edc1e29e2fab 开始发送消息,当前消息关联id为:f08a7a68-60da-4c5d-b1b8-c9e4d9453969 【confirm回调方法】,消息发布成功,messageId=18049efe-a624-4288-a8f0-9c28fd776773 【confirm回调方法】,消息发布成功,messageId=83d93f90-62f4-41cf-af02-03d496812561 接收到消息:{"orderId":"3","productName":"华为P60:3","address":"成都市高新区"},deliveryTag:2 接收到消息:{"orderId":"2","productName":"华为P60:2","address":"成都市高新区"},deliveryTag:1 接收到消息:{"orderId":"6","productName":"华为P60:6","address":"成都市高新区"},deliveryTag:3 接收到消息:{"orderId":"5","productName":"华为P60:5","address":"成都市高新区"},deliveryTag:2 接收到消息:{"orderId":"9","productName":"华为P60:9","address":"成都市高新区"},deliveryTag:4 接收到消息:{"orderId":"4","productName":"华为P60:4","address":"成都市高新区"},deliveryTag:2 接收到消息:{"orderId":"7","productName":"华为P60:7","address":"成都市高新区"},deliveryTag:3 接收到消息:{"orderId":"8","productName":"华为P60:8","address":"成都市高新区"},deliveryTag:3 【confirm回调方法】,消息发布成功,messageId=f83257b2-95b6-408e-a5b9-74d0ec9f30b0 【confirm回调方法】,消息发布成功,messageId=16a7e471-23ba-408b-9095-6add9ad1e270 【confirm回调方法】,消息发布成功,messageId=152b0fb0-3a22-452d-93fe-662252c2fd8c 【confirm回调方法】,消息发布成功,messageId=ade4f703-6075-485f-8e34-ec9b95bf59de 【confirm回调方法】,消息发布成功,messageId=e4511f82-476a-4f4c-b704-4399baadeaf4 【confirm回调方法】,消息发布成功,messageId=d8cd2dd6-bb9e-4d46-bc42-0d96df70748f 【confirm回调方法】,消息发布成功,messageId=76950a93-5887-43c1-adef-edc1e29e2fab 【confirm回调方法】,消息发布成功,messageId=f08a7a68-60da-4c5d-b1b8-c9e4d9453969上述代码仓库: gitee /syk1234/mqdmo
四、后台管理登录管理后台页面:http://192.168.56.202:15672/
共有三个节点,两个磁盘节点,一个内存节点。如果你还不清楚什么是磁盘节点,什么是内存节点,可以参考【RabbitMQ 实战】08 集群原理剖析
查看连接情况,发现是连接的是节点rabbit@stats节点 查看队列的情况,队列是在rabbit@stats节点上
【RabbitMQ实战】09客户端连接集群生产和消费消息由讯客互联创业栏目发布,感谢您对讯客互联的认可,以及对我们原创作品以及文章的青睐,非常欢迎各位朋友分享到个人网站或者朋友圈,但转载请说明文章出处“【RabbitMQ实战】09客户端连接集群生产和消费消息”
上一篇
20231009-学习笔记