From f2fe1528d4d09da68fe23b3c3d9c6e9b76899632 Mon Sep 17 00:00:00 2001 From: daniele Date: Wed, 24 Dec 2025 19:44:39 +0100 Subject: [PATCH] Aggiornamento script meteo (Vigilia di Natale) --- services/telegram-bot/civil_protection.py | 470 ++++++++++---- services/telegram-bot/severe_weather.py | 732 ++++++++++++++++++---- services/telegram-bot/student_alert.py | 571 ++++++++++++----- 3 files changed, 1368 insertions(+), 405 deletions(-) diff --git a/services/telegram-bot/civil_protection.py b/services/telegram-bot/civil_protection.py index 9b200cc..95ebf70 100644 --- a/services/telegram-bot/civil_protection.py +++ b/services/telegram-bot/civil_protection.py @@ -1,151 +1,371 @@ -import requests -import json -import os -import time +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + import datetime +import html as html_lib +import json +import logging +import os +import re +import time +from html.parser import HTMLParser +from logging.handlers import RotatingFileHandler from zoneinfo import ZoneInfo -# --- CONFIGURAZIONE UTENTE --- -# 👇👇 INSERISCI QUI I TUOI DATI 👇👇 -TELEGRAM_BOT_TOKEN = "8155587974:AAF9OekvBpixtk8ZH6KoIc0L8edbhdXt7A4" +import requests + +# ============================================================================= +# Modalità debug: +# DEBUG=1 python3 civil_protection.py +# Token Telegram (senza modifiche al codice): +# 1) export TELEGRAM_BOT_TOKEN="...token..." +# 2) oppure file nel tuo home: ~/.telegram_dpc_bot_token +# ============================================================================= + +DEBUG = os.environ.get("DEBUG", "0").strip() == "1" + +BULLETIN_URL = "https://mappe.protezionecivile.gov.it/it/mappe-rischi/bollettino-di-criticita/" + +# Chat IDs TELEGRAM_CHAT_IDS = ["64463169", "24827341", "132455422", "5405962012"] -# --- ZONE DA MONITORARE --- -# EMR-B2 = Costa Romagnola (Rimini/Dogana) -# EMR-A2 = Alta Collina Romagnola (San Marino/Titano) -# EMR-D1 = Pianura Bolognese (Bologna Città) -TARGET_ZONES = ["EMR-B2", "EMR-A2", "EMR-D1"] +# Directory dello script (log + state qui, così non serve sudo) +BASE_DIR = os.path.dirname(os.path.abspath(__file__)) +STATE_FILE = os.path.join(BASE_DIR, "dpc_state.json") +LOG_FILE = os.path.join(BASE_DIR, "dpc_telegram.log") -# URL Ufficiale DPC -DPC_URL = "https://raw.githubusercontent.com/pcm-dpc/DPC-Bollettini-Criticita-Idrogeologica-Idraulica/master/files/geojson/today.json" +# Dove cercare il token (in quest’ordine) +TOKEN_FILE_HOME = os.path.expanduser("~/.telegram_dpc_bot_token") +TOKEN_FILE_ETC = "/etc/telegram_dpc_bot_token" -# File di stato -STATE_FILE = "/home/daniely/docker/telegram-bot/dpc_state.json" - -# Mappe -RISK_MAP = { - 1: "🟡 GIALLA", - 2: "🟠 ARANCIONE", - 3: "🔴 ROSSA" +# Zone target (come compaiono tipicamente nel testo del bollettino) +TARGET_ZONES = { + "EMR-B2": "Costa romagnola", + "EMR-A2": "Alta collina romagnola", + "EMR-D1": "Pianura bolognese", } -RISK_TYPES = { - "idro": "💧 Idraulico", - "idrogeo": "⛰️ Idrogeologico", - "temporali": "⚡ Temporali", - "vento": "💨 Vento", - "neve": "❄️ Neve", - "ghiaccio": "🧊 Ghiaccio", - "mare": "🌊 Mareggiate" +RISK_ICON = { + "IDRAULICO": "💧", + "IDROGEOLOGICO": "⛰️", + "TEMPORALI": "⚡", } -def get_zone_label(zone_id): - if zone_id == "EMR-B2": return "Rimini / Bassa RSM" - if zone_id == "EMR-A2": return "Alta RSM / Carpegna" - if zone_id == "EMR-D1": return "Bologna Città" - return zone_id +COLOR_ICON = { + "GIALLA": "🟡", + "ARANCIONE": "🟠", + "ROSSA": "🔴", +} -def send_telegram_message(message): - if not TELEGRAM_BOT_TOKEN or "INSERISCI" in TELEGRAM_BOT_TOKEN: - print(f"[TEST OUT] {message}") - return +HTTP_HEADERS = {"User-Agent": "rpi-dpc-bollettino/1.2"} - url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage" - for chat_id in TELEGRAM_CHAT_IDS: - try: - requests.post(url, json={"chat_id": chat_id, "text": message, "parse_mode": "Markdown"}, timeout=10) - time.sleep(0.2) - except: pass +# ============================================================================= +# Logging +# ============================================================================= -def load_last_alert(): - if os.path.exists(STATE_FILE): - try: - with open(STATE_FILE, 'r') as f: return json.load(f) - except: pass +def setup_logger() -> logging.Logger: + logger = logging.getLogger("dpc_telegram") + logger.setLevel(logging.DEBUG if DEBUG else logging.INFO) + logger.handlers.clear() + + fh = RotatingFileHandler(LOG_FILE, maxBytes=1_000_000, backupCount=5, encoding="utf-8") + fh.setLevel(logging.DEBUG) + fmt = logging.Formatter("%(asctime)s %(levelname)s %(message)s") + fh.setFormatter(fmt) + logger.addHandler(fh) + + if DEBUG: + sh = logging.StreamHandler() + sh.setLevel(logging.DEBUG) + sh.setFormatter(fmt) + logger.addHandler(sh) + + return logger + +LOGGER = setup_logger() + +# ============================================================================= +# Utility +# ============================================================================= + +def now_rome(): + return datetime.datetime.now(ZoneInfo("Europe/Rome")) + +def today_str_italy(): + return now_rome().strftime("%d/%m/%Y") + +def load_text_file(path: str) -> str: + try: + with open(path, "r", encoding="utf-8") as f: + return f.read().strip() + except FileNotFoundError: + return "" + except PermissionError: + # Non consideriamo “errore fatale”: semplicemente quel path non è leggibile. + LOGGER.debug("Permesso negato leggendo %s", path) + return "" + except Exception as e: + LOGGER.exception("Errore leggendo %s: %s", path, e) + return "" + +def load_bot_token() -> str: + # 1) env + tok = os.environ.get("TELEGRAM_BOT_TOKEN", "").strip() + if tok: + return tok + + # 2) home file (consigliato) + tok = load_text_file(TOKEN_FILE_HOME) + if tok: + return tok + + # 3) /etc (solo se leggibile) + tok = load_text_file(TOKEN_FILE_ETC) + if tok: + return tok + + return "" + +def telegram_send_html(message_html: str) -> bool: + """ + Prova a inviare il messaggio. Non solleva eccezioni. + Ritorna True se almeno un invio ha avuto status 200. + Importante: lo script chiama questa funzione SOLO in caso di allerte. + """ + token = load_bot_token() + if not token: + LOGGER.warning("Token Telegram assente. Nessun invio effettuato.") + return False + + url = f"https://api.telegram.org/bot{token}/sendMessage" + base_payload = { + "text": message_html, + "parse_mode": "HTML", + "disable_web_page_preview": True, + } + + sent_ok = False + with requests.Session() as s: + for chat_id in TELEGRAM_CHAT_IDS: + payload = dict(base_payload) + payload["chat_id"] = chat_id + try: + resp = s.post(url, json=payload, timeout=15) + if resp.status_code == 200: + sent_ok = True + else: + LOGGER.error("Telegram error chat_id=%s status=%s body=%s", + chat_id, resp.status_code, resp.text[:500]) + time.sleep(0.25) + except Exception as e: + LOGGER.exception("Telegram exception chat_id=%s err=%s", chat_id, e) + + return sent_ok + +def load_state() -> dict: + try: + if os.path.exists(STATE_FILE): + with open(STATE_FILE, "r", encoding="utf-8") as f: + return json.load(f) or {} + except Exception as e: + LOGGER.exception("Errore lettura stato: %s", e) return {} -def save_current_alert(data): +def save_state(state: dict) -> None: try: - with open(STATE_FILE, 'w') as f: json.dump(data, f) - except: pass - -def analyze_dpc(): - print("--- Controllo Protezione Civile ---") - - try: - r = requests.get(DPC_URL, timeout=10) - r.raise_for_status() - data = r.json() + with open(STATE_FILE, "w", encoding="utf-8") as f: + json.dump(state, f, ensure_ascii=False, indent=2) except Exception as e: - print(f"Errore DPC: {e}") + LOGGER.exception("Errore scrittura stato: %s", e) + +def norm(s: str) -> str: + return re.sub(r"\s+", " ", (s or "").strip().lower()) + +class TextExtractor(HTMLParser): + def __init__(self): + super().__init__() + self.parts = [] + + def handle_data(self, data): + if data and data.strip(): + self.parts.append(data) + +def html_to_lines(html_text: str) -> list[str]: + p = TextExtractor() + p.feed(html_text) + text = "\n".join(p.parts) + text = html_lib.unescape(text) + + lines = [] + for raw in text.splitlines(): + line = re.sub(r"\s+", " ", raw).strip() + if line: + lines.append(line) + return lines + +# ============================================================================= +# Parsing bollettino +# ============================================================================= + +RISK_HEADER_RE = re.compile( + r"^(ORDINARIA|MODERATA|ELEVATA)\s+CRITICITA'\s+PER\s+RISCHIO\s+([A-ZÀ-Ü]+)\s*/\s*ALLERTA\s+(GIALLA|ARANCIONE|ROSSA)\s*:\s*$" +) +DAY_START_RE = re.compile(r"^Per la giornata di (oggi|domani),\s*(.+?):\s*$", re.IGNORECASE) +TITLE_RE = re.compile(r"^Bollettino di Criticità del (.+?) ore (\d{1,2}:\d{2})", re.IGNORECASE) + +def fetch_bulletin_text_lines() -> list[str]: + r = requests.get(BULLETIN_URL, headers=HTTP_HEADERS, timeout=25) + r.raise_for_status() + # Forza encoding per evitare “Mercoledì” + r.encoding = "utf-8" + return html_to_lines(r.text) + +def parse_bulletin(lines: list[str]) -> dict: + out = {"title": "", "issued_date": "", "issued_time": "", "days": {}} + + for ln in lines[:150]: + m = TITLE_RE.match(ln) + if m: + out["title"] = ln + out["issued_date"] = m.group(1).strip() + out["issued_time"] = m.group(2).strip() + break + + current_day_key = None + current_day_label = None + current_risk = None + current_color = None + + def add_alert(zone_name: str): + if not current_day_key or not current_risk or not current_color: + return + day = out["days"].setdefault(current_day_key, {"date_label": current_day_label or "", "alerts": {}}) + icon = COLOR_ICON.get(current_color, "⚪") + ricon = RISK_ICON.get(current_risk, "⚠️") + entry = f"{icon} {ricon} {current_risk}" + day["alerts"].setdefault(zone_name, []) + if entry not in day["alerts"][zone_name]: + day["alerts"][zone_name].append(entry) + + for ln in lines: + md = DAY_START_RE.match(ln) + if md: + current_day_key = md.group(1).lower() + current_day_label = md.group(2).strip() + out["days"].setdefault(current_day_key, {"date_label": current_day_label, "alerts": {}}) + current_risk = None + current_color = None + continue + + mh = RISK_HEADER_RE.match(ln) + if mh: + current_risk = mh.group(2).strip() + current_color = mh.group(3).strip() + continue + + if current_day_key and current_risk and current_color and ":" in ln: + region, rest = ln.split(":", 1) + if norm(region) in (norm("Emilia Romagna"), norm("Emilia-Romagna")): + zones = [z.strip() for z in rest.split(",") if z.strip()] + for _, target_zone_name in TARGET_ZONES.items(): + for z in zones: + if norm(z) == norm(target_zone_name): + add_alert(target_zone_name) + + return out + +def has_any_alert(parsed: dict) -> bool: + for day_key in ("oggi", "domani"): + day = parsed.get("days", {}).get(day_key, {}) + if day.get("alerts"): + return True + return False + +def build_signature(parsed: dict) -> str: + parts = [parsed.get("issued_date", ""), parsed.get("issued_time", ""), parsed.get("title", "")] + for day_key in ("oggi", "domani"): + day = parsed.get("days", {}).get(day_key, {}) + parts.append(day_key + ":" + day.get("date_label", "")) + alerts = day.get("alerts", {}) + for zone in sorted(alerts.keys()): + parts.append(zone + "=" + ",".join(sorted(alerts[zone]))) + return "|".join(parts) + +def format_message(parsed: dict) -> str: + issued = parsed.get("title") or "Bollettino di Criticità" + issued_safe = html_lib.escape(issued) + + lines = [] + lines.append("📢 PROTEZIONE CIVILE (Allerta)") + lines.append(f"🕒 {issued_safe}") + lines.append("") + + for day_key in ("oggi", "domani"): + day = parsed.get("days", {}).get(day_key) + if not day: + continue + alerts = day.get("alerts", {}) + if not alerts: + continue + + lines.append(f"📅 {html_lib.escape(day.get('date_label',''))}") + for zone in sorted(alerts.keys()): + lines.append(f"📍 {html_lib.escape(zone)}") + for entry in alerts[zone]: + lines.append(html_lib.escape(entry)) + lines.append("") + + lines.append("Fonte: mappe.protezionecivile.gov.it") + return "\n".join(lines) + +# ============================================================================= +# Main +# ============================================================================= + +def main(): + LOGGER.info("--- Controllo Protezione Civile (Bollettino ufficiale) ---") + + try: + lines = fetch_bulletin_text_lines() + parsed = parse_bulletin(lines) + except Exception as e: + # NESSUN Telegram in caso di errori: solo log. + LOGGER.exception("Errore durante fetch/parse bollettino: %s", e) return - today_str = datetime.datetime.now(ZoneInfo("Europe/Rome")).strftime('%d/%m/%Y') - - active_alerts = {} - max_global_level = 0 - - # Stringa univoca per identificare se l'allerta è cambiata nel contenuto - current_alert_signature = "" + if DEBUG: + LOGGER.debug("title=%s", parsed.get("title", "")) + for k in ("oggi", "domani"): + d = parsed.get("days", {}).get(k, {}) + LOGGER.debug("%s label=%s", k, d.get("date_label", "")) + LOGGER.debug("%s alerts=%s", k, d.get("alerts", {})) - for feature in data['features']: - props = feature['properties'] - zone_id = props.get('zone_id') - - if zone_id in TARGET_ZONES: - zone_risks = [] - - for key, label in RISK_TYPES.items(): - level = props.get(f"crit_{key}") - - if level and level > 0: - if level > max_global_level: max_global_level = level - - color_icon = "🟡" if level == 1 else "🟠" if level == 2 else "🔴" - risk_str = f"{color_icon} {label}" - zone_risks.append(risk_str) - - if zone_risks: - label = get_zone_label(zone_id) - active_alerts[label] = zone_risks - # Aggiungiamo alla firma per capire se qualcosa è cambiato - current_alert_signature += f"{zone_id}:{','.join(zone_risks)}|" - - # --- LOGICA DI INVIO --- - - last_state = load_last_alert() - last_date = last_state.get("date") - last_sig = last_state.get("signature", "") - - # Se tutto verde (livello 0) - if max_global_level == 0: - print("Nessuna allerta.") - if last_date == today_str and last_sig != "": - # Opzionale: Potremmo mandare "Allerta Rientrata", ma DPC resetta a mezzanotte. - # Per ora resettiamo solo lo stato. - save_current_alert({"date": today_str, "level": 0, "signature": ""}) + # Regola: invia Telegram SOLO se esistono allerte + if not has_any_alert(parsed): + LOGGER.info("Nessuna allerta nelle zone monitorate. Nessuna notifica inviata.") return - # Invia SE: - # 1. È un giorno nuovo - # 2. OPPURE la situazione è cambiata (es. aggiunto Bologna, o passato da Giallo a Rosso) - if last_date != today_str or current_alert_signature != last_sig: - - msg = f"📢 **PROTEZIONE CIVILE (Allerta)**\n" - msg += f"📅 {today_str}\n\n" - - for zone_name, risks in active_alerts.items(): - msg += f"📍 **{zone_name}**\n" - msg += "\n".join(risks) + "\n\n" - - msg += "_Fonte: Dipartimento Protezione Civile_" - - send_telegram_message(msg) - print("Allerta inviata.") - - save_current_alert({"date": today_str, "level": max_global_level, "signature": current_alert_signature}) + sig = build_signature(parsed) + state = load_state() + last_sig = state.get("last_alert_signature", "") + + if sig == last_sig: + LOGGER.info("Allerta già notificata e invariata. Nessuna nuova notifica.") + return + + # A questo punto: ci sono allerte e sono nuove -> prova invio + msg = format_message(parsed) + sent_ok = telegram_send_html(msg) + + if sent_ok: + LOGGER.info("Notifica allerta inviata con successo.") + save_state({ + "date": today_str_italy(), + "last_alert_signature": sig, + }) else: - print("Allerta già notificata e invariata.") + # Non aggiorniamo lo stato: quando risolvi token/rete, reinvierà. + LOGGER.warning("Invio non riuscito (token mancante o errore Telegram). Stato NON aggiornato.") if __name__ == "__main__": - analyze_dpc() + main() diff --git a/services/telegram-bot/severe_weather.py b/services/telegram-bot/severe_weather.py index 791fef2..a8bbac5 100644 --- a/services/telegram-bot/severe_weather.py +++ b/services/telegram-bot/severe_weather.py @@ -1,190 +1,650 @@ -import requests +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + import datetime -import os +import html import json +import logging +import os import time -from dateutil import parser +from logging.handlers import RotatingFileHandler +from typing import Dict, List, Optional, Tuple from zoneinfo import ZoneInfo -# --- CONFIGURAZIONE UTENTE --- -TELEGRAM_BOT_TOKEN = "8155587974:AAF9OekvBpixtk8ZH6KoIc0L8edbhdXt7A4" -TELEGRAM_CHAT_IDS = ["64463169", "24827341", "132455422", "5405962012"] +import requests +from dateutil import parser -# Coordinate (San Marino / Casa) +# ============================================================================= +# SEVERE WEATHER ALERT (next 24h) - Casa (LAT/LON) +# - Freezing rain/drizzle (WMO codes 56,57,66,67) -> priorità alta, basta 1 occorrenza +# - Wind gusts persistence: >= soglia per almeno 2 ore consecutive +# - Rain persistence: soglia (mm/3h) superata per almeno 2 ore (2 finestre 3h consecutive) +# +# Telegram token: NOT in clear. +# Read order: +# 1) env TELEGRAM_BOT_TOKEN +# 2) ~/.telegram_dpc_bot_token +# 3) /etc/telegram_dpc_bot_token +# +# Debug: +# DEBUG=1 python3 severe_weather.py +# +# Log: +# ./weather_alert.log (same folder as this script) +# ============================================================================= + +DEBUG = os.environ.get("DEBUG", "0").strip() == "1" + +# ----------------- TELEGRAM ----------------- +TELEGRAM_CHAT_IDS = ["64463169", "24827341", "132455422", "5405962012"] +TOKEN_FILE_HOME = os.path.expanduser("~/.telegram_dpc_bot_token") +TOKEN_FILE_ETC = "/etc/telegram_dpc_bot_token" + +# ----------------- LOCATION ----------------- LAT = 43.9356 LON = 12.4296 -# --- SOGLIE DI ALLARME --- -# Vento (Protezione Civile Emilia-Romagna) -WIND_YELLOW = 62.0 # km/h -WIND_ORANGE = 75.0 # km/h -WIND_RED = 88.0 # km/h +# ----------------- THRESHOLDS ----------------- +# Vento (km/h) - soglie come da tuo set +WIND_YELLOW = 62.0 +WIND_ORANGE = 75.0 +WIND_RED = 88.0 -# Pioggia -RAIN_3H_LIMIT = 25.0 # mm in 3 ore +# Pioggia: mm in 3 ore +RAIN_3H_LIMIT = 25.0 -# Codici WMO per Gelicidio (Freezing Rain) -FREEZING_CODES = [56, 57, 66, 67] +# Persistenza minima richiesta (ore) +PERSIST_HOURS = 2 # richiesta utente: >=2 ore -# File di stato +# Freezing rain/drizzle codes +FREEZING_CODES = {56, 57, 66, 67} + +# ----------------- HORIZON ----------------- +HOURS_AHEAD = 24 + +# ----------------- FILES ----------------- STATE_FILE = "/home/daniely/docker/telegram-bot/weather_state.json" +BASE_DIR = os.path.dirname(os.path.abspath(__file__)) +LOG_FILE = os.path.join(BASE_DIR, "weather_alert.log") -def send_telegram_message(message): - if not TELEGRAM_BOT_TOKEN or "INSERISCI" in TELEGRAM_BOT_TOKEN: - print(f"[TEST OUT] {message}") - return +# ----------------- OPEN-METEO ----------------- +OPEN_METEO_URL = "https://api.open-meteo.com/v1/forecast" +TZ = "Europe/Rome" +TZINFO = ZoneInfo(TZ) +HTTP_HEADERS = {"User-Agent": "rpi-severe-weather/2.0"} - url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage" - for chat_id in TELEGRAM_CHAT_IDS: - try: - requests.post(url, json={"chat_id": chat_id, "text": message, "parse_mode": "Markdown"}, timeout=10) - time.sleep(0.2) - except: pass +# Force model: AROME France HD 1.5 km +MODEL_PRIMARY = "meteofrance_arome_france_hd" +# Fallback (stessa famiglia Meteo-France) per continuità operativa +MODEL_FALLBACK = "meteofrance_seamless" -def get_forecast(): - url = "https://api.open-meteo.com/v1/forecast" - params = { - "latitude": LAT, "longitude": LON, - "hourly": "precipitation,windgusts_10m,weathercode", - "models": "arome_france_hd", - "timezone": "Europe/Rome", - "forecast_days": 2 - } +# Se True, invia messaggio "rientrata" quando tutto torna sotto soglia (non è un errore) +SEND_ALL_CLEAR = True + + +# ============================================================================= +# LOGGING +# ============================================================================= +def setup_logger() -> logging.Logger: + logger = logging.getLogger("severe_weather") + logger.setLevel(logging.DEBUG if DEBUG else logging.INFO) + logger.handlers.clear() + + fh = RotatingFileHandler(LOG_FILE, maxBytes=1_000_000, backupCount=5, encoding="utf-8") + fh.setLevel(logging.DEBUG) + fmt = logging.Formatter("%(asctime)s %(levelname)s %(message)s") + fh.setFormatter(fmt) + logger.addHandler(fh) + + if DEBUG: + sh = logging.StreamHandler() + sh.setLevel(logging.DEBUG) + sh.setFormatter(fmt) + logger.addHandler(sh) + + return logger + + +LOGGER = setup_logger() + + +# ============================================================================= +# UTILS +# ============================================================================= +def ensure_parent_dir(path: str) -> None: + parent = os.path.dirname(path) + if parent and not os.path.exists(parent): + os.makedirs(parent, exist_ok=True) + + +def now_local() -> datetime.datetime: + return datetime.datetime.now(TZINFO) + + +def read_text_file(path: str) -> str: try: - r = requests.get(url, params=params, timeout=10) - r.raise_for_status() - return r.json() - except: return None + with open(path, "r", encoding="utf-8") as f: + return f.read().strip() + except FileNotFoundError: + return "" + except PermissionError: + LOGGER.debug("Permission denied reading %s", path) + return "" + except Exception as e: + LOGGER.exception("Error reading %s: %s", path, e) + return "" -def load_state(): + +def load_bot_token() -> str: + tok = os.environ.get("TELEGRAM_BOT_TOKEN", "").strip() + if tok: + return tok + tok = read_text_file(TOKEN_FILE_HOME) + if tok: + return tok + tok = read_text_file(TOKEN_FILE_ETC) + return tok.strip() if tok else "" + + +def parse_time_to_local(t: str) -> datetime.datetime: + """ + Robust timezone handling: + - If timestamps are naive (common when timezone=Europe/Rome), interpret as Europe/Rome. + - If timestamps include offset, convert to Europe/Rome. + """ + dt = parser.isoparse(t) + if dt.tzinfo is None: + return dt.replace(tzinfo=TZINFO) + return dt.astimezone(TZINFO) + + +def hhmm(dt: datetime.datetime) -> str: + return dt.strftime("%H:%M") + + +# ============================================================================= +# TELEGRAM +# ============================================================================= +def telegram_send_html(message_html: str) -> bool: + """ + Never raises. Returns True if at least one chat_id succeeded. + IMPORTANT: called only on REAL ALERTS (not on errors). + """ + token = load_bot_token() + if not token: + LOGGER.warning("Telegram token missing: message not sent.") + return False + + url = f"https://api.telegram.org/bot{token}/sendMessage" + base_payload = { + "text": message_html, + "parse_mode": "HTML", + "disable_web_page_preview": True, + } + + sent_ok = False + with requests.Session() as s: + for chat_id in TELEGRAM_CHAT_IDS: + payload = dict(base_payload) + payload["chat_id"] = chat_id + try: + resp = s.post(url, json=payload, timeout=15) + if resp.status_code == 200: + sent_ok = True + else: + LOGGER.error("Telegram error chat_id=%s status=%s body=%s", + chat_id, resp.status_code, resp.text[:500]) + time.sleep(0.25) + except Exception as e: + LOGGER.exception("Telegram exception chat_id=%s err=%s", chat_id, e) + + return sent_ok + + +# ============================================================================= +# STATE +# ============================================================================= +def load_state() -> Dict: + default = { + "alert_active": False, + "wind_level": 0, + "last_wind_peak": 0.0, + "last_rain_3h": 0.0, + "freezing_active": False, + } if os.path.exists(STATE_FILE): try: - with open(STATE_FILE, 'r') as f: return json.load(f) - except: pass - return {"alert_active": False, "last_wind": 0, "last_rain": 0, "wind_level": 0, "freezing_active": False} + with open(STATE_FILE, "r", encoding="utf-8") as f: + data = json.load(f) or {} + default.update(data) + except Exception as e: + LOGGER.exception("State read error: %s", e) + return default -def save_state(state): + +def save_state(state: Dict) -> None: try: - with open(STATE_FILE, 'w') as f: json.dump(state, f) - except: pass + ensure_parent_dir(STATE_FILE) + with open(STATE_FILE, "w", encoding="utf-8") as f: + json.dump(state, f, ensure_ascii=False, indent=2) + except Exception as e: + LOGGER.exception("State write error: %s", e) -def analyze(): - print("--- Controllo Meteo Severo (Wind/Rain/Ice) ---") - data = get_forecast() - if not data: return - hourly = data.get("hourly", {}) - times = hourly.get("time", []) - wind = hourly.get("windgusts_10m", []) - rain = hourly.get("precipitation", []) - wcode = hourly.get("weathercode", []) +# ============================================================================= +# OPEN-METEO +# ============================================================================= +def fetch_forecast(models_value: str) -> Optional[Dict]: + params = { + "latitude": LAT, + "longitude": LON, + "hourly": "precipitation,wind_gusts_10m,weather_code", + "timezone": TZ, + "forecast_days": 2, + "wind_speed_unit": "kmh", + "precipitation_unit": "mm", + "models": models_value, + } - now = datetime.datetime.now(ZoneInfo("Europe/Rome")) + try: + r = requests.get(OPEN_METEO_URL, params=params, headers=HTTP_HEADERS, timeout=25) + if r.status_code == 400: + # Log reason if present; no Telegram on errors + try: + j = r.json() + LOGGER.error("Open-Meteo 400 (models=%s): %s", models_value, j.get("reason", j)) + except Exception: + LOGGER.error("Open-Meteo 400 (models=%s): %s", models_value, r.text[:500]) + return None + r.raise_for_status() + return r.json() + except Exception as e: + LOGGER.exception("Open-Meteo request error (models=%s): %s", models_value, e) + return None + + +def get_forecast() -> Tuple[Optional[Dict], str]: + LOGGER.debug("Requesting Open-Meteo with models=%s", MODEL_PRIMARY) + data = fetch_forecast(MODEL_PRIMARY) + if data is not None: + return data, MODEL_PRIMARY + + LOGGER.warning("Primary model failed (%s). Trying fallback=%s", MODEL_PRIMARY, MODEL_FALLBACK) + data2 = fetch_forecast(MODEL_FALLBACK) + if data2 is not None: + return data2, MODEL_FALLBACK + + return None, MODEL_PRIMARY + + +# ============================================================================= +# PERSISTENCE LOGIC +# ============================================================================= +def best_wind_persistent_level( + gusts: List[float], + times: List[str], + start_idx: int, + end_idx: int, + persist_hours: int +) -> Tuple[int, float, str, int]: + """ + Returns: + (level, peak_gust_within_level, first_start_hhmm, run_length_hours) + Level 0 means not persistent above yellow. + Persistence means >= threshold for >= persist_hours consecutive hourly points. + """ + thresholds = [ + (3, WIND_RED), + (2, WIND_ORANGE), + (1, WIND_YELLOW), + ] + + # Convert times to local datetimes once for speed/readability + dt_list = [parse_time_to_local(t) for t in times] + + def find_run(threshold: float) -> Tuple[bool, float, str, int]: + consec = 0 + run_start = None + run_peak = 0.0 + best_start = "" + best_peak = 0.0 + best_len = 0 + + for i in range(start_idx, end_idx): + try: + v = float(gusts[i]) + except Exception: + v = 0.0 + + if v >= threshold: + if consec == 0: + run_start = i + run_peak = v + else: + run_peak = max(run_peak, v) + consec += 1 + + # record first qualifying run of required length + if consec >= persist_hours and run_start is not None: + # lock the first time a qualifying run appears, but keep peak within that run length + # Extend peak as the run continues + # If user prefers "meglio uno in più", we take the first qualifying run. + if best_len == 0: + best_start = hhmm(dt_list[run_start]) + best_peak = run_peak + best_len = consec + else: + # If same threshold continues longer, update peak and length + if run_start == (run_start if best_len else run_start): + best_peak = max(best_peak, run_peak) + best_len = max(best_len, consec) + else: + consec = 0 + run_start = None + run_peak = 0.0 + + if best_len >= persist_hours: + return True, best_peak, best_start, best_len + return False, 0.0, "", 0 + + for lvl, thr in thresholds: + ok, peak, start_hhmm, run_len = find_run(thr) + if ok: + return lvl, peak, start_hhmm, run_len + + return 0, 0.0, "", 0 + + +def best_rain_persistent_3h( + rain: List[float], + times: List[str], + start_idx: int, + end_idx: int, + limit_3h: float, + persist_hours: int +) -> Tuple[float, str, int]: + """ + Returns: + (max_3h_sum, first_start_hhmm_of_persistent_exceedance, persistence_hours) + + Compute rolling 3h sums (hourly precipitation totals). + Persistence >=2h means: at least `persist_hours` consecutive rolling windows exceed the limit. + Each shift by 1 hour -> 'persistence_hours' approximates how long intense conditions persist. + """ + if end_idx - start_idx < 3: + return 0.0, "", 0 + + dt_list = [parse_time_to_local(t) for t in times] + + # rolling sums for each window start i (covers i, i+1, i+2) + window_starts = list(range(start_idx, end_idx - 2)) + sums = [] + for i in window_starts: + try: + s = float(rain[i]) + float(rain[i + 1]) + float(rain[i + 2]) + except Exception: + s = 0.0 + sums.append(s) + + # Find persistent exceedance: sums[j] >= limit for >= persist_hours consecutive j + best_max = 0.0 + best_start = "" + best_persist = 0 + + consec = 0 + run_start_j = None + run_max = 0.0 + + for j, s in enumerate(sums): + if s >= limit_3h: + if consec == 0: + run_start_j = j + run_max = s + else: + run_max = max(run_max, s) + consec += 1 + + if consec >= persist_hours and run_start_j is not None: + # take first persistent run (meglio uno in più), but keep track of max within it + if best_persist == 0: + start_i = window_starts[run_start_j] + best_start = hhmm(dt_list[start_i]) + best_persist = consec + best_max = run_max + else: + # if same run continues, update + best_persist = max(best_persist, consec) + best_max = max(best_max, run_max) + else: + consec = 0 + run_start_j = None + run_max = 0.0 + + return best_max, best_start, best_persist + + +# ============================================================================= +# MESSAGE BUILDERS +# ============================================================================= +def wind_message(level: int, peak: float, start_hhmm: str, run_len: int) -> str: + # run_len is in hours (number of consecutive hourly points) + if level == 3: + icon = "🔴" + title = "TEMPESTA (Burrasca fortissima)" + thr = WIND_RED + elif level == 2: + icon = "🟠" + title = "VENTO MOLTO FORTE" + thr = WIND_ORANGE + else: + icon = "🟡" + title = "VENTO FORTE" + thr = WIND_YELLOW + + return ( + f"{icon} {title}
" + f"Persistenza: ≥ {PERSIST_HOURS} ore sopra soglia ({thr:.0f} km/h).
" + f"🕒 Inizio stimato: {html.escape(start_hhmm or '—')}
" + f"📈 Picco in finestra: {peak:.0f} km/h (run ~{run_len}h)." + ) + + +def rain_message(max_3h: float, start_hhmm: str, persist_h: int) -> str: + return ( + "🌧️ PIOGGIA INTENSA
" + f"Persistenza: ≥ {PERSIST_HOURS} ore sopra soglia ({RAIN_3H_LIMIT:.1f} mm/3h).
" + f"🕒 Inizio stimato: {html.escape(start_hhmm or '—')}
" + f"📈 Max su 3 ore in finestra: {max_3h:.1f} mm (persistenza ~{persist_h}h)." + ) + + +# ============================================================================= +# MAIN +# ============================================================================= +def analyze() -> None: + LOGGER.info("--- Controllo Meteo Severo (Wind/Rain/Ice) ---") + + data, model_used = get_forecast() + if not data: + # No Telegram on errors + return + + hourly = (data.get("hourly", {}) or {}) + times = hourly.get("time", []) or [] + gusts = hourly.get("wind_gusts_10m", []) or [] + rain = hourly.get("precipitation", []) or [] + wcode = hourly.get("weather_code", []) or [] + + n = min(len(times), len(gusts), len(rain), len(wcode)) + if n == 0: + LOGGER.error("Empty hourly series (model=%s).", model_used) + return + + times = times[:n] + gusts = gusts[:n] + rain = rain[:n] + wcode = wcode[:n] + + now = now_local() state = load_state() - - # Trova indice ora corrente + was_alarm = bool(state.get("alert_active", False)) + + # Find starting index: first timestep >= now start_idx = -1 for i, t in enumerate(times): - if parser.isoparse(t).replace(tzinfo=ZoneInfo("Europe/Rome")) >= now: + if parse_time_to_local(t) >= now: start_idx = i break - - if start_idx == -1: return + if start_idx == -1: + LOGGER.error("Could not locate current time index in forecast timeline.") + return - # Analisi prossime 12 ore - end_idx = min(start_idx + 12, len(times)) - - max_wind = 0.0 - max_wind_time = "" - sum_rain_3h = 0.0 - - freezing_rain_detected = False + end_idx = min(start_idx + HOURS_AHEAD, n) + if end_idx <= start_idx: + LOGGER.error("Invalid horizon window (start=%s end=%s).", start_idx, end_idx) + return + + if DEBUG: + LOGGER.debug("model=%s start_idx=%s end_idx=%s (hours=%s)", + model_used, start_idx, end_idx, end_idx - start_idx) + + # --- Freezing detection (no persistence needed) --- + freezing_detected = False freezing_time = "" - - # Cerca picchi for i in range(start_idx, end_idx): - # Vento - if wind[i] > max_wind: - max_wind = wind[i] - max_wind_time = parser.isoparse(times[i]).strftime('%H:%M') - - # Gelicidio - if wcode[i] in FREEZING_CODES: - freezing_rain_detected = True - if not freezing_time: # Prendi il primo orario - freezing_time = parser.isoparse(times[i]).strftime('%H:%M') + try: + code = int(wcode[i]) + except Exception: + code = -1 + if code in FREEZING_CODES: + freezing_detected = True + if not freezing_time: + freezing_time = hhmm(parse_time_to_local(times[i])) + break - # Pioggia (sliding window 3h) - for i in range(start_idx, end_idx - 3): - current_sum = sum(rain[i:i+3]) - if current_sum > sum_rain_3h: - sum_rain_3h = current_sum + # --- Wind persistence --- + wind_level_curr, wind_peak, wind_start, wind_run_len = best_wind_persistent_level( + gusts=gusts, + times=times, + start_idx=start_idx, + end_idx=end_idx, + persist_hours=PERSIST_HOURS + ) - # --- CLASSIFICAZIONE --- - alerts = [] + # --- Rain persistence (3h windows) --- + rain_max_3h, rain_start, rain_persist = best_rain_persistent_3h( + rain=rain, + times=times, + start_idx=start_idx, + end_idx=end_idx, + limit_3h=RAIN_3H_LIMIT, + persist_hours=PERSIST_HOURS + ) + + # --- Decide notifications --- + alerts: List[str] = [] should_notify = False - - WAS_ALARM = state.get("alert_active", False) - - # 1. GELO (Priorità Massima) - if freezing_rain_detected: - if not state.get("freezing_active", False): - alerts.append(f"🧊 **ALLARME GELICIDIO**\nPrevista pioggia che gela (Freezing Rain).\n🕒 Inizio previsto: {freezing_time}\n_Pericolo ghiaccio su strada!_") + + # 1) Freezing rain (priority) + if freezing_detected: + if not bool(state.get("freezing_active", False)): + alerts.append( + "🧊 ALLARME GELICIDIO
" + "Prevista pioggia che gela (freezing rain/drizzle).
" + f"🕒 Inizio stimato: {html.escape(freezing_time or '—')}
" + "Pericolo ghiaccio su strada." + ) should_notify = True - state["freezing_active"] = True + state["freezing_active"] = True else: state["freezing_active"] = False - # 2. VENTO - wind_level_curr = 0 - if max_wind > WIND_RED: - wind_level_curr = 3 - msg = f"🔴 **TEMPESTA (Burrasca Fortissima)**\nRaffiche > {WIND_RED:.0f} km/h (Picco: **{max_wind:.0f}** km/h).\n🕒 Ore {max_wind_time}" - elif max_wind > WIND_ORANGE: - wind_level_curr = 2 - msg = f"🟠 **VENTO MOLTO FORTE**\nRaffiche > {WIND_ORANGE:.0f} km/h (Picco: **{max_wind:.0f}** km/h).\n🕒 Ore {max_wind_time}" - elif max_wind > WIND_YELLOW: - wind_level_curr = 1 - msg = f"🟡 **VENTO FORTE**\nRaffiche > {WIND_YELLOW:.0f} km/h (Picco: **{max_wind:.0f}** km/h).\n🕒 Ore {max_wind_time}" - + # 2) Wind (persistent) if wind_level_curr > 0: - if not WAS_ALARM or wind_level_curr > state.get("wind_level", 0): - alerts.append(msg) + prev_level = int(state.get("wind_level", 0) or 0) + if (not was_alarm) or (wind_level_curr > prev_level): + alerts.append(wind_message(wind_level_curr, wind_peak, wind_start, wind_run_len)) should_notify = True - state["wind_level"] = wind_level_curr - state["last_wind"] = max_wind + state["wind_level"] = wind_level_curr + state["last_wind_peak"] = float(wind_peak) + else: + state["wind_level"] = 0 + state["last_wind_peak"] = 0.0 - # 3. PIOGGIA - if sum_rain_3h > RAIN_3H_LIMIT: - if not WAS_ALARM or sum_rain_3h > state.get("last_rain", 0) + 10: - alerts.append(f"🌧️ **PIOGGIA INTENSA**\nPrevisti **{sum_rain_3h:.1f} mm** in 3 ore.") - state["last_rain"] = sum_rain_3h + # 3) Rain (persistent) + if rain_persist >= PERSIST_HOURS and rain_max_3h >= RAIN_3H_LIMIT: + prev_rain = float(state.get("last_rain_3h", 0.0) or 0.0) + # "Meglio uno in più": notifica anche al primo superamento persistente, + # e ri-notifica se peggiora di >= +10mm sul massimo 3h + if (not was_alarm) or (rain_max_3h >= prev_rain + 10.0): + alerts.append(rain_message(rain_max_3h, rain_start, rain_persist)) should_notify = True + state["last_rain_3h"] = float(rain_max_3h) + else: + state["last_rain_3h"] = 0.0 - # --- INVIO --- - IS_ALARM_NOW = (wind_level_curr > 0) or (sum_rain_3h > RAIN_3H_LIMIT) or freezing_rain_detected + is_alarm_now = ( + freezing_detected + or (wind_level_curr > 0) + or (rain_persist >= PERSIST_HOURS and rain_max_3h >= RAIN_3H_LIMIT) + ) + # --- Send only on alerts (never on errors) --- if should_notify and alerts: - full_msg = f"⚠️ **AVVISO METEO SEVERO**\n\n" + "\n\n".join(alerts) - send_telegram_message(full_msg) + headline = "⚠️ AVVISO METEO SEVERO" + meta = ( + f"📍 Casa (LAT {LAT:.4f}, LON {LON:.4f})
" + f"🕒 Finestra: prossime {HOURS_AHEAD} ore
" + f"🛰️ Modello: {html.escape(model_used)}
" + f"⏱️ Persistenza minima: {PERSIST_HOURS} ore
" + ) + body = "

".join(alerts) + footer = "

Fonte dati: Open-Meteo" + msg = f"{headline}
{meta}
{body}{footer}" + + ok = telegram_send_html(msg) + if ok: + LOGGER.info("Alert sent successfully.") + else: + LOGGER.warning("Alert NOT sent (token missing or Telegram error).") + state["alert_active"] = True save_state(state) - print("Allerta inviata.") + return - elif WAS_ALARM and not IS_ALARM_NOW: - # Allarme rientrato - msg = f"🟢 **ALLERTA METEO RIENTRATA**\nLe condizioni sono tornate sotto le soglie di guardia." - send_telegram_message(msg) - state = {"alert_active": False, "last_wind": 0, "last_rain": 0, "wind_level": 0, "freezing_active": False} + # Optional: cleared message (transition only) + if SEND_ALL_CLEAR and was_alarm and (not is_alarm_now): + msg = ( + "🟢 ALLERTA METEO RIENTRATA
" + "Condizioni rientrate sotto le soglie di guardia.
" + f"🕒 Finestra: prossime {HOURS_AHEAD} ore
" + f"🛰️ Modello: {html.escape(model_used)}
" + f"⏱️ Persistenza minima: {PERSIST_HOURS} ore" + ) + ok = telegram_send_html(msg) + if ok: + LOGGER.info("All-clear sent successfully.") + else: + LOGGER.warning("All-clear NOT sent (token missing or Telegram error).") + + state = { + "alert_active": False, + "wind_level": 0, + "last_wind_peak": 0.0, + "last_rain_3h": 0.0, + "freezing_active": False, + } save_state(state) - print("Allarme rientrato.") - - else: - # Aggiorna stato parziale se serve (es. vento calato ma ancora presente) - if not IS_ALARM_NOW: - state["alert_active"] = False - save_state(state) - print(f"Nessun nuovo allarme. W:{max_wind:.0f} R:{sum_rain_3h:.1f} Ice:{freezing_rain_detected}") + return + + # No new alert + state["alert_active"] = bool(is_alarm_now) + save_state(state) + LOGGER.info( + "No new alert. model=%s wind_level=%s rain3h=%.1fmm(persist=%sh) ice=%s", + model_used, wind_level_curr, rain_max_3h, rain_persist, freezing_detected + ) + if __name__ == "__main__": analyze() diff --git a/services/telegram-bot/student_alert.py b/services/telegram-bot/student_alert.py index 2f300ca..e1a6b48 100644 --- a/services/telegram-bot/student_alert.py +++ b/services/telegram-bot/student_alert.py @@ -1,191 +1,474 @@ -import requests +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + import datetime -import time +import html import json +import logging import os -from dateutil import parser +import time +from logging.handlers import RotatingFileHandler +from typing import Dict, List, Optional, Tuple from zoneinfo import ZoneInfo -# --- CONFIGURAZIONE UTENTE --- -# 👇👇 INSERISCI QUI I TUOI DATI 👇👇 -TELEGRAM_BOT_TOKEN = "8155587974:AAF9OekvBpixtk8ZH6KoIc0L8edbhdXt7A4" +import requests +from dateutil import parser + +# ============================================================================= +# student_alert.py +# +# Scopo: +# Notificare lo studente (Bologna) se nelle prossime 24 ore sono previsti: +# - Neve persistente (>= 2 ore consecutive con snowfall > 0) +# - Pioggia molto forte persistente (>= 2 ore consecutive con 3h-rolling >= soglia) +# sia a Bologna (trigger) sia lungo il tragitto (caselli A14) fino a San Marino. +# ============================================================================= + +DEBUG = os.environ.get("DEBUG", "0").strip() == "1" + +# ----------------- TELEGRAM ----------------- TELEGRAM_CHAT_IDS = ["64463169", "24827341", "132455422", "5405962012"] +TOKEN_FILE_HOME = os.path.expanduser("~/.telegram_dpc_bot_token") +TOKEN_FILE_ETC = "/etc/telegram_dpc_bot_token" -# --- SOGLIE DI ALLARME (Bologna) --- -SOGLIA_NEVE = 0.0 # cm (Basta neve per attivare) -SOGLIA_PIOGGIA_3H = 30.0 # mm in 3 ore (Pioggia molto forte) +# ----------------- SOGLIE E LOGICA ----------------- +SOGLIA_PIOGGIA_3H_MM = 30.0 # mm in 3 ore (rolling) +PERSIST_HOURS = 2 # persistenza minima (ore) +HOURS_AHEAD = 24 +SNOW_HOURLY_EPS_CM = 0.2 # Soglia minima neve cm/h -# File di stato per la memoria +# File di stato STATE_FILE = "/home/daniely/docker/telegram-bot/student_state.json" -# --- PUNTI DEL PERCORSO (Caselli A14) --- +# ----------------- PUNTI DEL PERCORSO (Caselli A14) ----------------- POINTS = [ - {"name": "🎓 Bologna (V. Regnoli)", "lat": 44.4930, "lon": 11.3690, "type": "trigger"}, + {"name": "🎓 Bologna (V. Regnoli)", "lat": 44.4930, "lon": 11.3690, "type": "trigger"}, {"name": "🛣️ Casello Imola", "lat": 44.3798, "lon": 11.7397, "type": "route"}, {"name": "🛣️ Casello Faenza", "lat": 44.3223, "lon": 11.9040, "type": "route"}, {"name": "🛣️ Casello Forlì", "lat": 44.2502, "lon": 12.0910, "type": "route"}, {"name": "🛣️ Casello Cesena", "lat": 44.1675, "lon": 12.2835, "type": "route"}, {"name": "🛣️ Casello Rimini", "lat": 44.0362, "lon": 12.5659, "type": "route"}, - {"name": "🏠 San Marino", "lat": 43.9356, "lon": 12.4296, "type": "end"} + {"name": "🏠 San Marino", "lat": 43.9356, "lon": 12.4296, "type": "end"}, ] -# --- FUNZIONI DI UTILITÀ --- +# ----------------- OPEN-METEO ----------------- +OPEN_METEO_URL = "https://api.open-meteo.com/v1/forecast" +TZ = "Europe/Rome" +TZINFO = ZoneInfo(TZ) +HTTP_HEADERS = {"User-Agent": "rpi-student-alert/1.0"} +MODEL = "meteofrance_arome_france_hd" -def load_last_state(): - """Legge se c'era un allerta attiva""" - if not os.path.exists(STATE_FILE): return False - try: - with open(STATE_FILE, 'r') as f: - data = json.load(f) - return data.get("alert_active", False) - except: return False +# ----------------- LOG ----------------- +BASE_DIR = os.path.dirname(os.path.abspath(__file__)) +LOG_FILE = os.path.join(BASE_DIR, "student_alert.log") -def save_current_state(is_active): - """Salva lo stato corrente""" + +def setup_logger() -> logging.Logger: + logger = logging.getLogger("student_alert") + logger.setLevel(logging.DEBUG if DEBUG else logging.INFO) + logger.handlers.clear() + + fh = RotatingFileHandler(LOG_FILE, maxBytes=1_000_000, backupCount=5, encoding="utf-8") + fh.setLevel(logging.DEBUG) + fmt = logging.Formatter("%(asctime)s %(levelname)s %(message)s") + fh.setFormatter(fmt) + logger.addHandler(fh) + + if DEBUG: + sh = logging.StreamHandler() + sh.setLevel(logging.DEBUG) + sh.setFormatter(fmt) + logger.addHandler(sh) + + return logger + + +LOGGER = setup_logger() + + +# ============================================================================= +# Utility +# ============================================================================= +def now_local() -> datetime.datetime: + return datetime.datetime.now(TZINFO) + + +def read_text_file(path: str) -> str: try: - with open(STATE_FILE, 'w') as f: - json.dump({"alert_active": is_active, "updated": str(datetime.datetime.now())}, f) + with open(path, "r", encoding="utf-8") as f: + return f.read().strip() + except FileNotFoundError: + return "" + except PermissionError: + LOGGER.debug("Permission denied reading %s", path) + return "" except Exception as e: - print(f"Errore salvataggio stato: {e}") + LOGGER.exception("Error reading %s: %s", path, e) + return "" -def send_telegram_message(message): - if not TELEGRAM_BOT_TOKEN or "INSERISCI" in TELEGRAM_BOT_TOKEN: - print(f"[TEST OUT] {message}") - return - url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage" - for chat_id in TELEGRAM_CHAT_IDS: +def load_bot_token() -> str: + tok = os.environ.get("TELEGRAM_BOT_TOKEN", "").strip() + if tok: + return tok + tok = read_text_file(TOKEN_FILE_HOME) + if tok: + return tok + tok = read_text_file(TOKEN_FILE_ETC) + return tok.strip() if tok else "" + + +def parse_time_to_local(t: str) -> datetime.datetime: + dt = parser.isoparse(t) + if dt.tzinfo is None: + return dt.replace(tzinfo=TZINFO) + return dt.astimezone(TZINFO) + + +def hhmm(dt: datetime.datetime) -> str: + return dt.strftime("%H:%M") + + +# ============================================================================= +# Telegram +# ============================================================================= +def telegram_send_html(message_html: str) -> bool: + token = load_bot_token() + if not token: + LOGGER.warning("Telegram token missing: message not sent.") + return False + + url = f"https://api.telegram.org/bot{token}/sendMessage" + base_payload = { + "text": message_html, + "parse_mode": "HTML", + "disable_web_page_preview": True, + } + + sent_ok = False + with requests.Session() as s: + for chat_id in TELEGRAM_CHAT_IDS: + payload = dict(base_payload) + payload["chat_id"] = chat_id + try: + resp = s.post(url, json=payload, timeout=15) + if resp.status_code == 200: + sent_ok = True + else: + LOGGER.error("Telegram error chat_id=%s status=%s body=%s", + chat_id, resp.status_code, resp.text[:500]) + time.sleep(0.25) + except Exception as e: + LOGGER.exception("Telegram exception chat_id=%s err=%s", chat_id, e) + + return sent_ok + + +# ============================================================================= +# State & Open-Meteo +# ============================================================================= +def load_state() -> Dict: + default = {"alert_active": False, "signature": "", "updated": ""} + if os.path.exists(STATE_FILE): try: - requests.post(url, json={"chat_id": chat_id, "text": message, "parse_mode": "Markdown"}, timeout=10) - time.sleep(0.2) - except: pass + with open(STATE_FILE, "r", encoding="utf-8") as f: + data = json.load(f) or {} + default.update(data) + except Exception as e: + LOGGER.exception("State read error: %s", e) + return default -def get_forecast(lat, lon): - url = "https://api.open-meteo.com/v1/forecast" + +def save_state(alert_active: bool, signature: str) -> None: + try: + os.makedirs(os.path.dirname(STATE_FILE), exist_ok=True) + with open(STATE_FILE, "w", encoding="utf-8") as f: + json.dump( + {"alert_active": alert_active, "signature": signature, "updated": now_local().isoformat()}, + f, + ensure_ascii=False, + indent=2, + ) + except Exception as e: + LOGGER.exception("State write error: %s", e) + + +def get_forecast(session: requests.Session, lat: float, lon: float) -> Optional[Dict]: params = { - "latitude": lat, "longitude": lon, + "latitude": lat, + "longitude": lon, "hourly": "precipitation,snowfall", - "models": "arome_france_hd", - "timezone": "Europe/Rome", - "forecast_days": 2 + "timezone": TZ, + "forecast_days": 2, + "precipitation_unit": "mm", + "models": MODEL, } try: - res = requests.get(url, params=params, timeout=5) - res.raise_for_status() - return res.json() - except: return None + r = session.get(OPEN_METEO_URL, params=params, headers=HTTP_HEADERS, timeout=25) + if r.status_code == 400: + return None + r.raise_for_status() + return r.json() + except Exception as e: + LOGGER.exception("Open-Meteo request error: %s", e) + return None -def get_stats(data): - if not data: return None - hourly = data.get("hourly", {}) - times = hourly.get("time", []) - snow = hourly.get("snowfall", []) - rain = hourly.get("precipitation", []) - - now = datetime.datetime.now(ZoneInfo("Europe/Rome")) - + +# ============================================================================= +# Analytics +# ============================================================================= +def rolling_sum_3h(values: List[float]) -> List[float]: + out = [] + for i in range(0, max(0, len(values) - 2)): + try: + s = float(values[i]) + float(values[i + 1]) + float(values[i + 2]) + except Exception: + s = 0.0 + out.append(s) + return out + + +def first_persistent_run(values: List[float], threshold: float, persist: int) -> Tuple[bool, int, int, float]: + consec = 0 + run_start = -1 + run_max = 0.0 + for i, v in enumerate(values): + vv = float(v) if v is not None else 0.0 + if vv >= threshold: + if consec == 0: + run_start = i + run_max = vv + else: + run_max = max(run_max, vv) + consec += 1 + if consec >= persist: + return True, run_start, consec, run_max + else: + consec = 0 + return False, -1, 0, 0.0 + + +def max_consecutive_gt(values: List[float], eps: float) -> Tuple[int, int]: + best_len = 0 + best_start = -1 + consec = 0 + start = -1 + for i, v in enumerate(values): + vv = float(v) if v is not None else 0.0 + if vv > eps: + if consec == 0: + start = i + consec += 1 + if consec > best_len: + best_len = consec + best_start = start + else: + consec = 0 + return best_len, best_start + + +def compute_stats(data: Dict) -> Optional[Dict]: + hourly = data.get("hourly", {}) or {} + times = hourly.get("time", []) or [] + precip = hourly.get("precipitation", []) or [] + snow = hourly.get("snowfall", []) or [] + + n = min(len(times), len(precip), len(snow)) + if n == 0: return None + + now = now_local() start_idx = -1 - for i, t in enumerate(times): - if parser.isoparse(t).replace(tzinfo=ZoneInfo("Europe/Rome")) >= now.replace(minute=0,second=0,microsecond=0): + for i, t in enumerate(times[:n]): + if parse_time_to_local(t) >= now: start_idx = i break if start_idx == -1: return None - - limit = min(start_idx + 24, len(times)) - - def sum_slice(arr, hours): - return sum(x for x in arr[start_idx:min(start_idx+hours, limit)] if x) + + end_idx = min(start_idx + HOURS_AHEAD, n) + if end_idx <= start_idx: return None + + times_w = times[start_idx:end_idx] + precip_w = precip[start_idx:end_idx] + snow_w = snow[start_idx:end_idx] + dt_w = [parse_time_to_local(t) for t in times_w] + + rain3 = rolling_sum_3h(precip_w) + rain3_max = max(rain3) if rain3 else 0.0 + rain3_max_idx = rain3.index(rain3_max) if rain3 else -1 + rain3_max_time = hhmm(dt_w[rain3_max_idx]) if (rain3_max_idx >= 0 and rain3_max_idx < len(dt_w)) else "" + + rain_persist_ok, rain_run_start, rain_run_len, rain_run_max = first_persistent_run( + rain3, SOGLIA_PIOGGIA_3H_MM, PERSIST_HOURS + ) + rain_persist_time = hhmm(dt_w[rain_run_start]) if (rain_persist_ok and rain_run_start < len(dt_w)) else "" + + snow_run_len, snow_run_start = max_consecutive_gt(snow_w, eps=SNOW_HOURLY_EPS_CM) + snow_run_time = hhmm(dt_w[snow_run_start]) if (snow_run_start >= 0 and snow_run_start < len(dt_w)) else "" + + snow_12h = sum(float(x) for x in snow_w[:min(12, len(snow_w))] if x is not None) + snow_24h = sum(float(x) for x in snow_w[:min(24, len(snow_w))] if x is not None) return { - "snow_3h": sum_slice(snow, 3), - "snow_6h": sum_slice(snow, 6), - "snow_12h": sum_slice(snow, 12), - "snow_24h": sum_slice(snow, 24), - "rain_3h": sum_slice(rain, 3), - "rain_max": max(rain[start_idx:limit]) if rain else 0 + "rain3_max": float(rain3_max), + "rain3_max_time": rain3_max_time, + "rain_persist_ok": bool(rain_persist_ok), + "rain_persist_time": rain_persist_time, + "rain_persist_run_max": float(rain_run_max), + "rain_persist_run_len": int(rain_run_len), + "snow_run_len": int(snow_run_len), + "snow_run_time": snow_run_time, + "snow_12h": float(snow_12h), + "snow_24h": float(snow_24h), } -# --- LOGICA PRINCIPALE --- -def main(): - print("--- Analisi Studente Bologna ---") - - now_str = datetime.datetime.now(ZoneInfo("Europe/Rome")).strftime('%H:%M') - - # 1. ANALISI BOLOGNA (Il Trigger) - bo_point = POINTS[0] - bo_data = get_forecast(bo_point["lat"], bo_point["lon"]) - bo_stats = get_stats(bo_data) - - if not bo_stats: return +def point_alerts(point_name: str, stats: Dict) -> Dict: + snow_alert = (stats["snow_run_len"] >= PERSIST_HOURS) and (stats["snow_24h"] > 0.0) + rain_alert = bool(stats["rain_persist_ok"]) + return { + "name": point_name, + "snow_alert": snow_alert, + "rain_alert": rain_alert, + "snow_12h": stats["snow_12h"], + "snow_24h": stats["snow_24h"], + "snow_run_len": stats["snow_run_len"], + "snow_run_time": stats["snow_run_time"], + "rain3_max": stats["rain3_max"], + "rain3_max_time": stats["rain3_max_time"], + "rain_persist_time": stats["rain_persist_time"], + "rain_persist_run_max": stats["rain_persist_run_max"], + "rain_persist_run_len": stats["rain_persist_run_len"], + } - # Controlla se scatta l'allarme - alarm_snow = bo_stats["snow_24h"] > SOGLIA_NEVE - alarm_rain = bo_stats["rain_3h"] > SOGLIA_PIOGGIA_3H - - # Carica stato precedente - WAS_ACTIVE = load_last_state() - # --- SCENARIO A: C'È ALLERTA --- - if alarm_snow or alarm_rain: - icon_main = "❄️" if alarm_snow else "🌧️" - - msg = f"{icon_main} **ALLERTA METEO BOLOGNA**\n" - msg += f"📅 _Aggiornamento ore {now_str}_\n\n" - - # Dettaglio Bologna - msg += f"🎓 **A BOLOGNA:**\n" - if alarm_snow: - msg += f"• Neve 3h: **{bo_stats['snow_3h']:.1f}** cm\n" - msg += f"• Neve 6h: **{bo_stats['snow_6h']:.1f}** cm\n" - msg += f"• Neve 12h: **{bo_stats['snow_12h']:.1f}** cm\n" - msg += f"• Neve 24h: **{bo_stats['snow_24h']:.1f}** cm\n" - - if alarm_rain: - msg += f"• Pioggia 3h: **{bo_stats['rain_3h']:.1f}** mm (Intensa!)\n" - - msg += "\n🚗 **SITUAZIONE AI CASELLI (A14):**\n" - - # 2. ANALISI PERCORSO (Solo se c'è allerta) - route_issues = False - - for p in POINTS[1:]: - stats = get_stats(get_forecast(p["lat"], p["lon"])) - if not stats: continue - - has_snow = stats["snow_24h"] > 0 - has_rain = stats["rain_3h"] > 5.0 - - if has_snow or has_rain: - route_issues = True - line = f"**{p['name']}**: " - if has_snow: line += f"❄️ {stats['snow_12h']:.1f}cm (12h) " - if has_rain: line += f"🌧️ {stats['rain_3h']:.1f}mm " - msg += f"{line}\n" - - if not route_issues: - msg += "✅ I caselli autostradali sembrano puliti." - - send_telegram_message(msg) - save_current_state(True) - print("Allerta inviata.") - - # --- SCENARIO B: ALLARME RIENTRATO --- - elif not (alarm_snow or alarm_rain) and WAS_ACTIVE: - msg = ( - f"🟢 **ALLARME RIENTRATO (Bologna)**\n" - f"📅 _Aggiornamento ore {now_str}_\n\n" - f"Le previsioni non indicano più neve o piogge critiche per le prossime 24 ore.\n" - f"Situazione tornata alla normalità." +def build_signature(bologna: Dict, route: List[Dict]) -> str: + parts = [ + f"BO:snow={int(bologna['snow_alert'])},rain={int(bologna['rain_alert'])}," + f"s24={bologna['snow_24h']:.1f},r3max={bologna['rain3_max']:.1f}" + ] + for r in route: + parts.append( + f"{r['name']}:s{int(r['snow_alert'])}r{int(r['rain_alert'])}" + f":s24={r['snow_24h']:.1f}:r3max={r['rain3_max']:.1f}" ) - send_telegram_message(msg) - save_current_state(False) - print("Allarme rientrato. Notifica inviata.") + return "|".join(parts) + + +def main() -> None: + LOGGER.info("--- Student alert Bologna (Neve/Pioggia intensa) ---") + + state = load_state() + was_active = bool(state.get("alert_active", False)) + last_sig = state.get("signature", "") + + with requests.Session() as session: + # Trigger: Bologna + bo = POINTS[0] + bo_data = get_forecast(session, bo["lat"], bo["lon"]) + if not bo_data: return + bo_stats = compute_stats(bo_data) + if not bo_stats: + LOGGER.error("Impossibile calcolare statistiche Bologna.") + return + bo_alerts = point_alerts(bo["name"], bo_stats) + + # Route points + route_alerts: List[Dict] = [] + for p in POINTS[1:]: + d = get_forecast(session, p["lat"], p["lon"]) + if not d: continue + st = compute_stats(d) + if not st: continue + route_alerts.append(point_alerts(p["name"], st)) + + any_route_alert = any(x["snow_alert"] or x["rain_alert"] for x in route_alerts) + any_alert = (bo_alerts["snow_alert"] or bo_alerts["rain_alert"] or any_route_alert) + + sig = build_signature(bo_alerts, route_alerts) + + # --- Scenario A: Allerta --- + if any_alert: + if (not was_active) or (sig != last_sig): + now_str = now_local().strftime("%H:%M") + header_icon = "❄️" if (bo_alerts["snow_alert"] or any(x["snow_alert"] for x in route_alerts)) \ + else "🌧️" if (bo_alerts["rain_alert"] or any(x["rain_alert"] for x in route_alerts)) \ + else "⚠️" + + msg: List[str] = [] + msg.append(f"{header_icon} ALLERTA METEO (Bologna / Rientro)") + msg.append(f"🕒 Aggiornamento ore {html.escape(now_str)}") + msg.append(f"🛰️ Modello: {html.escape(MODEL)}") + msg.append(f"⏱️ Finestra: {HOURS_AHEAD} ore | Persistenza: {PERSIST_HOURS} ore") + msg.append("") + + # Bologna + msg.append("🎓 A BOLOGNA") + if bo_alerts["snow_alert"]: + msg.append(f"❄️ Neve (≥{PERSIST_HOURS}h) da ~{html.escape(bo_alerts['snow_run_time'] or '—')} (run ~{bo_alerts['snow_run_len']}h).") + msg.append(f"• Accumulo: 12h {bo_alerts['snow_12h']:.1f} cm | 24h {bo_alerts['snow_24h']:.1f} cm") + else: + msg.append(f"❄️ Neve: nessuna persistenza ≥ {PERSIST_HOURS}h (24h {bo_alerts['snow_24h']:.1f} cm).") + + if bo_alerts["rain_alert"]: + msg.append(f"🌧️ Pioggia molto forte (3h ≥ {SOGLIA_PIOGGIA_3H_MM:.0f} mm, ≥{PERSIST_HOURS}h) da ~{html.escape(bo_alerts['rain_persist_time'] or '—')}.") + else: + msg.append(f"🌧️ Pioggia: max 3h {bo_alerts['rain3_max']:.1f} mm (picco ~{html.escape(bo_alerts['rain3_max_time'] or '—')}).") + + msg.append("") + msg.append("🚗 CASELLI (A14) / TRATTO") + + issues = [x for x in route_alerts if x["snow_alert"] or x["rain_alert"]] + if not issues: + msg.append("✅ Nessuna criticità persistente rilevata lungo il percorso.") + else: + for x in issues: + line = f"• {html.escape(x['name'])}: " + parts: List[str] = [] + if x["snow_alert"]: + parts.append(f"❄️ neve (≥{PERSIST_HOURS}h) da ~{html.escape(x['snow_run_time'] or '—')} (24h {x['snow_24h']:.1f} cm)") + if x["rain_alert"]: + parts.append(f"🌧️ pioggia forte da ~{html.escape(x['rain_persist_time'] or '—')}") + line += " | ".join(parts) + msg.append(line) + + msg.append("") + msg.append("Fonte dati: Open-Meteo") + + # FIX: usare \n invece di
+ ok = telegram_send_html("\n".join(msg)) + if ok: + LOGGER.info("Notifica inviata.") + else: + LOGGER.warning("Notifica NON inviata.") + + save_state(True, sig) + else: + LOGGER.info("Allerta già notificata e invariata.") + return + + # --- Scenario B: Rientro --- + if was_active and not any_alert: + now_str = now_local().strftime("%H:%M") + # FIX: usare \n invece di
+ msg = ( + "🟢 ALLERTA RIENTRATA (Bologna / Rientro)\n" + f"🕒 Aggiornamento ore {html.escape(now_str)}\n\n" + f"Nelle prossime {HOURS_AHEAD} ore non risultano più condizioni persistenti\n" + f"di neve (≥{PERSIST_HOURS}h) o pioggia 3h sopra soglia (≥{PERSIST_HOURS}h).\n" + "Fonte dati: Open-Meteo" + ) + ok = telegram_send_html(msg) + if ok: + LOGGER.info("Rientro notificato.") + else: + LOGGER.warning("Rientro NON inviato.") + save_state(False, "") + return + + # --- Scenario C: Tranquillo --- + save_state(False, "") + LOGGER.info("Nessuna allerta.") - # --- SCENARIO C: TRANQUILLO --- - else: - save_current_state(False) - print("Nessuna allerta.") if __name__ == "__main__": main()