475 lines
17 KiB
Python
475 lines
17 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
|
|
import datetime
|
|
import html
|
|
import json
|
|
import logging
|
|
import os
|
|
import time
|
|
from logging.handlers import RotatingFileHandler
|
|
from typing import Dict, List, Optional, Tuple
|
|
from zoneinfo import ZoneInfo
|
|
|
|
import requests
|
|
from dateutil import parser
|
|
|
|
# =============================================================================
|
|
# student_alert.py
|
|
#
|
|
# Scopo:
|
|
# Notificare lo studente (Bologna) se nelle prossime 24 ore sono previsti:
|
|
# - Neve persistente (>= 2 ore consecutive con snowfall > 0)
|
|
# - Pioggia molto forte persistente (>= 2 ore consecutive con 3h-rolling >= soglia)
|
|
# sia a Bologna (trigger) sia lungo il tragitto (caselli A14) fino a San Marino.
|
|
# =============================================================================
|
|
|
|
DEBUG = os.environ.get("DEBUG", "0").strip() == "1"
|
|
|
|
# ----------------- TELEGRAM -----------------
|
|
TELEGRAM_CHAT_IDS = ["64463169", "24827341", "132455422", "5405962012"]
|
|
TOKEN_FILE_HOME = os.path.expanduser("~/.telegram_dpc_bot_token")
|
|
TOKEN_FILE_ETC = "/etc/telegram_dpc_bot_token"
|
|
|
|
# ----------------- SOGLIE E LOGICA -----------------
|
|
SOGLIA_PIOGGIA_3H_MM = 30.0 # mm in 3 ore (rolling)
|
|
PERSIST_HOURS = 2 # persistenza minima (ore)
|
|
HOURS_AHEAD = 24
|
|
SNOW_HOURLY_EPS_CM = 0.2 # Soglia minima neve cm/h
|
|
|
|
# File di stato
|
|
STATE_FILE = "/home/daniely/docker/telegram-bot/student_state.json"
|
|
|
|
# ----------------- PUNTI DEL PERCORSO (Caselli A14) -----------------
|
|
POINTS = [
|
|
{"name": "🎓 Bologna (V. Regnoli)", "lat": 44.4930, "lon": 11.3690, "type": "trigger"},
|
|
{"name": "🛣️ Casello Imola", "lat": 44.3798, "lon": 11.7397, "type": "route"},
|
|
{"name": "🛣️ Casello Faenza", "lat": 44.3223, "lon": 11.9040, "type": "route"},
|
|
{"name": "🛣️ Casello Forlì", "lat": 44.2502, "lon": 12.0910, "type": "route"},
|
|
{"name": "🛣️ Casello Cesena", "lat": 44.1675, "lon": 12.2835, "type": "route"},
|
|
{"name": "🛣️ Casello Rimini", "lat": 44.0362, "lon": 12.5659, "type": "route"},
|
|
{"name": "🏠 San Marino", "lat": 43.9356, "lon": 12.4296, "type": "end"},
|
|
]
|
|
|
|
# ----------------- OPEN-METEO -----------------
|
|
OPEN_METEO_URL = "https://api.open-meteo.com/v1/forecast"
|
|
TZ = "Europe/Rome"
|
|
TZINFO = ZoneInfo(TZ)
|
|
HTTP_HEADERS = {"User-Agent": "rpi-student-alert/1.0"}
|
|
MODEL = "meteofrance_arome_france_hd"
|
|
|
|
# ----------------- LOG -----------------
|
|
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
|
|
LOG_FILE = os.path.join(BASE_DIR, "student_alert.log")
|
|
|
|
|
|
def setup_logger() -> logging.Logger:
|
|
logger = logging.getLogger("student_alert")
|
|
logger.setLevel(logging.DEBUG if DEBUG else logging.INFO)
|
|
logger.handlers.clear()
|
|
|
|
fh = RotatingFileHandler(LOG_FILE, maxBytes=1_000_000, backupCount=5, encoding="utf-8")
|
|
fh.setLevel(logging.DEBUG)
|
|
fmt = logging.Formatter("%(asctime)s %(levelname)s %(message)s")
|
|
fh.setFormatter(fmt)
|
|
logger.addHandler(fh)
|
|
|
|
if DEBUG:
|
|
sh = logging.StreamHandler()
|
|
sh.setLevel(logging.DEBUG)
|
|
sh.setFormatter(fmt)
|
|
logger.addHandler(sh)
|
|
|
|
return logger
|
|
|
|
|
|
LOGGER = setup_logger()
|
|
|
|
|
|
# =============================================================================
|
|
# Utility
|
|
# =============================================================================
|
|
def now_local() -> datetime.datetime:
|
|
return datetime.datetime.now(TZINFO)
|
|
|
|
|
|
def read_text_file(path: str) -> str:
|
|
try:
|
|
with open(path, "r", encoding="utf-8") as f:
|
|
return f.read().strip()
|
|
except FileNotFoundError:
|
|
return ""
|
|
except PermissionError:
|
|
LOGGER.debug("Permission denied reading %s", path)
|
|
return ""
|
|
except Exception as e:
|
|
LOGGER.exception("Error reading %s: %s", path, e)
|
|
return ""
|
|
|
|
|
|
def load_bot_token() -> str:
|
|
tok = os.environ.get("TELEGRAM_BOT_TOKEN", "").strip()
|
|
if tok:
|
|
return tok
|
|
tok = read_text_file(TOKEN_FILE_HOME)
|
|
if tok:
|
|
return tok
|
|
tok = read_text_file(TOKEN_FILE_ETC)
|
|
return tok.strip() if tok else ""
|
|
|
|
|
|
def parse_time_to_local(t: str) -> datetime.datetime:
|
|
dt = parser.isoparse(t)
|
|
if dt.tzinfo is None:
|
|
return dt.replace(tzinfo=TZINFO)
|
|
return dt.astimezone(TZINFO)
|
|
|
|
|
|
def hhmm(dt: datetime.datetime) -> str:
|
|
return dt.strftime("%H:%M")
|
|
|
|
|
|
# =============================================================================
|
|
# Telegram
|
|
# =============================================================================
|
|
def telegram_send_html(message_html: str) -> bool:
|
|
token = load_bot_token()
|
|
if not token:
|
|
LOGGER.warning("Telegram token missing: message not sent.")
|
|
return False
|
|
|
|
url = f"https://api.telegram.org/bot{token}/sendMessage"
|
|
base_payload = {
|
|
"text": message_html,
|
|
"parse_mode": "HTML",
|
|
"disable_web_page_preview": True,
|
|
}
|
|
|
|
sent_ok = False
|
|
with requests.Session() as s:
|
|
for chat_id in TELEGRAM_CHAT_IDS:
|
|
payload = dict(base_payload)
|
|
payload["chat_id"] = chat_id
|
|
try:
|
|
resp = s.post(url, json=payload, timeout=15)
|
|
if resp.status_code == 200:
|
|
sent_ok = True
|
|
else:
|
|
LOGGER.error("Telegram error chat_id=%s status=%s body=%s",
|
|
chat_id, resp.status_code, resp.text[:500])
|
|
time.sleep(0.25)
|
|
except Exception as e:
|
|
LOGGER.exception("Telegram exception chat_id=%s err=%s", chat_id, e)
|
|
|
|
return sent_ok
|
|
|
|
|
|
# =============================================================================
|
|
# State & Open-Meteo
|
|
# =============================================================================
|
|
def load_state() -> Dict:
|
|
default = {"alert_active": False, "signature": "", "updated": ""}
|
|
if os.path.exists(STATE_FILE):
|
|
try:
|
|
with open(STATE_FILE, "r", encoding="utf-8") as f:
|
|
data = json.load(f) or {}
|
|
default.update(data)
|
|
except Exception as e:
|
|
LOGGER.exception("State read error: %s", e)
|
|
return default
|
|
|
|
|
|
def save_state(alert_active: bool, signature: str) -> None:
|
|
try:
|
|
os.makedirs(os.path.dirname(STATE_FILE), exist_ok=True)
|
|
with open(STATE_FILE, "w", encoding="utf-8") as f:
|
|
json.dump(
|
|
{"alert_active": alert_active, "signature": signature, "updated": now_local().isoformat()},
|
|
f,
|
|
ensure_ascii=False,
|
|
indent=2,
|
|
)
|
|
except Exception as e:
|
|
LOGGER.exception("State write error: %s", e)
|
|
|
|
|
|
def get_forecast(session: requests.Session, lat: float, lon: float) -> Optional[Dict]:
|
|
params = {
|
|
"latitude": lat,
|
|
"longitude": lon,
|
|
"hourly": "precipitation,snowfall",
|
|
"timezone": TZ,
|
|
"forecast_days": 2,
|
|
"precipitation_unit": "mm",
|
|
"models": MODEL,
|
|
}
|
|
try:
|
|
r = session.get(OPEN_METEO_URL, params=params, headers=HTTP_HEADERS, timeout=25)
|
|
if r.status_code == 400:
|
|
return None
|
|
r.raise_for_status()
|
|
return r.json()
|
|
except Exception as e:
|
|
LOGGER.exception("Open-Meteo request error: %s", e)
|
|
return None
|
|
|
|
|
|
# =============================================================================
|
|
# Analytics
|
|
# =============================================================================
|
|
def rolling_sum_3h(values: List[float]) -> List[float]:
|
|
out = []
|
|
for i in range(0, max(0, len(values) - 2)):
|
|
try:
|
|
s = float(values[i]) + float(values[i + 1]) + float(values[i + 2])
|
|
except Exception:
|
|
s = 0.0
|
|
out.append(s)
|
|
return out
|
|
|
|
|
|
def first_persistent_run(values: List[float], threshold: float, persist: int) -> Tuple[bool, int, int, float]:
|
|
consec = 0
|
|
run_start = -1
|
|
run_max = 0.0
|
|
for i, v in enumerate(values):
|
|
vv = float(v) if v is not None else 0.0
|
|
if vv >= threshold:
|
|
if consec == 0:
|
|
run_start = i
|
|
run_max = vv
|
|
else:
|
|
run_max = max(run_max, vv)
|
|
consec += 1
|
|
if consec >= persist:
|
|
return True, run_start, consec, run_max
|
|
else:
|
|
consec = 0
|
|
return False, -1, 0, 0.0
|
|
|
|
|
|
def max_consecutive_gt(values: List[float], eps: float) -> Tuple[int, int]:
|
|
best_len = 0
|
|
best_start = -1
|
|
consec = 0
|
|
start = -1
|
|
for i, v in enumerate(values):
|
|
vv = float(v) if v is not None else 0.0
|
|
if vv > eps:
|
|
if consec == 0:
|
|
start = i
|
|
consec += 1
|
|
if consec > best_len:
|
|
best_len = consec
|
|
best_start = start
|
|
else:
|
|
consec = 0
|
|
return best_len, best_start
|
|
|
|
|
|
def compute_stats(data: Dict) -> Optional[Dict]:
|
|
hourly = data.get("hourly", {}) or {}
|
|
times = hourly.get("time", []) or []
|
|
precip = hourly.get("precipitation", []) or []
|
|
snow = hourly.get("snowfall", []) or []
|
|
|
|
n = min(len(times), len(precip), len(snow))
|
|
if n == 0: return None
|
|
|
|
now = now_local()
|
|
start_idx = -1
|
|
for i, t in enumerate(times[:n]):
|
|
if parse_time_to_local(t) >= now:
|
|
start_idx = i
|
|
break
|
|
if start_idx == -1: return None
|
|
|
|
end_idx = min(start_idx + HOURS_AHEAD, n)
|
|
if end_idx <= start_idx: return None
|
|
|
|
times_w = times[start_idx:end_idx]
|
|
precip_w = precip[start_idx:end_idx]
|
|
snow_w = snow[start_idx:end_idx]
|
|
dt_w = [parse_time_to_local(t) for t in times_w]
|
|
|
|
rain3 = rolling_sum_3h(precip_w)
|
|
rain3_max = max(rain3) if rain3 else 0.0
|
|
rain3_max_idx = rain3.index(rain3_max) if rain3 else -1
|
|
rain3_max_time = hhmm(dt_w[rain3_max_idx]) if (rain3_max_idx >= 0 and rain3_max_idx < len(dt_w)) else ""
|
|
|
|
rain_persist_ok, rain_run_start, rain_run_len, rain_run_max = first_persistent_run(
|
|
rain3, SOGLIA_PIOGGIA_3H_MM, PERSIST_HOURS
|
|
)
|
|
rain_persist_time = hhmm(dt_w[rain_run_start]) if (rain_persist_ok and rain_run_start < len(dt_w)) else ""
|
|
|
|
snow_run_len, snow_run_start = max_consecutive_gt(snow_w, eps=SNOW_HOURLY_EPS_CM)
|
|
snow_run_time = hhmm(dt_w[snow_run_start]) if (snow_run_start >= 0 and snow_run_start < len(dt_w)) else ""
|
|
|
|
snow_12h = sum(float(x) for x in snow_w[:min(12, len(snow_w))] if x is not None)
|
|
snow_24h = sum(float(x) for x in snow_w[:min(24, len(snow_w))] if x is not None)
|
|
|
|
return {
|
|
"rain3_max": float(rain3_max),
|
|
"rain3_max_time": rain3_max_time,
|
|
"rain_persist_ok": bool(rain_persist_ok),
|
|
"rain_persist_time": rain_persist_time,
|
|
"rain_persist_run_max": float(rain_run_max),
|
|
"rain_persist_run_len": int(rain_run_len),
|
|
"snow_run_len": int(snow_run_len),
|
|
"snow_run_time": snow_run_time,
|
|
"snow_12h": float(snow_12h),
|
|
"snow_24h": float(snow_24h),
|
|
}
|
|
|
|
|
|
def point_alerts(point_name: str, stats: Dict) -> Dict:
|
|
snow_alert = (stats["snow_run_len"] >= PERSIST_HOURS) and (stats["snow_24h"] > 0.0)
|
|
rain_alert = bool(stats["rain_persist_ok"])
|
|
return {
|
|
"name": point_name,
|
|
"snow_alert": snow_alert,
|
|
"rain_alert": rain_alert,
|
|
"snow_12h": stats["snow_12h"],
|
|
"snow_24h": stats["snow_24h"],
|
|
"snow_run_len": stats["snow_run_len"],
|
|
"snow_run_time": stats["snow_run_time"],
|
|
"rain3_max": stats["rain3_max"],
|
|
"rain3_max_time": stats["rain3_max_time"],
|
|
"rain_persist_time": stats["rain_persist_time"],
|
|
"rain_persist_run_max": stats["rain_persist_run_max"],
|
|
"rain_persist_run_len": stats["rain_persist_run_len"],
|
|
}
|
|
|
|
|
|
def build_signature(bologna: Dict, route: List[Dict]) -> str:
|
|
parts = [
|
|
f"BO:snow={int(bologna['snow_alert'])},rain={int(bologna['rain_alert'])},"
|
|
f"s24={bologna['snow_24h']:.1f},r3max={bologna['rain3_max']:.1f}"
|
|
]
|
|
for r in route:
|
|
parts.append(
|
|
f"{r['name']}:s{int(r['snow_alert'])}r{int(r['rain_alert'])}"
|
|
f":s24={r['snow_24h']:.1f}:r3max={r['rain3_max']:.1f}"
|
|
)
|
|
return "|".join(parts)
|
|
|
|
|
|
def main() -> None:
|
|
LOGGER.info("--- Student alert Bologna (Neve/Pioggia intensa) ---")
|
|
|
|
state = load_state()
|
|
was_active = bool(state.get("alert_active", False))
|
|
last_sig = state.get("signature", "")
|
|
|
|
with requests.Session() as session:
|
|
# Trigger: Bologna
|
|
bo = POINTS[0]
|
|
bo_data = get_forecast(session, bo["lat"], bo["lon"])
|
|
if not bo_data: return
|
|
bo_stats = compute_stats(bo_data)
|
|
if not bo_stats:
|
|
LOGGER.error("Impossibile calcolare statistiche Bologna.")
|
|
return
|
|
bo_alerts = point_alerts(bo["name"], bo_stats)
|
|
|
|
# Route points
|
|
route_alerts: List[Dict] = []
|
|
for p in POINTS[1:]:
|
|
d = get_forecast(session, p["lat"], p["lon"])
|
|
if not d: continue
|
|
st = compute_stats(d)
|
|
if not st: continue
|
|
route_alerts.append(point_alerts(p["name"], st))
|
|
|
|
any_route_alert = any(x["snow_alert"] or x["rain_alert"] for x in route_alerts)
|
|
any_alert = (bo_alerts["snow_alert"] or bo_alerts["rain_alert"] or any_route_alert)
|
|
|
|
sig = build_signature(bo_alerts, route_alerts)
|
|
|
|
# --- Scenario A: Allerta ---
|
|
if any_alert:
|
|
if (not was_active) or (sig != last_sig):
|
|
now_str = now_local().strftime("%H:%M")
|
|
header_icon = "❄️" if (bo_alerts["snow_alert"] or any(x["snow_alert"] for x in route_alerts)) \
|
|
else "🌧️" if (bo_alerts["rain_alert"] or any(x["rain_alert"] for x in route_alerts)) \
|
|
else "⚠️"
|
|
|
|
msg: List[str] = []
|
|
msg.append(f"{header_icon} <b>ALLERTA METEO (Bologna / Rientro)</b>")
|
|
msg.append(f"🕒 <i>Aggiornamento ore {html.escape(now_str)}</i>")
|
|
msg.append(f"🛰️ <code>Modello: {html.escape(MODEL)}</code>")
|
|
msg.append(f"⏱️ <code>Finestra: {HOURS_AHEAD} ore | Persistenza: {PERSIST_HOURS} ore</code>")
|
|
msg.append("")
|
|
|
|
# Bologna
|
|
msg.append("🎓 <b>A BOLOGNA</b>")
|
|
if bo_alerts["snow_alert"]:
|
|
msg.append(f"❄️ Neve (≥{PERSIST_HOURS}h) da ~<b>{html.escape(bo_alerts['snow_run_time'] or '—')}</b> (run ~{bo_alerts['snow_run_len']}h).")
|
|
msg.append(f"• Accumulo: 12h <b>{bo_alerts['snow_12h']:.1f} cm</b> | 24h <b>{bo_alerts['snow_24h']:.1f} cm</b>")
|
|
else:
|
|
msg.append(f"❄️ Neve: nessuna persistenza ≥ {PERSIST_HOURS}h (24h {bo_alerts['snow_24h']:.1f} cm).")
|
|
|
|
if bo_alerts["rain_alert"]:
|
|
msg.append(f"🌧️ Pioggia molto forte (3h ≥ {SOGLIA_PIOGGIA_3H_MM:.0f} mm, ≥{PERSIST_HOURS}h) da ~<b>{html.escape(bo_alerts['rain_persist_time'] or '—')}</b>.")
|
|
else:
|
|
msg.append(f"🌧️ Pioggia: max 3h <b>{bo_alerts['rain3_max']:.1f} mm</b> (picco ~{html.escape(bo_alerts['rain3_max_time'] or '—')}).")
|
|
|
|
msg.append("")
|
|
msg.append("🚗 <b>CASELLI (A14) / TRATTO</b>")
|
|
|
|
issues = [x for x in route_alerts if x["snow_alert"] or x["rain_alert"]]
|
|
if not issues:
|
|
msg.append("✅ Nessuna criticità persistente rilevata lungo il percorso.")
|
|
else:
|
|
for x in issues:
|
|
line = f"• <b>{html.escape(x['name'])}</b>: "
|
|
parts: List[str] = []
|
|
if x["snow_alert"]:
|
|
parts.append(f"❄️ neve (≥{PERSIST_HOURS}h) da ~{html.escape(x['snow_run_time'] or '—')} (24h {x['snow_24h']:.1f} cm)")
|
|
if x["rain_alert"]:
|
|
parts.append(f"🌧️ pioggia forte da ~{html.escape(x['rain_persist_time'] or '—')}")
|
|
line += " | ".join(parts)
|
|
msg.append(line)
|
|
|
|
msg.append("")
|
|
msg.append("<i>Fonte dati: Open-Meteo</i>")
|
|
|
|
# FIX: usare \n invece di <br/>
|
|
ok = telegram_send_html("\n".join(msg))
|
|
if ok:
|
|
LOGGER.info("Notifica inviata.")
|
|
else:
|
|
LOGGER.warning("Notifica NON inviata.")
|
|
|
|
save_state(True, sig)
|
|
else:
|
|
LOGGER.info("Allerta già notificata e invariata.")
|
|
return
|
|
|
|
# --- Scenario B: Rientro ---
|
|
if was_active and not any_alert:
|
|
now_str = now_local().strftime("%H:%M")
|
|
# FIX: usare \n invece di <br/>
|
|
msg = (
|
|
"🟢 <b>ALLERTA RIENTRATA (Bologna / Rientro)</b>\n"
|
|
f"🕒 <i>Aggiornamento ore {html.escape(now_str)}</i>\n\n"
|
|
f"Nelle prossime {HOURS_AHEAD} ore non risultano più condizioni persistenti\n"
|
|
f"di neve (≥{PERSIST_HOURS}h) o pioggia 3h sopra soglia (≥{PERSIST_HOURS}h).\n"
|
|
"<i>Fonte dati: Open-Meteo</i>"
|
|
)
|
|
ok = telegram_send_html(msg)
|
|
if ok:
|
|
LOGGER.info("Rientro notificato.")
|
|
else:
|
|
LOGGER.warning("Rientro NON inviato.")
|
|
save_state(False, "")
|
|
return
|
|
|
|
# --- Scenario C: Tranquillo ---
|
|
save_state(False, "")
|
|
LOGGER.info("Nessuna allerta.")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|