#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import datetime
import html
import json
import logging
import os
import time
from logging.handlers import RotatingFileHandler
from typing import Dict, List, Optional
from zoneinfo import ZoneInfo
import requests
from dateutil import parser
# =============================================================================
# arome_snow_alert.py
#
# Scopo:
# Monitorare neve prevista nelle prossime 24 ore su piΓΉ punti (Casa/Titano/Dogana/Carpegna)
# e notificare su Telegram se:
# - esiste almeno 1 ora nelle prossime 24h con snowfall > 0.2 cm
# (nessuna persistenza richiesta)
#
# Modello meteo:
# SOLO AROME HD 1.5 km (Meteo-France): meteofrance_arome_france_hd
#
# Token Telegram:
# Nessun token in chiaro. Lettura in ordine:
# 1) env TELEGRAM_BOT_TOKEN
# 2) ~/.telegram_dpc_bot_token
# 3) /etc/telegram_dpc_bot_token
#
# Debug:
# DEBUG=1 python3 arome_snow_alert.py
#
# Log:
# ./arome_snow_alert.log (stessa cartella dello script)
# =============================================================================
DEBUG = os.environ.get("DEBUG", "0").strip() == "1"
# ----------------- TELEGRAM -----------------
TELEGRAM_CHAT_IDS = ["64463169", "24827341", "132455422", "5405962012"]
TOKEN_FILE_HOME = os.path.expanduser("~/.telegram_dpc_bot_token")
TOKEN_FILE_ETC = "/etc/telegram_dpc_bot_token"
# ----------------- PUNTI DI MONITORAGGIO -----------------
POINTS = [
{"name": "π Casa", "lat": 43.9356, "lon": 12.4296},
{"name": "β°οΈ Titano", "lat": 43.9360, "lon": 12.4460},
{"name": "π’ Dogana", "lat": 43.9800, "lon": 12.4900},
{"name": "ποΈ Carpegna", "lat": 43.7819, "lon": 12.3346},
]
# ----------------- LOGICA ALLERTA -----------------
TZ = "Europe/Rome"
TZINFO = ZoneInfo(TZ)
HOURS_AHEAD = 24
# Soglia precipitazione neve oraria (cm/h): NOTIFICA se qualsiasi ora > soglia
SNOW_HOURLY_THRESHOLD_CM = 0.2
# Stagione invernale: 1 Nov -> 15 Apr
WINTER_START_MONTH = 11
WINTER_END_MONTH = 4
WINTER_END_DAY = 15
# File di stato
STATE_FILE = "/home/daniely/docker/telegram-bot/snow_state.json"
# ----------------- OPEN-METEO -----------------
OPEN_METEO_URL = "https://api.open-meteo.com/v1/forecast"
HTTP_HEADERS = {"User-Agent": "rpi-arome-snow-alert/2.1"}
MODEL = "meteofrance_arome_france_hd"
# ----------------- LOG FILE -----------------
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
LOG_FILE = os.path.join(BASE_DIR, "arome_snow_alert.log")
def setup_logger() -> logging.Logger:
logger = logging.getLogger("arome_snow_alert")
logger.setLevel(logging.DEBUG if DEBUG else logging.INFO)
logger.handlers.clear()
fh = RotatingFileHandler(LOG_FILE, maxBytes=1_000_000, backupCount=5, encoding="utf-8")
fh.setLevel(logging.DEBUG)
fmt = logging.Formatter("%(asctime)s %(levelname)s %(message)s")
fh.setFormatter(fmt)
logger.addHandler(fh)
if DEBUG:
sh = logging.StreamHandler()
sh.setLevel(logging.DEBUG)
sh.setFormatter(fmt)
logger.addHandler(sh)
return logger
LOGGER = setup_logger()
# =============================================================================
# Utility
# =============================================================================
def now_local() -> datetime.datetime:
return datetime.datetime.now(TZINFO)
def is_winter_season() -> bool:
"""True se oggi Γ¨ tra 1 Novembre e 15 Aprile (in TZ locale)."""
now = now_local()
m = now.month
d = now.day
if m >= WINTER_START_MONTH:
return True
if m <= 3:
return True
if m == WINTER_END_MONTH and d <= WINTER_END_DAY:
return True
return False
def read_text_file(path: str) -> str:
try:
with open(path, "r", encoding="utf-8") as f:
return f.read().strip()
except FileNotFoundError:
return ""
except PermissionError:
LOGGER.debug("Permission denied reading %s", path)
return ""
except Exception as e:
LOGGER.exception("Error reading %s: %s", path, e)
return ""
def load_bot_token() -> str:
tok = os.environ.get("TELEGRAM_BOT_TOKEN", "").strip()
if tok:
return tok
tok = read_text_file(TOKEN_FILE_HOME)
if tok:
return tok
tok = read_text_file(TOKEN_FILE_ETC)
return tok.strip() if tok else ""
def parse_time_to_local(t: str) -> datetime.datetime:
dt = parser.isoparse(t)
if dt.tzinfo is None:
return dt.replace(tzinfo=TZINFO)
return dt.astimezone(TZINFO)
def hhmm(dt: datetime.datetime) -> str:
return dt.strftime("%H:%M")
# =============================================================================
# Telegram
# =============================================================================
def telegram_send_html(message_html: str) -> bool:
"""Invia solo in caso di allerta (mai per errori)."""
token = load_bot_token()
if not token:
LOGGER.warning("Telegram token missing: message not sent.")
return False
url = f"https://api.telegram.org/bot{token}/sendMessage"
base_payload = {
"text": message_html,
"parse_mode": "HTML",
"disable_web_page_preview": True,
}
sent_ok = False
with requests.Session() as s:
for chat_id in TELEGRAM_CHAT_IDS:
payload = dict(base_payload)
payload["chat_id"] = chat_id
try:
resp = s.post(url, json=payload, timeout=15)
if resp.status_code == 200:
sent_ok = True
else:
LOGGER.error("Telegram error chat_id=%s status=%s body=%s",
chat_id, resp.status_code, resp.text[:500])
time.sleep(0.25)
except Exception as e:
LOGGER.exception("Telegram exception chat_id=%s err=%s", chat_id, e)
return sent_ok
# =============================================================================
# State
# =============================================================================
def load_state() -> Dict:
default = {"alert_active": False, "signature": "", "updated": ""}
if os.path.exists(STATE_FILE):
try:
with open(STATE_FILE, "r", encoding="utf-8") as f:
data = json.load(f) or {}
default.update(data)
except Exception as e:
LOGGER.exception("State read error: %s", e)
return default
def save_state(alert_active: bool, signature: str) -> None:
try:
os.makedirs(os.path.dirname(STATE_FILE), exist_ok=True)
with open(STATE_FILE, "w", encoding="utf-8") as f:
json.dump(
{"alert_active": alert_active, "signature": signature, "updated": now_local().isoformat()},
f,
ensure_ascii=False,
indent=2,
)
except Exception as e:
LOGGER.exception("State write error: %s", e)
# =============================================================================
# Open-Meteo
# =============================================================================
def get_forecast(session: requests.Session, lat: float, lon: float) -> Optional[Dict]:
params = {
"latitude": lat,
"longitude": lon,
"hourly": "snowfall",
"timezone": TZ,
"forecast_days": 2,
"models": MODEL,
}
try:
r = session.get(OPEN_METEO_URL, params=params, headers=HTTP_HEADERS, timeout=25)
if r.status_code == 400:
try:
j = r.json()
LOGGER.error("Open-Meteo 400 (model=%s lat=%.4f lon=%.4f): %s", MODEL, lat, lon, j.get("reason", j))
except Exception:
LOGGER.error("Open-Meteo 400 (model=%s lat=%.4f lon=%.4f): %s", MODEL, lat, lon, r.text[:500])
return None
r.raise_for_status()
return r.json()
except Exception as e:
LOGGER.exception("Open-Meteo request error (model=%s lat=%.4f lon=%.4f): %s", MODEL, lat, lon, e)
return None
# =============================================================================
# Analytics
# =============================================================================
def compute_snow_stats(data: Dict) -> Optional[Dict]:
hourly = data.get("hourly", {}) or {}
times = hourly.get("time", []) or []
snow = hourly.get("snowfall", []) or []
n = min(len(times), len(snow))
if n == 0:
return None
times = times[:n]
snow = snow[:n]
now = now_local()
start_idx = -1
for i, t in enumerate(times):
try:
if parse_time_to_local(t) >= now:
start_idx = i
break
except Exception:
continue
if start_idx == -1:
return None
end_idx = min(start_idx + HOURS_AHEAD, n)
if end_idx <= start_idx:
return None
times_w = times[start_idx:end_idx]
snow_w = [float(x) if x is not None else 0.0 for x in snow[start_idx:end_idx]]
dt_w = [parse_time_to_local(t) for t in times_w]
# Accumuli informativi
def sum_h(h: int) -> float:
upto = min(h, len(snow_w))
return float(sum(snow_w[:upto]))
s3 = sum_h(3)
s6 = sum_h(6)
s12 = sum_h(12)
s24 = sum_h(24)
# Picco orario e prima occorrenza > soglia
peak = max(snow_w) if snow_w else 0.0
peak_time = ""
first_thr_time = ""
first_thr_val = 0.0
for i, v in enumerate(snow_w):
if v > peak:
peak = v
peak_time = hhmm(dt_w[i])
if (not first_thr_time) and (v > SNOW_HOURLY_THRESHOLD_CM):
first_thr_time = hhmm(dt_w[i])
first_thr_val = v
if not peak_time and peak > 0 and dt_w:
try:
peak_i = snow_w.index(peak)
peak_time = hhmm(dt_w[peak_i])
except Exception:
peak_time = ""
return {
"snow_3h": s3,
"snow_6h": s6,
"snow_12h": s12,
"snow_24h": s24,
"peak_hourly": float(peak),
"peak_time": peak_time,
"first_thr_time": first_thr_time,
"first_thr_val": float(first_thr_val),
"triggered": bool(first_thr_time),
}
def point_summary(name: str, st: Dict) -> Dict:
return {
"name": name,
"triggered": bool(st["triggered"]),
"snow_3h": st["snow_3h"],
"snow_6h": st["snow_6h"],
"snow_12h": st["snow_12h"],
"snow_24h": st["snow_24h"],
"peak_hourly": st["peak_hourly"],
"peak_time": st["peak_time"],
"first_thr_time": st["first_thr_time"],
"first_thr_val": st["first_thr_val"],
}
def build_signature(summaries: List[Dict]) -> str:
# Firma per evitare spam: arrotondiamo a 0.1 cm
parts = []
for s in summaries:
parts.append(
f"{s['name']}:t{int(s['triggered'])}"
f":24={s['snow_24h']:.1f}"
f":pk={s['peak_hourly']:.1f}"
f":ft={s['first_thr_time'] or '-'}"
)
return "|".join(parts)
# =============================================================================
# Main
# =============================================================================
def analyze_snow() -> None:
if not is_winter_season():
LOGGER.info("Fuori stagione invernale: nessuna verifica/notify.")
save_state(False, "")
return
now_str = now_local().strftime("%H:%M")
LOGGER.info("--- Check Neve AROME HD %s ---", now_str)
state = load_state()
was_active = bool(state.get("alert_active", False))
last_sig = state.get("signature", "")
summaries: List[Dict] = []
with requests.Session() as session:
for p in POINTS:
data = get_forecast(session, p["lat"], p["lon"])
if not data:
LOGGER.warning("Forecast non disponibile per %s (skip).", p["name"])
continue
st = compute_snow_stats(data)
if not st:
LOGGER.warning("Statistiche non calcolabili per %s (skip).", p["name"])
continue
summaries.append(point_summary(p["name"], st))
time.sleep(0.2)
if not summaries:
LOGGER.error("Nessun punto ha restituito statistiche valide.")
return
any_trigger = any(s["triggered"] for s in summaries)
sig = build_signature(summaries)
# --- Scenario A: soglia superata ---
if any_trigger:
if (not was_active) or (sig != last_sig):
msg: List[str] = []
msg.append("βοΈ ALLERTA NEVE (AROME HD)")
msg.append(f"π Aggiornamento ore {html.escape(now_str)}")
msg.append(f"π°οΈ Modello: {html.escape(MODEL)}")
msg.append(f"β±οΈ Finestra: prossime {HOURS_AHEAD} ore | Trigger: snowfall > {SNOW_HOURLY_THRESHOLD_CM:.1f} cm/h")
msg.append("")
casa = next((s for s in summaries if s["name"] == "π Casa"), None)
if casa:
msg.append("π CASA")
msg.append(f"β’ 03h: {casa['snow_3h']:.1f} cm | 06h: {casa['snow_6h']:.1f} cm")
msg.append(f"β’ 12h: {casa['snow_12h']:.1f} cm | 24h: {casa['snow_24h']:.1f} cm")
if casa["triggered"]:
msg.append(
f"β’ Primo superamento soglia: {html.escape(casa['first_thr_time'] or 'β')} "
f"({casa['first_thr_val']:.1f} cm/h)"
)
if casa["peak_hourly"] > 0:
msg.append(f"β’ Picco orario: {casa['peak_hourly']:.1f} cm/h (~{html.escape(casa['peak_time'] or 'β')})")
msg.append("")
msg.append("π NEL CIRCONDARIO")
lines = []
for s in summaries:
if not s["triggered"]:
continue
lines.append(
f"{s['name']}: primo > soglia alle {s['first_thr_time'] or 'β'} "
f"({s['first_thr_val']:.1f} cm/h), picco {s['peak_hourly']:.1f} cm/h, 24h {s['snow_24h']:.1f} cm"
)
if lines:
msg.append("
" + html.escape("\n".join(lines)) + "")
else:
msg.append("Nessun punto ha superato la soglia (anomalia).")
msg.append("Fonte dati: Open-Meteo")
ok = telegram_send_html("