notifier/notifier/main.py

181 lines
5.1 KiB
Python

import asyncio
import json
import logging
import re
from os import getenv
from time import gmtime
from typing import Any
import requests
from dotenv import load_dotenv
from feedparser import parse as parse_feed
from rocketry import Rocketry
from rocketry.conds import every
STORAGE = "store.json"
app = Rocketry(config={"task_execution": "async"})
store_lock = asyncio.Lock()
class BotErrorHandler(logging.Handler):
def emit(self, record: logging.LogRecord):
try:
notify_chat("Exception when running task\\!", self.format(record))
except Exception:
self.handleError(record)
def notify_chat(header: str, body: str = ""):
url = f"https://api.telegram.org/bot{getenv('BOT_TOKEN')}/sendMessage"
message = header
if body:
message += "\n```\n" + body.replace("```", "\\`\\`\\`") + "\n```"
result = requests.post(
url,
data={
"chat_id": getenv("BOT_CHAT"),
"parse_mode": "MarkdownV2",
"text": message,
},
).json()
if not result.get("ok"):
raise Exception(json.dumps(result, indent=2))
async def read_from_storage(key: str) -> Any:
async with store_lock:
with open(STORAGE, "r") as f:
data = json.loads(f.read())
return data.get(key)
async def save_to_storage(key: str, value: Any):
async with store_lock:
with open(STORAGE, "r") as f:
data = json.loads(f.read())
data[key] = value
with open(STORAGE, "w") as f:
f.write(json.dumps(data, indent=2, sort_keys=True))
@app.task(every("2 hours"))
async def kernel_release():
feed_loc = "https://www.kernel.org/feeds/kdist.xml"
feed = parse_feed(feed_loc)
latest_title = ""
latest_published = gmtime(0)
for release in feed.entries:
if "stable" in release.title and release.published_parsed > latest_published:
latest_published = release.published_parsed
latest_title = release.title
if latest_title != await read_from_storage("latest_kernel_title"):
notify_chat("New kernel\\.org release", latest_title)
await save_to_storage("latest_kernel_title", latest_title)
def get_latest_arch_package(pkgname: str, repo: str) -> str | None:
feed_loc = f"https://archlinux.org/feeds/packages/x86_64/{repo}/"
feed = parse_feed(feed_loc)
r = re.compile(f"^{pkgname} .*$")
for package in feed.entries:
if r.match(package.title):
return package.title
# notify_chat("Unable to find Arch package\\!", f"{pkgname} in {repo}")
return None
@app.task(every("2 hours"))
async def arch_kernel_package_release():
latest = get_latest_arch_package("linux", "core")
latest_stored = await read_from_storage("latest_kernel_package")
if latest is not None and ((latest_stored is None) or (latest != latest_stored)):
await save_to_storage("latest_kernel_package", latest)
notify_chat("New Arch kernel package", latest)
@app.task(every("2 hours"))
async def opensnitch_package_release():
latest = get_latest_arch_package("opensnitch", "community")
latest_stored = await read_from_storage("latest_opensnitch_package")
if latest is not None and ((latest_stored is None) or (latest != latest_stored)):
await save_to_storage("latest_opensnitch_package", latest)
notify_chat("New Arch opensnitch package", latest)
@app.task(every("2 hours"))
async def graalvm_gds_update():
gds_loc = "https://oca.opensource.oracle.com/gds/meta-data.json"
gds = requests.get(gds_loc).json()
known = await read_from_storage("known_graalvm")
current = [x for x in gds.get("Releases").keys()]
new = []
for release in current:
if known is not None and release not in known:
new.append(release)
if new:
notify_chat("New Graal releases", "\n".join(new))
current.sort()
await save_to_storage("known_graalvm", current)
@app.task(every("2 hours"))
async def opensnitch_update():
# todo - generic github releases feed abstraction
feed_loc = "https://api.github.com/repos/evilsocket/opensnitch/releases"
feed = requests.get(feed_loc).json()
known = await read_from_storage("latest_opensnitch")
latest = feed[0].get("name")
if latest is None or latest != known:
notify_chat("New OpenSnitch release", latest)
await save_to_storage("latest_opensnitch", latest)
async def main():
s_handler = logging.StreamHandler()
s_handler.setLevel(logging.DEBUG)
s_handler.setFormatter(
logging.Formatter("[%(asctime)s] %(name)s (%(levelname)s) %(message)s")
)
b_handler = BotErrorHandler()
b_handler.setLevel(logging.ERROR)
b_handler.setFormatter(logging.Formatter("[%(name)s] %(message)s"))
task_logger = logging.getLogger("rocketry.task")
task_logger.addHandler(s_handler)
task_logger.addHandler(b_handler)
load_dotenv()
rocketry_task = asyncio.create_task(app.serve())
notify_chat("Started notifier")
await rocketry_task
def main_sync():
loop = asyncio.new_event_loop()
loop.run_until_complete(main())
if __name__ == "__main__":
main_sync()