Files
loogle-scripts/services/telegram-bot/student_alert.py

475 lines
17 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import datetime
import html
import json
import logging
import os
import time
from logging.handlers import RotatingFileHandler
from typing import Dict, List, Optional, Tuple
from zoneinfo import ZoneInfo
import requests
from dateutil import parser
# =============================================================================
# student_alert.py
#
# Scopo:
# Notificare lo studente (Bologna) se nelle prossime 24 ore sono previsti:
# - Neve persistente (>= 2 ore consecutive con snowfall > 0)
# - Pioggia molto forte persistente (>= 2 ore consecutive con 3h-rolling >= soglia)
# sia a Bologna (trigger) sia lungo il tragitto (caselli A14) fino a San Marino.
# =============================================================================
DEBUG = os.environ.get("DEBUG", "0").strip() == "1"
# ----------------- TELEGRAM -----------------
TELEGRAM_CHAT_IDS = ["64463169", "24827341", "132455422", "5405962012"]
TOKEN_FILE_HOME = os.path.expanduser("~/.telegram_dpc_bot_token")
TOKEN_FILE_ETC = "/etc/telegram_dpc_bot_token"
# ----------------- SOGLIE E LOGICA -----------------
SOGLIA_PIOGGIA_3H_MM = 30.0 # mm in 3 ore (rolling)
PERSIST_HOURS = 2 # persistenza minima (ore)
HOURS_AHEAD = 24
SNOW_HOURLY_EPS_CM = 0.2 # Soglia minima neve cm/h
# File di stato
STATE_FILE = "/home/daniely/docker/telegram-bot/student_state.json"
# ----------------- PUNTI DEL PERCORSO (Caselli A14) -----------------
POINTS = [
{"name": "🎓 Bologna (V. Regnoli)", "lat": 44.4930, "lon": 11.3690, "type": "trigger"},
{"name": "🛣️ Casello Imola", "lat": 44.3798, "lon": 11.7397, "type": "route"},
{"name": "🛣️ Casello Faenza", "lat": 44.3223, "lon": 11.9040, "type": "route"},
{"name": "🛣️ Casello Forlì", "lat": 44.2502, "lon": 12.0910, "type": "route"},
{"name": "🛣️ Casello Cesena", "lat": 44.1675, "lon": 12.2835, "type": "route"},
{"name": "🛣️ Casello Rimini", "lat": 44.0362, "lon": 12.5659, "type": "route"},
{"name": "🏠 San Marino", "lat": 43.9356, "lon": 12.4296, "type": "end"},
]
# ----------------- OPEN-METEO -----------------
OPEN_METEO_URL = "https://api.open-meteo.com/v1/forecast"
TZ = "Europe/Rome"
TZINFO = ZoneInfo(TZ)
HTTP_HEADERS = {"User-Agent": "rpi-student-alert/1.0"}
MODEL = "meteofrance_arome_france_hd"
# ----------------- LOG -----------------
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
LOG_FILE = os.path.join(BASE_DIR, "student_alert.log")
def setup_logger() -> logging.Logger:
logger = logging.getLogger("student_alert")
logger.setLevel(logging.DEBUG if DEBUG else logging.INFO)
logger.handlers.clear()
fh = RotatingFileHandler(LOG_FILE, maxBytes=1_000_000, backupCount=5, encoding="utf-8")
fh.setLevel(logging.DEBUG)
fmt = logging.Formatter("%(asctime)s %(levelname)s %(message)s")
fh.setFormatter(fmt)
logger.addHandler(fh)
if DEBUG:
sh = logging.StreamHandler()
sh.setLevel(logging.DEBUG)
sh.setFormatter(fmt)
logger.addHandler(sh)
return logger
LOGGER = setup_logger()
# =============================================================================
# Utility
# =============================================================================
def now_local() -> datetime.datetime:
return datetime.datetime.now(TZINFO)
def read_text_file(path: str) -> str:
try:
with open(path, "r", encoding="utf-8") as f:
return f.read().strip()
except FileNotFoundError:
return ""
except PermissionError:
LOGGER.debug("Permission denied reading %s", path)
return ""
except Exception as e:
LOGGER.exception("Error reading %s: %s", path, e)
return ""
def load_bot_token() -> str:
tok = os.environ.get("TELEGRAM_BOT_TOKEN", "").strip()
if tok:
return tok
tok = read_text_file(TOKEN_FILE_HOME)
if tok:
return tok
tok = read_text_file(TOKEN_FILE_ETC)
return tok.strip() if tok else ""
def parse_time_to_local(t: str) -> datetime.datetime:
dt = parser.isoparse(t)
if dt.tzinfo is None:
return dt.replace(tzinfo=TZINFO)
return dt.astimezone(TZINFO)
def hhmm(dt: datetime.datetime) -> str:
return dt.strftime("%H:%M")
# =============================================================================
# Telegram
# =============================================================================
def telegram_send_html(message_html: str) -> bool:
token = load_bot_token()
if not token:
LOGGER.warning("Telegram token missing: message not sent.")
return False
url = f"https://api.telegram.org/bot{token}/sendMessage"
base_payload = {
"text": message_html,
"parse_mode": "HTML",
"disable_web_page_preview": True,
}
sent_ok = False
with requests.Session() as s:
for chat_id in TELEGRAM_CHAT_IDS:
payload = dict(base_payload)
payload["chat_id"] = chat_id
try:
resp = s.post(url, json=payload, timeout=15)
if resp.status_code == 200:
sent_ok = True
else:
LOGGER.error("Telegram error chat_id=%s status=%s body=%s",
chat_id, resp.status_code, resp.text[:500])
time.sleep(0.25)
except Exception as e:
LOGGER.exception("Telegram exception chat_id=%s err=%s", chat_id, e)
return sent_ok
# =============================================================================
# State & Open-Meteo
# =============================================================================
def load_state() -> Dict:
default = {"alert_active": False, "signature": "", "updated": ""}
if os.path.exists(STATE_FILE):
try:
with open(STATE_FILE, "r", encoding="utf-8") as f:
data = json.load(f) or {}
default.update(data)
except Exception as e:
LOGGER.exception("State read error: %s", e)
return default
def save_state(alert_active: bool, signature: str) -> None:
try:
os.makedirs(os.path.dirname(STATE_FILE), exist_ok=True)
with open(STATE_FILE, "w", encoding="utf-8") as f:
json.dump(
{"alert_active": alert_active, "signature": signature, "updated": now_local().isoformat()},
f,
ensure_ascii=False,
indent=2,
)
except Exception as e:
LOGGER.exception("State write error: %s", e)
def get_forecast(session: requests.Session, lat: float, lon: float) -> Optional[Dict]:
params = {
"latitude": lat,
"longitude": lon,
"hourly": "precipitation,snowfall",
"timezone": TZ,
"forecast_days": 2,
"precipitation_unit": "mm",
"models": MODEL,
}
try:
r = session.get(OPEN_METEO_URL, params=params, headers=HTTP_HEADERS, timeout=25)
if r.status_code == 400:
return None
r.raise_for_status()
return r.json()
except Exception as e:
LOGGER.exception("Open-Meteo request error: %s", e)
return None
# =============================================================================
# Analytics
# =============================================================================
def rolling_sum_3h(values: List[float]) -> List[float]:
out = []
for i in range(0, max(0, len(values) - 2)):
try:
s = float(values[i]) + float(values[i + 1]) + float(values[i + 2])
except Exception:
s = 0.0
out.append(s)
return out
def first_persistent_run(values: List[float], threshold: float, persist: int) -> Tuple[bool, int, int, float]:
consec = 0
run_start = -1
run_max = 0.0
for i, v in enumerate(values):
vv = float(v) if v is not None else 0.0
if vv >= threshold:
if consec == 0:
run_start = i
run_max = vv
else:
run_max = max(run_max, vv)
consec += 1
if consec >= persist:
return True, run_start, consec, run_max
else:
consec = 0
return False, -1, 0, 0.0
def max_consecutive_gt(values: List[float], eps: float) -> Tuple[int, int]:
best_len = 0
best_start = -1
consec = 0
start = -1
for i, v in enumerate(values):
vv = float(v) if v is not None else 0.0
if vv > eps:
if consec == 0:
start = i
consec += 1
if consec > best_len:
best_len = consec
best_start = start
else:
consec = 0
return best_len, best_start
def compute_stats(data: Dict) -> Optional[Dict]:
hourly = data.get("hourly", {}) or {}
times = hourly.get("time", []) or []
precip = hourly.get("precipitation", []) or []
snow = hourly.get("snowfall", []) or []
n = min(len(times), len(precip), len(snow))
if n == 0: return None
now = now_local()
start_idx = -1
for i, t in enumerate(times[:n]):
if parse_time_to_local(t) >= now:
start_idx = i
break
if start_idx == -1: return None
end_idx = min(start_idx + HOURS_AHEAD, n)
if end_idx <= start_idx: return None
times_w = times[start_idx:end_idx]
precip_w = precip[start_idx:end_idx]
snow_w = snow[start_idx:end_idx]
dt_w = [parse_time_to_local(t) for t in times_w]
rain3 = rolling_sum_3h(precip_w)
rain3_max = max(rain3) if rain3 else 0.0
rain3_max_idx = rain3.index(rain3_max) if rain3 else -1
rain3_max_time = hhmm(dt_w[rain3_max_idx]) if (rain3_max_idx >= 0 and rain3_max_idx < len(dt_w)) else ""
rain_persist_ok, rain_run_start, rain_run_len, rain_run_max = first_persistent_run(
rain3, SOGLIA_PIOGGIA_3H_MM, PERSIST_HOURS
)
rain_persist_time = hhmm(dt_w[rain_run_start]) if (rain_persist_ok and rain_run_start < len(dt_w)) else ""
snow_run_len, snow_run_start = max_consecutive_gt(snow_w, eps=SNOW_HOURLY_EPS_CM)
snow_run_time = hhmm(dt_w[snow_run_start]) if (snow_run_start >= 0 and snow_run_start < len(dt_w)) else ""
snow_12h = sum(float(x) for x in snow_w[:min(12, len(snow_w))] if x is not None)
snow_24h = sum(float(x) for x in snow_w[:min(24, len(snow_w))] if x is not None)
return {
"rain3_max": float(rain3_max),
"rain3_max_time": rain3_max_time,
"rain_persist_ok": bool(rain_persist_ok),
"rain_persist_time": rain_persist_time,
"rain_persist_run_max": float(rain_run_max),
"rain_persist_run_len": int(rain_run_len),
"snow_run_len": int(snow_run_len),
"snow_run_time": snow_run_time,
"snow_12h": float(snow_12h),
"snow_24h": float(snow_24h),
}
def point_alerts(point_name: str, stats: Dict) -> Dict:
snow_alert = (stats["snow_run_len"] >= PERSIST_HOURS) and (stats["snow_24h"] > 0.0)
rain_alert = bool(stats["rain_persist_ok"])
return {
"name": point_name,
"snow_alert": snow_alert,
"rain_alert": rain_alert,
"snow_12h": stats["snow_12h"],
"snow_24h": stats["snow_24h"],
"snow_run_len": stats["snow_run_len"],
"snow_run_time": stats["snow_run_time"],
"rain3_max": stats["rain3_max"],
"rain3_max_time": stats["rain3_max_time"],
"rain_persist_time": stats["rain_persist_time"],
"rain_persist_run_max": stats["rain_persist_run_max"],
"rain_persist_run_len": stats["rain_persist_run_len"],
}
def build_signature(bologna: Dict, route: List[Dict]) -> str:
parts = [
f"BO:snow={int(bologna['snow_alert'])},rain={int(bologna['rain_alert'])},"
f"s24={bologna['snow_24h']:.1f},r3max={bologna['rain3_max']:.1f}"
]
for r in route:
parts.append(
f"{r['name']}:s{int(r['snow_alert'])}r{int(r['rain_alert'])}"
f":s24={r['snow_24h']:.1f}:r3max={r['rain3_max']:.1f}"
)
return "|".join(parts)
def main() -> None:
LOGGER.info("--- Student alert Bologna (Neve/Pioggia intensa) ---")
state = load_state()
was_active = bool(state.get("alert_active", False))
last_sig = state.get("signature", "")
with requests.Session() as session:
# Trigger: Bologna
bo = POINTS[0]
bo_data = get_forecast(session, bo["lat"], bo["lon"])
if not bo_data: return
bo_stats = compute_stats(bo_data)
if not bo_stats:
LOGGER.error("Impossibile calcolare statistiche Bologna.")
return
bo_alerts = point_alerts(bo["name"], bo_stats)
# Route points
route_alerts: List[Dict] = []
for p in POINTS[1:]:
d = get_forecast(session, p["lat"], p["lon"])
if not d: continue
st = compute_stats(d)
if not st: continue
route_alerts.append(point_alerts(p["name"], st))
any_route_alert = any(x["snow_alert"] or x["rain_alert"] for x in route_alerts)
any_alert = (bo_alerts["snow_alert"] or bo_alerts["rain_alert"] or any_route_alert)
sig = build_signature(bo_alerts, route_alerts)
# --- Scenario A: Allerta ---
if any_alert:
if (not was_active) or (sig != last_sig):
now_str = now_local().strftime("%H:%M")
header_icon = "❄️" if (bo_alerts["snow_alert"] or any(x["snow_alert"] for x in route_alerts)) \
else "🌧️" if (bo_alerts["rain_alert"] or any(x["rain_alert"] for x in route_alerts)) \
else "⚠️"
msg: List[str] = []
msg.append(f"{header_icon} <b>ALLERTA METEO (Bologna / Rientro)</b>")
msg.append(f"🕒 <i>Aggiornamento ore {html.escape(now_str)}</i>")
msg.append(f"🛰️ <code>Modello: {html.escape(MODEL)}</code>")
msg.append(f"⏱️ <code>Finestra: {HOURS_AHEAD} ore | Persistenza: {PERSIST_HOURS} ore</code>")
msg.append("")
# Bologna
msg.append("🎓 <b>A BOLOGNA</b>")
if bo_alerts["snow_alert"]:
msg.append(f"❄️ Neve (≥{PERSIST_HOURS}h) da ~<b>{html.escape(bo_alerts['snow_run_time'] or '')}</b> (run ~{bo_alerts['snow_run_len']}h).")
msg.append(f"• Accumulo: 12h <b>{bo_alerts['snow_12h']:.1f} cm</b> | 24h <b>{bo_alerts['snow_24h']:.1f} cm</b>")
else:
msg.append(f"❄️ Neve: nessuna persistenza ≥ {PERSIST_HOURS}h (24h {bo_alerts['snow_24h']:.1f} cm).")
if bo_alerts["rain_alert"]:
msg.append(f"🌧️ Pioggia molto forte (3h ≥ {SOGLIA_PIOGGIA_3H_MM:.0f} mm, ≥{PERSIST_HOURS}h) da ~<b>{html.escape(bo_alerts['rain_persist_time'] or '')}</b>.")
else:
msg.append(f"🌧️ Pioggia: max 3h <b>{bo_alerts['rain3_max']:.1f} mm</b> (picco ~{html.escape(bo_alerts['rain3_max_time'] or '')}).")
msg.append("")
msg.append("🚗 <b>CASELLI (A14) / TRATTO</b>")
issues = [x for x in route_alerts if x["snow_alert"] or x["rain_alert"]]
if not issues:
msg.append("✅ Nessuna criticità persistente rilevata lungo il percorso.")
else:
for x in issues:
line = f"• <b>{html.escape(x['name'])}</b>: "
parts: List[str] = []
if x["snow_alert"]:
parts.append(f"❄️ neve (≥{PERSIST_HOURS}h) da ~{html.escape(x['snow_run_time'] or '')} (24h {x['snow_24h']:.1f} cm)")
if x["rain_alert"]:
parts.append(f"🌧️ pioggia forte da ~{html.escape(x['rain_persist_time'] or '')}")
line += " | ".join(parts)
msg.append(line)
msg.append("")
msg.append("<i>Fonte dati: Open-Meteo</i>")
# FIX: usare \n invece di <br/>
ok = telegram_send_html("\n".join(msg))
if ok:
LOGGER.info("Notifica inviata.")
else:
LOGGER.warning("Notifica NON inviata.")
save_state(True, sig)
else:
LOGGER.info("Allerta già notificata e invariata.")
return
# --- Scenario B: Rientro ---
if was_active and not any_alert:
now_str = now_local().strftime("%H:%M")
# FIX: usare \n invece di <br/>
msg = (
"🟢 <b>ALLERTA RIENTRATA (Bologna / Rientro)</b>\n"
f"🕒 <i>Aggiornamento ore {html.escape(now_str)}</i>\n\n"
f"Nelle prossime {HOURS_AHEAD} ore non risultano più condizioni persistenti\n"
f"di neve (≥{PERSIST_HOURS}h) o pioggia 3h sopra soglia (≥{PERSIST_HOURS}h).\n"
"<i>Fonte dati: Open-Meteo</i>"
)
ok = telegram_send_html(msg)
if ok:
LOGGER.info("Rientro notificato.")
else:
LOGGER.warning("Rientro NON inviato.")
save_state(False, "")
return
# --- Scenario C: Tranquillo ---
save_state(False, "")
LOGGER.info("Nessuna allerta.")
if __name__ == "__main__":
main()