-
Notifications
You must be signed in to change notification settings - Fork 26
/
Copy pathql_log_scan.py
270 lines (246 loc) · 13.7 KB
/
ql_log_scan.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
import json
import os
import re
import time
import sys
import subprocess
import traceback
from depend import Depend
'''
cron: 30 23 * * *
new Env('青龙日志分析 && 自动补全依赖');
########环境变量设置#########
## (非必填) 脚本唯一性检测,请在此处填写你想运行的脚本的绝对路径,其他脚本检测到路径与此变量不符将会停止运行
QL_LOG_SCAN_SCRIPT_PATH=
## (非必填)指定日志目录: 默认自动识别青龙目录,出现错误才需要手动指定日志目录
export QL_LOG_PATH="/ql/data/log/"
## (非必填)指定不扫描目录:多个请用逗号隔开
export QL_LOG_BLACK_DIR=""
## (非必填)指定不扫描日志文件:多个请用逗号隔开
export QL_LOG_BLACK_FILE=""
## (非必填)需要被扫描的最近n天的日志,0就是只分析当天的日志(最近24小时的日志)
export QL_LOG_SCAN_DEEPIN=0
## (非必填)是否尝试自动补齐日志报错里提示的依赖
export QL_LOG_AUTO_INSTALL_DEPEND=False
## (非必填)强制指定npm包管理器,有些青龙使用了pnpm而不是npm,注意鉴别
export QL_LOG_NPM="npm"
'''
class QlLogScan(Depend):
def __init__(self):
self.pyname = os.path.basename(__file__).replace(".py", "")
print(self.only_check(self.pyname, os.path.abspath(__file__),"QL_LOG_SCAN_SCRIPT_PATH"))
self.ql_log_path = self.get_env("QL_LOG_PATH", self.get_ql_path() + "log/")
self.filter_dir_list = self.not2append(["^\.tmp$", "^update$", self.pyname + "$"],
self.str2list(self.get_env("QL_LOG_BLACK_DIR")))
self.filter_log_list = self.not2append(['task_error\.log', 'start\.log'],
self.str2list(self.get_env("QL_LOG_BLACK_FILE")))
self.history_scan_deepin = self.get_env("QL_LOG_SCAN_DEEPIN", "0")
self.auto_install_depend = self.get_env("QL_LOG_AUTO_INSTALL_DEPEND", False)
self.npm = self.get_env("QL_LOG_NPM", "npm")
self.log_stat = {
"all": 0,
"nodejs_err": 0,
"python_err": 0,
"err_dict": {},
"nodejs_depend": [],
"python_depend": [],
"readlog_err" :[]
}
self.LogNameHeadList = self.generateLogNameHeadList()
self.analysisLog()
self.showAnalysisLog()
if self.auto_install_depend:
self.auto_depend()
def generateLogNameHeadList(self):
scan_list = []
for i in range((self.history_scan_deepin + 1) * 24):
scan_list.append(time.strftime("%Y-%m-%d-%H", time.localtime((int(time.time()) - (3600 * i)))))
return scan_list
def analysisLog(self):
for path, dir_list, file_list in os.walk(self.ql_log_path):
dir_name = path.replace(self.ql_log_path, "")
if not self.re_filter_list(dir_name, self.filter_dir_list):
for file_name in file_list:
if not self.re_filter_list(file_name, self.filter_log_list) and re.search(r"(.*?).log$",
file_name) and file_name[
:13] in self.LogNameHeadList:
# 读取日志
log_file = open(os.path.join(path, file_name), "r")
try:
log_text = log_file.read(2097152)
log_file.close()
# 分析日志
nodejs_err_list = re.findall(r"Error\:(.*\s?)Require stack\:", log_text)
python_err_list = re.findall(
r"Traceback \(most recent call last\):([\n\s]+File[\s\S]*?, line [\d]+, in[\s\S]*?["
r"\s\S]*?\n[\s\S]*?\n)+(.*?)\n",
log_text)
if nodejs_err_list:
self.log_stat["nodejs_err"] += len(nodejs_err_list)
self.log_stat["err_dict"][dir_name] = []
for i in nodejs_err_list:
v = i.strip()
self.log_stat["err_dict"][dir_name].append({"type": "NodeJs", "log": v})
# 依赖缺失判断
miss_depend = re.search(r"Cannot find module '([a-zA-Z\d_-]+)'", v)
if miss_depend and miss_depend.group(1) not in self.log_stat["nodejs_depend"]:
self.log_stat["nodejs_depend"].append(miss_depend.group(1))
elif python_err_list:
self.log_stat["python_err"] += len(python_err_list)
self.log_stat["err_dict"][dir_name] = []
for i in python_err_list:
v = i[-1].strip()
self.log_stat["err_dict"][dir_name].append({"type": "Python", "log": v})
# 依赖缺失判断
miss_depend = re.search(r"ModuleNotFoundError: No module named \'([a-zA-Z0-9_-]+)\'", v)
if miss_depend and miss_depend.group(1) not in self.log_stat["python_depend"]:
self.log_stat["python_depend"].append(miss_depend.group(1))
self.log_stat["all"] += 1
except Exception as e:
err_log = "读取日志" + str(os.path.join(path, file_name)) + "出现异常: " + str(e) + "\n"
self.log_stat["readlog_err"].append(err_log)
print(err_log)
@staticmethod
def format_log_date(text):
text = text.split("-")
return text[0] + "年" + text[1] + "月" + text[2] + "日" + text[3] + "时"
def showAnalysisLog(self):
len_nodejs_depend = len(self.log_stat["nodejs_depend"])
len_python_depend = len(self.log_stat["python_depend"])
# 展示分析结果
result = "📆分析 " + (
self.format_log_date(self.LogNameHeadList[0]) + " ~ " + self.format_log_date(
self.LogNameHeadList[-1]) if len(self.LogNameHeadList) != 1 else
self.LogNameHeadList[
0]) + " 的日志报告:\n"
if len(self.log_stat["readlog_err"]) != 0:
result += "🔍脚本在读取日志过程中,出现了" + str(len(self.log_stat["readlog_err"])) + "个异常,详细信息将在最后展示\n"
result += "✅正常运行脚本:" + str(self.log_stat["all"]) + " 次\n"
if self.log_stat["all"] != 0:
result += "⛔异常运行脚本:" + str(self.log_stat["nodejs_err"] + self.log_stat["python_err"]) + " 次,占比 " + str(
round(
(float(self.log_stat["nodejs_err"] + self.log_stat["python_err"]) / float(
self.log_stat["all"]) * 100),
2)) + " %\n"
result += "🧐其中:\n"
result += " 🕵️♂️Nodejs异常:" + str(self.log_stat["nodejs_err"]) + " 次,占比 " + str(
round((float(self.log_stat["nodejs_err"]) / float(self.log_stat["all"]) * 100), 2)) + " %\n"
result += " 🕵️♂️Python异常:" + str(self.log_stat["python_err"]) + " 次,占比 " + str(
round((float(self.log_stat["python_err"]) / float(self.log_stat["all"]) * 100), 2)) + " %\n"
if len_nodejs_depend > 0 or len_python_depend > 0:
result += "👮♂️依赖检测: " + (
"☢已开启自动补全依赖,将执行shell命令,请小心恶意脚本👿" if self.auto_install_depend else "❎未开启自动补全依赖,请手动补齐以下依赖🤗") + "\n"
if len_nodejs_depend > 0:
result += "👮♂️检测到缺失NodeJs依赖:\n"
result += str(self.log_stat["nodejs_depend"]) + "\n"
if len_python_depend > 0:
result += "👮♂️检测到缺失Python依赖:\n"
result += str(self.log_stat["python_depend"]) + "\n"
result += "💂♂️详细错误日志:\n\n"
for k, v in self.log_stat["err_dict"].items():
if v:
result += "🛑脚本:" + k + ":\n"
for i in v:
result += "- ⚠" + i["type"] + "错误:" + i["log"] + " \n\n\n"
if len(self.log_stat["readlog_err"]) != 0:
result += "👷♀️读取日志异常日志:\n\n"
for i in self.log_stat["readlog_err"]:
result += "⚠" + i + "\n"
send("🐲青龙日志分析", result)
return result
def auto_depend(self):
len_nodejs_depend = len(self.log_stat["nodejs_depend"])
len_python_depend = len(self.log_stat["python_depend"])
len_all_depend = len_nodejs_depend + len_python_depend
if len_nodejs_depend > 0:
for i in range(len_nodejs_depend):
shell_log = "🤖检测是否安装NodeJs依赖: " + self.log_stat["nodejs_depend"][i] + "\n"
check_result = self.check_depend(self.log_stat["nodejs_depend"][i], "nodejs")
if check_result:
shell_log += "📦" + str(check_result) + "已安装, 跳过安装\n"
else:
shell_log += "⚙当前正在自动安装NodeJs依赖: " + self.log_stat["nodejs_depend"][i] + "\n"
install_result = self.install_depend(self.log_stat["nodejs_depend"][i], "nodejs")
shell_log += "🔨执行命令: " + install_result[0] + "\n"
if install_result[2] != '':
shell_log += "⛔出错了: \n" + install_result[2] + "\n\n"
elif install_result[1] != '':
shell_log += "✅执行完成: \n" + install_result[1] + "\n\n"
send("🐲青龙自动安装依赖(" + str(i + 1) + "/" + str(len_all_depend) + ")", shell_log)
if len_python_depend > 0:
for i in range(len_python_depend):
shell_log = "🤖检测是否安装Python依赖: " + self.log_stat["python_depend"][i] + "\n"
check_result = self.check_depend(self.log_stat["python_depend"][i], "python")
if check_result:
shell_log += "📦" + str(check_result) + "已安装, 跳过安装\n"
else:
shell_log += "⚙当前正在自动安装Python依赖: " + self.log_stat["python_depend"][i] + "\n"
install_result = self.install_depend(self.log_stat["python_depend"][i], "python")
shell_log += "🔨执行命令: " + install_result[0] + "\n"
if install_result[2] != '':
shell_log += "⛔出错了: \n" + install_result[2] + "\n\n"
elif install_result[1] != '':
shell_log += "✅执行完成: \n" + install_result[1] + "\n\n"
send("🐲青龙自动安装依赖(" + str(i + 1 + len_nodejs_depend) + "/" + str(len_all_depend) + ")", shell_log)
def install_depend(self, package, package_type):
package = package.replace("+", "\+")
if package_type == "nodejs":
install_exec = 'cd /ql/ && ' + self.npm + ' install ' + package
elif package_type == "python":
install_exec = 'pip3 install ' + package
elif package_type == "docker":
install_exec = 'apk update && apk add ' + package
if install_exec:
install = subprocess.run(install_exec, shell=True, capture_output=True, text=True)
install_log = install.stdout
install_err = install.stderr
return install_exec, install_log, install_err
else:
return None
def check_depend(self, package, package_type):
package = package.replace("+", "\+")
if package_type == "nodejs":
list_exec = 'cd /ql/ && ' + self.npm + ' list|grep ' + package
list_log = subprocess.run(list_exec, shell=True, capture_output=True, text=True).stdout
npm_re = re.search(r"[\s]" + package + "@[\d.]+", list_log)
pnpm_re = re.search(r"^" + package + " [\d.]+", list_log)
if npm_re:
return npm_re.group()
elif pnpm_re:
return pnpm_re.group()
else:
return None
elif package_type == "python":
list_exec = 'pip3 list|grep ' + package
list_log = subprocess.run(list_exec, shell=True, capture_output=True, text=True).stdout
pip_re = re.search(package + "[ ]+[\d.]+", list_log)
if pip_re:
return pip_re.group()
else:
return None
elif package_type == "docker":
list_exec = 'apk list|grep ' + package
list_log = subprocess.run(list_exec, shell=True, capture_output=True, text=True).stdout
docker_re = re.search(package + "-[\d.]+", list_log)
if docker_re:
return docker_re.group()
else:
return None
else:
return None
def load_send():
global send
cur_path = os.path.abspath(os.path.dirname(__file__))
sys.path.append(cur_path)
if os.path.exists(cur_path + "/notify.py"):
try:
from notify import send
except:
send = False
print("加载通知服务失败~")
else:
send = False
print("加载通知服务失败~")
if __name__ == '__main__':
load_send()
ql = QlLogScan()