diff --git a/services/telegram-bot/arome_snow_alert.py b/services/telegram-bot/arome_snow_alert.py
index 7d6591b..5a104d8 100644
--- a/services/telegram-bot/arome_snow_alert.py
+++ b/services/telegram-bot/arome_snow_alert.py
@@ -1,209 +1,483 @@
-import requests
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+
import datetime
-import time
+import html
import json
+import logging
import os
-from dateutil import parser
+import time
+from logging.handlers import RotatingFileHandler
+from typing import Dict, List, Optional
from zoneinfo import ZoneInfo
-# --- CONFIGURAZIONE UTENTE ---
-# ๐๐ INSERISCI QUI I TUOI DATI ๐๐
-TELEGRAM_BOT_TOKEN = "8155587974:AAF9OekvBpixtk8ZH6KoIc0L8edbhdXt7A4"
-TELEGRAM_CHAT_IDS = ["64463169", "24827341", "132455422", "5405962012"]
+import requests
+from dateutil import parser
-# --- PUNTI DI MONITORAGGIO ---
-# Sostituito San Leo con Carpegna
+# =============================================================================
+# arome_snow_alert.py
+#
+# Scopo:
+# Monitorare neve prevista nelle prossime 24 ore su piรน punti (Casa/Titano/Dogana/Carpegna)
+# e notificare su Telegram se:
+# - esiste almeno 1 ora nelle prossime 24h con snowfall > 0.2 cm
+# (nessuna persistenza richiesta)
+#
+# Modello meteo:
+# SOLO AROME HD 1.5 km (Meteo-France): meteofrance_arome_france_hd
+#
+# Token Telegram:
+# Nessun token in chiaro. Lettura in ordine:
+# 1) env TELEGRAM_BOT_TOKEN
+# 2) ~/.telegram_dpc_bot_token
+# 3) /etc/telegram_dpc_bot_token
+#
+# Debug:
+# DEBUG=1 python3 arome_snow_alert.py
+#
+# Log:
+# ./arome_snow_alert.log (stessa cartella dello script)
+# =============================================================================
+
+DEBUG = os.environ.get("DEBUG", "0").strip() == "1"
+
+# ----------------- TELEGRAM -----------------
+TELEGRAM_CHAT_IDS = ["64463169", "24827341", "132455422", "5405962012"]
+TOKEN_FILE_HOME = os.path.expanduser("~/.telegram_dpc_bot_token")
+TOKEN_FILE_ETC = "/etc/telegram_dpc_bot_token"
+
+# ----------------- PUNTI DI MONITORAGGIO -----------------
POINTS = [
{"name": "๐ Casa", "lat": 43.9356, "lon": 12.4296},
{"name": "โฐ๏ธ Titano", "lat": 43.9360, "lon": 12.4460},
{"name": "๐ข Dogana", "lat": 43.9800, "lon": 12.4900},
- {"name": "๐๏ธ Carpegna", "lat": 43.7819, "lon": 12.3346}
+ {"name": "๐๏ธ Carpegna", "lat": 43.7819, "lon": 12.3346},
]
-# Soglia notifica (cm)
-SOGLIA_NOTIFICA = 0.0
+# ----------------- LOGICA ALLERTA -----------------
+TZ = "Europe/Rome"
+TZINFO = ZoneInfo(TZ)
-# File di stato per ricordare l'ultima allerta
+HOURS_AHEAD = 24
+
+# Soglia precipitazione neve oraria (cm/h): NOTIFICA se qualsiasi ora > soglia
+SNOW_HOURLY_THRESHOLD_CM = 0.2
+
+# Stagione invernale: 1 Nov -> 15 Apr
+WINTER_START_MONTH = 11
+WINTER_END_MONTH = 4
+WINTER_END_DAY = 15
+
+# File di stato
STATE_FILE = "/home/daniely/docker/telegram-bot/snow_state.json"
-# --- FUNZIONI DI UTILITร ---
+# ----------------- OPEN-METEO -----------------
+OPEN_METEO_URL = "https://api.open-meteo.com/v1/forecast"
+HTTP_HEADERS = {"User-Agent": "rpi-arome-snow-alert/2.1"}
+MODEL = "meteofrance_arome_france_hd"
-def is_winter_season():
- """Ritorna True se oggi รจ tra il 1 Novembre e il 15 Aprile"""
- now = datetime.datetime.now()
- month = now.month
- day = now.day
-
- if month >= 11: return True # Nov, Dic
- if month <= 3: return True # Gen, Feb, Mar
- if month == 4 and day <= 15: return True # Fino al 15 Apr
+# ----------------- LOG FILE -----------------
+BASE_DIR = os.path.dirname(os.path.abspath(__file__))
+LOG_FILE = os.path.join(BASE_DIR, "arome_snow_alert.log")
+
+
+def setup_logger() -> logging.Logger:
+ logger = logging.getLogger("arome_snow_alert")
+ logger.setLevel(logging.DEBUG if DEBUG else logging.INFO)
+ logger.handlers.clear()
+
+ fh = RotatingFileHandler(LOG_FILE, maxBytes=1_000_000, backupCount=5, encoding="utf-8")
+ fh.setLevel(logging.DEBUG)
+ fmt = logging.Formatter("%(asctime)s %(levelname)s %(message)s")
+ fh.setFormatter(fmt)
+ logger.addHandler(fh)
+
+ if DEBUG:
+ sh = logging.StreamHandler()
+ sh.setLevel(logging.DEBUG)
+ sh.setFormatter(fmt)
+ logger.addHandler(sh)
+
+ return logger
+
+
+LOGGER = setup_logger()
+
+
+# =============================================================================
+# Utility
+# =============================================================================
+def now_local() -> datetime.datetime:
+ return datetime.datetime.now(TZINFO)
+
+
+def is_winter_season() -> bool:
+ """True se oggi รจ tra 1 Novembre e 15 Aprile (in TZ locale)."""
+ now = now_local()
+ m = now.month
+ d = now.day
+
+ if m >= WINTER_START_MONTH:
+ return True
+ if m <= 3:
+ return True
+ if m == WINTER_END_MONTH and d <= WINTER_END_DAY:
+ return True
return False
-def load_last_state():
- """Legge se c'era un allerta attiva"""
- if not os.path.exists(STATE_FILE): return False
- try:
- with open(STATE_FILE, 'r') as f:
- data = json.load(f)
- return data.get("alert_active", False)
- except: return False
-def save_current_state(is_active):
- """Salva lo stato corrente"""
+def read_text_file(path: str) -> str:
try:
- with open(STATE_FILE, 'w') as f:
- json.dump({"alert_active": is_active, "updated": str(datetime.datetime.now())}, f)
+ with open(path, "r", encoding="utf-8") as f:
+ return f.read().strip()
+ except FileNotFoundError:
+ return ""
+ except PermissionError:
+ LOGGER.debug("Permission denied reading %s", path)
+ return ""
except Exception as e:
- print(f"Errore salvataggio stato: {e}")
+ LOGGER.exception("Error reading %s: %s", path, e)
+ return ""
-def send_telegram_message(message):
- if not TELEGRAM_BOT_TOKEN or "INSERISCI" in TELEGRAM_BOT_TOKEN:
- print(f"[TEST OUT] {message}")
- return
- url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage"
- for chat_id in TELEGRAM_CHAT_IDS:
- payload = {"chat_id": chat_id, "text": message, "parse_mode": "Markdown"}
+def load_bot_token() -> str:
+ tok = os.environ.get("TELEGRAM_BOT_TOKEN", "").strip()
+ if tok:
+ return tok
+ tok = read_text_file(TOKEN_FILE_HOME)
+ if tok:
+ return tok
+ tok = read_text_file(TOKEN_FILE_ETC)
+ return tok.strip() if tok else ""
+
+
+def parse_time_to_local(t: str) -> datetime.datetime:
+ dt = parser.isoparse(t)
+ if dt.tzinfo is None:
+ return dt.replace(tzinfo=TZINFO)
+ return dt.astimezone(TZINFO)
+
+
+def hhmm(dt: datetime.datetime) -> str:
+ return dt.strftime("%H:%M")
+
+
+# =============================================================================
+# Telegram
+# =============================================================================
+def telegram_send_html(message_html: str) -> bool:
+ """Invia solo in caso di allerta (mai per errori)."""
+ token = load_bot_token()
+ if not token:
+ LOGGER.warning("Telegram token missing: message not sent.")
+ return False
+
+ url = f"https://api.telegram.org/bot{token}/sendMessage"
+ base_payload = {
+ "text": message_html,
+ "parse_mode": "HTML",
+ "disable_web_page_preview": True,
+ }
+
+ sent_ok = False
+ with requests.Session() as s:
+ for chat_id in TELEGRAM_CHAT_IDS:
+ payload = dict(base_payload)
+ payload["chat_id"] = chat_id
+ try:
+ resp = s.post(url, json=payload, timeout=15)
+ if resp.status_code == 200:
+ sent_ok = True
+ else:
+ LOGGER.error("Telegram error chat_id=%s status=%s body=%s",
+ chat_id, resp.status_code, resp.text[:500])
+ time.sleep(0.25)
+ except Exception as e:
+ LOGGER.exception("Telegram exception chat_id=%s err=%s", chat_id, e)
+
+ return sent_ok
+
+
+# =============================================================================
+# State
+# =============================================================================
+def load_state() -> Dict:
+ default = {"alert_active": False, "signature": "", "updated": ""}
+ if os.path.exists(STATE_FILE):
try:
- requests.post(url, json=payload, timeout=10)
- time.sleep(0.2)
+ with open(STATE_FILE, "r", encoding="utf-8") as f:
+ data = json.load(f) or {}
+ default.update(data)
except Exception as e:
- print(f"Errore invio a {chat_id}: {e}")
+ LOGGER.exception("State read error: %s", e)
+ return default
-def get_forecast(lat, lon):
- url = "https://api.open-meteo.com/v1/forecast"
+
+def save_state(alert_active: bool, signature: str) -> None:
+ try:
+ os.makedirs(os.path.dirname(STATE_FILE), exist_ok=True)
+ with open(STATE_FILE, "w", encoding="utf-8") as f:
+ json.dump(
+ {"alert_active": alert_active, "signature": signature, "updated": now_local().isoformat()},
+ f,
+ ensure_ascii=False,
+ indent=2,
+ )
+ except Exception as e:
+ LOGGER.exception("State write error: %s", e)
+
+
+# =============================================================================
+# Open-Meteo
+# =============================================================================
+def get_forecast(session: requests.Session, lat: float, lon: float) -> Optional[Dict]:
params = {
- "latitude": lat, "longitude": lon,
+ "latitude": lat,
+ "longitude": lon,
"hourly": "snowfall",
- "models": "arome_france_hd",
- "timezone": "Europe/Rome",
- "forecast_days": 2
+ "timezone": TZ,
+ "forecast_days": 2,
+ "models": MODEL,
}
try:
- response = requests.get(url, params=params, timeout=5)
- response.raise_for_status()
- return response.json()
- except:
+ r = session.get(OPEN_METEO_URL, params=params, headers=HTTP_HEADERS, timeout=25)
+ if r.status_code == 400:
+ try:
+ j = r.json()
+ LOGGER.error("Open-Meteo 400 (model=%s lat=%.4f lon=%.4f): %s", MODEL, lat, lon, j.get("reason", j))
+ except Exception:
+ LOGGER.error("Open-Meteo 400 (model=%s lat=%.4f lon=%.4f): %s", MODEL, lat, lon, r.text[:500])
+ return None
+ r.raise_for_status()
+ return r.json()
+ except Exception as e:
+ LOGGER.exception("Open-Meteo request error (model=%s lat=%.4f lon=%.4f): %s", MODEL, lat, lon, e)
return None
-def calculate_sums(data):
- if not data: return None
-
- hourly = data.get("hourly", {})
- times = hourly.get("time", [])
- snow = hourly.get("snowfall", [])
-
- if not times or not snow: return None
- now = datetime.datetime.now(ZoneInfo("Europe/Rome"))
-
+# =============================================================================
+# Analytics
+# =============================================================================
+def compute_snow_stats(data: Dict) -> Optional[Dict]:
+ hourly = data.get("hourly", {}) or {}
+ times = hourly.get("time", []) or []
+ snow = hourly.get("snowfall", []) or []
+
+ n = min(len(times), len(snow))
+ if n == 0:
+ return None
+
+ times = times[:n]
+ snow = snow[:n]
+
+ now = now_local()
start_idx = -1
- for i, t_str in enumerate(times):
+ for i, t in enumerate(times):
try:
- t_obj = parser.isoparse(t_str).replace(tzinfo=ZoneInfo("Europe/Rome"))
- if t_obj >= now.replace(minute=0, second=0, microsecond=0):
+ if parse_time_to_local(t) >= now:
start_idx = i
break
- except: continue
-
- if start_idx == -1: return None
+ except Exception:
+ continue
+ if start_idx == -1:
+ return None
- end = len(snow)
- # Calcola somme sugli orizzonti temporali
- def get_sum(hours):
- return sum(x for x in snow[start_idx:min(start_idx+hours, end)] if x)
+ end_idx = min(start_idx + HOURS_AHEAD, n)
+ if end_idx <= start_idx:
+ return None
+
+ times_w = times[start_idx:end_idx]
+ snow_w = [float(x) if x is not None else 0.0 for x in snow[start_idx:end_idx]]
+ dt_w = [parse_time_to_local(t) for t in times_w]
+
+ # Accumuli informativi
+ def sum_h(h: int) -> float:
+ upto = min(h, len(snow_w))
+ return float(sum(snow_w[:upto]))
+
+ s3 = sum_h(3)
+ s6 = sum_h(6)
+ s12 = sum_h(12)
+ s24 = sum_h(24)
+
+ # Picco orario e prima occorrenza > soglia
+ peak = max(snow_w) if snow_w else 0.0
+ peak_time = ""
+ first_thr_time = ""
+ first_thr_val = 0.0
+
+ for i, v in enumerate(snow_w):
+ if v > peak:
+ peak = v
+ peak_time = hhmm(dt_w[i])
+ if (not first_thr_time) and (v > SNOW_HOURLY_THRESHOLD_CM):
+ first_thr_time = hhmm(dt_w[i])
+ first_thr_val = v
+
+ if not peak_time and peak > 0 and dt_w:
+ try:
+ peak_i = snow_w.index(peak)
+ peak_time = hhmm(dt_w[peak_i])
+ except Exception:
+ peak_time = ""
return {
- "3h": get_sum(3),
- "6h": get_sum(6),
- "12h": get_sum(12),
- "24h": get_sum(24)
+ "snow_3h": s3,
+ "snow_6h": s6,
+ "snow_12h": s12,
+ "snow_24h": s24,
+ "peak_hourly": float(peak),
+ "peak_time": peak_time,
+ "first_thr_time": first_thr_time,
+ "first_thr_val": float(first_thr_val),
+ "triggered": bool(first_thr_time),
}
-# --- LOGICA PRINCIPALE ---
-def analyze_snow():
- # 1. Controllo Stagionale
+def point_summary(name: str, st: Dict) -> Dict:
+ return {
+ "name": name,
+ "triggered": bool(st["triggered"]),
+ "snow_3h": st["snow_3h"],
+ "snow_6h": st["snow_6h"],
+ "snow_12h": st["snow_12h"],
+ "snow_24h": st["snow_24h"],
+ "peak_hourly": st["peak_hourly"],
+ "peak_time": st["peak_time"],
+ "first_thr_time": st["first_thr_time"],
+ "first_thr_val": st["first_thr_val"],
+ }
+
+
+def build_signature(summaries: List[Dict]) -> str:
+ # Firma per evitare spam: arrotondiamo a 0.1 cm
+ parts = []
+ for s in summaries:
+ parts.append(
+ f"{s['name']}:t{int(s['triggered'])}"
+ f":24={s['snow_24h']:.1f}"
+ f":pk={s['peak_hourly']:.1f}"
+ f":ft={s['first_thr_time'] or '-'}"
+ )
+ return "|".join(parts)
+
+
+# =============================================================================
+# Main
+# =============================================================================
+def analyze_snow() -> None:
if not is_winter_season():
- print("Stagione estiva. Script in pausa.")
+ LOGGER.info("Fuori stagione invernale: nessuna verifica/notify.")
+ save_state(False, "")
return
- now_str = datetime.datetime.now(ZoneInfo("Europe/Rome")).strftime('%H:%M')
- print(f"--- Check Meteo {now_str} ---")
+ now_str = now_local().strftime("%H:%M")
+ LOGGER.info("--- Check Neve AROME HD %s ---", now_str)
- home_stats = None
- max_area_snow = 0.0
- area_details = ""
-
- # 2. Raccolta Dati
- for p in POINTS:
- data = get_forecast(p["lat"], p["lon"])
- stats = calculate_sums(data)
-
- if not stats: continue
-
- if p["name"] == "๐ Casa":
- home_stats = stats
-
- # Aggiorna il massimo rilevato in zona
- if stats["24h"] > max_area_snow:
- max_area_snow = stats["24h"]
+ state = load_state()
+ was_active = bool(state.get("alert_active", False))
+ last_sig = state.get("signature", "")
- # Costruisci dettaglio se c'รจ neve
- if stats["24h"] > 0:
- area_details += f"{p['name']}: {stats['24h']:.1f}cm (12h: {stats['12h']:.1f})\n"
-
- time.sleep(1)
+ summaries: List[Dict] = []
- # 3. Decisione Alert
- # C'รจ neve se a casa o nei dintorni l'accumulo รจ > soglia
- home_max = home_stats["24h"] if home_stats else 0.0
- SNOW_DETECTED = (home_max > SOGLIA_NOTIFICA or max_area_snow > SOGLIA_NOTIFICA)
-
- # Leggi stato precedente
- WAS_ACTIVE = load_last_state()
+ with requests.Session() as session:
+ for p in POINTS:
+ data = get_forecast(session, p["lat"], p["lon"])
+ if not data:
+ LOGGER.warning("Forecast non disponibile per %s (skip).", p["name"])
+ continue
- # --- SCENARIO A: C'ร NEVE (Nuova o Continua) ---
- if SNOW_DETECTED:
-
- def f(v): return f"**{v:.1f}**" if v > 0 else f"{v:.1f}"
+ st = compute_snow_stats(data)
+ if not st:
+ LOGGER.warning("Statistiche non calcolabili per %s (skip).", p["name"])
+ continue
- msg = f"โ๏ธ **ALLERTA NEVE (AROME HD)**\n๐
_Aggiornamento ore {now_str}_\n\n"
-
- if home_stats:
- msg += f"๐ **CASA:**\n"
- msg += f"โข 03h: {f(home_stats['3h'])} cm\n"
- msg += f"โข 06h: {f(home_stats['6h'])} cm\n"
- msg += f"โข 12h: {f(home_stats['12h'])} cm\n"
- msg += f"โข 24h: {f(home_stats['24h'])} cm\n\n"
-
- if area_details:
- msg += f"๐ **NEL CIRCONDARIO (24h):**\n"
- msg += f"`{area_details}`"
+ summaries.append(point_summary(p["name"], st))
+ time.sleep(0.2)
+
+ if not summaries:
+ LOGGER.error("Nessun punto ha restituito statistiche valide.")
+ return
+
+ any_trigger = any(s["triggered"] for s in summaries)
+ sig = build_signature(summaries)
+
+ # --- Scenario A: soglia superata ---
+ if any_trigger:
+ if (not was_active) or (sig != last_sig):
+ msg: List[str] = []
+ msg.append("โ๏ธ ALLERTA NEVE (AROME HD)")
+ msg.append(f"๐ Aggiornamento ore {html.escape(now_str)}")
+ msg.append(f"๐ฐ๏ธ Modello: {html.escape(MODEL)}")
+ msg.append(f"โฑ๏ธ Finestra: prossime {HOURS_AHEAD} ore | Trigger: snowfall > {SNOW_HOURLY_THRESHOLD_CM:.1f} cm/h")
+ msg.append("")
+
+ casa = next((s for s in summaries if s["name"] == "๐ Casa"), None)
+ if casa:
+ msg.append("๐ CASA")
+ msg.append(f"โข 03h: {casa['snow_3h']:.1f} cm | 06h: {casa['snow_6h']:.1f} cm")
+ msg.append(f"โข 12h: {casa['snow_12h']:.1f} cm | 24h: {casa['snow_24h']:.1f} cm")
+ if casa["triggered"]:
+ msg.append(
+ f"โข Primo superamento soglia: {html.escape(casa['first_thr_time'] or 'โ')} "
+ f"({casa['first_thr_val']:.1f} cm/h)"
+ )
+ if casa["peak_hourly"] > 0:
+ msg.append(f"โข Picco orario: {casa['peak_hourly']:.1f} cm/h (~{html.escape(casa['peak_time'] or 'โ')})")
+ msg.append("")
+
+ msg.append("๐ NEL CIRCONDARIO")
+ lines = []
+ for s in summaries:
+ if not s["triggered"]:
+ continue
+ lines.append(
+ f"{s['name']}: primo > soglia alle {s['first_thr_time'] or 'โ'} "
+ f"({s['first_thr_val']:.1f} cm/h), picco {s['peak_hourly']:.1f} cm/h, 24h {s['snow_24h']:.1f} cm"
+ )
+
+ if lines:
+ msg.append("
" + html.escape("\n".join(lines)) + "")
+ else:
+ msg.append("Nessun punto ha superato la soglia (anomalia).")
+
+ msg.append("Fonte dati: Open-Meteo")
+
+ ok = telegram_send_html("