python脚本将mysql数据写入doris
- 游戏开发
- 2025-09-19 17:39:01

flink-cdc连接的FE,数据写入时正常的, 但通过python脚本使用stream load方式直接连接FE,提示NOT_AUTHORIZED]no valid Basic authorization,如果改用直接连接BE节点,写入又提示[DATA_QUALITY_ERROR]too many filtered rows 这个就是搞技术最麻烦的地方,提示的异常,有没有具体信息,根本就不知道原因是啥,要猜。
from sqlalchemy import create_engine import pandas as pd import math import requests from loguru import logger import gzip import io import uuid from requests.auth import HTTPBasicAuth import json # MySQL 配置 ziping_config = { 'user': 'root', 'password': '123456', 'host': 'localhost', 'port': 3306, 'database': 'ziping' } # 创建 SQLAlchemy 引擎 # ziping_engine = create_engine( # "mysql+pymysql://{user}:{password}@{host}:{port}/{database}?charset=utf8mb4".format(**ziping_config) # ) def sync_zp_bazi_info(): # 从 MySQL 读取数据 ziping_engine = create_engine( "mysql+pymysql://{user}:{password}@{host}:{port}/{database}".format(**ziping_config) ,connect_args={'charset': 'utf8mb4'} ) logger.info("连接 MySQL 成功") query = "SELECT id, year, month, day, hour, year_xun, month_xun, day_xun, hour_xun, " \ "yg, yz, mg, mz, dg, dz, hg, hz, year_shu, month_shu, day_shu, hour_shu, yz_cs, " \ "mz_cs, dz_cs, hz_cs, yg_sk, yz_sk, mg_sk, mz_sk, dz_sk, hg_sk, hz_sk, year_ny, " \ "month_ny, day_ny, hour_ny, jin_cn, mu_cn, shui_cn, huo_cn, tu_cn, zy_cn, py_cn, " \ "zc_cn, pc_cn, zg_cn, qs_cn, ss_cn, sg_cn, bj_cn, jc_cn, ym_gj, md_gj, dh_gj FROM zp_bazi_info" df = pd.read_sql(query, ziping_engine) logger.info("读取数据成功") # Stream Load 配置 stream_load_url = "http://10.101.1.31:8040/api/fay/zp_bazi_info/_stream_load" auth = HTTPBasicAuth('root', '123456') headers = { "Content-Encoding":"gzip", "Content-Type": "text/csv; charset=UTF-8", # 数据格式为 CSV "Expect": "100-continue", # 支持大文件传输 } # 分批次大小 batch_size = 100000 # 每批次 10 万条 total_rows = len(df) num_batches = math.ceil(total_rows / batch_size) # 分批次导入 for i in range(num_batches): start = i * batch_size end = min((i + 1) * batch_size, total_rows) batch_df = df[start:end] # 将批次数据转换为 CSV batch_csv = batch_df.to_csv(index=False, header=False,sep='\t').encode('utf-8') headers["Content-Length"] = str(len(batch_csv)) # 压缩 CSV # compressed_data = compress_csv(batch_csv) # 发送 Stream Load 请求 headers["label"] = str(uuid.uuid1()) response = requests.put(stream_load_url, headers=headers, data=batch_csv,auth=auth,timeout=60) # 检查导入结果 result = response.json() if result['Status'] != 'Fail': logger.info(result['Message']) logger.info(f"批次 {i + 1}/{num_batches} 导入成功") else: logger.info(f"批次 {i + 1}/{num_batches} 导入失败") logger.info("错误信息:{}", result['Message']) break def compress_csv(batch_csv): buffer = io.BytesIO() with gzip.GzipFile(fileobj=buffer, mode='wb') as f: f.write(batch_csv) compressed_data = buffer.getvalue() return compressed_data if __name__ == '__main__': sync_zp_bazi_info()从BE中可以查看到日志,csv文件中到doris中被解析为1列了。看来问题出在batch_df.to_csv(index=False, header=False,sep='\t'),需要增加sep='\t' 这是将mysql的表结构直接转成doris的表
CREATE TABLE `zp_bazi_info` ( `id` CHAR(11) NOT NULL COMMENT "ID", `year` CHAR(2) NOT NULL COMMENT "年柱", `month` CHAR(2) COMMENT "月柱", `day` CHAR(2) COMMENT "日柱", `hour` CHAR(2) COMMENT "时柱", `year_xun` CHAR(2) COMMENT "年柱旬", `month_xun` CHAR(2) COMMENT "月柱旬", `day_xun` CHAR(2) COMMENT "日柱旬", `hour_xun` CHAR(2) COMMENT "时柱旬", `yg` CHAR(1) COMMENT "年干", `yz` CHAR(1) COMMENT "年支", `mg` CHAR(1) COMMENT "月干", `mz` CHAR(1) COMMENT "月支", `dg` CHAR(1) COMMENT "日干", `dz` CHAR(1) COMMENT "日支", `hg` CHAR(1) COMMENT "时干", `hz` CHAR(1) COMMENT "时支", `year_shu` TINYINT COMMENT "年柱数", `month_shu` TINYINT COMMENT "月柱数", `day_shu` TINYINT COMMENT "日柱数", `hour_shu` TINYINT COMMENT "时柱数", `yz_cs` CHAR(2) COMMENT "年长生", `mz_cs` CHAR(2) COMMENT "月长生", `dz_cs` CHAR(2) COMMENT "日长生", `hz_cs` CHAR(2) COMMENT "时长生", `yg_sk` CHAR(1) COMMENT "年干生克", `yz_sk` CHAR(1) COMMENT "年支生克", `mg_sk` CHAR(1) COMMENT "月干生克", `mz_sk` CHAR(1) COMMENT "月支生克", `dz_sk` CHAR(1) COMMENT "日支生克", `hg_sk` CHAR(1) COMMENT "时干生克", `hz_sk` CHAR(1) COMMENT "时支生克", `year_ny` CHAR(3) COMMENT "年纳音", `month_ny` CHAR(3) COMMENT "月纳音", `day_ny` CHAR(3) COMMENT "日纳音", `hour_ny` CHAR(3) COMMENT "时纳音", `jin_cn` TINYINT COMMENT "金数量", `mu_cn` TINYINT COMMENT "木数量", `shui_cn` TINYINT COMMENT "水数量", `huo_cn` TINYINT COMMENT "火数量", `tu_cn` TINYINT COMMENT "土数量", `zy_cn` TINYINT COMMENT "正印数量", `py_cn` TINYINT COMMENT "偏印数量", `zc_cn` TINYINT COMMENT "正财数量", `pc_cn` TINYINT COMMENT "偏财数量", `zg_cn` TINYINT COMMENT "正官数量", `qs_cn` TINYINT COMMENT "七杀数量", `ss_cn` TINYINT COMMENT "食神数量", `sg_cn` TINYINT COMMENT "伤官数量", `bj_cn` TINYINT COMMENT "比肩数量", `jc_cn` TINYINT COMMENT "劫财数量", `ym_gj` CHAR(1) COMMENT "年月拱夹", `md_gj` CHAR(1) COMMENT "月时拱夹", `dh_gj` CHAR(1) COMMENT "日时拱夹" ) ENGINE=OLAP DUPLICATE KEY(`id`) DISTRIBUTED BY HASH(`id`) BUCKETS 10 PROPERTIES ( "replication_num" = "1" );但实际写入,报下面的错误,看来错误原因应该是mysql的字节与doris的字节计算不一样。
Reason: column_name[id], the length of input is too long than schema. first 32 bytes of input str: [丁丑 丁未 丁丑 丁未] schema length: 11; actual length: 27; . src line [];因为id为主键,因此执行下面的语句,会提示Execution failed: Error Failed to execute sql: java.sql.SQLException: (conn=24) errCode = 2, detailMessage = No key column left. index[zp_bazi_info]
ALTER TABLE zp_bazi_info MODIFY COLUMN `id` VARCHAR(32) NOT NULL COMMENT "ID";这里看一下,flink-cdc是怎么做的。 你可以看到varchar在doris中变成了4倍,utf8mb4 编码(这是 MySQL 默认推荐的 UTF-8 实现,支持完整的 Unicode 字符集,包括表情符号等),flink采取了最坏的保守策略。而char是保持不变。
CREATE TABLE `zp_bazi_info` ( `id` VARCHAR(27) NOT NULL COMMENT "ID", `year` VARCHAR(6) NOT NULL COMMENT "年柱", `month` VARCHAR(6) COMMENT "月柱", `day` VARCHAR(6) COMMENT "日柱", `hour` VARCHAR(6) COMMENT "时柱", `year_xun` VARCHAR(6) COMMENT "年柱旬", `month_xun` VARCHAR(6) COMMENT "月柱旬", `day_xun` VARCHAR(6) COMMENT "日柱旬", `hour_xun` VARCHAR(6) COMMENT "时柱旬", `yg` VARCHAR(9) COMMENT "年干", `yz` VARCHAR(9) COMMENT "年支", `mg` VARCHAR(9) COMMENT "月干", `mz` VARCHAR(9) COMMENT "月支", `dg` VARCHAR(9) COMMENT "日干", `dz` VARCHAR(9) COMMENT "日支", `hg` VARCHAR(9) COMMENT "时干", `hz` VARCHAR(9) COMMENT "时支", `year_shu` TINYINT COMMENT "年柱数", `month_shu` TINYINT COMMENT "月柱数", `day_shu` TINYINT COMMENT "日柱数", `hour_shu` TINYINT COMMENT "时柱数", `yz_cs` VARCHAR(6) COMMENT "年长生", `mz_cs` VARCHAR(6) COMMENT "月长生", `dz_cs` VARCHAR(6) COMMENT "日长生", `hz_cs` VARCHAR(6) COMMENT "时长生", `yg_sk` VARCHAR(9) COMMENT "年干生克", `yz_sk` VARCHAR(9) COMMENT "年支生克", `mg_sk` VARCHAR(9) COMMENT "月干生克", `mz_sk` VARCHAR(9) COMMENT "月支生克", `dz_sk` VARCHAR(9) COMMENT "日支生克", `hg_sk` VARCHAR(9) COMMENT "时干生克", `hz_sk` VARCHAR(9) COMMENT "时支生克", `year_ny` VARCHAR(9) COMMENT "年纳音", `month_ny` VARCHAR(9) COMMENT "月纳音", `day_ny` VARCHAR(9) COMMENT "日纳音", `hour_ny` VARCHAR(9) COMMENT "时纳音", `jin_cn` TINYINT COMMENT "金数量", `mu_cn` TINYINT COMMENT "木数量", `shui_cn` TINYINT COMMENT "水数量", `huo_cn` TINYINT COMMENT "火数量", `tu_cn` TINYINT COMMENT "土数量", `zy_cn` TINYINT COMMENT "正印数量", `py_cn` TINYINT COMMENT "偏印数量", `zc_cn` TINYINT COMMENT "正财数量", `pc_cn` TINYINT COMMENT "偏财数量", `zg_cn` TINYINT COMMENT "正官数量", `qs_cn` TINYINT COMMENT "七杀数量", `ss_cn` TINYINT COMMENT "食神数量", `sg_cn` TINYINT COMMENT "伤官数量", `bj_cn` TINYINT COMMENT "比肩数量", `jc_cn` TINYINT COMMENT "劫财数量", `ym_gj` VARCHAR(9) COMMENT "年月拱夹", `md_gj` VARCHAR(9) COMMENT "月时拱夹", `dh_gj` VARCHAR(9) COMMENT "日时拱夹" ) ENGINE=OLAP UNIQUE KEY(`id`) DISTRIBUTED BY HASH(`id`) BUCKETS 10 PROPERTIES ( "replication_num" = "1" );扩展后,数据写入正常了,于是我又验证了,反复执行,看看有没有问题。结果在doris中出现了两条数据。id不是key,为什么会重复写入呢? 因为在Doris中,“Duplicate Key”是一个冗余模型的特性。这个模型的数据完全按照导入文件中的数据进行存储,不会有任何聚合。即使两行数据完全相同,也都会保留。这个Duplicate模型针对日志是可以,但是针对我们的系统表是不合适的,这里就需要采用UNIQUE KEY 51万数据,写入过程日志,doris的数据写入速度还挺快。
2025-03-01 15:27:18.756 | INFO | __main__:sync_zp_bazi_info:33 - 连接 MySQL 成功 2025-03-01 15:29:06.586 | INFO | __main__:sync_zp_bazi_info:40 - 读取数据成功 2025-03-01 15:29:21.388 | INFO | __main__:sync_zp_bazi_info:74 - OK 2025-03-01 15:29:21.388 | INFO | __main__:sync_zp_bazi_info:75 - 批次 1/6 导入成功 2025-03-01 15:29:36.447 | INFO | __main__:sync_zp_bazi_info:74 - OK 2025-03-01 15:29:36.447 | INFO | __main__:sync_zp_bazi_info:75 - 批次 2/6 导入成功 2025-03-01 15:29:52.508 | INFO | __main__:sync_zp_bazi_info:74 - OK 2025-03-01 15:29:52.509 | INFO | __main__:sync_zp_bazi_info:75 - 批次 3/6 导入成功 2025-03-01 15:30:06.747 | INFO | __main__:sync_zp_bazi_info:74 - OK 2025-03-01 15:30:06.747 | INFO | __main__:sync_zp_bazi_info:75 - 批次 4/6 导入成功 2025-03-01 15:30:22.621 | INFO | __main__:sync_zp_bazi_info:74 - OK 2025-03-01 15:30:22.621 | INFO | __main__:sync_zp_bazi_info:75 - 批次 5/6 导入成功 2025-03-01 15:30:24.921 | INFO | __main__:sync_zp_bazi_info:74 - OK 2025-03-01 15:30:24.921 | INFO | __main__:sync_zp_bazi_info:75 - 批次 6/6 导入成功 Process finished with exit code 0执行select * from zp_bazi_info where year='乙丑' and month='戊子',mysql需要2.232s,而doris却只需要92ms,doris这个查询比mysql快了24倍。那么为什么doris那么快呢?
python脚本将mysql数据写入doris由讯客互联游戏开发栏目发布,感谢您对讯客互联的认可,以及对我们原创作品以及文章的青睐,非常欢迎各位朋友分享到个人网站或者朋友圈,但转载请说明文章出处“python脚本将mysql数据写入doris”