KafkaConsumer消费逻辑
- 人工智能
- 2025-08-13 07:00:01

版本:kafka-clients-2.0.1.jar
之前想写个插件修改 kafkaConsumer 消费者的逻辑,根据 header 过滤一些消息。于是需要了解一下 kafkaConsumer 具体是如何拉取消费消息的,确认在消费之前过滤掉消息是否会有影响。 下面是相关的源码,并通过注释的方式进行说明。
先结论:kafkaConsumer 拉取消息的 offset 是存本地的,根据 offset 拉取消息。开启自动提交时,会自动提交 offset 到 broker(在一些场景下会手动检查是否需要提交),防止重启或reblance时 offset 丢失。而本地保存的 offset 是本地拉取到消息时就更新的,所以自动提交的场景下,在消费前过滤掉消息没有影响。
拉取消息 KafkaConsumer#poll private ConsumerRecords<K, V> poll(final long timeoutMs, final boolean includeMetadataInTimeout) { // note: 获取轻锁同时检查非多线程环境,并检查 consumer 开启状态 (可以close的) acquireAndEnsureOpen(); try { if (timeoutMs < 0) throw new IllegalArgumentException("Timeout must not be negative"); // note: subscriptions:SubscriptionState 维护了当前消费者订阅的主题列表的状态信息(组、offset等) // 方法判断是否未订阅或未分配分区 if (this.subscriptions.hasNoSubscriptionOrUserAssignment()) { throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions"); } // poll for new data until the timeout expires long elapsedTime = 0L; do { // note: 是否触发了唤醒操作 (调用了当前对象的 wakeup 方法) 通过抛异常的方式退出当前方法,(这里是while循环,可能一直在拉取消息,(无新消息时)) client.maybeTriggerWakeup(); final long metadataEnd; if (includeMetadataInTimeout) { final long metadataStart = time.milliseconds(); // note: 更新分区分配元数据以及offset, remain是用来算剩余时间的 // 内部逻辑: // 1 协调器 ConsumerCoordinator.poll 拉取协调器事件(期间会发送心跳、自动提交) // 2 updateFetchPositions 更新positions,(但本地有positions数据就不更新,更新完pos后,如果还有缺的,就先使用reset策略,最后异步设置pos) if (!updateAssignmentMetadataIfNeeded(remainingTimeAtLeastZero(timeoutMs, elapsedTime))) { return ConsumerRecords.empty(); } metadataEnd = time.milliseconds(); elapsedTime += metadataEnd - metadataStart; } else { while (!updateAssignmentMetadataIfNeeded(Long.MAX_VALUE)) { log.warn("Still waiting for metadata"); } metadataEnd = time.milliseconds(); } //note: 这里终于开始拉取消息了,下面单独讲一下 final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollForFetches(remainingTimeAtLeastZero(timeoutMs, elapsedTime)); if (!records.isEmpty()) { //note: 翻译:返回之前,发送下一个拉取的请求避免阻塞response // before returning the fetched records, we can send off the next round of fetches // and avoid block waiting for their responses to enable pipelining while the user // is handling the fetched records. // // NOTE: since the consumed position has already been updated, we must not allow // wakeups or any other errors to be triggered prior to returning the fetched records. if (fetcher.sendFetches() > 0 || client.hasPendingRequests()) { client.pollNoWakeup(); } //note: 这里使用拦截器拦截一下,这里可以对消息进行修改或过滤,但需要注意commit的问题 return this.interceptors.onConsume(new ConsumerRecords<>(records)); } final long fetchEnd = time.milliseconds(); elapsedTime += fetchEnd - metadataEnd; } while (elapsedTime < timeoutMs); return ConsumerRecords.empty(); } finally { release(); } }关于 pollForFetches 的逻辑
pollForFetches private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollForFetches(final long timeoutMs) { final long startMs = time.milliseconds(); long pollTimeout = Math.min(coordinator.timeToNextPoll(startMs), timeoutMs); // note: 先获取已经拉取了的消息,存在就直接返回 // fetcher 内部有一个 completedFetches 暂存预拉取的请求,可解析出 nextLineRecords 用于暂存预拉取的消息 // 从 nextLineRecords 获取消息时,先判断一下状态(如assigned、paused、position), // 然后获取到消息后,再更新 subscriptions 中的 position 位置(值为下一个的offset), 注意这个时候还没commit // if data is available already, return it immediately final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords(); if (!records.isEmpty()) { return records; } // note: 没有预拉取的消息,发送拉取请求(实际没发) // 先找到partition的leader,检查可用,检查没有待处理的请求,然后从 subscriptions 获取 position,构建ClientRequest暂存 // 以及设置listener (成功则处理结果入队列completedFetches) // send any new fetches (won't resend pending fetches) fetcher.sendFetches(); // We do not want to be stuck blocking in poll if we are missing some positions // since the offset lookup may be backing off after a failure // NOTE: the use of cachedSubscriptionHashAllFetchPositions means we MUST call // updateAssignmentMetadataIfNeeded before this method. if (!cachedSubscriptionHashAllFetchPositions && pollTimeout > retryBackoffMs) { pollTimeout = retryBackoffMs; } // note: 轮询等待,详见下文 client.poll(pollTimeout, startMs, () -> { // since a fetch might be completed by the background thread, we need this poll condition // to ensure that we do not block unnecessarily in poll() return !fetcher.hasCompletedFetches(); }); // after the long poll, we should check whether the group needs to rebalance // prior to returning data so that the group can stabilize faster if (coordinator.rejoinNeededOrPending()) { return Collections.emptyMap(); } return fetcher.fetchedRecords(); } ConsumerNetworkClient#poll /** * Poll for any network IO. * @param timeout timeout in milliseconds * @param now current time in milliseconds * @param disableWakeup If TRUE disable triggering wake-ups */ public void poll(long timeout, long now, PollCondition pollCondition, boolean disableWakeup) { // note: 触发已完成的请求的回调处理器 (有一个pendingCompletion的队列) // there may be handlers which need to be invoked if we woke up the previous call to poll firePendingCompletedRequests(); lock.lock(); try { // note: 处理断开的连接 (pendingDisconnects队列) // Handle async disconnects prior to attempting any sends handlePendingDisconnects(); // note: 实际上这里才真正发出请求。。 前面那个feature只是构建request // 前面准备的 ClientRequest 放在一个 UnsentRequests (内部map, key:Node,val: requests)中 // 这里面取出来进行发送, kafkaClient.ready -> send // send all the requests we can send now long pollDelayMs = trySend(now); timeout = Math.min(timeout, pollDelayMs); // note: 这里主要是判断是否需要阻塞 poll (timeout是否为0) 如果没有待完成且判断应该阻塞(completedFetches为空)则阻塞 // poll 里面是从 sockets 里面读写数据 // check whether the poll is still needed by the caller. Note that if the expected completion // condition becomes satisfied after the call to shouldBlock() (because of a fired completion // handler), the client will be woken up. if (pendingCompletion.isEmpty() && (pollCondition == null || pollCondition.shouldBlock())) { // if there are no requests in flight, do not block longer than the retry backoff if (client.inFlightRequestCount() == 0) timeout = Math.min(timeout, retryBackoffMs); client.poll(Math.min(maxPollTimeoutMs, timeout), now); now = time.milliseconds(); } else { client.poll(0, now); } // note: 检查断开的链接,判断node连接是否断开,是则从unset中取出对应requests,构建response加到completedFetches中 // handle any disconnects by failing the active requests. note that disconnects must // be checked immediately following poll since any subsequent call to client.ready() // will reset the disconnect status checkDisconnects(now); if (!disableWakeup) { // trigger wakeups after checking for disconnects so that the callbacks will be ready // to be fired on the next call to poll() maybeTriggerWakeup(); } // throw InterruptException if this thread is interrupted maybeThrowInterruptException(); // note: 再发一次请求,推测是可能部分 node 的连接在第一次没有ready (没ready会进行初始化,并返回false) // try again to send requests since buffer space may have been // cleared or a connect finished in the poll trySend(now); // fail requests that couldn't be sent if they have expired failExpiredRequests(now); // clean unsent requests collection to keep the map from growing indefinitely unsent.clean(); } finally { lock.unlock(); } // called without the lock to avoid deadlock potential if handlers need to acquire locks firePendingCompletedRequests(); } 自动提交提交 offset 是为了防止重启或 rebalance 后,导致本地 position 丢失无法正常拉取后面的消息。
入口是 ConsumerCoordinator#maybeAutoCommitOffsetsAsync
触发逻辑主要是
KafkaConsumer#poll 拉消息-> KafkaConsumer#updateAssignmentMetadataIfNeeded-> ConsumerCoordinator#poll -> maybeAutoCommitOffsetsAsync (也是先构建请求存 unset 里面,等拉消息的时候再发出去) public void maybeAutoCommitOffsetsAsync(long now) { // 这里用来判断是否满足自动提交的间隔 if (autoCommitEnabled && now >= nextAutoCommitDeadline) { this.nextAutoCommitDeadline = now + autoCommitIntervalMs; doAutoCommitOffsetsAsync(); } }KafkaConsumer消费逻辑由讯客互联人工智能栏目发布,感谢您对讯客互联的认可,以及对我们原创作品以及文章的青睐,非常欢迎各位朋友分享到个人网站或者朋友圈,但转载请说明文章出处“KafkaConsumer消费逻辑”