简识MQ之Kafka、ActiveMQ、RabbitMQ、RocketMQ传递机制
- 电脑硬件
- 2025-08-24 13:30:02

四种主流消息队列(Kafka、ActiveMQ、RabbitMQ、RocketMQ)的生产者与消费者传递信息的机制说明,以及实际使用中的注意事项和示例:
1. Apache Kafka 传递机制 模型:基于 发布-订阅模型,生产者向 主题(Topic) 发送消息,消费者订阅主题并消费消息。核心流程: 生产者将消息发送到 Kafka 集群的 Broker,根据 分区策略(如轮询、哈希)将消息写入对应的分区(Partition)。消费者通过消费者组(Consumer Group)订阅主题,每个分区的数据会被分配给组内的消费者(通过 Rebalance 机制)。消费者从分区中拉取消息(poll 方式)并处理。 示例代码(Kafka 生产者) Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("key.serializer", "org.apache.kafka mon.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka mon.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer<>(props); producer.send(new ProducerRecord<>("my-topic", "key", "message")); producer.close(); 示例代码(Kafka 消费者) Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "my-group"); props.put("key.deserializer", "org.apache.kafka mon.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka mon.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("my-topic")); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } } 注意事项 分区与顺序性: Kafka 不保证跨分区的消息顺序,但单个分区内的消息按顺序存储。示例:发送订单创建事件时,需将同一用户的消息发送到同一分区(通过 key)。 消费者组与 Rebalance: 消费者组内成员变化时(如新增消费者),会触发分区重新分配(Rebalance),可能导致短暂消息不可读。建议:避免频繁增减消费者实例。 消息持久化: 生产者可通过 acks=all 确保消息写入所有副本后返回成功,但会增加延迟。适用场景:对消息可靠性要求极高的场景(如金融交易)。
2. Apache ActiveMQ 传递机制 模型:支持 点对点(Queue) 和 发布-订阅(Topic) 模型。核心流程: 生产者发送消息到队列或主题。消息通过 异步/同步 方式传递给消费者(默认异步)。可启用 持久化,消息存储到磁盘以防 Broker宕机。 示例代码(ActiveMQ 生产者) ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); Connection connection = connectionFactory.createConnection(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue("my-queue"); MessageProducer producer = session.createProducer(queue); TextMessage message = session.createTextMessage("Hello ActiveMQ!"); producer.send(message); connection.close(); 示例代码(ActiveMQ 消费者) ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); Connection connection = connectionFactory.createConnection(); Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); Queue queue = session.createQueue("my-queue"); MessageConsumer consumer = session.createConsumer(queue); TextMessage message = (TextMessage) consumer.receive(); System.out.println("Received: " + message.getText()); consumer.acknowledge(); // 手动确认消息 connection.close(); 注意事项 消息持久化: 需设置 DeliveryMode.PERSISTENT,否则消息可能丢失。示例:关键业务消息(如订单支付通知)必须持久化。 事务支持: 生产者和消费者可通过事务确保消息的原子性(发送/接收一致性)。风险:长事务可能导致性能下降。 死信队列(DLQ): 配置 deadLetterExchange 和 deadLetterRoutingKey 处理无法消费的消息。示例:超过重试次数的消息自动进入 DLQ。
3. RabbitMQ 传递机制 模型:灵活的消息路由模型,基于 交换器(Exchange) 和 绑定(Binding)。核心流程: 生产者将消息发送到交换器,并附带路由键(Routing Key)。交换器根据类型(如 Direct、Topic、Headers)将消息路由到绑定的队列。消费者从队列中拉取消息。 示例代码(RabbitMQ 生产者Producers) ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection()) { Channel channel = connection.createChannel(); String exchangeName = "direct-exchange"; channel.exchangeDeclare(exchangeName, "direct"); String routingKey = "user.login"; AMQP.BasicProperties props = new AMQP.BasicProperties.Builder() .contentType("text/plain") .deliveryMode(2) // 持久化 .build(); channel.basicPublish(exchangeName, routingKey, props, "Login Event".getBytes()); } 示例代码(RabbitMQ 消费者Consumers) ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection()) { Channel channel = connection.createChannel(); String queueName = "user_queue"; channel.queueDeclare(queueName, true, false, false, null); String exchangeName = "direct-exchange"; channel.queueBind(queueName, exchangeName, "user.login"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println("Received: " + message); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); }; channel.basicConsume(queueName, false, deliverCallback, consumerTag -> {}); } 注意事项 消息确认机制: 消费者需发送 ACK 确认消息处理,避免重复消费。示例:使用 channel.basicAck() 或 channel.basicNack()。 死信队列配置: 在队列声明时配置 x-dead-letter-exchange 和 x-dead-letter-routing-key。示例:处理失败的消息进入专用队列。 内存限制: RabbitMQ 默认限制队列大小为内存中的一定比例,需根据业务调整 vm_memory_high_watermark。
4. RocketMQ 传递机制 模型:基于 主题(Topic) 和 队列(Queue) 的分布式模型。核心流程: 生产者Producers发送消息到主题,主题将消息路由到多个队列(负载均衡)。消费者Consumers通过消费者组(Consumer Group)订阅主题,从队列中拉取消息。 顺序消息:同一队列内的消息按顺序消费。广播消息:消费者组内每个消费者都收到同一条消息(仅限 Topic 模型)。 示例代码(RocketMQ 生产者Producers) DefaultMQProducer producer = new DefaultMQProducer("my-group"); producer.setNamesrvAddr("localhost:9876"); producer.start(); Message msg = new Message("my-topic", "Order-123".getBytes(), "JSON".getBytes()); SendResult sendResult = producer.send(msg); System.out.println("Send Result: " + sendResult); producer.shutdown(); 示例代码(RocketMQ 消费者Consumers) DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my-group"); consumer.setNamesrvAddr("localhost:9876"); consumer.subscribe("my-topic", "*"); // 订阅所有队列 consumer.registerMessageListener(new MessageListener() { @Override public void consume(Message msg, ConsumeContext context) throws Exception { System.out.println("Received: " + new String(msg.getBody())); context mitMessage(msg); // 提交消费位移 } }); consumer.start(); 注意事项 事务消息: 生产者和消费者可通过事务确保消息的最终一致性。示例:订单创建成功后,发送支付通知(若失败则回滚)。 消息顺序性: 严格顺序场景需指定 MessageQueueSelector,确保同一订单的所有消息进入同一队列。 消息堆积: 消费者处理能力不足时,消息会堆积在队列中,需监控并扩容消费者实例。
总结对比 特性KafkaActiveMQRabbitMQRocketMQ模型发布-订阅(仅 Topic)支持点对点和发布-订阅灵活路由(多种交换器)主题+队列(顺序/广播)持久化支持分区副本支持消息持久化和事务支持队列和消息持久化支持消息持久化和事务顺序性单分区有序不保证(除非事务)可通过队列保证单队列严格有序适用场景高吞吐、日志/事件流通用、企业级消息系统复杂路由、多协议支持高可靠、顺序消息、分布式事务
通用注意事项 消息幂等性:防止重复消费(如订单支付场景)。监控与告警:关注队列长度、消息堆积、消费者延迟。序列化与压缩:选择高效的序列化方式(如 Protobuf)和压缩算法(如 GZIP)。连接池管理:避免频繁创建/关闭连接,影响性能。 5、注意MQ的Kafka、ActiveMQ、RabbitMQ、RocketMQ区别;
URL: 浅识MQ的 Kafka、ActiveMQ、RabbitMQ、RocketMQ区别-CSDN博客
6、注意:持久化策略URL:浅聊MQ之Kafka、RabbitMQ、ActiveMQ、RocketMQ持久化策略-CSDN博客
(望各位潘安、各位子健/各位彦祖、于晏不吝赐教!多多指正!🙏)
简识MQ之Kafka、ActiveMQ、RabbitMQ、RocketMQ传递机制由讯客互联电脑硬件栏目发布,感谢您对讯客互联的认可,以及对我们原创作品以及文章的青睐,非常欢迎各位朋友分享到个人网站或者朋友圈,但转载请说明文章出处“简识MQ之Kafka、ActiveMQ、RabbitMQ、RocketMQ传递机制”