diff --git a/services/telegram-bot/nowcast_120m_alert.py b/services/telegram-bot/nowcast_120m_alert.py new file mode 100644 index 0000000..fe709f6 --- /dev/null +++ b/services/telegram-bot/nowcast_120m_alert.py @@ -0,0 +1,379 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +import datetime +import json +import logging +import os +import time +from logging.handlers import RotatingFileHandler +from typing import Dict, List, Optional, Tuple +from zoneinfo import ZoneInfo + +import requests +from dateutil import parser + +# ========================= +# CONFIG +# ========================= +DEBUG = os.environ.get("DEBUG", "0").strip() == "1" + +BASE_DIR = os.path.dirname(os.path.abspath(__file__)) +LOG_FILE = os.path.join(BASE_DIR, "nowcast_120m_alert.log") +STATE_FILE = os.path.join(BASE_DIR, "nowcast_120m_state.json") + +TZ = "Europe/Rome" +TZINFO = ZoneInfo(TZ) + +# Casa (San Marino) +LAT = 43.9356 +LON = 12.4296 +LOCATION_NAME = "🏠 Casa (Strada Cà Toro)" + +# Telegram (multi-chat) +TELEGRAM_CHAT_IDS = ["64463169", "24827341", "132455422", "5405962012"] +TOKEN_FILE_ETC = "/etc/telegram_dpc_bot_token" +TOKEN_FILE_HOME = os.path.expanduser("~/.telegram_dpc_bot_token") + +# Open-Meteo +OPEN_METEO_URL = "https://api.open-meteo.com/v1/forecast" +MODEL = "meteofrance_arome_france_hd" + +# Finestra di valutazione +WINDOW_MINUTES = 120 + +# Soglie / “conferma” +# Pioggia intensa: coerente con 25mm/3h ≈ 8.3 mm/h -> soglia 8.0 mm/h +RAIN_INTENSE_MM_H = 8.0 +RAIN_CONFIRM_HOURS = 2 # "confermato": almeno 2 ore consecutive + +# Vento forte: raffiche >= 62 km/h (giallo PC ER) +WIND_GUST_STRONG_KMH = 62.0 +WIND_CONFIRM_HOURS = 2 # almeno 2 ore consecutive + +# Neve: accumulo nelle prossime 2 ore >= 2 cm +SNOW_ACCUM_2H_CM = 2.0 + +# Anti-spam: minimo intervallo tra invii uguali (in minuti) +MIN_RESEND_MINUTES = 180 + + +def setup_logger() -> logging.Logger: + logger = logging.getLogger("nowcast_120m_alert") + logger.setLevel(logging.DEBUG if DEBUG else logging.INFO) + logger.handlers.clear() + + fh = RotatingFileHandler(LOG_FILE, maxBytes=1_000_000, backupCount=5, encoding="utf-8") + fh.setLevel(logging.DEBUG) + fmt = logging.Formatter("%(asctime)s %(levelname)s %(message)s") + fh.setFormatter(fmt) + logger.addHandler(fh) + + if DEBUG: + sh = logging.StreamHandler() + sh.setLevel(logging.DEBUG) + sh.setFormatter(fmt) + logger.addHandler(sh) + + return logger + + +LOGGER = setup_logger() + + +def now_local() -> datetime.datetime: + return datetime.datetime.now(TZINFO) + + +def read_text(path: str) -> str: + try: + with open(path, "r", encoding="utf-8") as f: + return f.read().strip() + except Exception: + return "" + + +def load_bot_token() -> str: + tok = (os.environ.get("TELEGRAM_BOT_TOKEN") or "").strip() + if tok: + return tok + tok = (os.environ.get("BOT_TOKEN") or "").strip() + if tok: + return tok + tok = read_text(TOKEN_FILE_HOME) + if tok: + return tok + tok = read_text(TOKEN_FILE_ETC) + return tok.strip() if tok else "" + + +def telegram_send_markdown(message: str) -> bool: + """ + Invia SOLO se message presente. Errori solo su log. + """ + if not message: + return False + + token = load_bot_token() + if not token: + LOGGER.error("Token Telegram mancante. Messaggio NON inviato.") + return False + + url = f"https://api.telegram.org/bot{token}/sendMessage" + payload_base = { + "text": message, + "parse_mode": "Markdown", + "disable_web_page_preview": True, + } + + ok_any = False + with requests.Session() as s: + for chat_id in TELEGRAM_CHAT_IDS: + payload = dict(payload_base) + payload["chat_id"] = chat_id + try: + r = s.post(url, json=payload, timeout=20) + if r.status_code == 200: + ok_any = True + else: + LOGGER.error("Telegram HTTP %s chat_id=%s body=%s", r.status_code, chat_id, r.text[:300]) + time.sleep(0.25) + except Exception as e: + LOGGER.exception("Errore invio Telegram chat_id=%s: %s", chat_id, e) + + return ok_any + + +def parse_time_local(t: str) -> datetime.datetime: + dt = parser.isoparse(t) + if dt.tzinfo is None: + return dt.replace(tzinfo=TZINFO) + return dt.astimezone(TZINFO) + + +def get_forecast() -> Optional[Dict]: + params = { + "latitude": LAT, + "longitude": LON, + "timezone": TZ, + "forecast_days": 2, + "models": MODEL, + "wind_speed_unit": "kmh", + "precipitation_unit": "mm", + "hourly": ",".join([ + "precipitation", + "windspeed_10m", + "windgusts_10m", + "snowfall", + ]), + } + try: + r = requests.get(OPEN_METEO_URL, params=params, timeout=25) + if r.status_code == 400: + try: + j = r.json() + LOGGER.error("Open-Meteo 400: %s", j.get("reason", j)) + except Exception: + LOGGER.error("Open-Meteo 400: %s", r.text[:300]) + return None + r.raise_for_status() + return r.json() + except Exception as e: + LOGGER.exception("Errore chiamata Open-Meteo: %s", e) + return None + + +def load_state() -> Dict: + if os.path.exists(STATE_FILE): + try: + with open(STATE_FILE, "r", encoding="utf-8") as f: + return json.load(f) or {} + except Exception: + return {} + return {} + + +def save_state(state: Dict) -> None: + try: + with open(STATE_FILE, "w", encoding="utf-8") as f: + json.dump(state, f, ensure_ascii=False) + except Exception: + pass + + +def find_confirmed_start( + times: List[str], + cond: List[bool], + confirm_hours: int, + window_start: datetime.datetime, + window_end: datetime.datetime +) -> Optional[int]: + """ + Trova il primo indice i tale che: + - time[i] è dentro (window_start, window_end] + - cond[i..i+confirm_hours-1] tutte True + """ + n = len(times) + for i in range(n): + try: + dt = parse_time_local(times[i]) + except Exception: + continue + if dt < window_start or dt > window_end: + continue + if i + confirm_hours - 1 >= n: + continue + ok = True + for k in range(confirm_hours): + if not cond[i + k]: + ok = False + break + if ok: + return i + return None + + +def main() -> None: + LOGGER.info("--- Nowcast 120m alert ---") + + data = get_forecast() + if not data: + return + + hourly = data.get("hourly", {}) or {} + times = hourly.get("time", []) or [] + precip = hourly.get("precipitation", []) or [] + gust = hourly.get("windgusts_10m", []) or [] + snow = hourly.get("snowfall", []) or [] + + if not times: + LOGGER.error("Open-Meteo: hourly.time mancante/vuoto") + return + + now = now_local() + window_end = now + datetime.timedelta(minutes=WINDOW_MINUTES) + + # Normalizza array a lunghezza times + n = len(times) + def val(arr, i, cast=float) -> float: + try: + v = arr[i] + return cast(v) if v is not None else 0.0 + except Exception: + return 0.0 + + rain_cond = [(val(precip, i) >= RAIN_INTENSE_MM_H) for i in range(n)] + wind_cond = [(val(gust, i) >= WIND_GUST_STRONG_KMH) for i in range(n)] + + # Per neve: accumulo su 2 ore consecutive (i e i+1) >= soglia + snow2_cond = [] + for i in range(n): + if i + 1 < n: + snow2 = val(snow, i) + val(snow, i + 1) + snow2_cond.append(snow2 >= SNOW_ACCUM_2H_CM) + else: + snow2_cond.append(False) + + rain_i = find_confirmed_start(times, rain_cond, RAIN_CONFIRM_HOURS, now, window_end) + wind_i = find_confirmed_start(times, wind_cond, WIND_CONFIRM_HOURS, now, window_end) + snow_i = find_confirmed_start(times, snow2_cond, 1, now, window_end) # già condensa su 2h + + if DEBUG: + LOGGER.debug("window now=%s end=%s model=%s", now.isoformat(timespec="minutes"), window_end.isoformat(timespec="minutes"), MODEL) + LOGGER.debug("rain_start=%s wind_start=%s snow_start=%s", rain_i, wind_i, snow_i) + + alerts: List[str] = [] + sig_parts: List[str] = [] + + # Pioggia intensa + if rain_i is not None: + start_dt = parse_time_local(times[rain_i]) + # picco entro finestra + max_r = 0.0 + for i in range(n): + dt = parse_time_local(times[i]) + if dt < now or dt > window_end: + continue + max_r = max(max_r, val(precip, i)) + alerts.append( + f"🌧️ *PIOGGIA INTENSA*\n" + f"Inizio confermato: `{start_dt.strftime('%H:%M')}` (≥ {RAIN_INTENSE_MM_H:.0f} mm/h per {RAIN_CONFIRM_HOURS}h)\n" + f"Picco stimato (entro {WINDOW_MINUTES}m): `{max_r:.1f} mm/h`" + ) + sig_parts.append(f"RAIN@{start_dt.strftime('%Y%m%d%H%M')}/peak{max_r:.1f}") + + # Vento forte (raffiche) + if wind_i is not None: + start_dt = parse_time_local(times[wind_i]) + max_g = 0.0 + for i in range(n): + dt = parse_time_local(times[i]) + if dt < now or dt > window_end: + continue + max_g = max(max_g, val(gust, i)) + alerts.append( + f"💨 *VENTO FORTE*\n" + f"Inizio confermato: `{start_dt.strftime('%H:%M')}` (raffiche ≥ {WIND_GUST_STRONG_KMH:.0f} km/h per {WIND_CONFIRM_HOURS}h)\n" + f"Raffica max stimata (entro {WINDOW_MINUTES}m): `{max_g:.0f} km/h`" + ) + sig_parts.append(f"WIND@{start_dt.strftime('%Y%m%d%H%M')}/peak{max_g:.0f}") + + # Neve (accumulo 2h) + if snow_i is not None: + start_dt = parse_time_local(times[snow_i]) + snow2 = val(snow, snow_i) + val(snow, snow_i + 1) + alerts.append( + f"❄️ *NEVE*\n" + f"Inizio stimato: `{start_dt.strftime('%H:%M')}`\n" + f"Accumulo 2h stimato: `{snow2:.1f} cm` (soglia ≥ {SNOW_ACCUM_2H_CM:.1f} cm)" + ) + sig_parts.append(f"SNOW@{start_dt.strftime('%Y%m%d%H%M')}/acc{snow2:.1f}") + + if not alerts: + LOGGER.info("Nessuna allerta confermata entro %s minuti.", WINDOW_MINUTES) + return + + signature = "|".join(sig_parts) + + # Anti-spam + state = load_state() + last_sig = str(state.get("signature", "")) + last_sent = state.get("last_sent_utc", "") + last_sent_dt = None + if last_sent: + try: + last_sent_dt = datetime.datetime.fromisoformat(last_sent).astimezone(datetime.timezone.utc) + except Exception: + last_sent_dt = None + + now_utc = datetime.datetime.now(datetime.timezone.utc) + too_soon = False + if last_sent_dt is not None: + delta_min = (now_utc - last_sent_dt).total_seconds() / 60.0 + too_soon = delta_min < MIN_RESEND_MINUTES + + if signature == last_sig and too_soon: + LOGGER.info("Allerta già inviata di recente (signature invariata).") + return + + msg = ( + f"⚠️ *ALLERTA METEO (entro {WINDOW_MINUTES} minuti)*\n" + f"📍 {LOCATION_NAME}\n" + f"🕒 Agg. `{now.strftime('%d/%m %H:%M')}` (modello: `{MODEL}`)\n\n" + + "\n\n".join(alerts) + + "\n\n_Fonte: Open-Meteo (AROME HD 1.5km)_" + ) + + ok = telegram_send_markdown(msg) + if ok: + LOGGER.info("Notifica inviata.") + save_state({ + "signature": signature, + "last_sent_utc": now_utc.isoformat(timespec="seconds"), + }) + else: + LOGGER.error("Notifica NON inviata (token/telegram).") + + +if __name__ == "__main__": + main()