探索ClickHouse——连接Kafka和Clickhouse
- 电脑硬件
- 2025-08-18 01:45:02

安装Kafka 新增用户 sudo adduser kafka sudo adduser kafka sudo su -l kafka 安装JDK sudo apt-get install openjdk-8-jre 下载解压kafka
可以从 downloads.apache.org/kafka/下找到希望安装的版本。需要注意的是,不要下载路径包含src的包,否则会报“Classpath is empty”之类的错误。
mkdir ~/Downloads curl " downloads.apache.org/kafka/3.5.1/kafka_2.13-3.5.1.tgz" -o ~/Downloads/kafka.tgz mkdir ~/kafka && cd ~/kafka tar -xvzf ~/Downloads/kafka.tgz --strip 1 配置 配置kafka vim ~/kafka/config/server.properties将下面这行加入文件的末尾
# ~/kafka/config/server.properties delete.topic.enable=true同时修改log的路径
# ~/kafka/config/server.properties log.dirs=/home/kafka/logs 创建zookeeper service sudo vim /etc/systemd/system/zookeeper.service将下面内容填入上述文件中
[Unit] Requires=network.target remote-fs.target After=network.target remote-fs.target [Service] Type=simple User=kafka ExecStart=/home/kafka/kafka/bin/zookeeper-server-start.sh /home/kafka/kafka/config/zookeeper.properties ExecStop=/home/kafka/kafka/bin/zookeeper-server-stop.sh Restart=on-abnormal [Install] WantedBy=multi-user.target 创建kafka service sudo vim /etc/systemd/system/kafka.service将下面内容填入上述文件中
[Unit] Requires=zookeeper.service After=zookeeper.service [Service] Type=simple User=kafka ExecStart=/bin/sh -c '/home/kafka/kafka/bin/kafka-server-start.sh /home/kafka/kafka/config/server.properties > /home/kafka/kafka/kafka.log 2>&1' ExecStop=/home/kafka/kafka/bin/kafka-server-stop.sh Restart=on-abnormal [Install] WantedBy=multi-user.target 启动kafka# 启动服务 sudo systemctl start kafka 查看状态 sudo systemctl status kafka● kafka.service Loaded: loaded (/etc/systemd/system/kafka.service; enabled; vendor preset: enabled) Active: active (running) since Thu 2023-09-28 03:09:39 UTC; 4s ago Main PID: 3561758 (sh) Tasks: 42 (limit: 2143) Memory: 292.4M CPU: 2.768s CGroup: /system.slice/kafka.service ├─3561758 /bin/sh -c “/home/kafka/kafka/bin/kafka-server-start.sh /home/kafka/kafka/config/server.properties > /home/kafka/kafka/kafka.log 2>&1” └─3561760 java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true -Xloggc:/> Sep 28 03:09:39 ubuntua systemd[1]: Started kafka.service.
可以看到kafka已经处于running状态。
测试 创建Topic ~/kafka/bin/kafka-topics.sh --create --bootstrap-server localhost:2181 --replication-factor 1 --partitions 1 --topic TutorialTopic 发送消息 echo "Hello, World" | ~/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic TutorialTopic > /dev/null 订阅Topic新启动一个界面,执行下面命令
~/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic TutorialTopic --from-beginning它会收到上面发的消息
Hello, World
连接 创建表使用kafka engine将kafka中的流映射到一个表中。我们以《探索ClickHouse——使用Projection加速查询》中的数据为例。
clickhouse-client --stream_like_engine_allow_direct_select 1 CREATE TABLE uk_price_paid_from_kafka (uuid_string String, price_string String, time String, postcode String, a String, b String, c String, addr1 String, addr2 String, street String, locality String, town String, district String, county String, d String, e String) ENGINE = Kafka SETTINGS kafka_broker_list = 'localhost:9092', kafka_topic_list='TutorialTopic', kafka_group_name='clickhouse', kafka_format='CSV', kafka_skip_broken_messages=1, kafka_num_consumers=1;CREATE TABLE uk_price_paid_from_kafka ( uuid_string String, price_string String, time String, postcode String, a String, b String, c String, addr1 String, addr2 String, street String, locality String, town String, district String, county String, d String, e String ) ENGINE = Kafka SETTINGS kafka_broker_list = ‘localhost:9092’, kafka_topic_list = ‘TutorialTopic’, kafka_group_name = ‘clickhouse’, kafka_format = ‘CSV’, kafka_skip_broken_messages = 1, kafka_num_consumers = 1 Query id: 07a063e9-6a61-42c0-8fec-1fe2f119ee28 Ok. 0 rows in set. Elapsed: 0.008 sec.
给kafka发送消息 ~/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic TutorialTopic进入消息输入模式,发送下面两个消息
"{F887F88E-7D15-4415-804E-52EAC2F10958}","70000","1995-07-07 00:00","MK15 9HP","D","N","F","31","","ALDRICH DRIVE","WILLEN","MILTON KEYNES","MILTON KEYNES","MILTON KEYNES","A","A" "{40FD4DF2-5362-407C-92BC-566E2CCE89E9}","44500","1995-02-03 00:00","SR6 0AQ","T","N","F","50","","HOWICK PARK","SUNDERLAND","SUNDERLAND","SUNDERLAND","TYNE AND WEAR","A","A" Clickhouse收到消息在clickhouse-client交互终端中执行下面指令:
select * from uk_price_paid_from_kafka;可以看到之前发送给kafka Topic的内容在Clickhouse中被收到了。
问题后面我再在clickhouse-client交互终端中查询不到数据了。即使我们给kafka该主题发消息,也查询不到。后面我们再将《探索ClickHouse——使用MaterializedView存储kafka传递的数据》中讲解使用MaterializedView清洗和固化kafka的数据。
参考资料 openjdk.org/install/ kafka.apache.org/quickstart .digitalocean /community/tutorials/how-to-install-apache-kafka-on-ubuntu-20-04#step-2-mdash-downloading-and-extracting-the-kafka-binaries cloud.tencent /developer/article/1892086 sineyuan.github.io/post/clickhouse-kafka/探索ClickHouse——连接Kafka和Clickhouse由讯客互联电脑硬件栏目发布,感谢您对讯客互联的认可,以及对我们原创作品以及文章的青睐,非常欢迎各位朋友分享到个人网站或者朋友圈,但转载请说明文章出处“探索ClickHouse——连接Kafka和Clickhouse”