#!/usr/bin/env python3 # -*- coding: utf-8 -*- import argparse import datetime import html as html_lib import json import logging import os import re import time from html.parser import HTMLParser from logging.handlers import RotatingFileHandler from typing import List, Optional from zoneinfo import ZoneInfo import requests # ============================================================================= # Modalità debug: # DEBUG=1 python3 civil_protection.py # Token Telegram (senza modifiche al codice): # 1) export TELEGRAM_BOT_TOKEN="...token..." # 2) oppure file nel tuo home: ~/.telegram_dpc_bot_token # ============================================================================= DEBUG = os.environ.get("DEBUG", "0").strip() == "1" BULLETIN_URL = "https://mappe.protezionecivile.gov.it/it/mappe-rischi/bollettino-di-criticita/" # Chat IDs TELEGRAM_CHAT_IDS = ["64463169", "24827341", "132455422", "5405962012"] # Directory dello script (log + state qui, così non serve sudo) BASE_DIR = os.path.dirname(os.path.abspath(__file__)) STATE_FILE = os.path.join(BASE_DIR, "dpc_state.json") LOG_FILE = os.path.join(BASE_DIR, "dpc_telegram.log") # Dove cercare il token (in quest’ordine) TOKEN_FILE_HOME = os.path.expanduser("~/.telegram_dpc_bot_token") TOKEN_FILE_ETC = "/etc/telegram_dpc_bot_token" # Zone target (come compaiono tipicamente nel testo del bollettino) TARGET_ZONES = { "EMR-B2": "Costa romagnola", "EMR-A2": "Alta collina romagnola", "EMR-D1": "Pianura bolognese", } RISK_ICON = { "IDRAULICO": "💧", "IDROGEOLOGICO": "⛰️", "TEMPORALI": "⚡", } COLOR_ICON = { "GIALLA": "🟡", "ARANCIONE": "🟠", "ROSSA": "🔴", } HTTP_HEADERS = {"User-Agent": "rpi-dpc-bollettino/1.2"} # ============================================================================= # Logging # ============================================================================= def setup_logger() -> logging.Logger: logger = logging.getLogger("dpc_telegram") logger.setLevel(logging.DEBUG if DEBUG else logging.INFO) logger.handlers.clear() fh = RotatingFileHandler(LOG_FILE, maxBytes=1_000_000, backupCount=5, encoding="utf-8") fh.setLevel(logging.DEBUG) fmt = logging.Formatter("%(asctime)s %(levelname)s %(message)s") fh.setFormatter(fmt) logger.addHandler(fh) if DEBUG: sh = logging.StreamHandler() sh.setLevel(logging.DEBUG) sh.setFormatter(fmt) logger.addHandler(sh) return logger LOGGER = setup_logger() # ============================================================================= # Utility # ============================================================================= def now_rome(): return datetime.datetime.now(ZoneInfo("Europe/Rome")) def today_str_italy(): return now_rome().strftime("%d/%m/%Y") def load_text_file(path: str) -> str: try: with open(path, "r", encoding="utf-8") as f: return f.read().strip() except FileNotFoundError: return "" except PermissionError: # Non consideriamo “errore fatale”: semplicemente quel path non è leggibile. LOGGER.debug("Permesso negato leggendo %s", path) return "" except Exception as e: LOGGER.exception("Errore leggendo %s: %s", path, e) return "" def load_bot_token() -> str: # 1) env tok = os.environ.get("TELEGRAM_BOT_TOKEN", "").strip() if tok: return tok # 2) home file (consigliato) tok = load_text_file(TOKEN_FILE_HOME) if tok: return tok # 3) /etc (solo se leggibile) tok = load_text_file(TOKEN_FILE_ETC) if tok: return tok return "" def telegram_send_html(message_html: str, chat_ids: Optional[List[str]] = None) -> bool: """ Prova a inviare il messaggio. Non solleva eccezioni. Ritorna True se almeno un invio ha avuto status 200. Importante: lo script chiama questa funzione SOLO in caso di allerte. Args: message_html: Messaggio HTML da inviare chat_ids: Lista di chat IDs (default: TELEGRAM_CHAT_IDS) """ # Sanitize unsupported HTML tags (Telegram non supporta
) if message_html: message_html = re.sub(r"<\s*br\s*/?\s*>", "\n", message_html, flags=re.IGNORECASE) token = load_bot_token() if not token: LOGGER.warning("Token Telegram assente. Nessun invio effettuato.") return False if chat_ids is None: chat_ids = TELEGRAM_CHAT_IDS url = f"https://api.telegram.org/bot{token}/sendMessage" base_payload = { "text": message_html, "parse_mode": "HTML", "disable_web_page_preview": True, } sent_ok = False with requests.Session() as s: for chat_id in chat_ids: payload = dict(base_payload) payload["chat_id"] = chat_id try: resp = s.post(url, json=payload, timeout=15) if resp.status_code == 200: sent_ok = True else: LOGGER.error("Telegram error chat_id=%s status=%s body=%s", chat_id, resp.status_code, resp.text[:500]) time.sleep(0.25) except Exception as e: LOGGER.exception("Telegram exception chat_id=%s err=%s", chat_id, e) return sent_ok def load_state() -> dict: try: if os.path.exists(STATE_FILE): with open(STATE_FILE, "r", encoding="utf-8") as f: return json.load(f) or {} except Exception as e: LOGGER.exception("Errore lettura stato: %s", e) return {} def save_state(state: dict) -> None: try: with open(STATE_FILE, "w", encoding="utf-8") as f: json.dump(state, f, ensure_ascii=False, indent=2) except Exception as e: LOGGER.exception("Errore scrittura stato: %s", e) def norm(s: str) -> str: return re.sub(r"\s+", " ", (s or "").strip().lower()) class TextExtractor(HTMLParser): def __init__(self): super().__init__() self.parts = [] def handle_data(self, data): if data and data.strip(): self.parts.append(data) def html_to_lines(html_text: str) -> list[str]: p = TextExtractor() p.feed(html_text) text = "\n".join(p.parts) text = html_lib.unescape(text) lines = [] for raw in text.splitlines(): line = re.sub(r"\s+", " ", raw).strip() if line: lines.append(line) return lines # ============================================================================= # Parsing bollettino # ============================================================================= RISK_HEADER_RE = re.compile( r"^(ORDINARIA|MODERATA|ELEVATA)\s+CRITICITA'\s+PER\s+RISCHIO\s+([A-ZÀ-Ü]+)\s*/\s*ALLERTA\s+(GIALLA|ARANCIONE|ROSSA)\s*:\s*$" ) DAY_START_RE = re.compile(r"^Per la giornata di (oggi|domani),\s*(.+?):\s*$", re.IGNORECASE) TITLE_RE = re.compile(r"^Bollettino di Criticità del (.+?) ore (\d{1,2}:\d{2})", re.IGNORECASE) def fetch_bulletin_text_lines() -> list[str]: r = requests.get(BULLETIN_URL, headers=HTTP_HEADERS, timeout=25) r.raise_for_status() # Forza encoding per evitare “Mercoledì” r.encoding = "utf-8" return html_to_lines(r.text) def parse_bulletin(lines: list[str]) -> dict: out = {"title": "", "issued_date": "", "issued_time": "", "days": {}} for ln in lines[:150]: m = TITLE_RE.match(ln) if m: out["title"] = ln out["issued_date"] = m.group(1).strip() out["issued_time"] = m.group(2).strip() break current_day_key = None current_day_label = None current_risk = None current_color = None def add_alert(zone_name: str): if not current_day_key or not current_risk or not current_color: return day = out["days"].setdefault(current_day_key, {"date_label": current_day_label or "", "alerts": {}}) icon = COLOR_ICON.get(current_color, "⚪") ricon = RISK_ICON.get(current_risk, "⚠️") entry = f"{icon} {ricon} {current_risk}" day["alerts"].setdefault(zone_name, []) if entry not in day["alerts"][zone_name]: day["alerts"][zone_name].append(entry) for ln in lines: md = DAY_START_RE.match(ln) if md: current_day_key = md.group(1).lower() current_day_label = md.group(2).strip() out["days"].setdefault(current_day_key, {"date_label": current_day_label, "alerts": {}}) current_risk = None current_color = None continue mh = RISK_HEADER_RE.match(ln) if mh: current_risk = mh.group(2).strip() current_color = mh.group(3).strip() continue if current_day_key and current_risk and current_color and ":" in ln: region, rest = ln.split(":", 1) if norm(region) in (norm("Emilia Romagna"), norm("Emilia-Romagna")): zones = [z.strip() for z in rest.split(",") if z.strip()] for _, target_zone_name in TARGET_ZONES.items(): for z in zones: if norm(z) == norm(target_zone_name): add_alert(target_zone_name) return out def has_any_alert(parsed: dict) -> bool: for day_key in ("oggi", "domani"): day = parsed.get("days", {}).get(day_key, {}) if day.get("alerts"): return True return False def build_signature(parsed: dict) -> str: parts = [parsed.get("issued_date", ""), parsed.get("issued_time", ""), parsed.get("title", "")] for day_key in ("oggi", "domani"): day = parsed.get("days", {}).get(day_key, {}) parts.append(day_key + ":" + day.get("date_label", "")) alerts = day.get("alerts", {}) for zone in sorted(alerts.keys()): parts.append(zone + "=" + ",".join(sorted(alerts[zone]))) return "|".join(parts) def format_message(parsed: dict) -> str: issued = parsed.get("title") or "Bollettino di Criticità" issued_safe = html_lib.escape(issued) lines = [] lines.append("📢 PROTEZIONE CIVILE (Allerta)") lines.append(f"🕒 {issued_safe}") lines.append("") for day_key in ("oggi", "domani"): day = parsed.get("days", {}).get(day_key) if not day: continue alerts = day.get("alerts", {}) if not alerts: continue lines.append(f"📅 {html_lib.escape(day.get('date_label',''))}") for zone in sorted(alerts.keys()): lines.append(f"📍 {html_lib.escape(zone)}") for entry in alerts[zone]: lines.append(html_lib.escape(entry)) lines.append("") lines.append("Fonte: mappe.protezionecivile.gov.it") return "\n".join(lines) # ============================================================================= # Main # ============================================================================= def main(chat_ids: Optional[List[str]] = None, debug_mode: bool = False): LOGGER.info("--- Controllo Protezione Civile (Bollettino ufficiale) ---") try: lines = fetch_bulletin_text_lines() parsed = parse_bulletin(lines) except Exception as e: # NESSUN Telegram in caso di errori: solo log. LOGGER.exception("Errore durante fetch/parse bollettino: %s", e) return if DEBUG: LOGGER.debug("title=%s", parsed.get("title", "")) for k in ("oggi", "domani"): d = parsed.get("days", {}).get(k, {}) LOGGER.debug("%s label=%s", k, d.get("date_label", "")) LOGGER.debug("%s alerts=%s", k, d.get("alerts", {})) # Regola: invia Telegram SOLO se esistono allerte (tranne in debug) if not has_any_alert(parsed): if debug_mode: # In modalità debug, crea un messaggio informativo anche se non ci sono allerte LOGGER.info("[DEBUG MODE] Nessuna allerta, ma creo messaggio informativo") msg = format_message(parsed) # Aggiungi prefisso per indicare che non ci sono allerte msg = f"ℹ️ DEBUG: Nessuna allerta attiva\n\n{msg}" sent_ok = telegram_send_html(msg, chat_ids=chat_ids) if sent_ok: LOGGER.info("Messaggio debug inviato con successo.") else: LOGGER.warning("Invio debug non riuscito (token mancante o errore Telegram).") else: LOGGER.info("Nessuna allerta nelle zone monitorate. Nessuna notifica inviata.") return sig = build_signature(parsed) state = load_state() last_sig = state.get("last_alert_signature", "") # In modalità debug, bypassa controlli anti-spam if debug_mode: LOGGER.info("[DEBUG MODE] Bypass anti-spam: invio forzato") elif sig == last_sig: LOGGER.info("Allerta già notificata e invariata. Nessuna nuova notifica.") return # A questo punto: ci sono allerte e sono nuove -> prova invio msg = format_message(parsed) sent_ok = telegram_send_html(msg, chat_ids=chat_ids) if sent_ok: LOGGER.info("Notifica allerta inviata con successo.") save_state({ "date": today_str_italy(), "last_alert_signature": sig, }) else: # Non aggiorniamo lo stato: quando risolvi token/rete, reinvierà. LOGGER.warning("Invio non riuscito (token mancante o errore Telegram). Stato NON aggiornato.") if __name__ == "__main__": parser = argparse.ArgumentParser(description="Civil protection alert") parser.add_argument("--debug", action="store_true", help="Invia messaggi solo all'admin (chat ID: %s)" % TELEGRAM_CHAT_IDS[0]) args = parser.parse_args() # In modalità debug, invia solo al primo chat ID (admin) e bypassa anti-spam chat_ids = [TELEGRAM_CHAT_IDS[0]] if args.debug else None main(chat_ids=chat_ids, debug_mode=args.debug)