主页 > 游戏开发  > 

Kafka-Java四:Spring配置Kafka消费者提交Offset的策略

Kafka-Java四:Spring配置Kafka消费者提交Offset的策略
一、Kafka消费者提交Offset的策略

Kafka消费者提交Offset的策略有

自动提交Offset: 消费者将消息拉取下来以后未被消费者消费前,直接自动提交offset。自动提交可能丢失数据,比如消息在被消费者消费前已经提交了offset,有可能消息拉取下来以后,消费者挂了手动提交Offset 消费者在消费消息时/后,再提交offset,在消费者中实现手动提交Offset分为:手动同步提交、手动异步提交什么是Offset 参考文章:Linux:【Kafka三】组件介绍 二、自动提交策略

        Kafka消费者默认是自动提交Offset的策略

        可设置自动提交的时间间隔

package com.demo.lxb.kafka; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka mon.serialization.StringSerializer; import java.time.Duration; import java.util.Arrays; import java.util.Properties; /** * @Description: kafka消费者消费消息,自动提交offset * @Author: lvxiaobu * @Date: 2023-10-24 16:26 **/ public class MyConsumerAutoSubmitOffset { private final static String CONSUMER_GROUP_NAME = "GROUP1"; private final static String TOPIC_NAME = "topic0921"; public static void main(String[] args) { Properties props = new Properties(); // 一、设置参数 // 配置kafka地址 // props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, // "192.168.151.28:9092"); // 单机配置 props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.154.128:9092,192.168.154.128:9093,192.168.154.128:9094"); // 集群配置 // 配置消息 键值的序列化规则 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringSerializer.class.getName()); // 配置消费者组 props.put(ConsumerConfig.GROUP_ID_CONFIG,CONSUMER_GROUP_NAME); // 设置消费者offset的提交方式 // 自动提交:默认配置 props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true"); // 自动提交offset的时间间隔 props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000"); // 二、创建消费者 KafkaConsumer<String,String> consumer = new KafkaConsumer<String,String>(props); // 三、消费者订阅主题 consumer.subscribe(Arrays.asList(TOPIC_NAME)); // 四、拉取消息,开始消费 while (true){ // 从kafka集群中拉取消息 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); // 消费消息,当前是自动提交模式,在消息上一行消息被拉取下来以后,offset就自动被提交了,下面的代码如果出错,或者此时 // 消费者挂掉了,那么消费其实是没有进行消费的(也就是业务逻辑处理) for (ConsumerRecord<String, String> record : records) { System.out.println("接收到的消息: 分区: " + record.partition() + ", offset: " + record.offset() + ", key值: " + record.key() + " , value值: "+record.value()); } } } }

上述代码中的如下代码是自动提交策略的相关设置 

// 设置消费者offset的提交方式 // 自动提交:默认配置 props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true"); // 自动提交offset的时间间隔 props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000"); 三、手动提交策略

3.1、手动同步提交策略

        手动同步提交,会在提交offset处阻塞。当消费者接收到 kafka集群返回的消费者提交offset成功的ack后,才开始执行消费者中后续的代码。

        因为使用异步提交容易丢失消息,固一般使用同步提交,在同步提交后不要再做其他逻辑处理。

package com.demo.lxb.kafka; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka mon.serialization.StringSerializer; import java.time.Duration; import java.util.Arrays; import java.util.Properties; /** * @Description: kafka消费者消费消息,手动同步提交offset * @Author: lvxiaobu * @Date: 2023-10-24 16:26 **/ public class MyConsumerMauSubmitOffset { private final static String CONSUMER_GROUP_NAME = "GROUP1"; private final static String TOPIC_NAME = "topic0921"; public static void main(String[] args) { Properties props = new Properties(); // 一、设置参数 // 配置kafka地址 // props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, // "192.168.151.28:9092"); // 单机配置 props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.154.128:9092,192.168.154.128:9093,192.168.154.128:9094"); // 集群配置 // 配置消息 键值的序列化规则 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringSerializer.class.getName()); // 配置消费者组 props.put(ConsumerConfig.GROUP_ID_CONFIG,CONSUMER_GROUP_NAME); // 设置消费者offset的提交方式 // 手动提交offset props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false"); // 自动提交offset的时间间隔:此时不再需要设置该值 // props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000"); // 二、创建消费者 KafkaConsumer<String,String> consumer = new KafkaConsumer<String,String>(props); // 三、消费者订阅主题 consumer.subscribe(Arrays.asList(TOPIC_NAME)); // 四、拉取消息,开始消费 while (true){ // 从kafka集群中拉取消息 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); // 业务逻辑处理 for (ConsumerRecord<String, String> record : records) { System.out.println("接收到的消息: 分区: " + record.partition() + ", offset: " + record.offset() + ", key值: " + record.key() + " , value值: "+record.value()); } // 当for循环业务逻辑处理结束以后,再手动提交offset // 同步方式提交,此时会产生阻塞,当kafka集群返回了提交成功的ack以后,才会消除阻塞,进行后续的代码逻辑。 // 一般使用同步提交,在同步提交后不再做其他逻辑处理 consumer mitAsync(); // do anything } } }

3.2、手动异步提交策略

        异步提交,不会在提交offset代码处阻塞,即消费者提交了offset后,不需要等待kafka集群返回的ack即可继续执行后续代码。但是在提交offset时需要提供一个回调方法,供kafka集群回调,来告诉消费者提交offset的结果。

package com.demo.lxb.kafka; import com.alibaba.fastjson.JSONObject; import org.apache.kafka.clients.consumer.*; import org.apache.kafka mon.TopicPartition; import org.apache.kafka mon.serialization.StringSerializer; import java.time.Duration; import java.util.Arrays; import java.util.Map; import java.util.Properties; /** * @Description: kafka消费者消费消息,手动异步提交offset * @Author: lvxiaobu * @Date: 2023-10-24 16:26 **/ public class MyConsumerMauSubmitOffset2 { private final static String CONSUMER_GROUP_NAME = "GROUP1"; private final static String TOPIC_NAME = "topic0921"; public static void main(String[] args) { Properties props = new Properties(); // 一、设置参数 // 配置kafka地址 // props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, // "192.168.151.28:9092"); // 单机配置 props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.154.128:9092,192.168.154.128:9093,192.168.154.128:9094"); // 集群配置 // 配置消息 键值的序列化规则 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringSerializer.class.getName()); // 配置消费者组 props.put(ConsumerConfig.GROUP_ID_CONFIG,CONSUMER_GROUP_NAME); // 设置消费者offset的提交方式 // 手动提交offset props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false"); // 自动提交offset的时间间隔:此时不再需要设置该值 // props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000"); // 二、创建消费者 KafkaConsumer<String,String> consumer = new KafkaConsumer<String,String>(props); // 三、消费者订阅主题 consumer.subscribe(Arrays.asList(TOPIC_NAME)); // 四、拉取消息,开始消费 while (true){ // 从kafka集群中拉取消息 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord<String, String> record : records) { System.out.println("接收到的消息: 分区: " + record.partition() + ", offset: " + record.offset() + ", key值: " + record.key() + " , value值: "+record.value()); } // 异步提交,不影响后续的内容。 // new OffsetCommitCallback是kafka集群会回调的方法,告诉消费者提交offset的结果 consumer mitAsync(new OffsetCommitCallback() { @Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) { if(e != null){ // 可将提交失败的消息记录到日志 System.out.println("记录提交offset失败的消息到日志"); System.out.println("消费者提交offset抛出异常:" + Arrays.toString(e.getStackTrace())); System.out.println("消费者提交offset异常的消息信息:" + JSONObject.toJSONString(map)); } } }); // 后续逻辑处理,不需要等到kafka集群返回了提交成功的ack以后才开始处理。 //do anything } } }

标签:

Kafka-Java四:Spring配置Kafka消费者提交Offset的策略由讯客互联游戏开发栏目发布,感谢您对讯客互联的认可,以及对我们原创作品以及文章的青睐,非常欢迎各位朋友分享到个人网站或者朋友圈,但转载请说明文章出处“Kafka-Java四:Spring配置Kafka消费者提交Offset的策略