主页 > 软件开发  > 

Flink中kafkabroker缩容导致Task一直重启

Flink中kafkabroker缩容导致Task一直重启
背景

Flink版本 1.12.2 Kafka 客户端 2.6.1 在公司的Flink平台运行了一个读Kafka计算DAU的流程序,由于公司Kafka的缩容,直接导致了该程序一直在重启,重启了一个小时都还没恢复(具体的所容操作是下掉了四台kafka broker,而当时flink配置了12台kafka broker),当时具体的现场如下:

JobManaer上的日志如下: 2023-10-07 10:02:52.975 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: TableSourceScan(table=[[default_catalog, default_database, ubt_start, watermark=[-(LOCALTIMESTAMP, 1000:INTERVAL SECOND)]]]) (34/64) (e33d9ad0196a71e8eb551c181eb779b5) switched from RUNNING to FAILED on container_e08_1690538387235_2599_01_000010 @ task-xxxx-shanghai.emr.aliyuncs (dataPort=xxxx). org.apache.flink.streaming.connectors.kafka.internals.Handover$ClosedException: null at org.apache.flink.streaming.connectors.kafka.internals.Handover.close(Handover.java:177) at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.cancel(KafkaFetcher.java:164) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:945) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.lambda$createAndStartDiscoveryLoop$2(FlinkKafkaConsumerBase.java:913) at java.lang.Thread.run(Thread.java:750) 对应的 TaskManager(task-xxxx-shanghai.emr.aliyuncs )上的日志如下: 2023-10-07 10:02:24.604 WARN org.apache.kafka.clients.NetworkClient - [Consumer clientId=xxxx] Connection to node 46129 (sh-bs-b1-303-i14-kafka-129-46.ximalaya.local/192.168.129.46:9092) could not be established. Broker may not be available. 2023-10-07 10:02:52.939 WARN org.apache.flink.runtime.taskmanager.Task - Source: TableSourceScan(t) (34/64)#0 (e33d9ad0196a71e8eb551c181eb779b5) switched from RUNNING to FAILED. org.apache.flink.streaming.connectors.kafka.internals.Handover$ClosedException: null at org.apache.flink.streaming.connectors.kafka.internals.Handover.close(Handover.java:177) at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.cancel(KafkaFetcher.java:164) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:945) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.lambda$createAndStartDiscoveryLoop$2(FlinkKafkaConsumerBase.java:913) at java.lang.Thread.run(Thread.java:750) 2023-10-07 10:04:58.205 WARN org.apache.kafka.clients.NetworkClient - [Consumer clientId=xxx, groupId=xxxx] Connection to node -4 (xxxx:909) could not be established. Broker may not be available. 2023-10-07 10:04:58.205 WARN org.apache.kafka.clients.NetworkClient - [Consumer clientId=xxx, groupId=xxxx] Bootstrap broker sxxxx:909 (id: -4 rack: null) disconnected 2023-10-07 10:04:58.206 WARN org.apache.kafka.clients.NetworkClient - [Consumer clientId=xxx, groupId=xxxxu] Connection to node -5 (xxxx:9092) could not be established. Broker may not be available. 2023-10-07 10:04:58.206 WARN org.apache.kafka.clients.NetworkClient - [Consumer clientId=xxx, groupId=xxxxu] Bootstrap broker xxxx:9092 (id: -5 rack: null) disconnected 2023-10-07 10:08:15.541 WARN org.apache.flink.runtime.taskmanager.Task - Source: TableSourceScan(xxx) switched from RUNNING to FAILED. org.apache.kafka mon.errors.TimeoutException: Timeout expired while fetching topic metadata

当时Flink中kafka source的相关配置如下:

scan.topic-partition-discovery.interval 300000 restart-strategy.type fixed-delay restart-strategy.fixed-delay.attempts 50000000 jobmanager.execution.failover-strategy region 结论以及解决

目前在kafka 消费端有两个参数default.api.timeout.ms(默认60000),request.timeout.ms(默认30000),这两个参数来控制kakfa的客户端从服务端请求超时,也就是说每次请求的超时时间是30s,超时之后可以再重试,如果在60s内请求没有得到任何回应,则会报TimeOutException,具体的见如下分析, 我们在flink kafka connector中通过设置如下参数来解决:

`properties.default.api.timeout.ms` = '600000', `properties.request.timeout.ms` = '5000', // max.block.ms是设置kafka producer的超时 `properties.max.block.ms` = '600000', 分析

在Flink中对于Kafka的Connector的DynamicTableSourceFactory是KafkaDynamicTableFactory,这里我们只讨论kafka作为source的情况, 而该类的方法createDynamicTableSource最终会被调用,至于具体的调用链可以参考Apache Hudi初探(四)(与flink的结合)–Flink Sql中hudi的createDynamicTableSource/createDynamicTableSink/是怎么被调用–只不过把Sink改成Source就可以了,所以最终会到KafkaDynamicSource类:

@Override public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) { final DeserializationSchema<RowData> keyDeserialization = createDeserialization(context, keyDecodingFormat, keyProjection, keyPrefix); final DeserializationSchema<RowData> valueDeserialization = createDeserialization(context, valueDecodingFormat, valueProjection, null); final TypeInformation<RowData> producedTypeInfo = context.createTypeInformation(producedDataType); final FlinkKafkaConsumer<RowData> kafkaConsumer = createKafkaConsumer(keyDeserialization, valueDeserialization, producedTypeInfo); return SourceFunctionProvider.of(kafkaConsumer, false); }

该类的getScanRuntimeProvider方法会被调用,所有kafka相关的操作都可以追溯到FlinkKafkaConsumer类(继承FlinkKafkaConsumerBase)中,对于该类重点的方法如下:

@Override public final void initializeState(FunctionInitializationContext context) throws Exception { OperatorStateStore stateStore = context.getOperatorStateStore(); this.unionOffsetStates = stateStore.getUnionListState( new ListStateDescriptor<>( OFFSETS_STATE_NAME, createStateSerializer(getRuntimeContext().getExecutionConfig()))); ... } @Override public void open(Configuration configuration) throws Exception { // determine the offset commit mode this.offsetCommitMode = OffsetCommitModes.fromConfiguration( getIsAutoCommitEnabled(), enableCommitOnCheckpoints, ((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled()); // create the partition discoverer this.partitionDiscoverer = createPartitionDiscoverer( topicsDescriptor, getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks()); this.partitionDiscoverer.open(); subscribedPartitionsToStartOffsets = new HashMap<>(); final List<KafkaTopicPartition> allPartitions = partitionDiscoverer.discoverPartitions(); if (restoredState != null) { ... } else { // use the partition discoverer to fetch the initial seed partitions, // and set their initial offsets depending on the startup mode. // for SPECIFIC_OFFSETS and TIMESTAMP modes, we set the specific offsets now; // for other modes (EARLIEST, LATEST, and GROUP_OFFSETS), the offset is lazily // determined // when the partition is actually read. switch (startupMode) { 。。。 default: for (KafkaTopicPartition seedPartition : allPartitions) { subscribedPartitionsToStartOffsets.put( seedPartition, startupMode.getStateSentinel()); } } if (!subscribedPartitionsToStartOffsets.isEmpty()) { switch (startupMode) { ... case GROUP_OFFSETS: LOG.info( "Consumer subtask {} will start reading the following {} partitions from the committed group offsets in Kafka: {}", getRuntimeContext().getIndexOfThisSubtask(), subscribedPartitionsToStartOffsets.size(), subscribedPartitionsToStartOffsets.keySet()); } } else { LOG.info( "Consumer subtask {} initially has no partitions to read from.", getRuntimeContext().getIndexOfThisSubtask()); } } this.deserializer.open( RuntimeContextInitializationContextAdapters.deserializationAdapter( getRuntimeContext(), metricGroup -> metricGroup.addGroup("user"))); } @Override public void run(SourceContext<T> sourceContext) throws Exception { if (subscribedPartitionsToStartOffsets == null) { throw new Exception("The partitions were not set for the consumer"); } // initialize commit metrics and default offset callback method this.successfulCommits = this.getRuntimeContext() .getMetricGroup() .counter(COMMITS_SUCCEEDED_METRICS_COUNTER); this.failedCommits = this.getRuntimeContext().getMetricGroup().counter(COMMITS_FAILED_METRICS_COUNTER); final int subtaskIndex = this.getRuntimeContext().getIndexOfThisSubtask(); this.offsetCommitCallback = new KafkaCommitCallback() { @Override public void onSuccess() { successfulCommits.inc(); } @Override public void onException(Throwable cause) { LOG.warn( String.format( "Consumer subtask %d failed async Kafka commit.", subtaskIndex), cause); failedCommits.inc(); } }; // mark the subtask as temporarily idle if there are no initial seed partitions; // once this subtask discovers some partitions and starts collecting records, the subtask's // status will automatically be triggered back to be active. if (subscribedPartitionsToStartOffsets.isEmpty()) { sourceContext.markAsTemporarilyIdle(); } LOG.info( "Consumer subtask {} creating fetcher with offsets {}.", getRuntimeContext().getIndexOfThisSubtask(), subscribedPartitionsToStartOffsets); // from this point forward: // - 'snapshotState' will draw offsets from the fetcher, // instead of being built from `subscribedPartitionsToStartOffsets` // - 'notifyCheckpointComplete' will start to do work (i.e. commit offsets to // Kafka through the fetcher, if configured to do so) this.kafkaFetcher = createFetcher( sourceContext, subscribedPartitionsToStartOffsets, watermarkStrategy, (StreamingRuntimeContext) getRuntimeContext(), offsetCommitMode, getRuntimeContext().getMetricGroup().addGroup(KAFKA_CONSUMER_METRICS_GROUP), useMetrics); if (!running) { return; } if (discoveryIntervalMillis == PARTITION_DISCOVERY_DISABLED) { kafkaFetcher.runFetchLoop(); } else { runWithPartitionDiscovery(); } } @Override public final void snapshotState(FunctionSnapshotContext context) throws Exception { ... HashMap<KafkaTopicPartition, Long> currentOffsets = fetcher.snapshotCurrentState(); if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) { // the map cannot be asynchronously updated, because only one checkpoint call // can happen // on this function at a time: either snapshotState() or // notifyCheckpointComplete() pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsets); } for (Map.Entry<KafkaTopicPartition, Long> kafkaTopicPartitionLongEntry : currentOffsets.entrySet()) { unionOffsetStates.add( Tuple2.of( kafkaTopicPartitionLongEntry.getKey(), kafkaTopicPartitionLongEntry.getValue())); } ... } } @Override public final void notifyCheckpointComplete(long checkpointId) throws Exception { ... fetcher mitInternalOffsetsToKafka(offsets, offsetCommitCallback); ... }

主要是initializeState,open,run,snapshotState,notifyCheckpointComplete这四个方法,下面带着问题逐一介绍一下: 注意:对于initializeState和open方法的先后顺序,可以参考StreamTask类,其中如下的调用链:

invoke() || \/ beforeInvoke() || \/ operatorChain.initializeStateAndOpenOperators || \/ FlinkKafkaConsumerBase.initializeState || \/ FlinkKafkaConsumerBase.open

就可以知道 initializeState方法的调用是在open之前的

initializeState方法

这里做的事情就是从持久化的State中恢复kafkaTopicOffset信息,我们这里假设是第一次启动

open方法 offsetCommitMode offsetCommitMode = OffsetCommitModes.fromConfiguration 这里获取设置的kafka offset的提交模式,这里会综合enable.auto mit的配置(默认是true),enableCommitOnCheckpoints默认是true,checkpointing设置为true(默认是false),综合以上得到的值为OffsetCommitMode.ON_CHECKPOINTSpartitionDiscoverer 这里主要是进行kafka的topic的分区发现,主要路程是 partitionDiscoverer.discoverPartitions,这里的涉及的流程如下:AbstractPartitionDiscoverer.discoverPartitions || \/ AbstractPartitionDiscoverer.getAllPartitionsForTopics || \/ KafkaPartitionDiscoverer.kafkaConsumer.partitionsFor || \/ KafkaConsumer.partitionsFor(topic, Duration.ofMillis(defaultApiTimeoutMs)) //这里的defaultApiTimeoutMs 来自于*default.api.timeout.ms* || \/ Fetcher.getTopicMetadata //这里面最后抛出 new TimeoutException("Timeout expired while fetching topic metadata"); || \/ Fetcher.sendMetadataRequest => NetworkClient.leastLoadedNode //这里会随机选择配置的broker的节点 || \/ client.poll(future, timer) => NetworkClient.poll => selector.poll(Utils.min(timeout, metadataTimeout, defaultRequestTimeoutMs)); // 这里的 *defaultRequestTimeoutMs* 来自配置*request.timeout.ms* 综上所述,discoverPartitions做的就是随机选择配置的broker节点,对每个节点进行请求,request.timeout.ms超时后,再随机选择broker,直至总的时间达到了配置的default.api.timeout.ms,这里默认default.api.timeout.ms 为60秒,request.timeout.ms为30秒subscribedPartitionsToStartOffsets 根据startupMode模式,默认是StartupMode.GROUP_OFFSETS(默认从上次消费的offset开始消费),设置开启的kafka offset,这在kafkaFetcher中会用到 run方法 设置一些指标successfulCommits/failedCommitsKafkaFetcher 这里主要是从kafka获取数据以及如果有分区发现则循环进kafka的topic分区发现,这里会根据配置scan.topic-partition-discovery.interval默认配置为0,实际中设置的为300000,即5分钟。该主要的流程为在方法runWithPartitionDiscovery: private void runWithPartitionDiscovery() throws Exception { final AtomicReference<Exception> discoveryLoopErrorRef = new AtomicReference<>(); createAndStartDiscoveryLoop(discoveryLoopErrorRef); kafkaFetcher.runFetchLoop(); // make sure that the partition discoverer is waked up so that // the discoveryLoopThread exits partitionDiscoverer.wakeup(); joinDiscoveryLoopThread(); // rethrow any fetcher errors final Exception discoveryLoopError = discoveryLoopErrorRef.get(); if (discoveryLoopError != null) { throw new RuntimeException(discoveryLoopError); } }

createAndStartDiscoveryLoop 这个会启动单个线程以while sleep方式实现以scan.topic-partition-discovery.interval为间隔来轮询进行Kafka的分区发现,注意这里会吞没Execption,并不会抛出异常

private void createAndStartDiscoveryLoop(AtomicReference<Exception> discoveryLoopErrorRef) { discoveryLoopThread = new Thread( ... while (running) { ... try { discoveredPartitions = partitionDiscoverer.discoverPartitions(); } catch (AbstractPartitionDiscoverer.WakeupException | AbstractPartitionDiscoverer.ClosedException e) { break; } if (running && !discoveredPartitions.isEmpty()) { kafkaFetcher.addDiscoveredPartitions(discoveredPartitions); } if (running && discoveryIntervalMillis != 0) { try { Thread.sleep(discoveryIntervalMillis); } catch (InterruptedException iex) { break; } } } } catch (Exception e) { discoveryLoopErrorRef.set(e); } finally { // calling cancel will also let the fetcher loop escape // (if not running, cancel() was already called) if (running) { cancel(); } } }, "Kafka Partition Discovery for " + getRuntimeContext().getTaskNameWithSubtasks()); discoveryLoopThread.start(); }

这里的kafkaFetcher.addDiscoveredPartitions(discoveredPartitions);中subscribedPartitionStates变量会把发现分区信息保存起来,这在kafkaFetcher.runFetchLoop中会设置已经提交的offset信息,并且会在snapshotState会用到

kafkaFetcher.runFetchLoop 这里会从kafka拉取数据,并设置kafka的offset,具体的流程如下:

runFetchLoop || \/ subscribedPartitionStates 这里会获取*subscribedPartitionStates*变量 || \/ partitionConsumerRecordsHandler || \/ emitRecordsWithTimestamps || \/ emitRecordsWithTimestamps || \/ partitionState.setOffset(offset);

这里的offset就是从消费的kafka记录中获取的

snapshotState方法

这里会对subscribedPartitionStates中的信息进行处理,主要是加到pendingOffsetsToCommit变量中

offsetCommitMode 这里上面说到是OffsetCommitMode.ON_CHECKPOINTS,如果是ON_CHECKPOINTS,则会从fetcher.snapshotCurrentState获取subscribedPartitionStates 并加到pendingOffsetsToCommit,并持久化到unionOffsetStates中,这实际的kafka offset commit操作在notifyCheckpointComplete中, notifyCheckpointComplete方法

获取到要提交的kafka offset信息,并持久化保存kafka中

参考 open 和 initailizeState的初始化顺序A single failing Kafka broker may cause jobs to fail indefinitely with TimeoutException: Timeout expired while fetching topic metadata
标签:

Flink中kafkabroker缩容导致Task一直重启由讯客互联软件开发栏目发布,感谢您对讯客互联的认可,以及对我们原创作品以及文章的青睐,非常欢迎各位朋友分享到个人网站或者朋友圈,但转载请说明文章出处“Flink中kafkabroker缩容导致Task一直重启