钉钉机器人zabbix报警
首先在钉钉群聊里添加一个自定义的机器人
并复制webhook的内容 如下面 创建新建群手加入 机器人就可得到
还有一个 安全设置 -- 加签 时得到
创建脚本
在zabbix服务端的alertscripts目录下新建一个python脚本
py3 Oct 2021
#pre config 添加配置文件zabbix_ding.conf,并将文件上传至/etc/zabbix/下(进入容器) 或者放在母机指定的mount的目录中 例如下面的py3 脚本 ding.py [config] # 日志文件 log_path=/var/log/zabbix/zabbix_ding.log #钉钉机器人 webhook 值 webhook=https://oapi.dingtalk.com/robot/send?access_token=b3e6fa0f410e7ced04c81680f036xxxx # 安全设置 -- 加签 时得到 secret=SEC64d20b4e9d2e2677f7aa01d2a7c2f9xxxx cat ding.py #!/usr/bin/env python3 # coding:utf8 #pwd docker目录 zbx_env/usr/lib/zabbix/alertscripts import configparser import os import time import hmac import hashlib import base64 import urllib.parse import requests import json import sys config = configparser.ConfigParser() config.read('/etc/zabbix/zabbix_ding.conf', encoding='utf-8') log_path = config.get('config', 'log_path') api_url = config.get('config', 'webhook') api_secret = config.get('config', 'secret') log_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) # 钉钉机器人文档说明 # https://ding-doc.dingtalk.com/doc#/serverapi2/qf2nxq def get_timestamp_sign(): timestamp = str(round(time.time() * 1000)) secret = api_secret secret_enc = secret.encode('utf-8') string_to_sign = '{}\n{}'.format(timestamp, secret) string_to_sign_enc = string_to_sign.encode('utf-8') hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest() sign = urllib.parse.quote_plus(base64.b64encode(hmac_code)) return timestamp, sign # 获取加签后的链接 def get_signed_url(): timestamp, sign = get_timestamp_sign() webhook = api_url + "×tamp=" + timestamp + "&sign=" + sign return webhook # 定义消息模式 def get_webhook(mode): if mode == 0: # only 关键字 webhook = api_url elif mode == 1 or mode == 2: # 关键字和加签 或 # 关键字+加签+ip webhook = get_signed_url() else: webhook = "" print("error! mode: ", mode, " webhook : ", webhook) return webhook def get_message(text, user_info): # 和类型相对应,具体可以看文档 :https://ding-doc.dingtalk.com/doc#/serverapi2/qf2nxq # 可以设置某个人的手机号,指定对象发送 message = { "msgtype": "text", # 有text, "markdown"、link、整体跳转ActionCard 、独立跳转ActionCard、FeedCard类型等 "text": { "content": text # 消息内容 }, "at": { "atMobiles": [ user_info, ], "isAtAll": False # 是否是发送群中全体成员 } } return message # 消息发送日志 def log(info): if os.path.exists(log_path): log_file = open(log_path, "a+") else: log_file = open(log_path, "w+") log_file.write(info) def send_ding_message(text, user_info): # 请求的URL,WebHook地址 # 主要模式有 0 : 关键字 1:# 关键字 +加签 3:关键字+加签+IP webhook = get_webhook(1) # 构建请求头部 header = { "Content-Type": "application/json", "Charset": "UTF-8" } # 构建请求数据 message = get_message(text, user_info) # 对请求的数据进行json封装 message_json = json.dumps(message) # 发送请求 info = requests.post(url=webhook, data=message_json, headers=header).json() code = info["errcode"] errmsg = info["errmsg"] if code == 0: log(log_time + ":消息已发送成功 返回信息:%s %s\n" % (code, errmsg)) else: log(log_time + ":消息发送失败 返回信息:%s %s\n" % (code, errmsg)) print(log_time + ":消息发送失败 返回信息:%s %s\n" % (code, errmsg)) exit(3) if __name__ == "__main__": text = sys.argv[3] user_info = sys.argv[1] send_ding_message(text, user_info)
py2 2018
#!/usr/bin/python #-*- coding: utf-8 -*- # Author evan this one f17 cat /usr/lib/zabbix/alertscripts/ding.py import requests import json import sys import os headers = {'Content-Type': 'application/json;charset=utf-8'} api_url="https://oapi.dingtalk.com/robot/send?access_token=709ea577d2cd4f8e6975be37fc177307b38fc1d32c322dded89723d3ff58e" def msg(text): json_text= { "msgtype": "text", "text": { "content": text }, "at": { "atMobliles": [ "18520124378","18578776813" ], "isAtAll": False } } print requests.post(api_url,json.dumps(json_text),headers=headers).content if __name__ == '__main__': text = sys.argv[1] msg(text) #参考一下以前的爬虫 相关参考请看 自定义机器人
脚本学习详情
see also
【Zabbix】5.0版本钉钉告警Python3脚本及配置 OK
Zabbix5.0使用钉钉机器人报警 WEB界面配置详情good