一个py文件搞定mysql查询+Json转换+表数据提取+根据数据条件生成excel文件+打包运行一条龙
- 手机
- 2025-09-16 23:18:02

import os import argparse import pymssql import json import pandas as pd from datetime import datetime from pandas.io.formats.excel import ExcelFormatter import openpyxl # 投注类型映射字典 BET_MAPPING = { 1: 'WIN', 2: 'PLA', 3: 'QIN', 4: 'QPL', 5: 'DBL', 6: 'TCE', 7: 'QTT', 8: 'D-Q', 9: 'TBL', 10: 'T-T', 11: '6UP', 12: 'D-T', 13: 'TRI', 14: 'FCT', 17: 'F-F' } Meetingloc = {'ST':1, 'HV':2, 'S1':5, 'S2':6, 'S3':7, 'S4':8, 'S5':9} def convert_pool_bitmap(pool_list): """将数字列表转换为投注类型""" return [BET_MAPPING.get(num, f'未知({num})') for num in pool_list] def convert_meeting_loc(venue_list): """将数字列表转换为 meeting 类型""" return Meetingloc.get(venue_list) def mains(date): # 数据库配置(需要根据实际情况修改) server = 'your server' user = 'your user' password = 'your pwd' database = 'your DB name' port = 'your port' # 准备数据容器 output_data = [] try: # 建立数据库连接 conn = pymssql.connect( server=server, user=user, password=password, database=database, port=port ) sql = """ SELECT * FROM wc2wca_col_log WHERE msg_details LIKE %s AND msg_code = %s AND biz_date = %s AND msg_type = %s ORDER BY msg_time desc """ # 定义查询参数 for i in Meetingloc.keys(): like_pattern = '%"meetingID":{"meetingLoc":' + str(Meetingloc.get(i)) + ',"meetingDayCode":[1-9]},"raceNo":1%' # like_pattern = '%"meetingID":{"meetingLoc":8,"meetingDayCode":[1-9]},"raceNo":1%' params = ( like_pattern, # LIKE参数 '1120', # msg_code date, # biz_date '1' # msg_type ) # 创建游标执行查询 with conn.cursor(as_dict=True) as cursor: cursor.execute(sql,params) for row in cursor: try: # 解析JSON数据 json_data = json.loads(row['msg_details']) # 提取目标字段 race_no = json_data['Data']['raceNo'] pool_bitmap = json_data['Data']['poolBitmap'] msg_time = row['msg_time'] # 转换投注类型 converted_pool = convert_pool_bitmap(pool_bitmap) # 存储到数据集 output_data.append({ 'meeting':i, 'time': msg_time.strftime('%Y-%m-%d %H:%M:%S.%f'), # 格式化时间 'interval': '', 'raceNo': race_no, 'pool': ', '.join(converted_pool) # 列表转字符串 }) except (KeyError, json.JSONDecodeError) as e: print(f"数据处理异常: {str(e)}") continue # 数据处理: 将 S1 S2 S3 S4 S5 S6 ST HV 数据分组 if output_data: groups = {} for item in output_data: meeting = item.get('meeting') if meeting in Meetingloc.key(): if meeting not in groups: groups[meeting] = [] groups[meeting].append(item) # 自定义输出路径 output_dir = "./reports" # 自定义输出路径 os.makedirs(output_dir, exist_ok=True) filename = f"RaceData_{datetime.now().strftime('%Y%m%d_%H%M%S')}.xlsx" # 生成文件名(带时间戳) filename = os.path.join(output_dir, filename) # 创建Excel文件 with pd.ExcelWriter(filename, engine='openpyxl') as writer: for meeting_name, records in groups.items(): df = pd.DataFrame(records, columns=['meeting','time','interval', 'raceNo', 'pool']) #指定顺序 df = df[['meeting','time','interval', 'RaceNo', 'Pool']] df["time"] = pd.to_datetime(df["time"]) # 转换为时间类型 # df = df.sort_values(by="time").reset_index(drop=True) # 计算时间差(秒) df["Next_Time"] = df["time"].shift(-1) df["interval"] = (df["Next_Time"] - df["time"]).dt.total_seconds() df.drop("Next_Time", axis=1, inplace=True) # 处理最后一条记录的NaN值 df["interval"] = df["interval"].fillna(0) df.to_excel( writer, sheet_name=meeting_name+str(date), index=False, startrow=0, startcol=0) # 获取工作表对象writer # worksheet = writer.sheets.get(meeting_name) worksheet = writer.sheets[meeting_name+str(date)] # 设置列宽(单位:字符) worksheet.column_dimensions['A'].width = 10 # MEETING worksheet.column_dimensions['B'].width = 30 # 时间列 worksheet.column_dimensions['C'].width = 10 # 间隔 worksheet.column_dimensions['D'].width = 10 # RaceNo列 worksheet.column_dimensions['E'].width = 60 # Pool类型列 # 设置标题行样式 header_style = { 'font': {'bold': True, 'color': 'FFFFFF'}, 'fill': {'fill_type': 'solid', 'start_color': '4F81BD'}, 'alignment': {'horizontal': 'center'} } for cell in worksheet[1]: # 第一行为标题行 cell.font = openpyxl.styles.Font(**header_style['font']) cell.fill = openpyxl.styles.PatternFill(**header_style['fill']) cell.alignment = openpyxl.styles.Alignment(**header_style['alignment']) # 自动调整行高 for row in worksheet.iter_rows(): for cell in row: worksheet.row_dimensions[cell.row].height = 20 print(f"数据已成功导出到 {filename}") except pymssql.Error as e: print(f"数据库错误: {str(e)}") finally: if conn: conn.close() def process_date(date_str): """ 处理日期参数的函数 :param date_str: 必传参数(格式示例:20240325) """ # 验证参数格式(可选) if len(date_str) != 8 or not date_str.isdigit(): raise ValueError("日期格式应为8位数字,例如:20240325") # 示例处理结果 year = date_str[:4] month = date_str[4:6] day = date_str[6:] print(f"解析结果:{year}年{month}月{day}日") # 这里添加你的业务逻辑 print(f"成功接收日期参数:{date_str}") print("正在处理...") # 这里添加你的业务逻辑 mains(date_str) def main(): # [!] 关键点:确保这里没有参数 # 创建命令行参数解析器 parser = argparse.ArgumentParser(description='日期处理程序') parser.add_argument('date', type=str, help='必传的日期参数(8位数字,示例:20240325)') args = parser.parse_args() process_date(args.date) if __name__ == "__main__": process_date('20240328')
最后运行:
一、cmd直接运行脚本测试:
python date_app.py 20240325二、打包:打包问题:‘“indexerror: tuple index out of range” 可以参考连接 , 一般问题可以参考连接
pyinstaller --onefile date_app.py三、生成dist文件后,会生成exe文件 在dist文件夹里面新建reports文件夹:报告文件夹
四、测试打包后的程序 1、新建start.bat空文件 2、放入代码:
start PMUCOL_LOG.exe 202403283、运行start.bat
五、生成报告:.\reports\xxxx.xlsx
备注:至于生成Excel文件的代码,想看数据格式的。可以参考
一个py文件搞定mysql查询+Json转换+表数据提取+根据数据条件生成excel文件+打包运行一条龙由讯客互联手机栏目发布,感谢您对讯客互联的认可,以及对我们原创作品以及文章的青睐,非常欢迎各位朋友分享到个人网站或者朋友圈,但转载请说明文章出处“一个py文件搞定mysql查询+Json转换+表数据提取+根据数据条件生成excel文件+打包运行一条龙”
上一篇
机器学习数学通关指南