共计 11241 个字符,预计需要花费 29 分钟才能阅读完成。
背景
资产的全生命周期管理过程中,监控告警和巡检都是不可缺失的两个组成部分。
监控告警适用于资源故障后的提醒,而巡检则是提前检测可能发生的风险。
下文会介绍如何使用 Python 实现资源水位巡检。
实践
监控使用 Prometheus 技术栈,巡检则从 prometheus 中获取监控指标,定义比实时告警更低的阈值。定期执行并通知到对应人员。
- 定义巡检 PromQL
- 调用 Prometheus API
- 获取数据并格式化
- 通过不同渠道发送通知
main.py
代码介绍由AI生成:
该脚本是一个资源水位巡检工具,主要用于监控和报告基础资源的状态。它通过Prometheus API获取数据,并将数据格式化为Excel和HTML文件,同时发送包含关键信息的Webhook通知。以下是脚本的详细说明:
- 导入依赖库:
-
json
:用于处理JSON数据。 -
requests
:用于发送HTTP请求。 -
xlsxwriter
:用于生成Excel文件。 -
datetime
:用于处理日期和时间。 -
pandas
:用于数据处理和分析,特别是读取和写入Excel文件。
-
- 设置常量:
-
PROM_URL
:Prometheus API的URL,用于查询数据。 -
WEBHOOK_URL
:Webhook的URL,用于发送通知。这里有两个URL,一个用于测试群,另一个用于巡检群,但巡检群的URL是被激活的。
-
- 定义函数:
-
send_email(markdown_messages, timestamp)
:该函数用于发送电子邮件,但当前被注释掉了。它接受Markdown格式的消息和时间戳作为参数,并构造一封包含资源水位巡检信息的电子邮件。 -
send_webhook(rules)
:该函数用于发送Webhook通知。它遍历规则列表,根据每条规则的PromQL查询获取数据,并生成Markdown格式的消息。然后,它通过POST请求将消息发送到指定的Webhook URL。 -
generate_xlsx(rules, timestamp)
:该函数用于生成Excel文件。它遍历规则列表,获取详细数据,并将数据写入Excel工作表。每个规则对应一个工作表,工作表的名称由规则中的sheet_name
字段指定。 -
generate_html(rules, timestamp, xlsx_file)
:该函数用于生成HTML文件。它首先根据Excel文件中的数据判断哪些类别有风险发生,然后为每个有风险发生的类别生成HTML表格。最后,它将所有HTML内容连接成一个完整的HTML文档并写入文件。
-
- 读取配置文件: 脚本从
config.json
文件中读取规则列表。这些规则定义了如何查询Prometheus、如何格式化数据以及如何发送通知等。 - 执行主逻辑:
- 获取当前时间戳并格式化为字符串。
- 调用
generate_xlsx
函数生成Excel文件。 - 调用
generate_html
函数生成HTML文件。 - 调用
send_webhook
函数发送Webhook通知。
需要注意的是,虽然脚本中包含了send_email
函数的定义,但在当前代码中该函数并未被调用。这可能是因为电子邮件发送功能在某些场景下不是必需的,或者需要额外的配置才能启用。
import json
import requests
import xlsxwriter
from datetime import datetime
import pandas as pd
# Prometheus 地址
PROM_URL = "http:/***/select/0/prometheus/api/v1/query"
# 巡检群 Webhook 地址
WEBHOOK_URL ="https://***/api/v1/webhook/send?key=***"
# 测试群 Webhook 地址
#WEBHOOK_URL = "https://***/api/v1/webhook/send?key=***"
def send_email(markdown_messages, timestamp):
sender = "****"
receivers = ["****","****"]
subject = f"基础资源水位巡检 - {now.strftime('%Y-%m-%d')}"
html_body = "<h2>资源水位巡检</h2>" + ''.join(markdown_messages) + "</h4><br><br>详细资源见附件表格。"
msg = MIMEMultipart()
msg['Subject'] = subject
msg['From'] = sender
msg['To'] = ', '.join(receivers)
msg.attach(MIMEText(html_body, 'html'))
with open(f'./output/{timestamp}.xlsx', 'rb') as f:
excel_data = f.read()
excel_attachment = MIMEApplication(excel_data)
excel_attachment.add_header('Content-Disposition', 'attachment', filename=f'{timestamp}.xlsx')
msg.attach(excel_attachment)
smtp_server = "****"
smtp_port = 465
smtp_password = "****"
# with smtplib.SMTP_SSL(smtp_server, smtp_port) as server:
# server.login(sender, smtp_password)
# server.sendmail(sender, receivers, msg.as_string())
# print("邮件已发送")
def send_webhook(rules):
# 存储 Webhook 的 Markdown 消息
webhook_markdown_messages = []
current_type = None
# 遍历每条规则
for rule in rules:
webhook_promql = rule.get('webhook_promql', '')
if webhook_promql == 'count' and rule['promql_details_webhook']:
# 获取详细数据
params = {'query': rule['promql_details_webhook']['query']}
details_data = requests.get(PROM_URL, params=params).json()
print(f"详细数据 ({rule['rules_name']}):")
print(details_data)
# 统计计数
count = len(details_data['data']['result']) if details_data['status'] == 'success' and details_data['data']['result'] else 0
# 生成 Webhook 的 Markdown 消息
if count > 0:
if current_type != rule['type']:
if current_type is not None:
webhook_markdown_messages.append("</h4>\n")
current_type = rule['type']
webhook_markdown_messages.append(f"<h4>资源类别:<font color='blue'>{rule['type']}</font></h4>\n\n")
message = f"- <b>{rule['rules_name']}</b> 的资源个数:<font color='red'>{count}</font> 个<br>\n\n"
webhook_markdown_messages.append(message)
elif webhook_promql == 'details' and rule['promql_details_webhook']:
# 获取详细数据
params = {'query': rule['promql_details_webhook']['query']}
details_data = requests.get(PROM_URL, params=params).json()
print(f"详细数据 ({rule['rules_name']}):")
print(details_data)
# 生成 Webhook 的 Markdown 消息
if details_data['status'] == 'success' and details_data['data']['result']:
if current_type != rule['type']:
if current_type is not None:
webhook_markdown_messages.append("</h4>\n")
current_type = rule['type']
webhook_markdown_messages.append(f"<h4>资源类别:<font color='blue'>{rule['type']}</font></h4>\n\n")
message = f"- <b>{rule['rules_name']}</b> 的资源清单:<br>\n"
for data in details_data['data']['result']:
labels = data['metric']
if rule['promql_details_webhook']['format'] == 'percent':
value = f"{float(data['value'][1]):.2%}"
elif rule['promql_details_webhook']['format'] == 'count':
value = int(float(data['value'][1]))
else:
value = data['value'][1]
label_values = [v for v in labels.values()]
message += ' '.join(label_values) + ' ' + '<font color=\'red\'>' + str(value) + '</font>' + '<br>'
message += "<br>\n"
webhook_markdown_messages.append(message)
# webhook_markdown_messages.append("\n查看邮件获取更多资源清单")
html_link = f"https://****/{timestamp}.html"
xlsx_link = f"https://****/{timestamp}.xlsx"
webhook_markdown_messages.append(f"\n 在线查看:[资源水位巡检表]({html_link})")
webhook_markdown_messages.append(f"\n\n 表格下载:[资源水位巡检表]({xlsx_link})")
# 发送 Markdown 消息到 Webhook
markdown_text = "### 基础资源水位巡检\n" + ''.join(webhook_markdown_messages)
print(f"最终 Markdown 文本:\n{markdown_text}")
payload = {
"msgtype": "markdown",
"markdown": {
"text": markdown_text
}
}
response = requests.post(WEBHOOK_URL, json=payload)
print(f"发送到 Webhook 的响应: {response.text}")
def generate_xlsx(rules, timestamp):
# 创建一个 Excel 文件
xlsx_file = f'./output/{timestamp}.xlsx'
with xlsxwriter.Workbook(xlsx_file) as workbook:
worksheets = {} # 用于存储已创建的工作表
worksheet_rows = {} # 用于存储每个工作表的当前行号
# 遍历每条规则
for rule in rules:
# 获取邮件详细数据
if rule['promql_details_mail']:
params = {'query': rule['promql_details_mail']['query']}
details_data = requests.get(PROM_URL, params=params).json()
print(f"详细数据 ({rule['rules_name']}):")
print(details_data)
# 只有在有数据的情况下才创建工作表
if details_data['status'] == 'success' and details_data['data']['result']:
# 将详细数据保存到 Excel 工作表
if rule['sheet_name'] not in worksheets:
worksheet = workbook.add_worksheet(rule['sheet_name'])
worksheets[rule['sheet_name']] = worksheet
worksheet_rows[rule['sheet_name']] = 0
row = 0
col = 0
all_keys = set()
for data in details_data['data']['result']:
all_keys.update(data['metric'].keys())
all_keys = sorted(all_keys)
worksheet.write(row, col, 'rules_name') # 写入字段名称
col += 1
for key in all_keys:
worksheet.write(row, col, key)
col += 1
worksheet.write(row, col, 'values') # 写入字段名称
row += 1
worksheet_rows[rule['sheet_name']] = row
else:
worksheet = worksheets[rule['sheet_name']]
row = worksheet_rows[rule['sheet_name']]
# 按照 value 的值进行排序
sorted_data = sorted(details_data['data']['result'], key=lambda x: float(x['value'][1]), reverse=True)
for data in sorted_data:
col = 0
worksheet.write(row, col, rule['rules_name']) # 写入规则名称
col += 1
for key in all_keys:
value = data['metric'].get(key, '')
worksheet.write(row, col, value)
col += 1
if rule['promql_details_mail']['format'] == 'percent':
value = f"{float(data['value'][1]):.2%}"
elif rule['promql_details_mail']['format'] == 'count':
value = int(float(data['value'][1]))
else:
value = data['value'][1]
worksheet.write(row, col, value)
row += 1
worksheet_rows[rule['sheet_name']] = row
return xlsx_file
def generate_html(rules, timestamp, xlsx_file):
# 存储所有工作表的 HTML 内容
html_contents = []
# 创建一个字典,用于存储每个类别的风险状态
category_risk_status = {}
# 遍历每条规则
for rule in rules:
# 获取规则所属的类别(使用 sheet_name)
category = rule['sheet_name']
# 如果该类别还没有出现过,则将其加入字典,默认为未发生风险
if category not in category_risk_status:
category_risk_status[category] = False
# 获取邮件详细数据
if rule['promql_details_mail']:
params = {'query': rule['promql_details_mail']['query']}
details_data = requests.get(PROM_URL, params=params).json()
# 如果有数据,则将该类别标记为有风险发生
if details_data['status'] == 'success' and details_data['data']['result']:
category_risk_status[category] = True
# 读取 Excel 文件中的所有工作表名称
sheet_names = pd.ExcelFile(xlsx_file).sheet_names
# 遍历每个类别
for category, has_risk in category_risk_status.items():
# 生成类别标题
category_title = f"<h2 style='text-align: left;'>{category}</h2>"
html_contents.append(category_title)
# 如果该类别有风险发生,并且相应的工作表存在,则读取 Excel 文件并生成表格
if has_risk and category in sheet_names:
df = pd.read_excel(xlsx_file, sheet_name=category)
# 计算每列的最大内容长度
column_widths = df.astype(str).apply(lambda x: x.str.len()).max()
# 将 DataFrame 转换为 HTML 表格
html_table = df.to_html(index=False, classes='table', justify='left', border=0)
# 生成列宽样式
column_styles = []
for column, width in column_widths.items():
column_styles.append(f".table th:nth-child({df.columns.get_loc(column) + 1}), .table td:nth-child({df.columns.get_loc(column) + 1}) {{ width: {width + 2}ch; }}")
# 包装 HTML 内容并美化表格样式
html_content = f"""
<style>
table {{
border-collapse: collapse;
margin: 0;
font-size: 14px;
table-layout: auto;
}}
th, td {{
border: 1px solid black;
padding: 8px;
text-align: left;
white-space: nowrap;
}}
th {{
background-color: #f2f2f2;
}}
{' '.join(column_styles)}
</style>
{html_table}
"""
else:
# 如果该类别没有风险发生,或者相应的工作表不存在,则生成提示信息
html_content = "<p style='color: green;'>该类别未发生风险</p>"
html_contents.append(html_content)
html_contents.append("<hr>")
# 将所有 HTML 内容连接成一个字符串
html_output = "\n".join(html_contents)
# 获取当前时间
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# 添加 HTML 文档结构
html_output = f"""
<!DOCTYPE html>
<html>
<head>
<title>资源水位巡检</title>
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<style>
body {{
font-family: Arial, sans-serif;
margin: 20px;
}}
.left {{
text-align: left;
}}
</style>
</head>
<body>
<p class="left">巡检时间: {current_time}</p>
{html_output}
</body>
</html>
"""
# 将 HTML 内容写入文件
html_file = f'./output/{timestamp}.html'
with open(html_file, 'w', encoding='utf-8') as f:
f.write(html_output)
print(f"HTML 文件已生成: {html_file}")
return html_file
# 读取配置文件
with open('config.json', 'r') as f:
rules = json.load(f)
# 获取当前时间戳
now = datetime.now()
timestamp = now.strftime("%Y-%m-%d_%H-%M")
# 生成 Excel 文件
xlsx_file = generate_xlsx(rules, timestamp)
# 生成 HTML 文件
html_file = generate_html(rules, timestamp, xlsx_file)
# 发送 Webhook
send_webhook(rules)
format-exceltojson.py
由于巡检规则较多,且规则会一直迭代更新,直接在配置文件中维护不方便。
于是将规则存储在表格文件中,通过代码将其自动转换为json。
- 表格截取部分,如下图:
- config.json 截取部分:
[
{
"rules_name": "内存使用率 24小时max > 90%",
"type": "服务器",
"promql_details_mail": {
"query": "max by (region,host_ip,team_id,apps_runon,groups,az,resource) (max_over_time(node_memory:usage_percentage{stype='node',os_platform='Linux',apps_runon!~'kvm|openstack|k8s',resource!~'vm|cloud_machine'}[1d])) /100 / on(team_id) group_left(team_name) enterprise_projects > 0.9",
"format": "percent"
},
"promql_details_webhook": {
"query": "max by (region,host_ip,team_id,apps_runon,groups,az,resource) (max_over_time(node_memory:usage_percentage{stype='node',os_platform='Linux',apps_runon!~'kvm|openstack|k8s',resource!~'vm|cloud_machine'}[1d])) /100 / on(team_id) group_left(team_name) enterprise_projects > 0.9",
"format": "percent"
},
"webhook_promql": "count",
"sheet_name": "服务器内存"
},
{
"rules_name": "CPU使用率 >90% 24小时内持续30分钟次数 >1",
"type": "服务器",
"promql_details_mail": {
"query": "sum (count_over_time((min_over_time(node_cpu:usage_percentage{stype='node',os_platform='Linux',apps_runon!~'kvm|openstack',resource!~'vm|cloud_machine'}[30m]) > 90)[24h:30m])) by (cloud_provider,region,host_ip,team_id,apps_runon,groups,az,resource) > 1",
"format": "raw"
},
"promql_details_webhook": {
"query": "sum (count_over_time((min_over_time(node_cpu:usage_percentage{stype='node',os_platform='Linux',apps_runon!~'kvm|openstack',resource!~'vm|cloud_machine'}[30m]) > 90)[24h:30m])) by (cloud_provider,region,host_ip,team_id,apps_runon,groups,az,resource) > 1",
"format": "raw"
},
"webhook_promql": "count",
"sheet_name": "服务器CPU"
}
]
- format-exceltojson.py
import json
from openpyxl import load_workbook
# 加载 Excel 文件
workbook = load_workbook(filename='基础资源水位巡检规则.xlsx')
sheet = workbook.active
# 初始化规则列表
rules = []
# 遍历 Excel 表格中的每一行
for row in sheet.iter_rows(min_row=2, values_only=True):
rule = {
'rules_name': row[0],
'type': row[1],
'promql_details_mail': {
'query': row[2],
'format': row[3]
},
'promql_details_webhook': {
'query': row[4],
'format': row[5]
},
'webhook_promql': row[6],
'sheet_name': row[7]
}
rules.append(rule)
# 将规则列表转换为 JSON 格式
json_data = json.dumps(rules, ensure_ascii=False, indent=4)
# 将 JSON 数据写入文件
with open('config.json', 'w', encoding='utf-8') as f:
f.write(json_data)
print("配置文件生成完成")
效果预览
在 企业IM 中,仅通知概览:
同时会生成 在线html 或 xlsx工作表,便于查询详情和归档。
本文属于专题:Prometheus 监控
- 使用 Redis Exporter 监控 Redis
- Grafana 备份、迁移与升级
- 云原生监控 Kube-Prometheus
- Prometheus 集成 Nginx 监控
- Prometheus 查询指定时间范围内的峰值或均值
- 云监控接入本地Prometheus
- 使用 MySQL Exporter 监控MySQL
- 个人微信接收夜莺告警消息
- PushGateway 报错:too many open files
- Blackbox 网络监控
- node_exporter 添加自定义指标
- Python 实现资源水位巡检
- Prometheus 替代方案:VictoriaMetrics
- 使用 node_exporter 实现路由器监控
正文完