diff --git a/services/telegram-bot/civil_protection.py b/services/telegram-bot/civil_protection.py
index 9b200cc..95ebf70 100644
--- a/services/telegram-bot/civil_protection.py
+++ b/services/telegram-bot/civil_protection.py
@@ -1,151 +1,371 @@
-import requests
-import json
-import os
-import time
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+
import datetime
+import html as html_lib
+import json
+import logging
+import os
+import re
+import time
+from html.parser import HTMLParser
+from logging.handlers import RotatingFileHandler
from zoneinfo import ZoneInfo
-# --- CONFIGURAZIONE UTENTE ---
-# 👇👇 INSERISCI QUI I TUOI DATI 👇👇
-TELEGRAM_BOT_TOKEN = "8155587974:AAF9OekvBpixtk8ZH6KoIc0L8edbhdXt7A4"
+import requests
+
+# =============================================================================
+# Modalità debug:
+# DEBUG=1 python3 civil_protection.py
+# Token Telegram (senza modifiche al codice):
+# 1) export TELEGRAM_BOT_TOKEN="...token..."
+# 2) oppure file nel tuo home: ~/.telegram_dpc_bot_token
+# =============================================================================
+
+DEBUG = os.environ.get("DEBUG", "0").strip() == "1"
+
+BULLETIN_URL = "https://mappe.protezionecivile.gov.it/it/mappe-rischi/bollettino-di-criticita/"
+
+# Chat IDs
TELEGRAM_CHAT_IDS = ["64463169", "24827341", "132455422", "5405962012"]
-# --- ZONE DA MONITORARE ---
-# EMR-B2 = Costa Romagnola (Rimini/Dogana)
-# EMR-A2 = Alta Collina Romagnola (San Marino/Titano)
-# EMR-D1 = Pianura Bolognese (Bologna Città)
-TARGET_ZONES = ["EMR-B2", "EMR-A2", "EMR-D1"]
+# Directory dello script (log + state qui, così non serve sudo)
+BASE_DIR = os.path.dirname(os.path.abspath(__file__))
+STATE_FILE = os.path.join(BASE_DIR, "dpc_state.json")
+LOG_FILE = os.path.join(BASE_DIR, "dpc_telegram.log")
-# URL Ufficiale DPC
-DPC_URL = "https://raw.githubusercontent.com/pcm-dpc/DPC-Bollettini-Criticita-Idrogeologica-Idraulica/master/files/geojson/today.json"
+# Dove cercare il token (in quest’ordine)
+TOKEN_FILE_HOME = os.path.expanduser("~/.telegram_dpc_bot_token")
+TOKEN_FILE_ETC = "/etc/telegram_dpc_bot_token"
-# File di stato
-STATE_FILE = "/home/daniely/docker/telegram-bot/dpc_state.json"
-
-# Mappe
-RISK_MAP = {
- 1: "🟡 GIALLA",
- 2: "🟠 ARANCIONE",
- 3: "🔴 ROSSA"
+# Zone target (come compaiono tipicamente nel testo del bollettino)
+TARGET_ZONES = {
+ "EMR-B2": "Costa romagnola",
+ "EMR-A2": "Alta collina romagnola",
+ "EMR-D1": "Pianura bolognese",
}
-RISK_TYPES = {
- "idro": "💧 Idraulico",
- "idrogeo": "⛰️ Idrogeologico",
- "temporali": "⚡ Temporali",
- "vento": "💨 Vento",
- "neve": "❄️ Neve",
- "ghiaccio": "🧊 Ghiaccio",
- "mare": "🌊 Mareggiate"
+RISK_ICON = {
+ "IDRAULICO": "💧",
+ "IDROGEOLOGICO": "⛰️",
+ "TEMPORALI": "⚡",
}
-def get_zone_label(zone_id):
- if zone_id == "EMR-B2": return "Rimini / Bassa RSM"
- if zone_id == "EMR-A2": return "Alta RSM / Carpegna"
- if zone_id == "EMR-D1": return "Bologna Città"
- return zone_id
+COLOR_ICON = {
+ "GIALLA": "🟡",
+ "ARANCIONE": "🟠",
+ "ROSSA": "🔴",
+}
-def send_telegram_message(message):
- if not TELEGRAM_BOT_TOKEN or "INSERISCI" in TELEGRAM_BOT_TOKEN:
- print(f"[TEST OUT] {message}")
- return
+HTTP_HEADERS = {"User-Agent": "rpi-dpc-bollettino/1.2"}
- url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage"
- for chat_id in TELEGRAM_CHAT_IDS:
- try:
- requests.post(url, json={"chat_id": chat_id, "text": message, "parse_mode": "Markdown"}, timeout=10)
- time.sleep(0.2)
- except: pass
+# =============================================================================
+# Logging
+# =============================================================================
-def load_last_alert():
- if os.path.exists(STATE_FILE):
- try:
- with open(STATE_FILE, 'r') as f: return json.load(f)
- except: pass
+def setup_logger() -> logging.Logger:
+ logger = logging.getLogger("dpc_telegram")
+ logger.setLevel(logging.DEBUG if DEBUG else logging.INFO)
+ logger.handlers.clear()
+
+ fh = RotatingFileHandler(LOG_FILE, maxBytes=1_000_000, backupCount=5, encoding="utf-8")
+ fh.setLevel(logging.DEBUG)
+ fmt = logging.Formatter("%(asctime)s %(levelname)s %(message)s")
+ fh.setFormatter(fmt)
+ logger.addHandler(fh)
+
+ if DEBUG:
+ sh = logging.StreamHandler()
+ sh.setLevel(logging.DEBUG)
+ sh.setFormatter(fmt)
+ logger.addHandler(sh)
+
+ return logger
+
+LOGGER = setup_logger()
+
+# =============================================================================
+# Utility
+# =============================================================================
+
+def now_rome():
+ return datetime.datetime.now(ZoneInfo("Europe/Rome"))
+
+def today_str_italy():
+ return now_rome().strftime("%d/%m/%Y")
+
+def load_text_file(path: str) -> str:
+ try:
+ with open(path, "r", encoding="utf-8") as f:
+ return f.read().strip()
+ except FileNotFoundError:
+ return ""
+ except PermissionError:
+ # Non consideriamo “errore fatale”: semplicemente quel path non è leggibile.
+ LOGGER.debug("Permesso negato leggendo %s", path)
+ return ""
+ except Exception as e:
+ LOGGER.exception("Errore leggendo %s: %s", path, e)
+ return ""
+
+def load_bot_token() -> str:
+ # 1) env
+ tok = os.environ.get("TELEGRAM_BOT_TOKEN", "").strip()
+ if tok:
+ return tok
+
+ # 2) home file (consigliato)
+ tok = load_text_file(TOKEN_FILE_HOME)
+ if tok:
+ return tok
+
+ # 3) /etc (solo se leggibile)
+ tok = load_text_file(TOKEN_FILE_ETC)
+ if tok:
+ return tok
+
+ return ""
+
+def telegram_send_html(message_html: str) -> bool:
+ """
+ Prova a inviare il messaggio. Non solleva eccezioni.
+ Ritorna True se almeno un invio ha avuto status 200.
+ Importante: lo script chiama questa funzione SOLO in caso di allerte.
+ """
+ token = load_bot_token()
+ if not token:
+ LOGGER.warning("Token Telegram assente. Nessun invio effettuato.")
+ return False
+
+ url = f"https://api.telegram.org/bot{token}/sendMessage"
+ base_payload = {
+ "text": message_html,
+ "parse_mode": "HTML",
+ "disable_web_page_preview": True,
+ }
+
+ sent_ok = False
+ with requests.Session() as s:
+ for chat_id in TELEGRAM_CHAT_IDS:
+ payload = dict(base_payload)
+ payload["chat_id"] = chat_id
+ try:
+ resp = s.post(url, json=payload, timeout=15)
+ if resp.status_code == 200:
+ sent_ok = True
+ else:
+ LOGGER.error("Telegram error chat_id=%s status=%s body=%s",
+ chat_id, resp.status_code, resp.text[:500])
+ time.sleep(0.25)
+ except Exception as e:
+ LOGGER.exception("Telegram exception chat_id=%s err=%s", chat_id, e)
+
+ return sent_ok
+
+def load_state() -> dict:
+ try:
+ if os.path.exists(STATE_FILE):
+ with open(STATE_FILE, "r", encoding="utf-8") as f:
+ return json.load(f) or {}
+ except Exception as e:
+ LOGGER.exception("Errore lettura stato: %s", e)
return {}
-def save_current_alert(data):
+def save_state(state: dict) -> None:
try:
- with open(STATE_FILE, 'w') as f: json.dump(data, f)
- except: pass
-
-def analyze_dpc():
- print("--- Controllo Protezione Civile ---")
-
- try:
- r = requests.get(DPC_URL, timeout=10)
- r.raise_for_status()
- data = r.json()
+ with open(STATE_FILE, "w", encoding="utf-8") as f:
+ json.dump(state, f, ensure_ascii=False, indent=2)
except Exception as e:
- print(f"Errore DPC: {e}")
+ LOGGER.exception("Errore scrittura stato: %s", e)
+
+def norm(s: str) -> str:
+ return re.sub(r"\s+", " ", (s or "").strip().lower())
+
+class TextExtractor(HTMLParser):
+ def __init__(self):
+ super().__init__()
+ self.parts = []
+
+ def handle_data(self, data):
+ if data and data.strip():
+ self.parts.append(data)
+
+def html_to_lines(html_text: str) -> list[str]:
+ p = TextExtractor()
+ p.feed(html_text)
+ text = "\n".join(p.parts)
+ text = html_lib.unescape(text)
+
+ lines = []
+ for raw in text.splitlines():
+ line = re.sub(r"\s+", " ", raw).strip()
+ if line:
+ lines.append(line)
+ return lines
+
+# =============================================================================
+# Parsing bollettino
+# =============================================================================
+
+RISK_HEADER_RE = re.compile(
+ r"^(ORDINARIA|MODERATA|ELEVATA)\s+CRITICITA'\s+PER\s+RISCHIO\s+([A-ZÀ-Ü]+)\s*/\s*ALLERTA\s+(GIALLA|ARANCIONE|ROSSA)\s*:\s*$"
+)
+DAY_START_RE = re.compile(r"^Per la giornata di (oggi|domani),\s*(.+?):\s*$", re.IGNORECASE)
+TITLE_RE = re.compile(r"^Bollettino di Criticità del (.+?) ore (\d{1,2}:\d{2})", re.IGNORECASE)
+
+def fetch_bulletin_text_lines() -> list[str]:
+ r = requests.get(BULLETIN_URL, headers=HTTP_HEADERS, timeout=25)
+ r.raise_for_status()
+ # Forza encoding per evitare “Mercoledì”
+ r.encoding = "utf-8"
+ return html_to_lines(r.text)
+
+def parse_bulletin(lines: list[str]) -> dict:
+ out = {"title": "", "issued_date": "", "issued_time": "", "days": {}}
+
+ for ln in lines[:150]:
+ m = TITLE_RE.match(ln)
+ if m:
+ out["title"] = ln
+ out["issued_date"] = m.group(1).strip()
+ out["issued_time"] = m.group(2).strip()
+ break
+
+ current_day_key = None
+ current_day_label = None
+ current_risk = None
+ current_color = None
+
+ def add_alert(zone_name: str):
+ if not current_day_key or not current_risk or not current_color:
+ return
+ day = out["days"].setdefault(current_day_key, {"date_label": current_day_label or "", "alerts": {}})
+ icon = COLOR_ICON.get(current_color, "⚪")
+ ricon = RISK_ICON.get(current_risk, "⚠️")
+ entry = f"{icon} {ricon} {current_risk}"
+ day["alerts"].setdefault(zone_name, [])
+ if entry not in day["alerts"][zone_name]:
+ day["alerts"][zone_name].append(entry)
+
+ for ln in lines:
+ md = DAY_START_RE.match(ln)
+ if md:
+ current_day_key = md.group(1).lower()
+ current_day_label = md.group(2).strip()
+ out["days"].setdefault(current_day_key, {"date_label": current_day_label, "alerts": {}})
+ current_risk = None
+ current_color = None
+ continue
+
+ mh = RISK_HEADER_RE.match(ln)
+ if mh:
+ current_risk = mh.group(2).strip()
+ current_color = mh.group(3).strip()
+ continue
+
+ if current_day_key and current_risk and current_color and ":" in ln:
+ region, rest = ln.split(":", 1)
+ if norm(region) in (norm("Emilia Romagna"), norm("Emilia-Romagna")):
+ zones = [z.strip() for z in rest.split(",") if z.strip()]
+ for _, target_zone_name in TARGET_ZONES.items():
+ for z in zones:
+ if norm(z) == norm(target_zone_name):
+ add_alert(target_zone_name)
+
+ return out
+
+def has_any_alert(parsed: dict) -> bool:
+ for day_key in ("oggi", "domani"):
+ day = parsed.get("days", {}).get(day_key, {})
+ if day.get("alerts"):
+ return True
+ return False
+
+def build_signature(parsed: dict) -> str:
+ parts = [parsed.get("issued_date", ""), parsed.get("issued_time", ""), parsed.get("title", "")]
+ for day_key in ("oggi", "domani"):
+ day = parsed.get("days", {}).get(day_key, {})
+ parts.append(day_key + ":" + day.get("date_label", ""))
+ alerts = day.get("alerts", {})
+ for zone in sorted(alerts.keys()):
+ parts.append(zone + "=" + ",".join(sorted(alerts[zone])))
+ return "|".join(parts)
+
+def format_message(parsed: dict) -> str:
+ issued = parsed.get("title") or "Bollettino di Criticità"
+ issued_safe = html_lib.escape(issued)
+
+ lines = []
+ lines.append("📢 PROTEZIONE CIVILE (Allerta)")
+ lines.append(f"🕒 {issued_safe}")
+ lines.append("")
+
+ for day_key in ("oggi", "domani"):
+ day = parsed.get("days", {}).get(day_key)
+ if not day:
+ continue
+ alerts = day.get("alerts", {})
+ if not alerts:
+ continue
+
+ lines.append(f"📅 {html_lib.escape(day.get('date_label',''))}")
+ for zone in sorted(alerts.keys()):
+ lines.append(f"📍 {html_lib.escape(zone)}")
+ for entry in alerts[zone]:
+ lines.append(html_lib.escape(entry))
+ lines.append("")
+
+ lines.append("Fonte: mappe.protezionecivile.gov.it")
+ return "\n".join(lines)
+
+# =============================================================================
+# Main
+# =============================================================================
+
+def main():
+ LOGGER.info("--- Controllo Protezione Civile (Bollettino ufficiale) ---")
+
+ try:
+ lines = fetch_bulletin_text_lines()
+ parsed = parse_bulletin(lines)
+ except Exception as e:
+ # NESSUN Telegram in caso di errori: solo log.
+ LOGGER.exception("Errore durante fetch/parse bollettino: %s", e)
return
- today_str = datetime.datetime.now(ZoneInfo("Europe/Rome")).strftime('%d/%m/%Y')
-
- active_alerts = {}
- max_global_level = 0
-
- # Stringa univoca per identificare se l'allerta è cambiata nel contenuto
- current_alert_signature = ""
+ if DEBUG:
+ LOGGER.debug("title=%s", parsed.get("title", ""))
+ for k in ("oggi", "domani"):
+ d = parsed.get("days", {}).get(k, {})
+ LOGGER.debug("%s label=%s", k, d.get("date_label", ""))
+ LOGGER.debug("%s alerts=%s", k, d.get("alerts", {}))
- for feature in data['features']:
- props = feature['properties']
- zone_id = props.get('zone_id')
-
- if zone_id in TARGET_ZONES:
- zone_risks = []
-
- for key, label in RISK_TYPES.items():
- level = props.get(f"crit_{key}")
-
- if level and level > 0:
- if level > max_global_level: max_global_level = level
-
- color_icon = "🟡" if level == 1 else "🟠" if level == 2 else "🔴"
- risk_str = f"{color_icon} {label}"
- zone_risks.append(risk_str)
-
- if zone_risks:
- label = get_zone_label(zone_id)
- active_alerts[label] = zone_risks
- # Aggiungiamo alla firma per capire se qualcosa è cambiato
- current_alert_signature += f"{zone_id}:{','.join(zone_risks)}|"
-
- # --- LOGICA DI INVIO ---
-
- last_state = load_last_alert()
- last_date = last_state.get("date")
- last_sig = last_state.get("signature", "")
-
- # Se tutto verde (livello 0)
- if max_global_level == 0:
- print("Nessuna allerta.")
- if last_date == today_str and last_sig != "":
- # Opzionale: Potremmo mandare "Allerta Rientrata", ma DPC resetta a mezzanotte.
- # Per ora resettiamo solo lo stato.
- save_current_alert({"date": today_str, "level": 0, "signature": ""})
+ # Regola: invia Telegram SOLO se esistono allerte
+ if not has_any_alert(parsed):
+ LOGGER.info("Nessuna allerta nelle zone monitorate. Nessuna notifica inviata.")
return
- # Invia SE:
- # 1. È un giorno nuovo
- # 2. OPPURE la situazione è cambiata (es. aggiunto Bologna, o passato da Giallo a Rosso)
- if last_date != today_str or current_alert_signature != last_sig:
-
- msg = f"📢 **PROTEZIONE CIVILE (Allerta)**\n"
- msg += f"📅 {today_str}\n\n"
-
- for zone_name, risks in active_alerts.items():
- msg += f"📍 **{zone_name}**\n"
- msg += "\n".join(risks) + "\n\n"
-
- msg += "_Fonte: Dipartimento Protezione Civile_"
-
- send_telegram_message(msg)
- print("Allerta inviata.")
-
- save_current_alert({"date": today_str, "level": max_global_level, "signature": current_alert_signature})
+ sig = build_signature(parsed)
+ state = load_state()
+ last_sig = state.get("last_alert_signature", "")
+
+ if sig == last_sig:
+ LOGGER.info("Allerta già notificata e invariata. Nessuna nuova notifica.")
+ return
+
+ # A questo punto: ci sono allerte e sono nuove -> prova invio
+ msg = format_message(parsed)
+ sent_ok = telegram_send_html(msg)
+
+ if sent_ok:
+ LOGGER.info("Notifica allerta inviata con successo.")
+ save_state({
+ "date": today_str_italy(),
+ "last_alert_signature": sig,
+ })
else:
- print("Allerta già notificata e invariata.")
+ # Non aggiorniamo lo stato: quando risolvi token/rete, reinvierà.
+ LOGGER.warning("Invio non riuscito (token mancante o errore Telegram). Stato NON aggiornato.")
if __name__ == "__main__":
- analyze_dpc()
+ main()
diff --git a/services/telegram-bot/severe_weather.py b/services/telegram-bot/severe_weather.py
index 791fef2..a8bbac5 100644
--- a/services/telegram-bot/severe_weather.py
+++ b/services/telegram-bot/severe_weather.py
@@ -1,190 +1,650 @@
-import requests
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+
import datetime
-import os
+import html
import json
+import logging
+import os
import time
-from dateutil import parser
+from logging.handlers import RotatingFileHandler
+from typing import Dict, List, Optional, Tuple
from zoneinfo import ZoneInfo
-# --- CONFIGURAZIONE UTENTE ---
-TELEGRAM_BOT_TOKEN = "8155587974:AAF9OekvBpixtk8ZH6KoIc0L8edbhdXt7A4"
-TELEGRAM_CHAT_IDS = ["64463169", "24827341", "132455422", "5405962012"]
+import requests
+from dateutil import parser
-# Coordinate (San Marino / Casa)
+# =============================================================================
+# SEVERE WEATHER ALERT (next 24h) - Casa (LAT/LON)
+# - Freezing rain/drizzle (WMO codes 56,57,66,67) -> priorità alta, basta 1 occorrenza
+# - Wind gusts persistence: >= soglia per almeno 2 ore consecutive
+# - Rain persistence: soglia (mm/3h) superata per almeno 2 ore (2 finestre 3h consecutive)
+#
+# Telegram token: NOT in clear.
+# Read order:
+# 1) env TELEGRAM_BOT_TOKEN
+# 2) ~/.telegram_dpc_bot_token
+# 3) /etc/telegram_dpc_bot_token
+#
+# Debug:
+# DEBUG=1 python3 severe_weather.py
+#
+# Log:
+# ./weather_alert.log (same folder as this script)
+# =============================================================================
+
+DEBUG = os.environ.get("DEBUG", "0").strip() == "1"
+
+# ----------------- TELEGRAM -----------------
+TELEGRAM_CHAT_IDS = ["64463169", "24827341", "132455422", "5405962012"]
+TOKEN_FILE_HOME = os.path.expanduser("~/.telegram_dpc_bot_token")
+TOKEN_FILE_ETC = "/etc/telegram_dpc_bot_token"
+
+# ----------------- LOCATION -----------------
LAT = 43.9356
LON = 12.4296
-# --- SOGLIE DI ALLARME ---
-# Vento (Protezione Civile Emilia-Romagna)
-WIND_YELLOW = 62.0 # km/h
-WIND_ORANGE = 75.0 # km/h
-WIND_RED = 88.0 # km/h
+# ----------------- THRESHOLDS -----------------
+# Vento (km/h) - soglie come da tuo set
+WIND_YELLOW = 62.0
+WIND_ORANGE = 75.0
+WIND_RED = 88.0
-# Pioggia
-RAIN_3H_LIMIT = 25.0 # mm in 3 ore
+# Pioggia: mm in 3 ore
+RAIN_3H_LIMIT = 25.0
-# Codici WMO per Gelicidio (Freezing Rain)
-FREEZING_CODES = [56, 57, 66, 67]
+# Persistenza minima richiesta (ore)
+PERSIST_HOURS = 2 # richiesta utente: >=2 ore
-# File di stato
+# Freezing rain/drizzle codes
+FREEZING_CODES = {56, 57, 66, 67}
+
+# ----------------- HORIZON -----------------
+HOURS_AHEAD = 24
+
+# ----------------- FILES -----------------
STATE_FILE = "/home/daniely/docker/telegram-bot/weather_state.json"
+BASE_DIR = os.path.dirname(os.path.abspath(__file__))
+LOG_FILE = os.path.join(BASE_DIR, "weather_alert.log")
-def send_telegram_message(message):
- if not TELEGRAM_BOT_TOKEN or "INSERISCI" in TELEGRAM_BOT_TOKEN:
- print(f"[TEST OUT] {message}")
- return
+# ----------------- OPEN-METEO -----------------
+OPEN_METEO_URL = "https://api.open-meteo.com/v1/forecast"
+TZ = "Europe/Rome"
+TZINFO = ZoneInfo(TZ)
+HTTP_HEADERS = {"User-Agent": "rpi-severe-weather/2.0"}
- url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage"
- for chat_id in TELEGRAM_CHAT_IDS:
- try:
- requests.post(url, json={"chat_id": chat_id, "text": message, "parse_mode": "Markdown"}, timeout=10)
- time.sleep(0.2)
- except: pass
+# Force model: AROME France HD 1.5 km
+MODEL_PRIMARY = "meteofrance_arome_france_hd"
+# Fallback (stessa famiglia Meteo-France) per continuità operativa
+MODEL_FALLBACK = "meteofrance_seamless"
-def get_forecast():
- url = "https://api.open-meteo.com/v1/forecast"
- params = {
- "latitude": LAT, "longitude": LON,
- "hourly": "precipitation,windgusts_10m,weathercode",
- "models": "arome_france_hd",
- "timezone": "Europe/Rome",
- "forecast_days": 2
- }
+# Se True, invia messaggio "rientrata" quando tutto torna sotto soglia (non è un errore)
+SEND_ALL_CLEAR = True
+
+
+# =============================================================================
+# LOGGING
+# =============================================================================
+def setup_logger() -> logging.Logger:
+ logger = logging.getLogger("severe_weather")
+ logger.setLevel(logging.DEBUG if DEBUG else logging.INFO)
+ logger.handlers.clear()
+
+ fh = RotatingFileHandler(LOG_FILE, maxBytes=1_000_000, backupCount=5, encoding="utf-8")
+ fh.setLevel(logging.DEBUG)
+ fmt = logging.Formatter("%(asctime)s %(levelname)s %(message)s")
+ fh.setFormatter(fmt)
+ logger.addHandler(fh)
+
+ if DEBUG:
+ sh = logging.StreamHandler()
+ sh.setLevel(logging.DEBUG)
+ sh.setFormatter(fmt)
+ logger.addHandler(sh)
+
+ return logger
+
+
+LOGGER = setup_logger()
+
+
+# =============================================================================
+# UTILS
+# =============================================================================
+def ensure_parent_dir(path: str) -> None:
+ parent = os.path.dirname(path)
+ if parent and not os.path.exists(parent):
+ os.makedirs(parent, exist_ok=True)
+
+
+def now_local() -> datetime.datetime:
+ return datetime.datetime.now(TZINFO)
+
+
+def read_text_file(path: str) -> str:
try:
- r = requests.get(url, params=params, timeout=10)
- r.raise_for_status()
- return r.json()
- except: return None
+ with open(path, "r", encoding="utf-8") as f:
+ return f.read().strip()
+ except FileNotFoundError:
+ return ""
+ except PermissionError:
+ LOGGER.debug("Permission denied reading %s", path)
+ return ""
+ except Exception as e:
+ LOGGER.exception("Error reading %s: %s", path, e)
+ return ""
-def load_state():
+
+def load_bot_token() -> str:
+ tok = os.environ.get("TELEGRAM_BOT_TOKEN", "").strip()
+ if tok:
+ return tok
+ tok = read_text_file(TOKEN_FILE_HOME)
+ if tok:
+ return tok
+ tok = read_text_file(TOKEN_FILE_ETC)
+ return tok.strip() if tok else ""
+
+
+def parse_time_to_local(t: str) -> datetime.datetime:
+ """
+ Robust timezone handling:
+ - If timestamps are naive (common when timezone=Europe/Rome), interpret as Europe/Rome.
+ - If timestamps include offset, convert to Europe/Rome.
+ """
+ dt = parser.isoparse(t)
+ if dt.tzinfo is None:
+ return dt.replace(tzinfo=TZINFO)
+ return dt.astimezone(TZINFO)
+
+
+def hhmm(dt: datetime.datetime) -> str:
+ return dt.strftime("%H:%M")
+
+
+# =============================================================================
+# TELEGRAM
+# =============================================================================
+def telegram_send_html(message_html: str) -> bool:
+ """
+ Never raises. Returns True if at least one chat_id succeeded.
+ IMPORTANT: called only on REAL ALERTS (not on errors).
+ """
+ token = load_bot_token()
+ if not token:
+ LOGGER.warning("Telegram token missing: message not sent.")
+ return False
+
+ url = f"https://api.telegram.org/bot{token}/sendMessage"
+ base_payload = {
+ "text": message_html,
+ "parse_mode": "HTML",
+ "disable_web_page_preview": True,
+ }
+
+ sent_ok = False
+ with requests.Session() as s:
+ for chat_id in TELEGRAM_CHAT_IDS:
+ payload = dict(base_payload)
+ payload["chat_id"] = chat_id
+ try:
+ resp = s.post(url, json=payload, timeout=15)
+ if resp.status_code == 200:
+ sent_ok = True
+ else:
+ LOGGER.error("Telegram error chat_id=%s status=%s body=%s",
+ chat_id, resp.status_code, resp.text[:500])
+ time.sleep(0.25)
+ except Exception as e:
+ LOGGER.exception("Telegram exception chat_id=%s err=%s", chat_id, e)
+
+ return sent_ok
+
+
+# =============================================================================
+# STATE
+# =============================================================================
+def load_state() -> Dict:
+ default = {
+ "alert_active": False,
+ "wind_level": 0,
+ "last_wind_peak": 0.0,
+ "last_rain_3h": 0.0,
+ "freezing_active": False,
+ }
if os.path.exists(STATE_FILE):
try:
- with open(STATE_FILE, 'r') as f: return json.load(f)
- except: pass
- return {"alert_active": False, "last_wind": 0, "last_rain": 0, "wind_level": 0, "freezing_active": False}
+ with open(STATE_FILE, "r", encoding="utf-8") as f:
+ data = json.load(f) or {}
+ default.update(data)
+ except Exception as e:
+ LOGGER.exception("State read error: %s", e)
+ return default
-def save_state(state):
+
+def save_state(state: Dict) -> None:
try:
- with open(STATE_FILE, 'w') as f: json.dump(state, f)
- except: pass
+ ensure_parent_dir(STATE_FILE)
+ with open(STATE_FILE, "w", encoding="utf-8") as f:
+ json.dump(state, f, ensure_ascii=False, indent=2)
+ except Exception as e:
+ LOGGER.exception("State write error: %s", e)
-def analyze():
- print("--- Controllo Meteo Severo (Wind/Rain/Ice) ---")
- data = get_forecast()
- if not data: return
- hourly = data.get("hourly", {})
- times = hourly.get("time", [])
- wind = hourly.get("windgusts_10m", [])
- rain = hourly.get("precipitation", [])
- wcode = hourly.get("weathercode", [])
+# =============================================================================
+# OPEN-METEO
+# =============================================================================
+def fetch_forecast(models_value: str) -> Optional[Dict]:
+ params = {
+ "latitude": LAT,
+ "longitude": LON,
+ "hourly": "precipitation,wind_gusts_10m,weather_code",
+ "timezone": TZ,
+ "forecast_days": 2,
+ "wind_speed_unit": "kmh",
+ "precipitation_unit": "mm",
+ "models": models_value,
+ }
- now = datetime.datetime.now(ZoneInfo("Europe/Rome"))
+ try:
+ r = requests.get(OPEN_METEO_URL, params=params, headers=HTTP_HEADERS, timeout=25)
+ if r.status_code == 400:
+ # Log reason if present; no Telegram on errors
+ try:
+ j = r.json()
+ LOGGER.error("Open-Meteo 400 (models=%s): %s", models_value, j.get("reason", j))
+ except Exception:
+ LOGGER.error("Open-Meteo 400 (models=%s): %s", models_value, r.text[:500])
+ return None
+ r.raise_for_status()
+ return r.json()
+ except Exception as e:
+ LOGGER.exception("Open-Meteo request error (models=%s): %s", models_value, e)
+ return None
+
+
+def get_forecast() -> Tuple[Optional[Dict], str]:
+ LOGGER.debug("Requesting Open-Meteo with models=%s", MODEL_PRIMARY)
+ data = fetch_forecast(MODEL_PRIMARY)
+ if data is not None:
+ return data, MODEL_PRIMARY
+
+ LOGGER.warning("Primary model failed (%s). Trying fallback=%s", MODEL_PRIMARY, MODEL_FALLBACK)
+ data2 = fetch_forecast(MODEL_FALLBACK)
+ if data2 is not None:
+ return data2, MODEL_FALLBACK
+
+ return None, MODEL_PRIMARY
+
+
+# =============================================================================
+# PERSISTENCE LOGIC
+# =============================================================================
+def best_wind_persistent_level(
+ gusts: List[float],
+ times: List[str],
+ start_idx: int,
+ end_idx: int,
+ persist_hours: int
+) -> Tuple[int, float, str, int]:
+ """
+ Returns:
+ (level, peak_gust_within_level, first_start_hhmm, run_length_hours)
+ Level 0 means not persistent above yellow.
+ Persistence means >= threshold for >= persist_hours consecutive hourly points.
+ """
+ thresholds = [
+ (3, WIND_RED),
+ (2, WIND_ORANGE),
+ (1, WIND_YELLOW),
+ ]
+
+ # Convert times to local datetimes once for speed/readability
+ dt_list = [parse_time_to_local(t) for t in times]
+
+ def find_run(threshold: float) -> Tuple[bool, float, str, int]:
+ consec = 0
+ run_start = None
+ run_peak = 0.0
+ best_start = ""
+ best_peak = 0.0
+ best_len = 0
+
+ for i in range(start_idx, end_idx):
+ try:
+ v = float(gusts[i])
+ except Exception:
+ v = 0.0
+
+ if v >= threshold:
+ if consec == 0:
+ run_start = i
+ run_peak = v
+ else:
+ run_peak = max(run_peak, v)
+ consec += 1
+
+ # record first qualifying run of required length
+ if consec >= persist_hours and run_start is not None:
+ # lock the first time a qualifying run appears, but keep peak within that run length
+ # Extend peak as the run continues
+ # If user prefers "meglio uno in più", we take the first qualifying run.
+ if best_len == 0:
+ best_start = hhmm(dt_list[run_start])
+ best_peak = run_peak
+ best_len = consec
+ else:
+ # If same threshold continues longer, update peak and length
+ if run_start == (run_start if best_len else run_start):
+ best_peak = max(best_peak, run_peak)
+ best_len = max(best_len, consec)
+ else:
+ consec = 0
+ run_start = None
+ run_peak = 0.0
+
+ if best_len >= persist_hours:
+ return True, best_peak, best_start, best_len
+ return False, 0.0, "", 0
+
+ for lvl, thr in thresholds:
+ ok, peak, start_hhmm, run_len = find_run(thr)
+ if ok:
+ return lvl, peak, start_hhmm, run_len
+
+ return 0, 0.0, "", 0
+
+
+def best_rain_persistent_3h(
+ rain: List[float],
+ times: List[str],
+ start_idx: int,
+ end_idx: int,
+ limit_3h: float,
+ persist_hours: int
+) -> Tuple[float, str, int]:
+ """
+ Returns:
+ (max_3h_sum, first_start_hhmm_of_persistent_exceedance, persistence_hours)
+
+ Compute rolling 3h sums (hourly precipitation totals).
+ Persistence >=2h means: at least `persist_hours` consecutive rolling windows exceed the limit.
+ Each shift by 1 hour -> 'persistence_hours' approximates how long intense conditions persist.
+ """
+ if end_idx - start_idx < 3:
+ return 0.0, "", 0
+
+ dt_list = [parse_time_to_local(t) for t in times]
+
+ # rolling sums for each window start i (covers i, i+1, i+2)
+ window_starts = list(range(start_idx, end_idx - 2))
+ sums = []
+ for i in window_starts:
+ try:
+ s = float(rain[i]) + float(rain[i + 1]) + float(rain[i + 2])
+ except Exception:
+ s = 0.0
+ sums.append(s)
+
+ # Find persistent exceedance: sums[j] >= limit for >= persist_hours consecutive j
+ best_max = 0.0
+ best_start = ""
+ best_persist = 0
+
+ consec = 0
+ run_start_j = None
+ run_max = 0.0
+
+ for j, s in enumerate(sums):
+ if s >= limit_3h:
+ if consec == 0:
+ run_start_j = j
+ run_max = s
+ else:
+ run_max = max(run_max, s)
+ consec += 1
+
+ if consec >= persist_hours and run_start_j is not None:
+ # take first persistent run (meglio uno in più), but keep track of max within it
+ if best_persist == 0:
+ start_i = window_starts[run_start_j]
+ best_start = hhmm(dt_list[start_i])
+ best_persist = consec
+ best_max = run_max
+ else:
+ # if same run continues, update
+ best_persist = max(best_persist, consec)
+ best_max = max(best_max, run_max)
+ else:
+ consec = 0
+ run_start_j = None
+ run_max = 0.0
+
+ return best_max, best_start, best_persist
+
+
+# =============================================================================
+# MESSAGE BUILDERS
+# =============================================================================
+def wind_message(level: int, peak: float, start_hhmm: str, run_len: int) -> str:
+ # run_len is in hours (number of consecutive hourly points)
+ if level == 3:
+ icon = "🔴"
+ title = "TEMPESTA (Burrasca fortissima)"
+ thr = WIND_RED
+ elif level == 2:
+ icon = "🟠"
+ title = "VENTO MOLTO FORTE"
+ thr = WIND_ORANGE
+ else:
+ icon = "🟡"
+ title = "VENTO FORTE"
+ thr = WIND_YELLOW
+
+ return (
+ f"{icon} {title}
"
+ f"Persistenza: ≥ {PERSIST_HOURS} ore sopra soglia ({thr:.0f} km/h).
"
+ f"🕒 Inizio stimato: {html.escape(start_hhmm or '—')}
"
+ f"📈 Picco in finestra: {peak:.0f} km/h (run ~{run_len}h)."
+ )
+
+
+def rain_message(max_3h: float, start_hhmm: str, persist_h: int) -> str:
+ return (
+ "🌧️ PIOGGIA INTENSA
"
+ f"Persistenza: ≥ {PERSIST_HOURS} ore sopra soglia ({RAIN_3H_LIMIT:.1f} mm/3h).
"
+ f"🕒 Inizio stimato: {html.escape(start_hhmm or '—')}
"
+ f"📈 Max su 3 ore in finestra: {max_3h:.1f} mm (persistenza ~{persist_h}h)."
+ )
+
+
+# =============================================================================
+# MAIN
+# =============================================================================
+def analyze() -> None:
+ LOGGER.info("--- Controllo Meteo Severo (Wind/Rain/Ice) ---")
+
+ data, model_used = get_forecast()
+ if not data:
+ # No Telegram on errors
+ return
+
+ hourly = (data.get("hourly", {}) or {})
+ times = hourly.get("time", []) or []
+ gusts = hourly.get("wind_gusts_10m", []) or []
+ rain = hourly.get("precipitation", []) or []
+ wcode = hourly.get("weather_code", []) or []
+
+ n = min(len(times), len(gusts), len(rain), len(wcode))
+ if n == 0:
+ LOGGER.error("Empty hourly series (model=%s).", model_used)
+ return
+
+ times = times[:n]
+ gusts = gusts[:n]
+ rain = rain[:n]
+ wcode = wcode[:n]
+
+ now = now_local()
state = load_state()
-
- # Trova indice ora corrente
+ was_alarm = bool(state.get("alert_active", False))
+
+ # Find starting index: first timestep >= now
start_idx = -1
for i, t in enumerate(times):
- if parser.isoparse(t).replace(tzinfo=ZoneInfo("Europe/Rome")) >= now:
+ if parse_time_to_local(t) >= now:
start_idx = i
break
-
- if start_idx == -1: return
+ if start_idx == -1:
+ LOGGER.error("Could not locate current time index in forecast timeline.")
+ return
- # Analisi prossime 12 ore
- end_idx = min(start_idx + 12, len(times))
-
- max_wind = 0.0
- max_wind_time = ""
- sum_rain_3h = 0.0
-
- freezing_rain_detected = False
+ end_idx = min(start_idx + HOURS_AHEAD, n)
+ if end_idx <= start_idx:
+ LOGGER.error("Invalid horizon window (start=%s end=%s).", start_idx, end_idx)
+ return
+
+ if DEBUG:
+ LOGGER.debug("model=%s start_idx=%s end_idx=%s (hours=%s)",
+ model_used, start_idx, end_idx, end_idx - start_idx)
+
+ # --- Freezing detection (no persistence needed) ---
+ freezing_detected = False
freezing_time = ""
-
- # Cerca picchi
for i in range(start_idx, end_idx):
- # Vento
- if wind[i] > max_wind:
- max_wind = wind[i]
- max_wind_time = parser.isoparse(times[i]).strftime('%H:%M')
-
- # Gelicidio
- if wcode[i] in FREEZING_CODES:
- freezing_rain_detected = True
- if not freezing_time: # Prendi il primo orario
- freezing_time = parser.isoparse(times[i]).strftime('%H:%M')
+ try:
+ code = int(wcode[i])
+ except Exception:
+ code = -1
+ if code in FREEZING_CODES:
+ freezing_detected = True
+ if not freezing_time:
+ freezing_time = hhmm(parse_time_to_local(times[i]))
+ break
- # Pioggia (sliding window 3h)
- for i in range(start_idx, end_idx - 3):
- current_sum = sum(rain[i:i+3])
- if current_sum > sum_rain_3h:
- sum_rain_3h = current_sum
+ # --- Wind persistence ---
+ wind_level_curr, wind_peak, wind_start, wind_run_len = best_wind_persistent_level(
+ gusts=gusts,
+ times=times,
+ start_idx=start_idx,
+ end_idx=end_idx,
+ persist_hours=PERSIST_HOURS
+ )
- # --- CLASSIFICAZIONE ---
- alerts = []
+ # --- Rain persistence (3h windows) ---
+ rain_max_3h, rain_start, rain_persist = best_rain_persistent_3h(
+ rain=rain,
+ times=times,
+ start_idx=start_idx,
+ end_idx=end_idx,
+ limit_3h=RAIN_3H_LIMIT,
+ persist_hours=PERSIST_HOURS
+ )
+
+ # --- Decide notifications ---
+ alerts: List[str] = []
should_notify = False
-
- WAS_ALARM = state.get("alert_active", False)
-
- # 1. GELO (Priorità Massima)
- if freezing_rain_detected:
- if not state.get("freezing_active", False):
- alerts.append(f"🧊 **ALLARME GELICIDIO**\nPrevista pioggia che gela (Freezing Rain).\n🕒 Inizio previsto: {freezing_time}\n_Pericolo ghiaccio su strada!_")
+
+ # 1) Freezing rain (priority)
+ if freezing_detected:
+ if not bool(state.get("freezing_active", False)):
+ alerts.append(
+ "🧊 ALLARME GELICIDIO
"
+ "Prevista pioggia che gela (freezing rain/drizzle).
"
+ f"🕒 Inizio stimato: {html.escape(freezing_time or '—')}
"
+ "Pericolo ghiaccio su strada."
+ )
should_notify = True
- state["freezing_active"] = True
+ state["freezing_active"] = True
else:
state["freezing_active"] = False
- # 2. VENTO
- wind_level_curr = 0
- if max_wind > WIND_RED:
- wind_level_curr = 3
- msg = f"🔴 **TEMPESTA (Burrasca Fortissima)**\nRaffiche > {WIND_RED:.0f} km/h (Picco: **{max_wind:.0f}** km/h).\n🕒 Ore {max_wind_time}"
- elif max_wind > WIND_ORANGE:
- wind_level_curr = 2
- msg = f"🟠 **VENTO MOLTO FORTE**\nRaffiche > {WIND_ORANGE:.0f} km/h (Picco: **{max_wind:.0f}** km/h).\n🕒 Ore {max_wind_time}"
- elif max_wind > WIND_YELLOW:
- wind_level_curr = 1
- msg = f"🟡 **VENTO FORTE**\nRaffiche > {WIND_YELLOW:.0f} km/h (Picco: **{max_wind:.0f}** km/h).\n🕒 Ore {max_wind_time}"
-
+ # 2) Wind (persistent)
if wind_level_curr > 0:
- if not WAS_ALARM or wind_level_curr > state.get("wind_level", 0):
- alerts.append(msg)
+ prev_level = int(state.get("wind_level", 0) or 0)
+ if (not was_alarm) or (wind_level_curr > prev_level):
+ alerts.append(wind_message(wind_level_curr, wind_peak, wind_start, wind_run_len))
should_notify = True
- state["wind_level"] = wind_level_curr
- state["last_wind"] = max_wind
+ state["wind_level"] = wind_level_curr
+ state["last_wind_peak"] = float(wind_peak)
+ else:
+ state["wind_level"] = 0
+ state["last_wind_peak"] = 0.0
- # 3. PIOGGIA
- if sum_rain_3h > RAIN_3H_LIMIT:
- if not WAS_ALARM or sum_rain_3h > state.get("last_rain", 0) + 10:
- alerts.append(f"🌧️ **PIOGGIA INTENSA**\nPrevisti **{sum_rain_3h:.1f} mm** in 3 ore.")
- state["last_rain"] = sum_rain_3h
+ # 3) Rain (persistent)
+ if rain_persist >= PERSIST_HOURS and rain_max_3h >= RAIN_3H_LIMIT:
+ prev_rain = float(state.get("last_rain_3h", 0.0) or 0.0)
+ # "Meglio uno in più": notifica anche al primo superamento persistente,
+ # e ri-notifica se peggiora di >= +10mm sul massimo 3h
+ if (not was_alarm) or (rain_max_3h >= prev_rain + 10.0):
+ alerts.append(rain_message(rain_max_3h, rain_start, rain_persist))
should_notify = True
+ state["last_rain_3h"] = float(rain_max_3h)
+ else:
+ state["last_rain_3h"] = 0.0
- # --- INVIO ---
- IS_ALARM_NOW = (wind_level_curr > 0) or (sum_rain_3h > RAIN_3H_LIMIT) or freezing_rain_detected
+ is_alarm_now = (
+ freezing_detected
+ or (wind_level_curr > 0)
+ or (rain_persist >= PERSIST_HOURS and rain_max_3h >= RAIN_3H_LIMIT)
+ )
+ # --- Send only on alerts (never on errors) ---
if should_notify and alerts:
- full_msg = f"⚠️ **AVVISO METEO SEVERO**\n\n" + "\n\n".join(alerts)
- send_telegram_message(full_msg)
+ headline = "⚠️ AVVISO METEO SEVERO"
+ meta = (
+ f"📍 Casa (LAT {LAT:.4f}, LON {LON:.4f})
"
+ f"🕒 Finestra: prossime {HOURS_AHEAD} ore
"
+ f"🛰️ Modello: {html.escape(model_used)}
"
+ f"⏱️ Persistenza minima: {PERSIST_HOURS} ore
"
+ )
+ body = "
".join(alerts)
+ footer = "
Fonte dati: Open-Meteo"
+ msg = f"{headline}
{meta}
{body}{footer}"
+
+ ok = telegram_send_html(msg)
+ if ok:
+ LOGGER.info("Alert sent successfully.")
+ else:
+ LOGGER.warning("Alert NOT sent (token missing or Telegram error).")
+
state["alert_active"] = True
save_state(state)
- print("Allerta inviata.")
+ return
- elif WAS_ALARM and not IS_ALARM_NOW:
- # Allarme rientrato
- msg = f"🟢 **ALLERTA METEO RIENTRATA**\nLe condizioni sono tornate sotto le soglie di guardia."
- send_telegram_message(msg)
- state = {"alert_active": False, "last_wind": 0, "last_rain": 0, "wind_level": 0, "freezing_active": False}
+ # Optional: cleared message (transition only)
+ if SEND_ALL_CLEAR and was_alarm and (not is_alarm_now):
+ msg = (
+ "🟢 ALLERTA METEO RIENTRATA
"
+ "Condizioni rientrate sotto le soglie di guardia.
"
+ f"🕒 Finestra: prossime {HOURS_AHEAD} ore
"
+ f"🛰️ Modello: {html.escape(model_used)}
"
+ f"⏱️ Persistenza minima: {PERSIST_HOURS} ore"
+ )
+ ok = telegram_send_html(msg)
+ if ok:
+ LOGGER.info("All-clear sent successfully.")
+ else:
+ LOGGER.warning("All-clear NOT sent (token missing or Telegram error).")
+
+ state = {
+ "alert_active": False,
+ "wind_level": 0,
+ "last_wind_peak": 0.0,
+ "last_rain_3h": 0.0,
+ "freezing_active": False,
+ }
save_state(state)
- print("Allarme rientrato.")
-
- else:
- # Aggiorna stato parziale se serve (es. vento calato ma ancora presente)
- if not IS_ALARM_NOW:
- state["alert_active"] = False
- save_state(state)
- print(f"Nessun nuovo allarme. W:{max_wind:.0f} R:{sum_rain_3h:.1f} Ice:{freezing_rain_detected}")
+ return
+
+ # No new alert
+ state["alert_active"] = bool(is_alarm_now)
+ save_state(state)
+ LOGGER.info(
+ "No new alert. model=%s wind_level=%s rain3h=%.1fmm(persist=%sh) ice=%s",
+ model_used, wind_level_curr, rain_max_3h, rain_persist, freezing_detected
+ )
+
if __name__ == "__main__":
analyze()
diff --git a/services/telegram-bot/student_alert.py b/services/telegram-bot/student_alert.py
index 2f300ca..e1a6b48 100644
--- a/services/telegram-bot/student_alert.py
+++ b/services/telegram-bot/student_alert.py
@@ -1,191 +1,474 @@
-import requests
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+
import datetime
-import time
+import html
import json
+import logging
import os
-from dateutil import parser
+import time
+from logging.handlers import RotatingFileHandler
+from typing import Dict, List, Optional, Tuple
from zoneinfo import ZoneInfo
-# --- CONFIGURAZIONE UTENTE ---
-# 👇👇 INSERISCI QUI I TUOI DATI 👇👇
-TELEGRAM_BOT_TOKEN = "8155587974:AAF9OekvBpixtk8ZH6KoIc0L8edbhdXt7A4"
+import requests
+from dateutil import parser
+
+# =============================================================================
+# student_alert.py
+#
+# Scopo:
+# Notificare lo studente (Bologna) se nelle prossime 24 ore sono previsti:
+# - Neve persistente (>= 2 ore consecutive con snowfall > 0)
+# - Pioggia molto forte persistente (>= 2 ore consecutive con 3h-rolling >= soglia)
+# sia a Bologna (trigger) sia lungo il tragitto (caselli A14) fino a San Marino.
+# =============================================================================
+
+DEBUG = os.environ.get("DEBUG", "0").strip() == "1"
+
+# ----------------- TELEGRAM -----------------
TELEGRAM_CHAT_IDS = ["64463169", "24827341", "132455422", "5405962012"]
+TOKEN_FILE_HOME = os.path.expanduser("~/.telegram_dpc_bot_token")
+TOKEN_FILE_ETC = "/etc/telegram_dpc_bot_token"
-# --- SOGLIE DI ALLARME (Bologna) ---
-SOGLIA_NEVE = 0.0 # cm (Basta neve per attivare)
-SOGLIA_PIOGGIA_3H = 30.0 # mm in 3 ore (Pioggia molto forte)
+# ----------------- SOGLIE E LOGICA -----------------
+SOGLIA_PIOGGIA_3H_MM = 30.0 # mm in 3 ore (rolling)
+PERSIST_HOURS = 2 # persistenza minima (ore)
+HOURS_AHEAD = 24
+SNOW_HOURLY_EPS_CM = 0.2 # Soglia minima neve cm/h
-# File di stato per la memoria
+# File di stato
STATE_FILE = "/home/daniely/docker/telegram-bot/student_state.json"
-# --- PUNTI DEL PERCORSO (Caselli A14) ---
+# ----------------- PUNTI DEL PERCORSO (Caselli A14) -----------------
POINTS = [
- {"name": "🎓 Bologna (V. Regnoli)", "lat": 44.4930, "lon": 11.3690, "type": "trigger"},
+ {"name": "🎓 Bologna (V. Regnoli)", "lat": 44.4930, "lon": 11.3690, "type": "trigger"},
{"name": "🛣️ Casello Imola", "lat": 44.3798, "lon": 11.7397, "type": "route"},
{"name": "🛣️ Casello Faenza", "lat": 44.3223, "lon": 11.9040, "type": "route"},
{"name": "🛣️ Casello Forlì", "lat": 44.2502, "lon": 12.0910, "type": "route"},
{"name": "🛣️ Casello Cesena", "lat": 44.1675, "lon": 12.2835, "type": "route"},
{"name": "🛣️ Casello Rimini", "lat": 44.0362, "lon": 12.5659, "type": "route"},
- {"name": "🏠 San Marino", "lat": 43.9356, "lon": 12.4296, "type": "end"}
+ {"name": "🏠 San Marino", "lat": 43.9356, "lon": 12.4296, "type": "end"},
]
-# --- FUNZIONI DI UTILITÀ ---
+# ----------------- OPEN-METEO -----------------
+OPEN_METEO_URL = "https://api.open-meteo.com/v1/forecast"
+TZ = "Europe/Rome"
+TZINFO = ZoneInfo(TZ)
+HTTP_HEADERS = {"User-Agent": "rpi-student-alert/1.0"}
+MODEL = "meteofrance_arome_france_hd"
-def load_last_state():
- """Legge se c'era un allerta attiva"""
- if not os.path.exists(STATE_FILE): return False
- try:
- with open(STATE_FILE, 'r') as f:
- data = json.load(f)
- return data.get("alert_active", False)
- except: return False
+# ----------------- LOG -----------------
+BASE_DIR = os.path.dirname(os.path.abspath(__file__))
+LOG_FILE = os.path.join(BASE_DIR, "student_alert.log")
-def save_current_state(is_active):
- """Salva lo stato corrente"""
+
+def setup_logger() -> logging.Logger:
+ logger = logging.getLogger("student_alert")
+ logger.setLevel(logging.DEBUG if DEBUG else logging.INFO)
+ logger.handlers.clear()
+
+ fh = RotatingFileHandler(LOG_FILE, maxBytes=1_000_000, backupCount=5, encoding="utf-8")
+ fh.setLevel(logging.DEBUG)
+ fmt = logging.Formatter("%(asctime)s %(levelname)s %(message)s")
+ fh.setFormatter(fmt)
+ logger.addHandler(fh)
+
+ if DEBUG:
+ sh = logging.StreamHandler()
+ sh.setLevel(logging.DEBUG)
+ sh.setFormatter(fmt)
+ logger.addHandler(sh)
+
+ return logger
+
+
+LOGGER = setup_logger()
+
+
+# =============================================================================
+# Utility
+# =============================================================================
+def now_local() -> datetime.datetime:
+ return datetime.datetime.now(TZINFO)
+
+
+def read_text_file(path: str) -> str:
try:
- with open(STATE_FILE, 'w') as f:
- json.dump({"alert_active": is_active, "updated": str(datetime.datetime.now())}, f)
+ with open(path, "r", encoding="utf-8") as f:
+ return f.read().strip()
+ except FileNotFoundError:
+ return ""
+ except PermissionError:
+ LOGGER.debug("Permission denied reading %s", path)
+ return ""
except Exception as e:
- print(f"Errore salvataggio stato: {e}")
+ LOGGER.exception("Error reading %s: %s", path, e)
+ return ""
-def send_telegram_message(message):
- if not TELEGRAM_BOT_TOKEN or "INSERISCI" in TELEGRAM_BOT_TOKEN:
- print(f"[TEST OUT] {message}")
- return
- url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage"
- for chat_id in TELEGRAM_CHAT_IDS:
+def load_bot_token() -> str:
+ tok = os.environ.get("TELEGRAM_BOT_TOKEN", "").strip()
+ if tok:
+ return tok
+ tok = read_text_file(TOKEN_FILE_HOME)
+ if tok:
+ return tok
+ tok = read_text_file(TOKEN_FILE_ETC)
+ return tok.strip() if tok else ""
+
+
+def parse_time_to_local(t: str) -> datetime.datetime:
+ dt = parser.isoparse(t)
+ if dt.tzinfo is None:
+ return dt.replace(tzinfo=TZINFO)
+ return dt.astimezone(TZINFO)
+
+
+def hhmm(dt: datetime.datetime) -> str:
+ return dt.strftime("%H:%M")
+
+
+# =============================================================================
+# Telegram
+# =============================================================================
+def telegram_send_html(message_html: str) -> bool:
+ token = load_bot_token()
+ if not token:
+ LOGGER.warning("Telegram token missing: message not sent.")
+ return False
+
+ url = f"https://api.telegram.org/bot{token}/sendMessage"
+ base_payload = {
+ "text": message_html,
+ "parse_mode": "HTML",
+ "disable_web_page_preview": True,
+ }
+
+ sent_ok = False
+ with requests.Session() as s:
+ for chat_id in TELEGRAM_CHAT_IDS:
+ payload = dict(base_payload)
+ payload["chat_id"] = chat_id
+ try:
+ resp = s.post(url, json=payload, timeout=15)
+ if resp.status_code == 200:
+ sent_ok = True
+ else:
+ LOGGER.error("Telegram error chat_id=%s status=%s body=%s",
+ chat_id, resp.status_code, resp.text[:500])
+ time.sleep(0.25)
+ except Exception as e:
+ LOGGER.exception("Telegram exception chat_id=%s err=%s", chat_id, e)
+
+ return sent_ok
+
+
+# =============================================================================
+# State & Open-Meteo
+# =============================================================================
+def load_state() -> Dict:
+ default = {"alert_active": False, "signature": "", "updated": ""}
+ if os.path.exists(STATE_FILE):
try:
- requests.post(url, json={"chat_id": chat_id, "text": message, "parse_mode": "Markdown"}, timeout=10)
- time.sleep(0.2)
- except: pass
+ with open(STATE_FILE, "r", encoding="utf-8") as f:
+ data = json.load(f) or {}
+ default.update(data)
+ except Exception as e:
+ LOGGER.exception("State read error: %s", e)
+ return default
-def get_forecast(lat, lon):
- url = "https://api.open-meteo.com/v1/forecast"
+
+def save_state(alert_active: bool, signature: str) -> None:
+ try:
+ os.makedirs(os.path.dirname(STATE_FILE), exist_ok=True)
+ with open(STATE_FILE, "w", encoding="utf-8") as f:
+ json.dump(
+ {"alert_active": alert_active, "signature": signature, "updated": now_local().isoformat()},
+ f,
+ ensure_ascii=False,
+ indent=2,
+ )
+ except Exception as e:
+ LOGGER.exception("State write error: %s", e)
+
+
+def get_forecast(session: requests.Session, lat: float, lon: float) -> Optional[Dict]:
params = {
- "latitude": lat, "longitude": lon,
+ "latitude": lat,
+ "longitude": lon,
"hourly": "precipitation,snowfall",
- "models": "arome_france_hd",
- "timezone": "Europe/Rome",
- "forecast_days": 2
+ "timezone": TZ,
+ "forecast_days": 2,
+ "precipitation_unit": "mm",
+ "models": MODEL,
}
try:
- res = requests.get(url, params=params, timeout=5)
- res.raise_for_status()
- return res.json()
- except: return None
+ r = session.get(OPEN_METEO_URL, params=params, headers=HTTP_HEADERS, timeout=25)
+ if r.status_code == 400:
+ return None
+ r.raise_for_status()
+ return r.json()
+ except Exception as e:
+ LOGGER.exception("Open-Meteo request error: %s", e)
+ return None
-def get_stats(data):
- if not data: return None
- hourly = data.get("hourly", {})
- times = hourly.get("time", [])
- snow = hourly.get("snowfall", [])
- rain = hourly.get("precipitation", [])
-
- now = datetime.datetime.now(ZoneInfo("Europe/Rome"))
-
+
+# =============================================================================
+# Analytics
+# =============================================================================
+def rolling_sum_3h(values: List[float]) -> List[float]:
+ out = []
+ for i in range(0, max(0, len(values) - 2)):
+ try:
+ s = float(values[i]) + float(values[i + 1]) + float(values[i + 2])
+ except Exception:
+ s = 0.0
+ out.append(s)
+ return out
+
+
+def first_persistent_run(values: List[float], threshold: float, persist: int) -> Tuple[bool, int, int, float]:
+ consec = 0
+ run_start = -1
+ run_max = 0.0
+ for i, v in enumerate(values):
+ vv = float(v) if v is not None else 0.0
+ if vv >= threshold:
+ if consec == 0:
+ run_start = i
+ run_max = vv
+ else:
+ run_max = max(run_max, vv)
+ consec += 1
+ if consec >= persist:
+ return True, run_start, consec, run_max
+ else:
+ consec = 0
+ return False, -1, 0, 0.0
+
+
+def max_consecutive_gt(values: List[float], eps: float) -> Tuple[int, int]:
+ best_len = 0
+ best_start = -1
+ consec = 0
+ start = -1
+ for i, v in enumerate(values):
+ vv = float(v) if v is not None else 0.0
+ if vv > eps:
+ if consec == 0:
+ start = i
+ consec += 1
+ if consec > best_len:
+ best_len = consec
+ best_start = start
+ else:
+ consec = 0
+ return best_len, best_start
+
+
+def compute_stats(data: Dict) -> Optional[Dict]:
+ hourly = data.get("hourly", {}) or {}
+ times = hourly.get("time", []) or []
+ precip = hourly.get("precipitation", []) or []
+ snow = hourly.get("snowfall", []) or []
+
+ n = min(len(times), len(precip), len(snow))
+ if n == 0: return None
+
+ now = now_local()
start_idx = -1
- for i, t in enumerate(times):
- if parser.isoparse(t).replace(tzinfo=ZoneInfo("Europe/Rome")) >= now.replace(minute=0,second=0,microsecond=0):
+ for i, t in enumerate(times[:n]):
+ if parse_time_to_local(t) >= now:
start_idx = i
break
if start_idx == -1: return None
-
- limit = min(start_idx + 24, len(times))
-
- def sum_slice(arr, hours):
- return sum(x for x in arr[start_idx:min(start_idx+hours, limit)] if x)
+
+ end_idx = min(start_idx + HOURS_AHEAD, n)
+ if end_idx <= start_idx: return None
+
+ times_w = times[start_idx:end_idx]
+ precip_w = precip[start_idx:end_idx]
+ snow_w = snow[start_idx:end_idx]
+ dt_w = [parse_time_to_local(t) for t in times_w]
+
+ rain3 = rolling_sum_3h(precip_w)
+ rain3_max = max(rain3) if rain3 else 0.0
+ rain3_max_idx = rain3.index(rain3_max) if rain3 else -1
+ rain3_max_time = hhmm(dt_w[rain3_max_idx]) if (rain3_max_idx >= 0 and rain3_max_idx < len(dt_w)) else ""
+
+ rain_persist_ok, rain_run_start, rain_run_len, rain_run_max = first_persistent_run(
+ rain3, SOGLIA_PIOGGIA_3H_MM, PERSIST_HOURS
+ )
+ rain_persist_time = hhmm(dt_w[rain_run_start]) if (rain_persist_ok and rain_run_start < len(dt_w)) else ""
+
+ snow_run_len, snow_run_start = max_consecutive_gt(snow_w, eps=SNOW_HOURLY_EPS_CM)
+ snow_run_time = hhmm(dt_w[snow_run_start]) if (snow_run_start >= 0 and snow_run_start < len(dt_w)) else ""
+
+ snow_12h = sum(float(x) for x in snow_w[:min(12, len(snow_w))] if x is not None)
+ snow_24h = sum(float(x) for x in snow_w[:min(24, len(snow_w))] if x is not None)
return {
- "snow_3h": sum_slice(snow, 3),
- "snow_6h": sum_slice(snow, 6),
- "snow_12h": sum_slice(snow, 12),
- "snow_24h": sum_slice(snow, 24),
- "rain_3h": sum_slice(rain, 3),
- "rain_max": max(rain[start_idx:limit]) if rain else 0
+ "rain3_max": float(rain3_max),
+ "rain3_max_time": rain3_max_time,
+ "rain_persist_ok": bool(rain_persist_ok),
+ "rain_persist_time": rain_persist_time,
+ "rain_persist_run_max": float(rain_run_max),
+ "rain_persist_run_len": int(rain_run_len),
+ "snow_run_len": int(snow_run_len),
+ "snow_run_time": snow_run_time,
+ "snow_12h": float(snow_12h),
+ "snow_24h": float(snow_24h),
}
-# --- LOGICA PRINCIPALE ---
-def main():
- print("--- Analisi Studente Bologna ---")
-
- now_str = datetime.datetime.now(ZoneInfo("Europe/Rome")).strftime('%H:%M')
-
- # 1. ANALISI BOLOGNA (Il Trigger)
- bo_point = POINTS[0]
- bo_data = get_forecast(bo_point["lat"], bo_point["lon"])
- bo_stats = get_stats(bo_data)
-
- if not bo_stats: return
+def point_alerts(point_name: str, stats: Dict) -> Dict:
+ snow_alert = (stats["snow_run_len"] >= PERSIST_HOURS) and (stats["snow_24h"] > 0.0)
+ rain_alert = bool(stats["rain_persist_ok"])
+ return {
+ "name": point_name,
+ "snow_alert": snow_alert,
+ "rain_alert": rain_alert,
+ "snow_12h": stats["snow_12h"],
+ "snow_24h": stats["snow_24h"],
+ "snow_run_len": stats["snow_run_len"],
+ "snow_run_time": stats["snow_run_time"],
+ "rain3_max": stats["rain3_max"],
+ "rain3_max_time": stats["rain3_max_time"],
+ "rain_persist_time": stats["rain_persist_time"],
+ "rain_persist_run_max": stats["rain_persist_run_max"],
+ "rain_persist_run_len": stats["rain_persist_run_len"],
+ }
- # Controlla se scatta l'allarme
- alarm_snow = bo_stats["snow_24h"] > SOGLIA_NEVE
- alarm_rain = bo_stats["rain_3h"] > SOGLIA_PIOGGIA_3H
-
- # Carica stato precedente
- WAS_ACTIVE = load_last_state()
- # --- SCENARIO A: C'È ALLERTA ---
- if alarm_snow or alarm_rain:
- icon_main = "❄️" if alarm_snow else "🌧️"
-
- msg = f"{icon_main} **ALLERTA METEO BOLOGNA**\n"
- msg += f"📅 _Aggiornamento ore {now_str}_\n\n"
-
- # Dettaglio Bologna
- msg += f"🎓 **A BOLOGNA:**\n"
- if alarm_snow:
- msg += f"• Neve 3h: **{bo_stats['snow_3h']:.1f}** cm\n"
- msg += f"• Neve 6h: **{bo_stats['snow_6h']:.1f}** cm\n"
- msg += f"• Neve 12h: **{bo_stats['snow_12h']:.1f}** cm\n"
- msg += f"• Neve 24h: **{bo_stats['snow_24h']:.1f}** cm\n"
-
- if alarm_rain:
- msg += f"• Pioggia 3h: **{bo_stats['rain_3h']:.1f}** mm (Intensa!)\n"
-
- msg += "\n🚗 **SITUAZIONE AI CASELLI (A14):**\n"
-
- # 2. ANALISI PERCORSO (Solo se c'è allerta)
- route_issues = False
-
- for p in POINTS[1:]:
- stats = get_stats(get_forecast(p["lat"], p["lon"]))
- if not stats: continue
-
- has_snow = stats["snow_24h"] > 0
- has_rain = stats["rain_3h"] > 5.0
-
- if has_snow or has_rain:
- route_issues = True
- line = f"**{p['name']}**: "
- if has_snow: line += f"❄️ {stats['snow_12h']:.1f}cm (12h) "
- if has_rain: line += f"🌧️ {stats['rain_3h']:.1f}mm "
- msg += f"{line}\n"
-
- if not route_issues:
- msg += "✅ I caselli autostradali sembrano puliti."
-
- send_telegram_message(msg)
- save_current_state(True)
- print("Allerta inviata.")
-
- # --- SCENARIO B: ALLARME RIENTRATO ---
- elif not (alarm_snow or alarm_rain) and WAS_ACTIVE:
- msg = (
- f"🟢 **ALLARME RIENTRATO (Bologna)**\n"
- f"📅 _Aggiornamento ore {now_str}_\n\n"
- f"Le previsioni non indicano più neve o piogge critiche per le prossime 24 ore.\n"
- f"Situazione tornata alla normalità."
+def build_signature(bologna: Dict, route: List[Dict]) -> str:
+ parts = [
+ f"BO:snow={int(bologna['snow_alert'])},rain={int(bologna['rain_alert'])},"
+ f"s24={bologna['snow_24h']:.1f},r3max={bologna['rain3_max']:.1f}"
+ ]
+ for r in route:
+ parts.append(
+ f"{r['name']}:s{int(r['snow_alert'])}r{int(r['rain_alert'])}"
+ f":s24={r['snow_24h']:.1f}:r3max={r['rain3_max']:.1f}"
)
- send_telegram_message(msg)
- save_current_state(False)
- print("Allarme rientrato. Notifica inviata.")
+ return "|".join(parts)
+
+
+def main() -> None:
+ LOGGER.info("--- Student alert Bologna (Neve/Pioggia intensa) ---")
+
+ state = load_state()
+ was_active = bool(state.get("alert_active", False))
+ last_sig = state.get("signature", "")
+
+ with requests.Session() as session:
+ # Trigger: Bologna
+ bo = POINTS[0]
+ bo_data = get_forecast(session, bo["lat"], bo["lon"])
+ if not bo_data: return
+ bo_stats = compute_stats(bo_data)
+ if not bo_stats:
+ LOGGER.error("Impossibile calcolare statistiche Bologna.")
+ return
+ bo_alerts = point_alerts(bo["name"], bo_stats)
+
+ # Route points
+ route_alerts: List[Dict] = []
+ for p in POINTS[1:]:
+ d = get_forecast(session, p["lat"], p["lon"])
+ if not d: continue
+ st = compute_stats(d)
+ if not st: continue
+ route_alerts.append(point_alerts(p["name"], st))
+
+ any_route_alert = any(x["snow_alert"] or x["rain_alert"] for x in route_alerts)
+ any_alert = (bo_alerts["snow_alert"] or bo_alerts["rain_alert"] or any_route_alert)
+
+ sig = build_signature(bo_alerts, route_alerts)
+
+ # --- Scenario A: Allerta ---
+ if any_alert:
+ if (not was_active) or (sig != last_sig):
+ now_str = now_local().strftime("%H:%M")
+ header_icon = "❄️" if (bo_alerts["snow_alert"] or any(x["snow_alert"] for x in route_alerts)) \
+ else "🌧️" if (bo_alerts["rain_alert"] or any(x["rain_alert"] for x in route_alerts)) \
+ else "⚠️"
+
+ msg: List[str] = []
+ msg.append(f"{header_icon} ALLERTA METEO (Bologna / Rientro)")
+ msg.append(f"🕒 Aggiornamento ore {html.escape(now_str)}")
+ msg.append(f"🛰️ Modello: {html.escape(MODEL)}")
+ msg.append(f"⏱️ Finestra: {HOURS_AHEAD} ore | Persistenza: {PERSIST_HOURS} ore")
+ msg.append("")
+
+ # Bologna
+ msg.append("🎓 A BOLOGNA")
+ if bo_alerts["snow_alert"]:
+ msg.append(f"❄️ Neve (≥{PERSIST_HOURS}h) da ~{html.escape(bo_alerts['snow_run_time'] or '—')} (run ~{bo_alerts['snow_run_len']}h).")
+ msg.append(f"• Accumulo: 12h {bo_alerts['snow_12h']:.1f} cm | 24h {bo_alerts['snow_24h']:.1f} cm")
+ else:
+ msg.append(f"❄️ Neve: nessuna persistenza ≥ {PERSIST_HOURS}h (24h {bo_alerts['snow_24h']:.1f} cm).")
+
+ if bo_alerts["rain_alert"]:
+ msg.append(f"🌧️ Pioggia molto forte (3h ≥ {SOGLIA_PIOGGIA_3H_MM:.0f} mm, ≥{PERSIST_HOURS}h) da ~{html.escape(bo_alerts['rain_persist_time'] or '—')}.")
+ else:
+ msg.append(f"🌧️ Pioggia: max 3h {bo_alerts['rain3_max']:.1f} mm (picco ~{html.escape(bo_alerts['rain3_max_time'] or '—')}).")
+
+ msg.append("")
+ msg.append("🚗 CASELLI (A14) / TRATTO")
+
+ issues = [x for x in route_alerts if x["snow_alert"] or x["rain_alert"]]
+ if not issues:
+ msg.append("✅ Nessuna criticità persistente rilevata lungo il percorso.")
+ else:
+ for x in issues:
+ line = f"• {html.escape(x['name'])}: "
+ parts: List[str] = []
+ if x["snow_alert"]:
+ parts.append(f"❄️ neve (≥{PERSIST_HOURS}h) da ~{html.escape(x['snow_run_time'] or '—')} (24h {x['snow_24h']:.1f} cm)")
+ if x["rain_alert"]:
+ parts.append(f"🌧️ pioggia forte da ~{html.escape(x['rain_persist_time'] or '—')}")
+ line += " | ".join(parts)
+ msg.append(line)
+
+ msg.append("")
+ msg.append("Fonte dati: Open-Meteo")
+
+ # FIX: usare \n invece di
+ ok = telegram_send_html("\n".join(msg))
+ if ok:
+ LOGGER.info("Notifica inviata.")
+ else:
+ LOGGER.warning("Notifica NON inviata.")
+
+ save_state(True, sig)
+ else:
+ LOGGER.info("Allerta già notificata e invariata.")
+ return
+
+ # --- Scenario B: Rientro ---
+ if was_active and not any_alert:
+ now_str = now_local().strftime("%H:%M")
+ # FIX: usare \n invece di
+ msg = (
+ "🟢 ALLERTA RIENTRATA (Bologna / Rientro)\n"
+ f"🕒 Aggiornamento ore {html.escape(now_str)}\n\n"
+ f"Nelle prossime {HOURS_AHEAD} ore non risultano più condizioni persistenti\n"
+ f"di neve (≥{PERSIST_HOURS}h) o pioggia 3h sopra soglia (≥{PERSIST_HOURS}h).\n"
+ "Fonte dati: Open-Meteo"
+ )
+ ok = telegram_send_html(msg)
+ if ok:
+ LOGGER.info("Rientro notificato.")
+ else:
+ LOGGER.warning("Rientro NON inviato.")
+ save_state(False, "")
+ return
+
+ # --- Scenario C: Tranquillo ---
+ save_state(False, "")
+ LOGGER.info("Nessuna allerta.")
- # --- SCENARIO C: TRANQUILLO ---
- else:
- save_current_state(False)
- print("Nessuna allerta.")
if __name__ == "__main__":
main()