#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import argparse
import datetime
import html as html_lib
import json
import logging
import os
import re
import time
from html.parser import HTMLParser
from logging.handlers import RotatingFileHandler
from typing import List, Optional
from zoneinfo import ZoneInfo
import requests
# =============================================================================
# Modalità debug:
# DEBUG=1 python3 civil_protection.py
# Token Telegram (senza modifiche al codice):
# 1) export TELEGRAM_BOT_TOKEN="...token..."
# 2) oppure file nel tuo home: ~/.telegram_dpc_bot_token
# =============================================================================
DEBUG = os.environ.get("DEBUG", "0").strip() == "1"
BULLETIN_URL = "https://mappe.protezionecivile.gov.it/it/mappe-rischi/bollettino-di-criticita/"
# Chat IDs
TELEGRAM_CHAT_IDS = ["64463169", "24827341", "132455422", "5405962012"]
# Directory dello script (log + state qui, così non serve sudo)
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
STATE_FILE = os.path.join(BASE_DIR, "dpc_state.json")
LOG_FILE = os.path.join(BASE_DIR, "dpc_telegram.log")
# Dove cercare il token (in quest’ordine)
TOKEN_FILE_HOME = os.path.expanduser("~/.telegram_dpc_bot_token")
TOKEN_FILE_ETC = "/etc/telegram_dpc_bot_token"
# Zone target (come compaiono tipicamente nel testo del bollettino)
TARGET_ZONES = {
"EMR-B2": "Costa romagnola",
"EMR-A2": "Alta collina romagnola",
"EMR-D1": "Pianura bolognese",
}
RISK_ICON = {
"IDRAULICO": "💧",
"IDROGEOLOGICO": "⛰️",
"TEMPORALI": "⚡",
}
COLOR_ICON = {
"GIALLA": "🟡",
"ARANCIONE": "🟠",
"ROSSA": "🔴",
}
HTTP_HEADERS = {"User-Agent": "rpi-dpc-bollettino/1.2"}
# =============================================================================
# Logging
# =============================================================================
def setup_logger() -> logging.Logger:
logger = logging.getLogger("dpc_telegram")
logger.setLevel(logging.DEBUG if DEBUG else logging.INFO)
logger.handlers.clear()
fh = RotatingFileHandler(LOG_FILE, maxBytes=1_000_000, backupCount=5, encoding="utf-8")
fh.setLevel(logging.DEBUG)
fmt = logging.Formatter("%(asctime)s %(levelname)s %(message)s")
fh.setFormatter(fmt)
logger.addHandler(fh)
if DEBUG:
sh = logging.StreamHandler()
sh.setLevel(logging.DEBUG)
sh.setFormatter(fmt)
logger.addHandler(sh)
return logger
LOGGER = setup_logger()
# =============================================================================
# Utility
# =============================================================================
def now_rome():
return datetime.datetime.now(ZoneInfo("Europe/Rome"))
def today_str_italy():
return now_rome().strftime("%d/%m/%Y")
def load_text_file(path: str) -> str:
try:
with open(path, "r", encoding="utf-8") as f:
return f.read().strip()
except FileNotFoundError:
return ""
except PermissionError:
# Non consideriamo “errore fatale”: semplicemente quel path non è leggibile.
LOGGER.debug("Permesso negato leggendo %s", path)
return ""
except Exception as e:
LOGGER.exception("Errore leggendo %s: %s", path, e)
return ""
def load_bot_token() -> str:
# 1) env
tok = os.environ.get("TELEGRAM_BOT_TOKEN", "").strip()
if tok:
return tok
# 2) home file (consigliato)
tok = load_text_file(TOKEN_FILE_HOME)
if tok:
return tok
# 3) /etc (solo se leggibile)
tok = load_text_file(TOKEN_FILE_ETC)
if tok:
return tok
return ""
def telegram_send_html(message_html: str, chat_ids: Optional[List[str]] = None) -> bool:
"""
Prova a inviare il messaggio. Non solleva eccezioni.
Ritorna True se almeno un invio ha avuto status 200.
Importante: lo script chiama questa funzione SOLO in caso di allerte.
Args:
message_html: Messaggio HTML da inviare
chat_ids: Lista di chat IDs (default: TELEGRAM_CHAT_IDS)
"""
token = load_bot_token()
if not token:
LOGGER.warning("Token Telegram assente. Nessun invio effettuato.")
return False
if chat_ids is None:
chat_ids = TELEGRAM_CHAT_IDS
url = f"https://api.telegram.org/bot{token}/sendMessage"
base_payload = {
"text": message_html,
"parse_mode": "HTML",
"disable_web_page_preview": True,
}
sent_ok = False
with requests.Session() as s:
for chat_id in chat_ids:
payload = dict(base_payload)
payload["chat_id"] = chat_id
try:
resp = s.post(url, json=payload, timeout=15)
if resp.status_code == 200:
sent_ok = True
else:
LOGGER.error("Telegram error chat_id=%s status=%s body=%s",
chat_id, resp.status_code, resp.text[:500])
time.sleep(0.25)
except Exception as e:
LOGGER.exception("Telegram exception chat_id=%s err=%s", chat_id, e)
return sent_ok
def load_state() -> dict:
try:
if os.path.exists(STATE_FILE):
with open(STATE_FILE, "r", encoding="utf-8") as f:
return json.load(f) or {}
except Exception as e:
LOGGER.exception("Errore lettura stato: %s", e)
return {}
def save_state(state: dict) -> None:
try:
with open(STATE_FILE, "w", encoding="utf-8") as f:
json.dump(state, f, ensure_ascii=False, indent=2)
except Exception as e:
LOGGER.exception("Errore scrittura stato: %s", e)
def norm(s: str) -> str:
return re.sub(r"\s+", " ", (s or "").strip().lower())
class TextExtractor(HTMLParser):
def __init__(self):
super().__init__()
self.parts = []
def handle_data(self, data):
if data and data.strip():
self.parts.append(data)
def html_to_lines(html_text: str) -> list[str]:
p = TextExtractor()
p.feed(html_text)
text = "\n".join(p.parts)
text = html_lib.unescape(text)
lines = []
for raw in text.splitlines():
line = re.sub(r"\s+", " ", raw).strip()
if line:
lines.append(line)
return lines
# =============================================================================
# Parsing bollettino
# =============================================================================
RISK_HEADER_RE = re.compile(
r"^(ORDINARIA|MODERATA|ELEVATA)\s+CRITICITA'\s+PER\s+RISCHIO\s+([A-ZÀ-Ü]+)\s*/\s*ALLERTA\s+(GIALLA|ARANCIONE|ROSSA)\s*:\s*$"
)
DAY_START_RE = re.compile(r"^Per la giornata di (oggi|domani),\s*(.+?):\s*$", re.IGNORECASE)
TITLE_RE = re.compile(r"^Bollettino di Criticità del (.+?) ore (\d{1,2}:\d{2})", re.IGNORECASE)
def fetch_bulletin_text_lines() -> list[str]:
r = requests.get(BULLETIN_URL, headers=HTTP_HEADERS, timeout=25)
r.raise_for_status()
# Forza encoding per evitare “Mercoledì”
r.encoding = "utf-8"
return html_to_lines(r.text)
def parse_bulletin(lines: list[str]) -> dict:
out = {"title": "", "issued_date": "", "issued_time": "", "days": {}}
for ln in lines[:150]:
m = TITLE_RE.match(ln)
if m:
out["title"] = ln
out["issued_date"] = m.group(1).strip()
out["issued_time"] = m.group(2).strip()
break
current_day_key = None
current_day_label = None
current_risk = None
current_color = None
def add_alert(zone_name: str):
if not current_day_key or not current_risk or not current_color:
return
day = out["days"].setdefault(current_day_key, {"date_label": current_day_label or "", "alerts": {}})
icon = COLOR_ICON.get(current_color, "⚪")
ricon = RISK_ICON.get(current_risk, "⚠️")
entry = f"{icon} {ricon} {current_risk}"
day["alerts"].setdefault(zone_name, [])
if entry not in day["alerts"][zone_name]:
day["alerts"][zone_name].append(entry)
for ln in lines:
md = DAY_START_RE.match(ln)
if md:
current_day_key = md.group(1).lower()
current_day_label = md.group(2).strip()
out["days"].setdefault(current_day_key, {"date_label": current_day_label, "alerts": {}})
current_risk = None
current_color = None
continue
mh = RISK_HEADER_RE.match(ln)
if mh:
current_risk = mh.group(2).strip()
current_color = mh.group(3).strip()
continue
if current_day_key and current_risk and current_color and ":" in ln:
region, rest = ln.split(":", 1)
if norm(region) in (norm("Emilia Romagna"), norm("Emilia-Romagna")):
zones = [z.strip() for z in rest.split(",") if z.strip()]
for _, target_zone_name in TARGET_ZONES.items():
for z in zones:
if norm(z) == norm(target_zone_name):
add_alert(target_zone_name)
return out
def has_any_alert(parsed: dict) -> bool:
for day_key in ("oggi", "domani"):
day = parsed.get("days", {}).get(day_key, {})
if day.get("alerts"):
return True
return False
def build_signature(parsed: dict) -> str:
parts = [parsed.get("issued_date", ""), parsed.get("issued_time", ""), parsed.get("title", "")]
for day_key in ("oggi", "domani"):
day = parsed.get("days", {}).get(day_key, {})
parts.append(day_key + ":" + day.get("date_label", ""))
alerts = day.get("alerts", {})
for zone in sorted(alerts.keys()):
parts.append(zone + "=" + ",".join(sorted(alerts[zone])))
return "|".join(parts)
def format_message(parsed: dict) -> str:
issued = parsed.get("title") or "Bollettino di Criticità"
issued_safe = html_lib.escape(issued)
lines = []
lines.append("📢 PROTEZIONE CIVILE (Allerta)")
lines.append(f"🕒 {issued_safe}")
lines.append("")
for day_key in ("oggi", "domani"):
day = parsed.get("days", {}).get(day_key)
if not day:
continue
alerts = day.get("alerts", {})
if not alerts:
continue
lines.append(f"📅 {html_lib.escape(day.get('date_label',''))}")
for zone in sorted(alerts.keys()):
lines.append(f"📍 {html_lib.escape(zone)}")
for entry in alerts[zone]:
lines.append(html_lib.escape(entry))
lines.append("")
lines.append("Fonte: mappe.protezionecivile.gov.it")
return "\n".join(lines)
# =============================================================================
# Main
# =============================================================================
def main(chat_ids: Optional[List[str]] = None, debug_mode: bool = False):
LOGGER.info("--- Controllo Protezione Civile (Bollettino ufficiale) ---")
try:
lines = fetch_bulletin_text_lines()
parsed = parse_bulletin(lines)
except Exception as e:
# NESSUN Telegram in caso di errori: solo log.
LOGGER.exception("Errore durante fetch/parse bollettino: %s", e)
return
if DEBUG:
LOGGER.debug("title=%s", parsed.get("title", ""))
for k in ("oggi", "domani"):
d = parsed.get("days", {}).get(k, {})
LOGGER.debug("%s label=%s", k, d.get("date_label", ""))
LOGGER.debug("%s alerts=%s", k, d.get("alerts", {}))
# Regola: invia Telegram SOLO se esistono allerte (tranne in debug)
if not has_any_alert(parsed):
if debug_mode:
# In modalità debug, crea un messaggio informativo anche se non ci sono allerte
LOGGER.info("[DEBUG MODE] Nessuna allerta, ma creo messaggio informativo")
msg = format_message(parsed)
# Aggiungi prefisso per indicare che non ci sono allerte
msg = f"ℹ️ DEBUG: Nessuna allerta attiva\n\n{msg}"
sent_ok = telegram_send_html(msg, chat_ids=chat_ids)
if sent_ok:
LOGGER.info("Messaggio debug inviato con successo.")
else:
LOGGER.warning("Invio debug non riuscito (token mancante o errore Telegram).")
else:
LOGGER.info("Nessuna allerta nelle zone monitorate. Nessuna notifica inviata.")
return
sig = build_signature(parsed)
state = load_state()
last_sig = state.get("last_alert_signature", "")
# In modalità debug, bypassa controlli anti-spam
if debug_mode:
LOGGER.info("[DEBUG MODE] Bypass anti-spam: invio forzato")
elif sig == last_sig:
LOGGER.info("Allerta già notificata e invariata. Nessuna nuova notifica.")
return
# A questo punto: ci sono allerte e sono nuove -> prova invio
msg = format_message(parsed)
sent_ok = telegram_send_html(msg, chat_ids=chat_ids)
if sent_ok:
LOGGER.info("Notifica allerta inviata con successo.")
save_state({
"date": today_str_italy(),
"last_alert_signature": sig,
})
else:
# Non aggiorniamo lo stato: quando risolvi token/rete, reinvierà.
LOGGER.warning("Invio non riuscito (token mancante o errore Telegram). Stato NON aggiornato.")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Civil protection alert")
parser.add_argument("--debug", action="store_true", help="Invia messaggi solo all'admin (chat ID: %s)" % TELEGRAM_CHAT_IDS[0])
args = parser.parse_args()
# In modalità debug, invia solo al primo chat ID (admin) e bypassa anti-spam
chat_ids = [TELEGRAM_CHAT_IDS[0]] if args.debug else None
main(chat_ids=chat_ids, debug_mode=args.debug)