消息中间件Kafka快速入门
- 创业
- 2025-08-21 01:27:02

前言
Kafka是基于zookeeper管理的,所以要先安装zookeeper,如果是单机模式,zookeeper安装比较简单,本文就介绍一下单机如何搭建kafka,以及基本的java demo。
环境搭建 Zookeeper 安装http://mirrors nic /apache/zookeeper/ 下载zookeeper 解压安装 解压tar –zxvf 修改zookeeper config 把cfg改成 zoo.cfg 单机zookeeper不需要配置 启动
bin/zkServer.sh start kafka安装下载 http://kafka.apache.org/downloads.html 解压安装 解压tar –zxvf 启动
sh kafka-server-start.sh ../config/server.properties &没有异常就算是成功的了。
消息测试打开2个crt客户端,一个做producer,一个做consumer,在producer里面发送消息,就可以立即在consumer里面接收到。
1.生产消息producer Sh kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test 2.消费消息方consumer Sh bin/kafka-console-consumer.sh --zookeeper 127.0.0.1:2181 --topic test --from-beginning 3.创建topic命令 sh bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic message对于每一个topic都会在/tmp/kafka-logs/ 生成一个文
4.获取topic 列表 sh kafka-topics.sh --list --zookeeper localhost:2181 test Java Demo需要依赖zookeeper和kafka的jar
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>0.9.0.0</version> </dependency> <dependency> <groupId>com.101tec</groupId> <artifactId>zkclient</artifactId> <version>0.7</version> </dependency> 发送消息 public class KafKaProducer { //zookeeper 地址 public static String zookeeperConnect = "10.101.14.230:2181"; public static String topicName = "message"; //topic name private static Producer createProducer() { Properties properties = new Properties(); properties.put("zookeeper.connect", zookeeperConnect);// 声明zk properties.put("serializer.class", "kafka.serializer.StringEncoder"); //配置value的序列化类 properties.put("key.serializer.class", "kafka.serializer.StringEncoder"); //配置key的序列化类 properties.put("metadata.broker.list", "10.101.14.230:9092");// 声明kafka return new Producer<Integer, String>(new ProducerConfig(properties)); } public static void main(String[] args) { try { Producer producer = createProducer(); producer.send(new KeyedMessage<Integer, String>(topicName, "message:hello")); TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } } 接收消息注意接收消息,是客户端主动去pull,没有消息时就会阻塞
public class KafkaConsumer { public static String zookeeperConnect = "10.101.14.230:2181"; private final ConsumerConnector consumer; private KafkaConsumer() { Properties props = new Properties(); props.put("zookeeper.connect", zookeeperConnect);//声明zk props.put("group.id", "jd-group2"); //group 代表一个消费组 props.put("zookeeper.session.timeout.ms", "4000"); //zk连接超时 props.put("zookeeper.sync.time.ms", "200"); props.put("auto mit.interval.ms", "1000"); props.put("auto.offset.reset", "smallest"); props.put("serializer.class", "kafka.serializer.StringEncoder");//序列化类 ConsumerConfig config = new ConsumerConfig(props); consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config); } void consume() { Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(KafKaProducer.topicName, new Integer(1)); StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties()); StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties()); Map<String, List<KafkaStream<String, String>>> consumerMap = consumer.createMessageStreams(topicCountMap,keyDecoder,valueDecoder); KafkaStream<String, String> stream = consumerMap.get(KafKaProducer.topicName).get(0); ConsumerIterator<String, String> it = stream.iterator(); while (it.hasNext()){//该地方没有消息时会阻塞 System.out.printf("接受到信息:"); System.out.println(it.next().message()); } } public static void main(String[] args) { //更好的方法是启动一个线程 new KafkaConsumer().consume(); } }可以看到结果,只要新建一个group,都会把历史消息也接受到
遇到的问题用java 发送的遇到了这样的一个问题,网上搜了很久,看到说只要是在服务器上安装都有这样的问题,本地的服务就没有问题,按照帖子的方法修改kafka服务器的host.name也没解决。后来在使用hbase的时候,发现也是链接不上,然后 Debug hbase的时候发现,连接zookeeper里面有这样一段代码。
InetSocketAddress remoteAddr=new InetSocketAddress("e010101014230.zmf",37556); if(remoteAddr.isUnresolved()){ // 表示链接不上。 System.out.print("isUnresolved"); }else{ System.out.print("resolved"); } properties.put("zookeeper.connect",” 10.101.14.230:2181”);// 声明zk properties.put("zookeeper.connect",” e010101014230.zmf:2181”);//机器名称我就感觉zookeeper 的作用在hbase 和 kafka 是类似的,所以两个应该是同一个问题,果然debug了一遍就解决了。不管你在开始设置zookeeper地址的时候是用ip地址,还是机器名称,到了InetSocketAddress这里他都会转换成机器名称,所以就链接不上。解决办法就是:在hosts文件里加上 10.101.14.230 e010101014230.zmf
kafka原理kafka原理中最重要的一点是,他的消息推送,是消费者主动pull的,这样实现起来相对简单,而且还有个好处就是,消费者按需按能力消费,消费服务器不会有积压问题。
Broker一台kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic。
Partition就是分块的意思,一个topic可以分为多个 partition,可以分布到不同的机子上。同一个topic的partition在一台机子上只能有一个。
ReplicationPartition的多个备份
Topic话题,队列,消费
ProducerProducer 客户端通过zookeeper 获取到所有broker,可以动态更新 Producer直接通过socket发送消息到broker 消息被路由到哪个partition上,有producer客户端决定 Consumer与topic关系以及机制 Consumer是主动pull topic,没有消息时会阻塞。 每个consumer属于一个consumer group,对于同一条消息一个group只接收到一次。 一个group 有多个consumer,那么topic就会负载均衡的发送到每个consumer里面去。 如果所有的consumer都是属于不同的group,那所有的人都会接收到消息
Zookeeper 的作用Producer端使用zookeeper用来"发现"broker列表, 以及和Topic下每个partition leader建立socket连接并发送消息.(如果partition失效了,就接收到消息) 2) Broker端使用zookeeper用来注册broker信息,已经监测partition leader存活性.
Consumer端使用zookeeper用来注册consumer信息,其中包括consumer消费的partition列表等,同时也用来发现broker列表,并和partition leader建立socket连接,并获取消息. 总结这个教程比较简单,适合刚刚使用过消息队列,还不了解其原理的人看看。也提供了简单的java demo,对理解复杂的notify 框架还是挺有帮助的。
推荐2篇比较好的文章: [1] http://blog.csdn.net/derekjiang/article/details/9053863/ [2] http://blog.csdn.net/hmsiwtv/article/details/46960053
消息中间件Kafka快速入门由讯客互联创业栏目发布,感谢您对讯客互联的认可,以及对我们原创作品以及文章的青睐,非常欢迎各位朋友分享到个人网站或者朋友圈,但转载请说明文章出处“消息中间件Kafka快速入门”
下一篇
蓝桥杯-飞行员兄弟