#!/usr/bin/env python3 # -*- coding: utf-8 -*- import datetime import html import json import logging import os import time from logging.handlers import RotatingFileHandler from typing import Dict, List, Optional from zoneinfo import ZoneInfo import requests from dateutil import parser # ============================================================================= # arome_snow_alert.py # # Scopo: # Monitorare neve prevista nelle prossime 24 ore su piΓΉ punti (Casa/Titano/Dogana/Carpegna) # e notificare su Telegram se: # - esiste almeno 1 ora nelle prossime 24h con snowfall > 0.2 cm # (nessuna persistenza richiesta) # # Modello meteo: # SOLO AROME HD 1.5 km (Meteo-France): meteofrance_arome_france_hd # # Token Telegram: # Nessun token in chiaro. Lettura in ordine: # 1) env TELEGRAM_BOT_TOKEN # 2) ~/.telegram_dpc_bot_token # 3) /etc/telegram_dpc_bot_token # # Debug: # DEBUG=1 python3 arome_snow_alert.py # # Log: # ./arome_snow_alert.log (stessa cartella dello script) # ============================================================================= DEBUG = os.environ.get("DEBUG", "0").strip() == "1" # ----------------- TELEGRAM ----------------- TELEGRAM_CHAT_IDS = ["64463169", "24827341", "132455422", "5405962012"] TOKEN_FILE_HOME = os.path.expanduser("~/.telegram_dpc_bot_token") TOKEN_FILE_ETC = "/etc/telegram_dpc_bot_token" # ----------------- PUNTI DI MONITORAGGIO ----------------- POINTS = [ {"name": "🏠 Casa", "lat": 43.9356, "lon": 12.4296}, {"name": "⛰️ Titano", "lat": 43.9360, "lon": 12.4460}, {"name": "🏒 Dogana", "lat": 43.9800, "lon": 12.4900}, {"name": "πŸ”οΈ Carpegna", "lat": 43.7819, "lon": 12.3346}, ] # ----------------- LOGICA ALLERTA ----------------- TZ = "Europe/Rome" TZINFO = ZoneInfo(TZ) HOURS_AHEAD = 24 # Soglia precipitazione neve oraria (cm/h): NOTIFICA se qualsiasi ora > soglia SNOW_HOURLY_THRESHOLD_CM = 0.2 # Stagione invernale: 1 Nov -> 15 Apr WINTER_START_MONTH = 11 WINTER_END_MONTH = 4 WINTER_END_DAY = 15 # File di stato STATE_FILE = "/home/daniely/docker/telegram-bot/snow_state.json" # ----------------- OPEN-METEO ----------------- OPEN_METEO_URL = "https://api.open-meteo.com/v1/forecast" HTTP_HEADERS = {"User-Agent": "rpi-arome-snow-alert/2.1"} MODEL = "meteofrance_arome_france_hd" # ----------------- LOG FILE ----------------- BASE_DIR = os.path.dirname(os.path.abspath(__file__)) LOG_FILE = os.path.join(BASE_DIR, "arome_snow_alert.log") def setup_logger() -> logging.Logger: logger = logging.getLogger("arome_snow_alert") logger.setLevel(logging.DEBUG if DEBUG else logging.INFO) logger.handlers.clear() fh = RotatingFileHandler(LOG_FILE, maxBytes=1_000_000, backupCount=5, encoding="utf-8") fh.setLevel(logging.DEBUG) fmt = logging.Formatter("%(asctime)s %(levelname)s %(message)s") fh.setFormatter(fmt) logger.addHandler(fh) if DEBUG: sh = logging.StreamHandler() sh.setLevel(logging.DEBUG) sh.setFormatter(fmt) logger.addHandler(sh) return logger LOGGER = setup_logger() # ============================================================================= # Utility # ============================================================================= def now_local() -> datetime.datetime: return datetime.datetime.now(TZINFO) def is_winter_season() -> bool: """True se oggi Γ¨ tra 1 Novembre e 15 Aprile (in TZ locale).""" now = now_local() m = now.month d = now.day if m >= WINTER_START_MONTH: return True if m <= 3: return True if m == WINTER_END_MONTH and d <= WINTER_END_DAY: return True return False def read_text_file(path: str) -> str: try: with open(path, "r", encoding="utf-8") as f: return f.read().strip() except FileNotFoundError: return "" except PermissionError: LOGGER.debug("Permission denied reading %s", path) return "" except Exception as e: LOGGER.exception("Error reading %s: %s", path, e) return "" def load_bot_token() -> str: tok = os.environ.get("TELEGRAM_BOT_TOKEN", "").strip() if tok: return tok tok = read_text_file(TOKEN_FILE_HOME) if tok: return tok tok = read_text_file(TOKEN_FILE_ETC) return tok.strip() if tok else "" def parse_time_to_local(t: str) -> datetime.datetime: dt = parser.isoparse(t) if dt.tzinfo is None: return dt.replace(tzinfo=TZINFO) return dt.astimezone(TZINFO) def hhmm(dt: datetime.datetime) -> str: return dt.strftime("%H:%M") # ============================================================================= # Telegram # ============================================================================= def telegram_send_html(message_html: str) -> bool: """Invia solo in caso di allerta (mai per errori).""" token = load_bot_token() if not token: LOGGER.warning("Telegram token missing: message not sent.") return False url = f"https://api.telegram.org/bot{token}/sendMessage" base_payload = { "text": message_html, "parse_mode": "HTML", "disable_web_page_preview": True, } sent_ok = False with requests.Session() as s: for chat_id in TELEGRAM_CHAT_IDS: payload = dict(base_payload) payload["chat_id"] = chat_id try: resp = s.post(url, json=payload, timeout=15) if resp.status_code == 200: sent_ok = True else: LOGGER.error("Telegram error chat_id=%s status=%s body=%s", chat_id, resp.status_code, resp.text[:500]) time.sleep(0.25) except Exception as e: LOGGER.exception("Telegram exception chat_id=%s err=%s", chat_id, e) return sent_ok # ============================================================================= # State # ============================================================================= def load_state() -> Dict: default = {"alert_active": False, "signature": "", "updated": ""} if os.path.exists(STATE_FILE): try: with open(STATE_FILE, "r", encoding="utf-8") as f: data = json.load(f) or {} default.update(data) except Exception as e: LOGGER.exception("State read error: %s", e) return default def save_state(alert_active: bool, signature: str) -> None: try: os.makedirs(os.path.dirname(STATE_FILE), exist_ok=True) with open(STATE_FILE, "w", encoding="utf-8") as f: json.dump( {"alert_active": alert_active, "signature": signature, "updated": now_local().isoformat()}, f, ensure_ascii=False, indent=2, ) except Exception as e: LOGGER.exception("State write error: %s", e) # ============================================================================= # Open-Meteo # ============================================================================= def get_forecast(session: requests.Session, lat: float, lon: float) -> Optional[Dict]: params = { "latitude": lat, "longitude": lon, "hourly": "snowfall", "timezone": TZ, "forecast_days": 2, "models": MODEL, } try: r = session.get(OPEN_METEO_URL, params=params, headers=HTTP_HEADERS, timeout=25) if r.status_code == 400: try: j = r.json() LOGGER.error("Open-Meteo 400 (model=%s lat=%.4f lon=%.4f): %s", MODEL, lat, lon, j.get("reason", j)) except Exception: LOGGER.error("Open-Meteo 400 (model=%s lat=%.4f lon=%.4f): %s", MODEL, lat, lon, r.text[:500]) return None r.raise_for_status() return r.json() except Exception as e: LOGGER.exception("Open-Meteo request error (model=%s lat=%.4f lon=%.4f): %s", MODEL, lat, lon, e) return None # ============================================================================= # Analytics # ============================================================================= def compute_snow_stats(data: Dict) -> Optional[Dict]: hourly = data.get("hourly", {}) or {} times = hourly.get("time", []) or [] snow = hourly.get("snowfall", []) or [] n = min(len(times), len(snow)) if n == 0: return None times = times[:n] snow = snow[:n] now = now_local() start_idx = -1 for i, t in enumerate(times): try: if parse_time_to_local(t) >= now: start_idx = i break except Exception: continue if start_idx == -1: return None end_idx = min(start_idx + HOURS_AHEAD, n) if end_idx <= start_idx: return None times_w = times[start_idx:end_idx] snow_w = [float(x) if x is not None else 0.0 for x in snow[start_idx:end_idx]] dt_w = [parse_time_to_local(t) for t in times_w] # Accumuli informativi def sum_h(h: int) -> float: upto = min(h, len(snow_w)) return float(sum(snow_w[:upto])) s3 = sum_h(3) s6 = sum_h(6) s12 = sum_h(12) s24 = sum_h(24) # Picco orario e prima occorrenza > soglia peak = max(snow_w) if snow_w else 0.0 peak_time = "" first_thr_time = "" first_thr_val = 0.0 for i, v in enumerate(snow_w): if v > peak: peak = v peak_time = hhmm(dt_w[i]) if (not first_thr_time) and (v > SNOW_HOURLY_THRESHOLD_CM): first_thr_time = hhmm(dt_w[i]) first_thr_val = v if not peak_time and peak > 0 and dt_w: try: peak_i = snow_w.index(peak) peak_time = hhmm(dt_w[peak_i]) except Exception: peak_time = "" return { "snow_3h": s3, "snow_6h": s6, "snow_12h": s12, "snow_24h": s24, "peak_hourly": float(peak), "peak_time": peak_time, "first_thr_time": first_thr_time, "first_thr_val": float(first_thr_val), "triggered": bool(first_thr_time), } def point_summary(name: str, st: Dict) -> Dict: return { "name": name, "triggered": bool(st["triggered"]), "snow_3h": st["snow_3h"], "snow_6h": st["snow_6h"], "snow_12h": st["snow_12h"], "snow_24h": st["snow_24h"], "peak_hourly": st["peak_hourly"], "peak_time": st["peak_time"], "first_thr_time": st["first_thr_time"], "first_thr_val": st["first_thr_val"], } def build_signature(summaries: List[Dict]) -> str: # Firma per evitare spam: arrotondiamo a 0.1 cm parts = [] for s in summaries: parts.append( f"{s['name']}:t{int(s['triggered'])}" f":24={s['snow_24h']:.1f}" f":pk={s['peak_hourly']:.1f}" f":ft={s['first_thr_time'] or '-'}" ) return "|".join(parts) # ============================================================================= # Main # ============================================================================= def analyze_snow() -> None: if not is_winter_season(): LOGGER.info("Fuori stagione invernale: nessuna verifica/notify.") save_state(False, "") return now_str = now_local().strftime("%H:%M") LOGGER.info("--- Check Neve AROME HD %s ---", now_str) state = load_state() was_active = bool(state.get("alert_active", False)) last_sig = state.get("signature", "") summaries: List[Dict] = [] with requests.Session() as session: for p in POINTS: data = get_forecast(session, p["lat"], p["lon"]) if not data: LOGGER.warning("Forecast non disponibile per %s (skip).", p["name"]) continue st = compute_snow_stats(data) if not st: LOGGER.warning("Statistiche non calcolabili per %s (skip).", p["name"]) continue summaries.append(point_summary(p["name"], st)) time.sleep(0.2) if not summaries: LOGGER.error("Nessun punto ha restituito statistiche valide.") return any_trigger = any(s["triggered"] for s in summaries) sig = build_signature(summaries) # --- Scenario A: soglia superata --- if any_trigger: if (not was_active) or (sig != last_sig): msg: List[str] = [] msg.append("❄️ ALLERTA NEVE (AROME HD)") msg.append(f"πŸ•’ Aggiornamento ore {html.escape(now_str)}") msg.append(f"πŸ›°οΈ Modello: {html.escape(MODEL)}") msg.append(f"⏱️ Finestra: prossime {HOURS_AHEAD} ore | Trigger: snowfall > {SNOW_HOURLY_THRESHOLD_CM:.1f} cm/h") msg.append("") casa = next((s for s in summaries if s["name"] == "🏠 Casa"), None) if casa: msg.append("🏠 CASA") msg.append(f"β€’ 03h: {casa['snow_3h']:.1f} cm | 06h: {casa['snow_6h']:.1f} cm") msg.append(f"β€’ 12h: {casa['snow_12h']:.1f} cm | 24h: {casa['snow_24h']:.1f} cm") if casa["triggered"]: msg.append( f"β€’ Primo superamento soglia: {html.escape(casa['first_thr_time'] or 'β€”')} " f"({casa['first_thr_val']:.1f} cm/h)" ) if casa["peak_hourly"] > 0: msg.append(f"β€’ Picco orario: {casa['peak_hourly']:.1f} cm/h (~{html.escape(casa['peak_time'] or 'β€”')})") msg.append("") msg.append("🌍 NEL CIRCONDARIO") lines = [] for s in summaries: if not s["triggered"]: continue lines.append( f"{s['name']}: primo > soglia alle {s['first_thr_time'] or 'β€”'} " f"({s['first_thr_val']:.1f} cm/h), picco {s['peak_hourly']:.1f} cm/h, 24h {s['snow_24h']:.1f} cm" ) if lines: msg.append("
" + html.escape("\n".join(lines)) + "
") else: msg.append("Nessun punto ha superato la soglia (anomalia).") msg.append("Fonte dati: Open-Meteo") ok = telegram_send_html("
".join(msg)) if ok: LOGGER.info("Notifica neve inviata.") else: LOGGER.warning("Notifica neve NON inviata (token mancante o errore Telegram).") save_state(True, sig) else: LOGGER.info("Allerta già notificata e invariata.") return # --- Scenario B: rientro (nessun superamento) --- if was_active and not any_trigger: msg = ( "🟒 PREVISIONE NEVE ANNULLATA
" f"πŸ•’ Aggiornamento ore {html.escape(now_str)}

" f"Nelle prossime {HOURS_AHEAD} ore non risultano ore con snowfall > {SNOW_HOURLY_THRESHOLD_CM:.1f} cm/h.
" "Fonte dati: Open-Meteo" ) ok = telegram_send_html(msg) if ok: LOGGER.info("Rientro neve notificato.") else: LOGGER.warning("Rientro neve NON inviato (token mancante o errore Telegram).") save_state(False, "") return # --- Scenario C: tranquillo --- save_state(False, "") top = sorted(summaries, key=lambda x: x["snow_24h"], reverse=True)[:3] LOGGER.info( "Nessuna neve sopra soglia. Top accumuli 24h: %s", " | ".join(f"{t['name']}={t['snow_24h']:.1f}cm (pk {t['peak_hourly']:.1f}cm/h)" for t in top) ) if __name__ == "__main__": analyze_snow()