启动你的RocketMQ之旅(四)-Producer启动和发送流程(下)
- IT业界
- 2025-09-17 17:00:01

前言: 👏作者简介:我是笑霸final。 📝个人主页: 笑霸final的主页2 📕系列专栏:java专栏 📧如果文章知识点有错误的地方,请指正!和大家一起学习,一起进步👀 🔥如果感觉博主的文章还不错的话,👍点赞👍 + 👀关注👀 + 🤏收藏🤏
上一章节:启动你的RocketMQ之旅(三)-Producer启动和发送流程(上)
目录 基础接口MQAdmin接口字段详解MQProducer接口详解 producer发送底层消息分析同步消息异步消息延时消息(实现在broker)事务消息发送流程总结消息request具体传递的信息发送消息流程总结选择消息队列的规避策略 基础接口在上文可知我们都是用的DefaultMQProducer来创建的生产者,现在我们来看看DefaultMQProducer的继承关系 可见 MQAdmin接口是基础接口,下面就来看看 MQAdmin有哪些字段。
MQAdmin接口字段详解 /** * 创建主题 * key、newTopic、queueNum */ void createTopic(final String key, final String newTopic, final int queueNum) throws MQClientException; /** * 创建主题 * key、newTopic、queueNum、topicSysFlag */ void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException; /** * 根据时间戳从队列中查找消息偏移量 */ long searchOffset(final MessageQueue mq, final long timestamp) throws MQClientException; /** * 查找消息队列中最大偏移量 */ long maxOffset(final MessageQueue mq) throws MQClientException; /** * 查找消息队列中最小的偏移量 */ long minOffset(final MessageQueue mq) throws MQClientException; /** * 查找消息队列中最早的存储消息时间 */ long earliestMsgStoreTime(final MessageQueue mq) throws MQClientException; /** * 根据消息id查找消息 */ MessageExt viewMessage(final String offsetMsgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException; /** * 根据条件查找消息 */ QueryResult queryMessage(final String topic, final String key, final int maxNum, final long begin, final long end) throws MQClientException, InterruptedException; /** * 根据主题和消息id查找消息 */ MessageExt viewMessage(String topic, String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException;这些方法主要提供了对 RocketMQ 消息队列的管理能力,包括主题创建、消息偏移量查询、消息查找等功能。以下是这些方法的主要用途分类:
主题管理: createTopic:创建新的主题。消息队列管理: searchOffset:根据时间戳查找偏移量。 maxOffset 和 minOffset:获取队列的最大和最小偏移量。 earliestMsgStoreTime:获取最早存储消息的时间。消息查询: viewMessage:根据消息 ID 查找消息。 queryMessage:根据条件批量查询消息。 MQProducer接口详解 public interface MQProducer extends MQAdmin { /** * 启动生产者 */ void start() throws MQClientException; /** * 关闭生产者 */ void shutdown(); /** * 查找指定主题下的所有消息 */ List<MessageQueue> fetchPublishMessageQueues(final String topic) throws MQClientException; /** * 发送同步消息 */ SendResult send(final Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException; SendResult send(final Message msg, final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException; SendResult send(final Message msg, final MessageQueue mq) throws MQClientException, RemotingException, MQBrokerException, InterruptedException; SendResult send(final Message msg, final MessageQueue mq, final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException; SendResult send(final Message msg, final MessageQueueSelector selector, final Object arg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException; SendResult send(final Message msg, final MessageQueueSelector selector, final Object arg, final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException; /** * 发送异步消息 */ void send(final Message msg, final SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException; void send(final Message msg, final SendCallback sendCallback, final long timeout) throws MQClientException, RemotingException, InterruptedException; void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException; void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback, long timeout) throws MQClientException, RemotingException, InterruptedException; void send(final Message msg, final MessageQueueSelector selector, final Object arg, final SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException; void send(final Message msg, final MessageQueueSelector selector, final Object arg, final SendCallback sendCallback, final long timeout) throws MQClientException, RemotingException, InterruptedException; /** * 发送单向消息 */ void sendOneway(final Message msg) throws MQClientException, RemotingException, InterruptedException; void sendOneway(final Message msg, final MessageQueue mq) throws MQClientException, RemotingException, InterruptedException; void sendOneway(final Message msg, final MessageQueueSelector selector, final Object arg) throws MQClientException, RemotingException, InterruptedException; /** * 发送事务消息 */ TransactionSendResult sendMessageInTransaction(final Message msg, final LocalTransactionExecuter tranExecuter, final Object arg) throws MQClientException; TransactionSendResult sendMessageInTransaction(final Message msg, final Object arg) throws MQClientException; /** * 批量发送消息 */ SendResult send(final Collection<Message> msgs) throws MQClientException, RemotingException, MQBrokerException, InterruptedException; SendResult send(final Collection<Message> msgs, final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException; SendResult send(final Collection<Message> msgs, final MessageQueue mq) throws MQClientException, RemotingException, MQBrokerException, InterruptedException; SendResult send(final Collection<Message> msgs, final MessageQueue mq, final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException; void send(final Collection<Message> msgs, final SendCallback sendCallback) throws MQClientException, RemotingException, MQBrokerException, InterruptedException; void send(final Collection<Message> msgs, final SendCallback sendCallback, final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException; void send(final Collection<Message> msgs, final MessageQueue mq, final SendCallback sendCallback) throws MQClientException, RemotingException, MQBrokerException, InterruptedException; void send(final Collection<Message> msgs, final MessageQueue mq, final SendCallback sendCallback, final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException; //for rpc Message request(final Message msg, final long timeout) throws RequestTimeoutException, MQClientException, RemotingException, MQBrokerException, InterruptedException; void request(final Message msg, final RequestCallback requestCallback, final long timeout) throws MQClientException, RemotingException, InterruptedException, MQBrokerException; Message request(final Message msg, final MessageQueueSelector selector, final Object arg, final long timeout) throws RequestTimeoutException, MQClientException, RemotingException, MQBrokerException, InterruptedException; void request(final Message msg, final MessageQueueSelector selector, final Object arg, final RequestCallback requestCallback, final long timeout) throws MQClientException, RemotingException, InterruptedException, MQBrokerException; Message request(final Message msg, final MessageQueue mq, final long timeout) throws RequestTimeoutException, MQClientException, RemotingException, MQBrokerException, InterruptedException; void request(final Message msg, final MessageQueue mq, final RequestCallback requestCallback, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException; }MQProducer 是 RocketMQ 中生产者的核心接口,继承自 MQAdmin,提供了消息生产和管理的能力。总的来说有一下特点:
生产者生命周期管理
启动生产者: void start() throws MQClientException; 启动生产者实例,初始化与 Broker 的连接。关闭生产者:oid shutdown();关闭生产者实例,释放资源。主题相关操作
查找指定主题下的所有消息队列:List fetchPublishMessageQueues(final String topic) throws MQClientException;获取指定主题下的所有消息队列列表。消息发送方式
同步消息发送
发送消息后会等待服务器返回确认结果提供多种参数组合,支持指定消息队列、选择器、超时时间等。异步消息发送
发送消息后不会阻塞线程,通过回调函数处理发送结果。提供多种参数组合,支持指定消息队列、选择器、超时时间等。单向消息发送
不关心发送结果,只负责将消息发送到 Broker。适用于对可靠性要求不高的场景。事务消息发送
支持分布式事务,确保消息发送与本地事务的一致性。批量消息发送(支持一次发送多条消息,减少网络开销,提高吞吐量。)
同步批量发送:异步批量发送:RPC 请求(请求-响应模式)[提供基于消息的请求-响应模式,适用于需要即时反馈的场景]
同步请求:异步请求 producer发送底层消息分析上面我们已经对 1 消息的的检查、2 查找路由 3、选择队列 4、消息发送流程做了大概了解,现在我们看看消息如何发送的。
1、sendKernelImpl 方法:这是实际执行消息发送的核心逻辑所在。它会根据消息类型(普通消息、事务消息等)和同步/异步模式来决定如何处理消息。
private SendResult sendDefaultImpl( Message msg,//消息 final CommunicationMode communicationMode,//通讯方式 final SendCallback sendCallback, //回调函数 final long timeout//超时时间 ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException2、在 sendKernelImpl 方法内部,会进一步调用 MQClientAPIImpl.sendMessage() ,这方法负责与Broker建立网络连接,并将消息数据序列化后通过Netty或其他网络通信框架发送给Broker。
SendResult sendMessage( String addr, Message msg, boolean block, SendCallback sendCallback, long timeout, CommunicationMode communicationMode, SendMessageContext context, SocketAddress storeHost, Boolean checkImmunity) ------------------------具体的方法----------------- public SendResult sendMessage( final String addr, final String brokerName, final Message msg, final SendMessageRequestHeader requestHeader,//请求头 final long timeoutMillis, final CommunicationMode communicationMode, final SendCallback sendCallback, final TopicPublishInfo topicPublishInfo, final MQClientInstance instance, final int retryTimesWhenSendFailed, final SendMessageContext context, final DefaultMQProducerImpl producer ) throws RemotingException, MQBrokerException, InterruptedException { long beginStartTime = System.currentTimeMillis(); RemotingCommand request = null; String msgType = msg.getProperty(MessageConst.PROPERTY_MESSAGE_TYPE); // 是否是回应消息 //在RocketMQ的请求-响应模式中,生产者发送一个带有特定标记的消息到Broker, // 然后等待Broker返回一个带有相同标记的回复消息。这个 isReply 变量就是用来在消息处理逻辑中识别回复消息的关键标志。 boolean isReply = msgType != null && msgType.equals(MixAll.REPLY_MESSAGE_FLAG); if (isReply) { if (sendSmartMsg) { SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader); request = RemotingCommand.createRequestCommand(RequestCode.SEND_REPLY_MESSAGE_V2, requestHeaderV2); } else { request = RemotingCommand.createRequestCommand(RequestCode.SEND_REPLY_MESSAGE, requestHeader); } } else { //非回复消息,普通消息 if (sendSmartMsg || msg instanceof MessageBatch) { SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader); request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2, requestHeaderV2); } else { // 并将Producer要发送的消息信息填充进去,如Topic、消息Key、消息Tag等。 request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader); } } request.setBody(msg.getBody()); //根据通信方式选择 switch (communicationMode) { case ONEWAY: //单向消息 this.remotingClient.invokeOneway(addr, request, timeoutMillis); return null;//单向消息不用返回 case ASYNC: //异步消息 final AtomicInteger times = new AtomicInteger(); long costTimeAsync = System.currentTimeMillis() - beginStartTime; if (timeoutMillis < costTimeAsync) { throw new RemotingTooMuchRequestException("sendMessage call timeout"); } this.sendMessageAsync(addr, brokerName, msg, timeoutMillis - costTimeAsync, request, sendCallback, topicPublishInfo, instance, retryTimesWhenSendFailed, times, context, producer); return null;//异步消息直接反悔null case SYNC: //同步消息 long costTimeSync = System.currentTimeMillis() - beginStartTime; if (timeoutMillis < costTimeSync) { throw new RemotingTooMuchRequestException("sendMessage call timeout"); } //同步消息会返回 return this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request); default: assert false; break; } return null; }根据消息是回复类型还是 普通消息,然后看启用智能发送功能没有,最后根据这些不同来构造不同的请求头,然后把消息放进请求体。最终,根据消息是单向,异步,还是同步消息来进行发送。 这里请求头作为参数传进来的: requestHeader请求头封装,查看DefaultMQproducerImpl的sendKernelIlmpl方法可知requestHeader有如下参数 1.设置生产者组名,2.主题名称,3.目标消息队列的ID,4.系统标志位,5.是否批量消息 6.消息重试次数,7.消息最大重试次数 8.Properties(比如延迟消息) 9.标志位(事务消息=4) 然后将上面的信息写入 请求标头中 封装成request
同步消息最终调用的如下代码
public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis) throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException { /** * 每个发送到Broker的请求都会携带一个唯一的 opaque 值,Broker在处理请求后, * 会将该值原封不动地返回到Producer或Consumer端,这样客户端就可以通过 opaque 值 * 匹配到对应的请求和响应,进而进行后续的处理工作 */ // opaque 请求id final int opaque = request.getOpaque(); try { //创建一个ResponseFuture对象,用于保存本次请求的响应信息,包括响应的Channel、 // 请求的不透明标识符opaque、超时时间、以及将来填充的响应结果和异常原因 final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null); //将ResponseFuture对象放入responseTable中,以opaque作为键,方便后续根据opaque找到对应的响应 this.responseTable.put(opaque, responseFuture); //获取远程服务器地址 final SocketAddress addr = channel.remoteAddress(); /** * 使用Netty的writeAndFlush方法异步发送请求,并添加一个ChannelFutureListener监听器。 * 监听器的operationComplete方法会在写操作完成后被调用。 */ channel.writeAndFlush(request).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture f) throws Exception { if (f.isSuccess()) { //如果写操作成功,设置responseFuture的sendRequestOK为true,表示请求已成功发送至远程服务器。 responseFuture.setSendRequestOK(true); return; } else { //如果写操作失败,从responseTable中移除对应opaque的ResponseFuture,并设置错误原因和响应结果为空,同时输出日志警告。 responseFuture.setSendRequestOK(false); } responseTable.remove(opaque); responseFuture.setCause(f.cause()); responseFuture.putResponse(null); log.warn("send a request command to channel <" + addr + "> failed."); } }); // ··等待服务器的响应··,如果在指定超时时间内没有收到响应,则返回null RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis); if (null == responseCommand) { if (responseFuture.isSendRequestOK()) { throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis, responseFuture.getCause()); } else { throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause()); } } return responseCommand; } finally { this.responseTable.remove(opaque); } }最后rpc远程调用的时候,发送的还是request,请求体里面只有msg.body 总的来说,这段代码实现了客户端向服务器发送请求,并等待服务器响应的过程,其中包括了异步发送、超时处理、异常处理等功能。
异步消息进入带有回调对象作为参数的 …client.impl.MQclientAPIImpl.sendMessage(…)可以发现 在netty异步调用完成时会把结果写入注册的回调方法。
延时消息(实现在broker)在消息里面设置延时等级 可见延迟消息的字段在property中 然后会把这个property(map结构转换为String)然后传给Broker 主要实现在Broker端的处理消息存储的过程中
事务消息发送流程旧是调用的 defaultMQProducerImpl.sendMessageInTransaction(msg, null, arg);方法。详细看下面代码 事务消息中不支持延迟发送。即使设置了也会把延迟等级的属性给删除
//检查是否有事务监听器 TransactionListener transactionListener = getCheckListener(); if (null == localTransactionExecuter && null == transactionListener) { throw new MQClientException("tranExecutor is null", null); } // ignore DelayTimeLevel parameter // 忽略延迟等级 删除msg中Property的延时key if (msg.getDelayTimeLevel() != 0) { MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL); } // 检查消息 Validators.checkMessage(msg, this.defaultMQProducer); SendResult sendResult = null; //添加msg中Property的 TRAN_MSG 为true (半消息为 true) MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true"); // 添加生产者组 MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup()); try { //发送消息 sendResult = this.send(msg); } catch (Exception e) { throw new MQClientException("send message Exception", e); } //发送完毕 设置localTransactionState的状态为 UNKNOW LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW; Throwable localException = null; switch (sendResult.getSendStatus()) { case SEND_OK: { // 其他代码... //todo 执行本地事务 localTransactionState = transactionListener.executeLocalTransaction(msg, arg); // 其他代码... } break; case FLUSH_DISK_TIMEOUT: case FLUSH_SLAVE_TIMEOUT: case SLAVE_NOT_AVAILABLE: //只要不是SEND_OK 就 设置 localTransactionState=ROLLBACK_MESSAGE回滚消息 localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE; break; default: break; } try { //todo 方法的主要作用是根据本地事务的执行结果向RocketMQ Broker报告事务的最终状态 this.endTransaction(msg, sendResult, localTransactionState, localException); } catch (Exception e) { log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e); } TransactionSendResult transactionSendResult = new TransactionSendResult(); transactionSendResult.setSendStatus(sendResult.getSendStatus()); transactionSendResult.setMessageQueue(sendResult.getMessageQueue()); transactionSendResult.setMsgId(sendResult.getMsgId()); transactionSendResult.setQueueOffset(sendResult.getQueueOffset()); transactionSendResult.setTransactionId(sendResult.getTransactionId()); transactionSendResult.setLocalTransactionState(localTransactionState); return transactionSendResult;上面代码总结: ● 检查是否有本地事务执行器和事务监听器 ● 忽略延迟消息 ● 把消息设置为半事务消息 (properties的k=TRAN_MSG , V=true)和生产者组 ● 发送消息 ● 根据消息的结果 选择执行本地事务操作或者本地事务的回滚操作。 ● 根据本地事务执行的结果向RocketMQ Broker报告事务的最终状态,以便Broker根据事务结果决定消息的最终处理
然后发送消息时调用如下代码 事务消息 是同步消息 标志位设置为4 方法的主要作用是根据本地事务的执行结果向RocketMQ Broker报告事务的最终状态
总结 消息request具体传递的信息● 请求头:
1.设置生产者组名,2.主题名称,3.目标消息队列的ID,4.系统标志位,5.是否批量消息6.消息重试次数,7.消息最大重试次数8.Properties(比如延迟消息)9.标志位(事务消息=4)● 请求体:msg.body消息的字节码
发送消息流程总结1、检查消息 ● topic的名称和长度(不大于127)是否符合规范 ● 消息是否不为空和大小有没有超过4m ● 当前topic是否允许生产者发送消息
2、查找路由 ● 现在本地缓存中查找,如果没有再从namesrv查找。 ● 如果找打的路由信息不可用则强制再从namesrv查找 ● 返回路由
3、选择队列 ● 如果没有开启发送延迟容忍(默认没开启)则走轮询,但是会规避上次发送失败的broker ● 如果开启了,就会根据当前broker的可用时间发送,如果都不可用,则选择一个适合的broker。如果没有适合的broker则走默认的轮询机制。
4、发送消息 使用netty实现网络通信,发送消息,它的通信协议是 自定义的协议。消息传递的基本协议单元包含以下重要信息 ● 命令标识code:用于标识具体的命令 ● 语言标记languang:表示使用的语言 ● 版本信息version:协议版本 ● 唯一标识opaque:请求id ● 序列化类型 serializeTypeCurrentRPC ● 消息正文Body ● 自定义标头customHeader:生产者组、主题、消息队列、properties、标志位
选择消息队列的规避策略 消息发送成功后会记录发送时间,然后会将当前broker加了故障表故障表记录了broker名称,延迟时间、broker可用时间broker可用时间,会根据当前延时时间来选择 固定的间隔时间(mq内部有一个数组,根据延迟时间返回对应的间隔时间),然后broker可用时间=存入故障表的时间+间隔时间如果消息发送失败,会用延迟时间为30000ms来计算可用时间。默认轮询有参数传入的是 上次不可用的broker名
在选择消息队列时,使用ThreadLocal来针对当前线程inde自增,然后用inde当前路由信息的总队列数取模运算,然后得到消息队列,在返回队列之前,会规避掉上次故障的broker(故障broker只能记录最后一次异常的roker)【循环找】开启发送延迟故障容忍 :
先是使用ThreadLocal来针对当前线程inde自增,得到inde然后也是循环用inde对总队列数取模运算,得到一个mq然后根据当前mq所在的broker是否在可用时间(当前时间>=broker可用时间),如果可用就返回消息队列。不可用,就循环找。都找不到就选择一个适合的Broker【在故障表中找到所有的Broker,然后根据一定的条件排序,组后返回中间附近的broker返回】。如果Broker有队列数,就随机找一个mq,如果没有队列数,走无参的轮询。启动你的RocketMQ之旅(四)-Producer启动和发送流程(下)由讯客互联IT业界栏目发布,感谢您对讯客互联的认可,以及对我们原创作品以及文章的青睐,非常欢迎各位朋友分享到个人网站或者朋友圈,但转载请说明文章出处“启动你的RocketMQ之旅(四)-Producer启动和发送流程(下)”
上一篇
修改DOSBox的窗口大小
下一篇
绪论(3)