Python 实现资源水位巡检

150次阅读
没有评论

共计 11241 个字符,预计需要花费 29 分钟才能阅读完成。

背景

资产的全生命周期管理过程中,监控告警和巡检都是不可缺失的两个组成部分。

监控告警适用于资源故障后的提醒,而巡检则是提前检测可能发生的风险。

下文会介绍如何使用 Python 实现资源水位巡检。

实践

监控使用 Prometheus 技术栈,巡检则从 prometheus 中获取监控指标,定义比实时告警更低的阈值。定期执行并通知到对应人员。

  • 定义巡检 PromQL
  • 调用 Prometheus API
  • 获取数据并格式化
  • 通过不同渠道发送通知

main.py

代码介绍由AI生成:

该脚本是一个资源水位巡检工具,主要用于监控和报告基础资源的状态。它通过Prometheus API获取数据,并将数据格式化为Excel和HTML文件,同时发送包含关键信息的Webhook通知。以下是脚本的详细说明:

  1. 导入依赖库
    • json​:用于处理JSON数据。
    • requests​:用于发送HTTP请求。
    • xlsxwriter​:用于生成Excel文件。
    • datetime​:用于处理日期和时间。
    • pandas​:用于数据处理和分析,特别是读取和写入Excel文件。
  2. 设置常量
    • PROM_URL​:Prometheus API的URL,用于查询数据。
    • WEBHOOK_URL​:Webhook的URL,用于发送通知。这里有两个URL,一个用于测试群,另一个用于巡检群,但巡检群的URL是被激活的。
  3. 定义函数
    • send_email(markdown_messages, timestamp)​:该函数用于发送电子邮件,但当前被注释掉了。它接受Markdown格式的消息和时间戳作为参数,并构造一封包含资源水位巡检信息的电子邮件。
    • send_webhook(rules)​:该函数用于发送Webhook通知。它遍历规则列表,根据每条规则的PromQL查询获取数据,并生成Markdown格式的消息。然后,它通过POST请求将消息发送到指定的Webhook URL。
    • generate_xlsx(rules, timestamp)​:该函数用于生成Excel文件。它遍历规则列表,获取详细数据,并将数据写入Excel工作表。每个规则对应一个工作表,工作表的名称由规则中的sheet_name​字段指定。
    • generate_html(rules, timestamp, xlsx_file)​:该函数用于生成HTML文件。它首先根据Excel文件中的数据判断哪些类别有风险发生,然后为每个有风险发生的类别生成HTML表格。最后,它将所有HTML内容连接成一个完整的HTML文档并写入文件。
  4. 读取配置文件: 脚本从config.json​文件中读取规则列表。这些规则定义了如何查询Prometheus、如何格式化数据以及如何发送通知等。
  5. 执行主逻辑
    • 获取当前时间戳并格式化为字符串。
    • 调用generate_xlsx​函数生成Excel文件。
    • 调用generate_html​函数生成HTML文件。
    • 调用send_webhook​函数发送Webhook通知。

需要注意的是,虽然脚本中包含了send_email​函数的定义,但在当前代码中该函数并未被调用。这可能是因为电子邮件发送功能在某些场景下不是必需的,或者需要额外的配置才能启用。

import json
import requests
import xlsxwriter
from datetime import datetime
import pandas as pd

# Prometheus 地址
PROM_URL = "http:/***/select/0/prometheus/api/v1/query"

# 巡检群 Webhook 地址
WEBHOOK_URL ="https://***/api/v1/webhook/send?key=***"

# 测试群 Webhook 地址
#WEBHOOK_URL = "https://***/api/v1/webhook/send?key=***"

def send_email(markdown_messages, timestamp):
    sender = "****"
    receivers = ["****","****"]
    subject = f"基础资源水位巡检 - {now.strftime('%Y-%m-%d')}"
    html_body = "<h2>资源水位巡检</h2>" + ''.join(markdown_messages) + "</h4><br><br>详细资源见附件表格。"

    msg = MIMEMultipart()
    msg['Subject'] = subject
    msg['From'] = sender
    msg['To'] = ', '.join(receivers)

    msg.attach(MIMEText(html_body, 'html'))

    with open(f'./output/{timestamp}.xlsx', 'rb') as f:
        excel_data = f.read()

    excel_attachment = MIMEApplication(excel_data)
    excel_attachment.add_header('Content-Disposition', 'attachment', filename=f'{timestamp}.xlsx')
    msg.attach(excel_attachment)

    smtp_server = "****"
    smtp_port = 465
    smtp_password = "****"

#    with smtplib.SMTP_SSL(smtp_server, smtp_port) as server:
#        server.login(sender, smtp_password)
#        server.sendmail(sender, receivers, msg.as_string())

#    print("邮件已发送")

def send_webhook(rules):
    # 存储 Webhook 的 Markdown 消息
    webhook_markdown_messages = []
    current_type = None

    # 遍历每条规则
    for rule in rules:
        webhook_promql = rule.get('webhook_promql', '')

        if webhook_promql == 'count' and rule['promql_details_webhook']:
            # 获取详细数据
            params = {'query': rule['promql_details_webhook']['query']}
            details_data = requests.get(PROM_URL, params=params).json()
            print(f"详细数据 ({rule['rules_name']}):")
            print(details_data)

            # 统计计数
            count = len(details_data['data']['result']) if details_data['status'] == 'success' and details_data['data']['result'] else 0

            # 生成 Webhook 的 Markdown 消息
            if count > 0:
                if current_type != rule['type']:
                    if current_type is not None:
                        webhook_markdown_messages.append("</h4>\n")
                    current_type = rule['type']
                    webhook_markdown_messages.append(f"<h4>资源类别:<font color='blue'>{rule['type']}</font></h4>\n\n")
                message = f"- <b>{rule['rules_name']}</b> 的资源个数:<font color='red'>{count}</font> 个<br>\n\n"
                webhook_markdown_messages.append(message)

        elif webhook_promql == 'details' and rule['promql_details_webhook']:
            # 获取详细数据
            params = {'query': rule['promql_details_webhook']['query']}
            details_data = requests.get(PROM_URL, params=params).json()
            print(f"详细数据 ({rule['rules_name']}):")
            print(details_data)

            # 生成 Webhook 的 Markdown 消息
            if details_data['status'] == 'success' and details_data['data']['result']:
                if current_type != rule['type']:
                    if current_type is not None:
                        webhook_markdown_messages.append("</h4>\n")
                    current_type = rule['type']
                    webhook_markdown_messages.append(f"<h4>资源类别:<font color='blue'>{rule['type']}</font></h4>\n\n")
                message = f"- <b>{rule['rules_name']}</b> 的资源清单:<br>\n"
                for data in details_data['data']['result']:
                    labels = data['metric']
                    if rule['promql_details_webhook']['format'] == 'percent':
                        value = f"{float(data['value'][1]):.2%}"
                    elif rule['promql_details_webhook']['format'] == 'count':
                        value = int(float(data['value'][1]))
                    else:
                        value = data['value'][1]
                    label_values = [v for v in labels.values()]
                    message += ' '.join(label_values) + ' ' + '<font color=\'red\'>' + str(value) + '</font>' + '<br>'
                message += "<br>\n"
                webhook_markdown_messages.append(message)

#    webhook_markdown_messages.append("\n查看邮件获取更多资源清单")

    html_link = f"https://****/{timestamp}.html"
    xlsx_link = f"https://****/{timestamp}.xlsx"
    webhook_markdown_messages.append(f"\n 在线查看:[资源水位巡检表]({html_link})")
    webhook_markdown_messages.append(f"\n\n 表格下载:[资源水位巡检表]({xlsx_link})")

    # 发送 Markdown 消息到 Webhook
    markdown_text = "### 基础资源水位巡检\n" + ''.join(webhook_markdown_messages)
    print(f"最终 Markdown 文本:\n{markdown_text}")

    payload = {
        "msgtype": "markdown",
        "markdown": {
            "text": markdown_text
        }
    }

    response = requests.post(WEBHOOK_URL, json=payload)
    print(f"发送到 Webhook 的响应: {response.text}")

def generate_xlsx(rules, timestamp):
    # 创建一个 Excel 文件
    xlsx_file = f'./output/{timestamp}.xlsx'
    with xlsxwriter.Workbook(xlsx_file) as workbook:
        worksheets = {}  # 用于存储已创建的工作表
        worksheet_rows = {}  # 用于存储每个工作表的当前行号

        # 遍历每条规则
        for rule in rules:
            # 获取邮件详细数据
            if rule['promql_details_mail']:
                params = {'query': rule['promql_details_mail']['query']}
                details_data = requests.get(PROM_URL, params=params).json()
                print(f"详细数据 ({rule['rules_name']}):")
                print(details_data)

                # 只有在有数据的情况下才创建工作表
                if details_data['status'] == 'success' and details_data['data']['result']:
                    # 将详细数据保存到 Excel 工作表
                    if rule['sheet_name'] not in worksheets:
                        worksheet = workbook.add_worksheet(rule['sheet_name'])
                        worksheets[rule['sheet_name']] = worksheet
                        worksheet_rows[rule['sheet_name']] = 0
                        row = 0
                        col = 0
                        all_keys = set()
                        for data in details_data['data']['result']:
                            all_keys.update(data['metric'].keys())
                        all_keys = sorted(all_keys)
                        worksheet.write(row, col, 'rules_name')  # 写入字段名称
                        col += 1
                        for key in all_keys:
                            worksheet.write(row, col, key)
                            col += 1
                        worksheet.write(row, col, 'values')  # 写入字段名称
                        row += 1
                        worksheet_rows[rule['sheet_name']] = row
                    else:
                        worksheet = worksheets[rule['sheet_name']]
                        row = worksheet_rows[rule['sheet_name']]

                    # 按照 value 的值进行排序
                    sorted_data = sorted(details_data['data']['result'], key=lambda x: float(x['value'][1]), reverse=True)

                    for data in sorted_data:
                        col = 0
                        worksheet.write(row, col, rule['rules_name'])  # 写入规则名称
                        col += 1
                        for key in all_keys:
                            value = data['metric'].get(key, '')
                            worksheet.write(row, col, value)
                            col += 1
                        if rule['promql_details_mail']['format'] == 'percent':
                            value = f"{float(data['value'][1]):.2%}"
                        elif rule['promql_details_mail']['format'] == 'count':
                            value = int(float(data['value'][1]))
                        else:
                            value = data['value'][1]
                        worksheet.write(row, col, value)
                        row += 1
                    worksheet_rows[rule['sheet_name']] = row

    return xlsx_file

def generate_html(rules, timestamp, xlsx_file):
    # 存储所有工作表的 HTML 内容
    html_contents = []

    # 创建一个字典,用于存储每个类别的风险状态
    category_risk_status = {}

    # 遍历每条规则
    for rule in rules:
        # 获取规则所属的类别(使用 sheet_name)
        category = rule['sheet_name']

        # 如果该类别还没有出现过,则将其加入字典,默认为未发生风险
        if category not in category_risk_status:
            category_risk_status[category] = False

        # 获取邮件详细数据
        if rule['promql_details_mail']:
            params = {'query': rule['promql_details_mail']['query']}
            details_data = requests.get(PROM_URL, params=params).json()

            # 如果有数据,则将该类别标记为有风险发生
            if details_data['status'] == 'success' and details_data['data']['result']:
                category_risk_status[category] = True

    # 读取 Excel 文件中的所有工作表名称
    sheet_names = pd.ExcelFile(xlsx_file).sheet_names

    # 遍历每个类别
    for category, has_risk in category_risk_status.items():
        # 生成类别标题
        category_title = f"<h2 style='text-align: left;'>{category}</h2>"
        html_contents.append(category_title)

        # 如果该类别有风险发生,并且相应的工作表存在,则读取 Excel 文件并生成表格
        if has_risk and category in sheet_names:
            df = pd.read_excel(xlsx_file, sheet_name=category)

            # 计算每列的最大内容长度
            column_widths = df.astype(str).apply(lambda x: x.str.len()).max()

            # 将 DataFrame 转换为 HTML 表格
            html_table = df.to_html(index=False, classes='table', justify='left', border=0)

            # 生成列宽样式
            column_styles = []
            for column, width in column_widths.items():
                column_styles.append(f".table th:nth-child({df.columns.get_loc(column) + 1}), .table td:nth-child({df.columns.get_loc(column) + 1}) {{ width: {width + 2}ch; }}")

            # 包装 HTML 内容并美化表格样式
            html_content = f"""
<style>
table {{
  border-collapse: collapse;
  margin: 0;
  font-size: 14px;
  table-layout: auto;
}}
th, td {{
  border: 1px solid black;
  padding: 8px;
  text-align: left;
  white-space: nowrap;
}}
th {{
  background-color: #f2f2f2;
}}
{' '.join(column_styles)}
</style>
{html_table}
"""
        else:
            # 如果该类别没有风险发生,或者相应的工作表不存在,则生成提示信息
            html_content = "<p style='color: green;'>该类别未发生风险</p>"

        html_contents.append(html_content)
        html_contents.append("<hr>")

    # 将所有 HTML 内容连接成一个字符串
    html_output = "\n".join(html_contents)

    # 获取当前时间
    current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")

    # 添加 HTML 文档结构
    html_output = f"""
<!DOCTYPE html>
<html>
<head>
    <title>资源水位巡检</title>
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <style>
        body {{
            font-family: Arial, sans-serif;
            margin: 20px;
        }}
        .left {{
            text-align: left;
        }}
    </style>
</head>
<body>
    <p class="left">巡检时间: {current_time}</p>
    {html_output}
</body>
</html>
"""

    # 将 HTML 内容写入文件
    html_file = f'./output/{timestamp}.html'
    with open(html_file, 'w', encoding='utf-8') as f:
        f.write(html_output)

    print(f"HTML 文件已生成: {html_file}")
    return html_file


# 读取配置文件
with open('config.json', 'r') as f:
    rules = json.load(f)

# 获取当前时间戳
now = datetime.now()
timestamp = now.strftime("%Y-%m-%d_%H-%M")

# 生成 Excel 文件
xlsx_file = generate_xlsx(rules, timestamp)

# 生成 HTML 文件
html_file = generate_html(rules, timestamp, xlsx_file)

# 发送 Webhook
send_webhook(rules)


format-exceltojson.py

由于巡检规则较多,且规则会一直迭代更新,直接在配置文件中维护不方便。

于是将规则存储在表格文件中,通过代码将其自动转换为json。

  • 表格截取部分,如下图:

Python 实现资源水位巡检

  • config.json 截取部分:
[
    {
        "rules_name": "内存使用率 24小时max > 90%",
        "type": "服务器",
        "promql_details_mail": {
            "query": "max by (region,host_ip,team_id,apps_runon,groups,az,resource) (max_over_time(node_memory:usage_percentage{stype='node',os_platform='Linux',apps_runon!~'kvm|openstack|k8s',resource!~'vm|cloud_machine'}[1d])) /100 / on(team_id) group_left(team_name)  enterprise_projects > 0.9",
            "format": "percent"
        },
        "promql_details_webhook": {
            "query": "max by (region,host_ip,team_id,apps_runon,groups,az,resource) (max_over_time(node_memory:usage_percentage{stype='node',os_platform='Linux',apps_runon!~'kvm|openstack|k8s',resource!~'vm|cloud_machine'}[1d])) /100 / on(team_id) group_left(team_name)  enterprise_projects > 0.9",
            "format": "percent"
        },
        "webhook_promql": "count",
        "sheet_name": "服务器内存"
    },
    {
        "rules_name": "CPU使用率 >90% 24小时内持续30分钟次数 >1",
        "type": "服务器",
        "promql_details_mail": {
            "query": "sum (count_over_time((min_over_time(node_cpu:usage_percentage{stype='node',os_platform='Linux',apps_runon!~'kvm|openstack',resource!~'vm|cloud_machine'}[30m]) > 90)[24h:30m])) by (cloud_provider,region,host_ip,team_id,apps_runon,groups,az,resource) > 1",
            "format": "raw"
        },
        "promql_details_webhook": {
            "query": "sum (count_over_time((min_over_time(node_cpu:usage_percentage{stype='node',os_platform='Linux',apps_runon!~'kvm|openstack',resource!~'vm|cloud_machine'}[30m]) > 90)[24h:30m])) by (cloud_provider,region,host_ip,team_id,apps_runon,groups,az,resource) > 1",
            "format": "raw"
        },
        "webhook_promql": "count",
        "sheet_name": "服务器CPU"
    }
]

  • format-exceltojson.py
import json
from openpyxl import load_workbook

# 加载 Excel 文件
workbook = load_workbook(filename='基础资源水位巡检规则.xlsx')
sheet = workbook.active

# 初始化规则列表
rules = []

# 遍历 Excel 表格中的每一行
for row in sheet.iter_rows(min_row=2, values_only=True):
    rule = {
        'rules_name': row[0],
        'type': row[1],
        'promql_details_mail': {
            'query': row[2],
            'format': row[3]
        },
        'promql_details_webhook': {
            'query': row[4],
            'format': row[5]
        },
        'webhook_promql': row[6],
        'sheet_name': row[7]
    }
    rules.append(rule)

# 将规则列表转换为 JSON 格式
json_data = json.dumps(rules, ensure_ascii=False, indent=4)

# 将 JSON 数据写入文件
with open('config.json', 'w', encoding='utf-8') as f:
    f.write(json_data)

print("配置文件生成完成")

效果预览

在 企业IM 中,仅通知概览:
Python 实现资源水位巡检

同时会生成 在线html 或 xlsx工作表,便于查询详情和归档。

Python 实现资源水位巡检

本文属于专题:Prometheus 监控

正文完
 
pengyinwei
版权声明:本站原创文章,由 pengyinwei 2024-10-17发表,共计11241字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处:https://www.opshub.cn
评论(没有评论)