python监控ES索引数量变化
- 软件开发
- 2025-08-17 23:45:02

文章目录 1, datafram根据相同的key聚合2, 数据合并:获取采集10,20,30分钟es索引数据脚本测试验证 1, datafram根据相同的key聚合 # 创建df1 ==> json {'key':'A', 'value':1 } {'key':'B', 'value':2 } data1 = {'key': ['A', 'B'], 'value': [1, 2]} df1 = pd.DataFrame(data1) # 创建df2 ==> {'key':'A', 'value':11 } {'key':'B', 'value':22 } data2 = {'key': ['A', 'B'], 'value': [11, 22]} df2 = pd.DataFrame(data2) # 创建df3 ==>{'key':'A', 'value':111 } {'key':'B', 'value':222 } {'key':'C', 'value':333 } data3 = {'key': ['A', 'B', 'c'], 'value': [111, 222, 333]} df3 = pd.DataFrame(data3) #### 聚合两个dataframe #==> {'key':'A', 'value_x':1, 'value_y':11 } {'key':'B', 'value_x':2, 'value_y':22 } >>> mdf1=pd.merge(df1, df2, on='key') >>> mdf1 key value_x value_y 0 A 1 11 1 B 2 22 #### 再聚合两个dataframe #==> {'key':'A', 'value_x':1, 'value_y':11 , 'value':111 } {'key':'B', 'value_x':2, 'value_y':22 , 'value':222 } mdf = pd.merge(pd.merge(df1, df2, on='key'), df3, on='key') >>> mdf2=pd.merge(mdf1, df3, on='key') >>> mdf2 key value_x value_y value 0 A 1 11 111 1 B 2 22 222 2, 数据合并:获取采集10,20,30分钟es索引数据 [root@localhost ] # cat es-indices-monitor.py import json import time import requests import os import sys import glob import pandas as pd def deloldfile(workdir): # 获取目录下所有的文件 all_files = glob.glob(os.path.join(workdir, '*')) # 将文件名和访问时间存入列表 file_list = [] for file in all_files: file_list.append((file, os.path.getatime(file))) # 根据访问时间排序 file_list.sort(key=lambda x: x[1], reverse=False) # 删除旧文件,只保留最新的文件 for file in file_list[:-3]: # 排除最后三个文件,因为它是最新的 os.remove(file[0]) def createfile(workdir,fileName): if not os.path.exists(workdir): os.makedirs(workdir) #os.system("find {}/*.json -type f -ctime +1 -delete".format(workdir) ) #for fileName in os.listdir(workdir): file=open(workdir+fileName,'w',encoding="utf-8") return file def readfile(workdir): if not os.path.exists(workdir): os.makedirs(workdir) # 获取目录下所有的文件 all_files = glob.glob(os.path.join(workdir, '*')) # 将文件名和访问时间存入列表 file_list = [] for file in all_files: file_list.append((file, os.path.getatime(file))) # 根据访问时间排序 files=[] file_list.sort(key=lambda x: x[1], reverse=False) for file in file_list: # 排除最后两个文件,因为它是最新的 files.append(file[0]) return files def writejson(file,jsonArr): for js in jsonArr: jstr=json.dumps(js)+"\n" file.write(jstr) file.close() #3,json转字符串 def getdata(domain,password): url = "http://"+domain+"/_cat/indices?format=json" # 设置认证信息 auth = ('elastic', password) # 发送GET请求,并在请求中添加认证信息 response = requests.get(url, auth=auth) # 检查响应状态码,如果成功则打印响应内容 if response.status_code == 200: #遍历返回的json数组,提取需要的字段 jsonArr=json.loads(response.text) df = pd.json_normalize(jsonArr) dfnew = df.drop(["uuid","docs.deleted"], axis=1) #print(dfnew) #保存_cat/es/indices数据到json文件 workdir="/data/es-indices/" workdir_tmp=workdir+"tmp/" f_time = time.strftime("%Y-%m-%d_%H-%M-%S",time.localtime()) filename="es-data-{}.json".format(f_time) filename_tmp="tmp-{}.json".format(f_time) file=createfile(workdir_tmp,filename_tmp) writejson(file,jsonArr) #删除旧文件,只保留2个最新的 deloldfile(workdir_tmp) deloldfile(workdir) files=readfile(workdir_tmp) #df1=pd.read_json(files[0],lines=True,convert_dates=False) if len(files) > 1: print(files[0]) print(files[1]) df1=pd.read_json(files[0],lines=True) df2=pd.read_json(files[1],lines=True) #"health","status","index","uuid","pri","rep","docs.count","docs.deleted","store.size","pri.store.size" df1 = df1.drop(["health","status","uuid","pri","rep","docs.deleted","store.size","pri.store.size"], axis=1) df2 = df2.drop(["health","status","uuid","pri","rep","docs.deleted","store.size","pri.store.size"], axis=1) mdf = pd.merge(df1, df2, on='index', how='outer') #print(df1) else: mdf=dfnew #聚合3条数据,查看索引文档数量是否变化: 近10分钟的数量为doc.count, 前10分钟的数量为doc.count_x, 前20分钟的数量为doc.count_y, #print(mdf) mdf2 = pd.merge(dfnew, mdf, on='index', how='outer') mdf2 = mdf2.rename(columns={"docs.count_x":"docs.count_30", "docs.count_y":"docs.count_20"}) #print(mdf2) file=createfile(workdir,filename) for idx,row in mdf2.iterrows(): jstr=row.to_json() file.write(jstr+"\n") file.close() else: print('请求失败,状态码:', response.status_code) domain="196.1.0.106:9200" password="123456" getdata(domain,password) 脚本测试验证 [root@localhost] # python3 es-indices-monitor.py /data/es-indices/tmp/tmp-2023-09-28_13-56-12.json /data/es-indices/tmp/tmp-2023-09-28_14-11-47.json #查看结果 [root@localhost] # /appset/ldm/script # ll /data/es-indices/ total 148 -rw------- 1 root root 46791 Sep 28 13:56 es-data-2023-09-28_13-56-12.json -rw------- 1 root root 46788 Sep 28 14:11 es-data-2023-09-28_14-11-47.json -rw------- 1 root root 46788 Sep 28 14:12 es-data-2023-09-28_14-12-07.json drwx------ 2 root root 4096 Sep 28 14:12 tmp [root@localhost] # /appset/ldm/script # ll /data/es-indices/tmp/ total 156 -rw------- 1 root root 52367 Sep 28 13:56 tmp-2023-09-28_13-56-12.json -rw------- 1 root root 52364 Sep 28 14:11 tmp-2023-09-28_14-11-47.json -rw------- 1 root root 52364 Sep 28 14:12 tmp-2023-09-28_14-12-07.json #核对文档数量 [root@localhost] # /appset/ldm/script # head -n 2 /data/es-indices/es-data-2023-09-28_13-56-12.json |grep 2023_09 |grep count {"health":"green","status":"open","index":"test_2023_09","pri":"3","rep":"1","docs.count":"14393","store.size":"29.7mb","pri.store.size":"13.9mb","docs.count_30":14391.0,"docs.count_20":14393.0} [root@localhost] # /appset/ldm/script # head -n 2 /data/es-indices/es-data-2023-09-28_14-11-47.json |grep 2023_09 |grep count {"health":"green","status":"open","index":"test_2023_09","pri":"3","rep":"1","docs.count":"14422","store.size":"33.5mb","pri.store.size":"15.8mb","docs.count_30":14391.0,"docs.count_20":14393.0} [root@localhost] # /appset/ldm/script # head -n 2 /data/es-indices/es-data-2023-09-28_14-12-07.json |grep 2023_09 |grep count {"health":"green","status":"open","index":"test_2023_09","pri":"3","rep":"1","docs.count":"14427","store.size":"33.5mb","pri.store.size":"15.8mb","docs.count_30":14393.0,"docs.count_20":14422.0}
python监控ES索引数量变化由讯客互联软件开发栏目发布,感谢您对讯客互联的认可,以及对我们原创作品以及文章的青睐,非常欢迎各位朋友分享到个人网站或者朋友圈,但转载请说明文章出处“python监控ES索引数量变化”
下一篇
前端开发网站推荐