Aggiunto script daily_report.py

This commit is contained in:
2025-12-26 07:54:26 +01:00
parent 056ac220d5
commit 6f6183291a

View File

@@ -1,127 +1,298 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import sqlite3 import sqlite3
import os import os
import datetime import datetime
import urllib.request import urllib.request
import urllib.parse import urllib.parse
import json import logging
import subprocess
import tempfile
import time
from logging.handlers import RotatingFileHandler
from typing import Optional
# --- CONFIGURAZIONE --- DEBUG = os.environ.get("DEBUG", "0").strip() == "1"
BOT_TOKEN = os.environ.get('BOT_TOKEN')
CHAT_ID = os.environ.get('ALLOWED_USER_ID') SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
DB_PATH = "/data/speedtest.sqlite" LOG_FILE = os.path.join(SCRIPT_DIR, "daily_report.log")
TELEGRAM_CHAT_IDS = ["64463169", "24827341", "132455422", "5405962012"]
TOKEN_FILE_HOME = os.path.expanduser("~/.telegram_dpc_bot_token")
TOKEN_FILE_ETC = "/etc/telegram_dpc_bot_token"
DB_CANDIDATES = [
(os.environ.get("SPEEDTEST_DB") or "").strip(),
"/data/speedtest.sqlite",
os.path.join(SCRIPT_DIR, "speedtest.sqlite"),
os.path.join(SCRIPT_DIR, "database.sqlite"),
os.path.join(SCRIPT_DIR, "data", "speedtest.sqlite"),
os.path.join(SCRIPT_DIR, "data", "database.sqlite"),
]
SPEEDTEST_CONTAINER = (os.environ.get("SPEEDTEST_CONTAINER") or "speedtest-tracker").strip()
CONTAINER_DB_PATH = (os.environ.get("SPEEDTEST_CONTAINER_DB_PATH") or "/config/database.sqlite").strip()
# SOGLIE DI ALLARME (Mbps)
WARN_DOWN = 400 WARN_DOWN = 400
WARN_UP = 100 WARN_UP = 100
def send_telegram_message(message): VALUES_ARE_BITS = os.environ.get("SPEEDTEST_VALUES_ARE_BITS", "0").strip() == "1"
if not message: return
url = f"https://api.telegram.org/bot{BOT_TOKEN}/sendMessage"
payload = { def setup_logger() -> logging.Logger:
"chat_id": CHAT_ID, logger = logging.getLogger("daily_report")
"text": message, logger.setLevel(logging.DEBUG if DEBUG else logging.INFO)
"parse_mode": "Markdown" logger.handlers.clear()
}
fh = RotatingFileHandler(LOG_FILE, maxBytes=1_000_000, backupCount=5, encoding="utf-8")
fh.setLevel(logging.DEBUG)
fmt = logging.Formatter("%(asctime)s %(levelname)s %(message)s")
fh.setFormatter(fmt)
logger.addHandler(fh)
if DEBUG:
sh = logging.StreamHandler()
sh.setLevel(logging.DEBUG)
sh.setFormatter(fmt)
logger.addHandler(sh)
return logger
LOGGER = setup_logger()
def _read_text_file(path: str) -> str:
try: try:
data = urllib.parse.urlencode(payload).encode('utf-8') with open(path, "r", encoding="utf-8") as f:
req = urllib.request.Request(url, data=data) return f.read().strip()
with urllib.request.urlopen(req) as response: except Exception:
if response.status == 200: return ""
print("✅ Report inviato.")
def load_bot_token() -> str:
tok = (os.environ.get("TELEGRAM_BOT_TOKEN") or "").strip()
if tok:
return tok
tok = (os.environ.get("BOT_TOKEN") or "").strip()
if tok:
return tok
tok = _read_text_file(TOKEN_FILE_HOME)
if tok:
return tok
tok = _read_text_file(TOKEN_FILE_ETC)
return tok.strip() if tok else ""
def send_telegram_message(message: str) -> None:
if not message:
return
bot_token = load_bot_token()
if not bot_token:
LOGGER.error("Token Telegram mancante (env/file). Messaggio NON inviato.")
return
url = f"https://api.telegram.org/bot{bot_token}/sendMessage"
for chat_id in TELEGRAM_CHAT_IDS:
payload = {
"chat_id": chat_id,
"text": message,
"parse_mode": "Markdown",
"disable_web_page_preview": True,
}
try:
data = urllib.parse.urlencode(payload).encode("utf-8")
req = urllib.request.Request(url, data=data)
with urllib.request.urlopen(req, timeout=20) as response:
if response.status == 200:
LOGGER.info("Report inviato a chat_id=%s", chat_id)
else:
LOGGER.error("Telegram HTTP %s chat_id=%s", response.status, chat_id)
time.sleep(0.25)
except Exception as e:
LOGGER.exception("Errore invio Telegram chat_id=%s: %s", chat_id, e)
def _to_mbps(val) -> float:
try:
v = float(val)
except Exception:
return 0.0
if VALUES_ARE_BITS:
return v / 1_000_000.0
return (v * 8.0) / 1_000_000.0
def _parse_created_at_utc(created_at) -> datetime.datetime:
if isinstance(created_at, datetime.datetime):
dt = created_at
else:
s = str(created_at).replace("Z", "+00:00")
try:
dt = datetime.datetime.fromisoformat(s)
except Exception:
s2 = s.split(".")[0].replace("T", " ")
dt = datetime.datetime.strptime(s2, "%Y-%m-%d %H:%M:%S")
if dt.tzinfo is None:
dt = dt.replace(tzinfo=datetime.timezone.utc)
else:
dt = dt.astimezone(datetime.timezone.utc)
return dt
def find_local_db_path() -> str:
for p in DB_CANDIDATES:
if not p:
continue
if os.path.exists(p) and os.path.isfile(p):
return p
return ""
def docker_copy_db_to_temp() -> str:
try:
subprocess.run(["docker", "inspect", SPEEDTEST_CONTAINER],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True)
except Exception as e: except Exception as e:
print(f"❌ Errore invio Telegram: {e}") LOGGER.error("Docker/container non disponibile (%s).", e)
return ""
def generate_report():
if not os.path.exists(DB_PATH):
print(f"❌ Database non trovato: {DB_PATH}")
return None
try: try:
conn = sqlite3.connect(DB_PATH) tmpdir = tempfile.mkdtemp(prefix="speedtest_db_")
dst = os.path.join(tmpdir, "database.sqlite")
except Exception as e:
LOGGER.exception("Impossibile creare directory temporanea: %s", e)
return ""
src = f"{SPEEDTEST_CONTAINER}:{CONTAINER_DB_PATH}"
try:
LOGGER.info("DB non trovato localmente. Copio da Docker: %s -> %s", src, dst)
subprocess.run(["docker", "cp", src, dst], check=True,
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
if os.path.exists(dst) and os.path.getsize(dst) > 0:
return dst
LOGGER.error("Copia Docker riuscita ma file assente/vuoto: %s", dst)
return ""
except Exception as e:
LOGGER.exception("Errore docker cp: %s", e)
return ""
def generate_report(db_path: str) -> Optional[str]:
now_utc = datetime.datetime.now(datetime.timezone.utc)
window_start_utc = now_utc - datetime.timedelta(hours=24)
window_start_str = window_start_utc.isoformat(timespec="seconds")
try:
conn = sqlite3.connect(db_path)
cursor = conn.cursor() cursor = conn.cursor()
# Timezone UTC per la query query = """
yesterday = datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(hours=24) SELECT download, upload, ping, created_at
query = "SELECT download, upload, ping, created_at FROM results WHERE created_at > ? AND status = 'completed' ORDER BY created_at ASC" FROM results
cursor.execute(query, (yesterday,)) WHERE created_at >= ?
AND status = 'completed'
ORDER BY created_at ASC
"""
cursor.execute(query, (window_start_str,))
rows = cursor.fetchall() rows = cursor.fetchall()
except Exception as e: except Exception as e:
print(f"Errore DB: {str(e)}") LOGGER.exception("Errore DB (%s): %s", db_path, e)
return None return None
finally: finally:
if 'conn' in locals(): conn.close() try:
conn.close()
except Exception:
pass
if not rows: if not rows:
print(" Nessun test trovato.") LOGGER.info("Nessun test trovato nelle ultime 24h.")
return None return None
# Variabili header = "ORA | Dn | Up | Pg |!"
total_down = 0 sep = "-----+-----+-----+----+-"
total_up = 0
total_down = 0.0
total_up = 0.0
count = 0 count = 0
issues = 0 issues = 0
# Intestazione
now_local = datetime.datetime.now() now_local = datetime.datetime.now()
msg = f"📊 **REPORT VELOCITÀ 24H**\n📅 {now_local.strftime('%d/%m/%Y')}\n\n" msg = f"📊 **REPORT VELOCITÀ 24H**\n📅 {now_local.strftime('%d/%m/%Y')}\n\n"
# Inizio Tabella Monospazio
# Allarghiamo le colonne per farci stare "Mb"
# ORA (5) | DOWN (6) | UP (5)
msg += "```text\n"
msg += "ORA | DOWN | UP \n"
msg += "------+--------+------\n"
for row in rows: msg += "```text\n"
d_val = (int(row[0]) * 8) / 1000000 msg += header + "\n"
u_val = (int(row[1]) * 8) / 1000000 msg += sep + "\n"
total_down += d_val for d_raw, u_raw, ping_raw, created_at in rows:
total_up += u_val d_mbps = _to_mbps(d_raw)
count += 1 u_mbps = _to_mbps(u_raw)
marker = ""
if d_val < WARN_DOWN or u_val < WARN_UP:
issues += 1
marker = "!"
try: try:
d_str = row[3].split(".")[0].replace("T", " ") ping_ms = float(ping_raw)
dt_utc = datetime.datetime.strptime(d_str, '%Y-%m-%d %H:%M:%S').replace(tzinfo=datetime.timezone.utc) except Exception:
ping_ms = 0.0
total_down += d_mbps
total_up += u_mbps
count += 1
flag = " "
if d_mbps < WARN_DOWN or u_mbps < WARN_UP:
issues += 1
flag = "!"
try:
dt_utc = _parse_created_at_utc(created_at)
dt_local = dt_utc.astimezone() dt_local = dt_utc.astimezone()
time_str = dt_local.strftime('%H:%M') time_str = dt_local.strftime("%H:%M")
except: except Exception:
time_str = "--:--" time_str = "--:--"
# FORMATTAZIONE CON UNITÀ msg += f"{time_str:<5}|{int(round(d_mbps)):>5}|{int(round(u_mbps)):>5}|{int(round(ping_ms)):>4}|{flag}\n"
# Creiamo stringhe tipo "850Mb"
d_text = f"{int(d_val)}Mb"
u_text = f"{int(u_val)}Mb"
# Allineamento:
# :>6 significa "occupa 6 spazi allineato a destra"
row_str = f"{time_str} | {d_text:>6} | {u_text:>5} {marker}"
msg += f"{row_str}\n"
msg += "```\n" msg += "```\n"
if count > 0: if count > 0:
avg_d = total_down / count avg_d = total_down / count
avg_u = total_up / count avg_u = total_up / count
icon_d = "" if avg_d >= WARN_DOWN else "⚠️" icon_d = "" if avg_d >= WARN_DOWN else "⚠️"
icon_u = "" if avg_u >= WARN_UP else "⚠️" icon_u = "" if avg_u >= WARN_UP else "⚠️"
# Media su una riga sola con "Mb" # Ø al posto di "MEDIA:" per accorciare ed evitare wrap
msg += f"📈 **MEDIA:** ⬇️{icon_d}`{avg_d:.0f}Mb` ⬆️{icon_u}`{avg_u:.0f}Mb`" msg += f"Ø ⬇️{icon_d}`{avg_d:.0f} Mbps` ⬆️{icon_u}`{avg_u:.0f} Mbps`"
if issues > 0: if issues > 0:
msg += f"\n\n⚠️ **{issues}** test sotto soglia (!)" msg += f"\n\n⚠️ **{issues}** test sotto soglia (!)"
return msg return msg
if __name__ == "__main__":
report = generate_report() def main() -> None:
db_path = find_local_db_path()
if not db_path:
db_path = docker_copy_db_to_temp()
if not db_path:
LOGGER.error("Database non trovato. Locali provati=%s; Docker=%s:%s",
[p for p in DB_CANDIDATES if p],
SPEEDTEST_CONTAINER,
CONTAINER_DB_PATH)
print("❌ Database non trovato. Vedi daily_report.log.")
return
LOGGER.info("Uso database: %s", db_path)
report = generate_report(db_path)
if report: if report:
send_telegram_message(report) send_telegram_message(report)
else:
LOGGER.info("Nessun report da inviare.")
if __name__ == "__main__":
main()