class NotificationManager:
"""统一的消息推送管理器"""
def __init__(self):
self.server_chan_key = "YOUR_SENDKEY"
self.bark_url = "https://api.day.app/YOUR_KEY"
self.telegram_bot = TelegramBot("TOKEN", "CHAT_ID")
def send(self, title, content, channels=None):
"""发送到多个渠道"""
if channels is None:
channels = ['all']
results = {}
if 'all' in channels or 'wechat' in channels:
results['wechat'] = self._send_wechat(title, content)
if 'all' in channels or 'bark' in channels:
results['bark'] = self._send_bark(title, content)
if 'all' in channels or 'telegram' in channels:
results['telegram'] = self._send_telegram(title, content)
return results
def _send_wechat(self, title, content):
url = f"https://sctapi.ftqq.com/{self.server_chan_key}.send"
return requests.post(url, data={"title": title, "desp": content})
def _send_bark(self, title, content):
return requests.post(self.bark_url, json={"title": title, "body": content})
def _send_telegram(self, title, content):
return self.telegram_bot.send_message(f"*{title}*\n\n{content}")
# 使用
notifier = NotificationManager()
notifier.send("系统告警", "CPU 使用率超过 90%", channels=['wechat', 'telegram'])