372 lines
12 KiB
Python
372 lines
12 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
|
||
import datetime
|
||
import html as html_lib
|
||
import json
|
||
import logging
|
||
import os
|
||
import re
|
||
import time
|
||
from html.parser import HTMLParser
|
||
from logging.handlers import RotatingFileHandler
|
||
from zoneinfo import ZoneInfo
|
||
|
||
import requests
|
||
|
||
# =============================================================================
|
||
# Modalità debug:
|
||
# DEBUG=1 python3 civil_protection.py
|
||
# Token Telegram (senza modifiche al codice):
|
||
# 1) export TELEGRAM_BOT_TOKEN="...token..."
|
||
# 2) oppure file nel tuo home: ~/.telegram_dpc_bot_token
|
||
# =============================================================================
|
||
|
||
DEBUG = os.environ.get("DEBUG", "0").strip() == "1"
|
||
|
||
BULLETIN_URL = "https://mappe.protezionecivile.gov.it/it/mappe-rischi/bollettino-di-criticita/"
|
||
|
||
# Chat IDs
|
||
TELEGRAM_CHAT_IDS = ["64463169", "24827341", "132455422", "5405962012"]
|
||
|
||
# Directory dello script (log + state qui, così non serve sudo)
|
||
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
|
||
STATE_FILE = os.path.join(BASE_DIR, "dpc_state.json")
|
||
LOG_FILE = os.path.join(BASE_DIR, "dpc_telegram.log")
|
||
|
||
# Dove cercare il token (in quest’ordine)
|
||
TOKEN_FILE_HOME = os.path.expanduser("~/.telegram_dpc_bot_token")
|
||
TOKEN_FILE_ETC = "/etc/telegram_dpc_bot_token"
|
||
|
||
# Zone target (come compaiono tipicamente nel testo del bollettino)
|
||
TARGET_ZONES = {
|
||
"EMR-B2": "Costa romagnola",
|
||
"EMR-A2": "Alta collina romagnola",
|
||
"EMR-D1": "Pianura bolognese",
|
||
}
|
||
|
||
RISK_ICON = {
|
||
"IDRAULICO": "💧",
|
||
"IDROGEOLOGICO": "⛰️",
|
||
"TEMPORALI": "⚡",
|
||
}
|
||
|
||
COLOR_ICON = {
|
||
"GIALLA": "🟡",
|
||
"ARANCIONE": "🟠",
|
||
"ROSSA": "🔴",
|
||
}
|
||
|
||
HTTP_HEADERS = {"User-Agent": "rpi-dpc-bollettino/1.2"}
|
||
|
||
# =============================================================================
|
||
# Logging
|
||
# =============================================================================
|
||
|
||
def setup_logger() -> logging.Logger:
|
||
logger = logging.getLogger("dpc_telegram")
|
||
logger.setLevel(logging.DEBUG if DEBUG else logging.INFO)
|
||
logger.handlers.clear()
|
||
|
||
fh = RotatingFileHandler(LOG_FILE, maxBytes=1_000_000, backupCount=5, encoding="utf-8")
|
||
fh.setLevel(logging.DEBUG)
|
||
fmt = logging.Formatter("%(asctime)s %(levelname)s %(message)s")
|
||
fh.setFormatter(fmt)
|
||
logger.addHandler(fh)
|
||
|
||
if DEBUG:
|
||
sh = logging.StreamHandler()
|
||
sh.setLevel(logging.DEBUG)
|
||
sh.setFormatter(fmt)
|
||
logger.addHandler(sh)
|
||
|
||
return logger
|
||
|
||
LOGGER = setup_logger()
|
||
|
||
# =============================================================================
|
||
# Utility
|
||
# =============================================================================
|
||
|
||
def now_rome():
|
||
return datetime.datetime.now(ZoneInfo("Europe/Rome"))
|
||
|
||
def today_str_italy():
|
||
return now_rome().strftime("%d/%m/%Y")
|
||
|
||
def load_text_file(path: str) -> str:
|
||
try:
|
||
with open(path, "r", encoding="utf-8") as f:
|
||
return f.read().strip()
|
||
except FileNotFoundError:
|
||
return ""
|
||
except PermissionError:
|
||
# Non consideriamo “errore fatale”: semplicemente quel path non è leggibile.
|
||
LOGGER.debug("Permesso negato leggendo %s", path)
|
||
return ""
|
||
except Exception as e:
|
||
LOGGER.exception("Errore leggendo %s: %s", path, e)
|
||
return ""
|
||
|
||
def load_bot_token() -> str:
|
||
# 1) env
|
||
tok = os.environ.get("TELEGRAM_BOT_TOKEN", "").strip()
|
||
if tok:
|
||
return tok
|
||
|
||
# 2) home file (consigliato)
|
||
tok = load_text_file(TOKEN_FILE_HOME)
|
||
if tok:
|
||
return tok
|
||
|
||
# 3) /etc (solo se leggibile)
|
||
tok = load_text_file(TOKEN_FILE_ETC)
|
||
if tok:
|
||
return tok
|
||
|
||
return ""
|
||
|
||
def telegram_send_html(message_html: str) -> bool:
|
||
"""
|
||
Prova a inviare il messaggio. Non solleva eccezioni.
|
||
Ritorna True se almeno un invio ha avuto status 200.
|
||
Importante: lo script chiama questa funzione SOLO in caso di allerte.
|
||
"""
|
||
token = load_bot_token()
|
||
if not token:
|
||
LOGGER.warning("Token Telegram assente. Nessun invio effettuato.")
|
||
return False
|
||
|
||
url = f"https://api.telegram.org/bot{token}/sendMessage"
|
||
base_payload = {
|
||
"text": message_html,
|
||
"parse_mode": "HTML",
|
||
"disable_web_page_preview": True,
|
||
}
|
||
|
||
sent_ok = False
|
||
with requests.Session() as s:
|
||
for chat_id in TELEGRAM_CHAT_IDS:
|
||
payload = dict(base_payload)
|
||
payload["chat_id"] = chat_id
|
||
try:
|
||
resp = s.post(url, json=payload, timeout=15)
|
||
if resp.status_code == 200:
|
||
sent_ok = True
|
||
else:
|
||
LOGGER.error("Telegram error chat_id=%s status=%s body=%s",
|
||
chat_id, resp.status_code, resp.text[:500])
|
||
time.sleep(0.25)
|
||
except Exception as e:
|
||
LOGGER.exception("Telegram exception chat_id=%s err=%s", chat_id, e)
|
||
|
||
return sent_ok
|
||
|
||
def load_state() -> dict:
|
||
try:
|
||
if os.path.exists(STATE_FILE):
|
||
with open(STATE_FILE, "r", encoding="utf-8") as f:
|
||
return json.load(f) or {}
|
||
except Exception as e:
|
||
LOGGER.exception("Errore lettura stato: %s", e)
|
||
return {}
|
||
|
||
def save_state(state: dict) -> None:
|
||
try:
|
||
with open(STATE_FILE, "w", encoding="utf-8") as f:
|
||
json.dump(state, f, ensure_ascii=False, indent=2)
|
||
except Exception as e:
|
||
LOGGER.exception("Errore scrittura stato: %s", e)
|
||
|
||
def norm(s: str) -> str:
|
||
return re.sub(r"\s+", " ", (s or "").strip().lower())
|
||
|
||
class TextExtractor(HTMLParser):
|
||
def __init__(self):
|
||
super().__init__()
|
||
self.parts = []
|
||
|
||
def handle_data(self, data):
|
||
if data and data.strip():
|
||
self.parts.append(data)
|
||
|
||
def html_to_lines(html_text: str) -> list[str]:
|
||
p = TextExtractor()
|
||
p.feed(html_text)
|
||
text = "\n".join(p.parts)
|
||
text = html_lib.unescape(text)
|
||
|
||
lines = []
|
||
for raw in text.splitlines():
|
||
line = re.sub(r"\s+", " ", raw).strip()
|
||
if line:
|
||
lines.append(line)
|
||
return lines
|
||
|
||
# =============================================================================
|
||
# Parsing bollettino
|
||
# =============================================================================
|
||
|
||
RISK_HEADER_RE = re.compile(
|
||
r"^(ORDINARIA|MODERATA|ELEVATA)\s+CRITICITA'\s+PER\s+RISCHIO\s+([A-ZÀ-Ü]+)\s*/\s*ALLERTA\s+(GIALLA|ARANCIONE|ROSSA)\s*:\s*$"
|
||
)
|
||
DAY_START_RE = re.compile(r"^Per la giornata di (oggi|domani),\s*(.+?):\s*$", re.IGNORECASE)
|
||
TITLE_RE = re.compile(r"^Bollettino di Criticità del (.+?) ore (\d{1,2}:\d{2})", re.IGNORECASE)
|
||
|
||
def fetch_bulletin_text_lines() -> list[str]:
|
||
r = requests.get(BULLETIN_URL, headers=HTTP_HEADERS, timeout=25)
|
||
r.raise_for_status()
|
||
# Forza encoding per evitare “Mercoledì”
|
||
r.encoding = "utf-8"
|
||
return html_to_lines(r.text)
|
||
|
||
def parse_bulletin(lines: list[str]) -> dict:
|
||
out = {"title": "", "issued_date": "", "issued_time": "", "days": {}}
|
||
|
||
for ln in lines[:150]:
|
||
m = TITLE_RE.match(ln)
|
||
if m:
|
||
out["title"] = ln
|
||
out["issued_date"] = m.group(1).strip()
|
||
out["issued_time"] = m.group(2).strip()
|
||
break
|
||
|
||
current_day_key = None
|
||
current_day_label = None
|
||
current_risk = None
|
||
current_color = None
|
||
|
||
def add_alert(zone_name: str):
|
||
if not current_day_key or not current_risk or not current_color:
|
||
return
|
||
day = out["days"].setdefault(current_day_key, {"date_label": current_day_label or "", "alerts": {}})
|
||
icon = COLOR_ICON.get(current_color, "⚪")
|
||
ricon = RISK_ICON.get(current_risk, "⚠️")
|
||
entry = f"{icon} {ricon} {current_risk}"
|
||
day["alerts"].setdefault(zone_name, [])
|
||
if entry not in day["alerts"][zone_name]:
|
||
day["alerts"][zone_name].append(entry)
|
||
|
||
for ln in lines:
|
||
md = DAY_START_RE.match(ln)
|
||
if md:
|
||
current_day_key = md.group(1).lower()
|
||
current_day_label = md.group(2).strip()
|
||
out["days"].setdefault(current_day_key, {"date_label": current_day_label, "alerts": {}})
|
||
current_risk = None
|
||
current_color = None
|
||
continue
|
||
|
||
mh = RISK_HEADER_RE.match(ln)
|
||
if mh:
|
||
current_risk = mh.group(2).strip()
|
||
current_color = mh.group(3).strip()
|
||
continue
|
||
|
||
if current_day_key and current_risk and current_color and ":" in ln:
|
||
region, rest = ln.split(":", 1)
|
||
if norm(region) in (norm("Emilia Romagna"), norm("Emilia-Romagna")):
|
||
zones = [z.strip() for z in rest.split(",") if z.strip()]
|
||
for _, target_zone_name in TARGET_ZONES.items():
|
||
for z in zones:
|
||
if norm(z) == norm(target_zone_name):
|
||
add_alert(target_zone_name)
|
||
|
||
return out
|
||
|
||
def has_any_alert(parsed: dict) -> bool:
|
||
for day_key in ("oggi", "domani"):
|
||
day = parsed.get("days", {}).get(day_key, {})
|
||
if day.get("alerts"):
|
||
return True
|
||
return False
|
||
|
||
def build_signature(parsed: dict) -> str:
|
||
parts = [parsed.get("issued_date", ""), parsed.get("issued_time", ""), parsed.get("title", "")]
|
||
for day_key in ("oggi", "domani"):
|
||
day = parsed.get("days", {}).get(day_key, {})
|
||
parts.append(day_key + ":" + day.get("date_label", ""))
|
||
alerts = day.get("alerts", {})
|
||
for zone in sorted(alerts.keys()):
|
||
parts.append(zone + "=" + ",".join(sorted(alerts[zone])))
|
||
return "|".join(parts)
|
||
|
||
def format_message(parsed: dict) -> str:
|
||
issued = parsed.get("title") or "Bollettino di Criticità"
|
||
issued_safe = html_lib.escape(issued)
|
||
|
||
lines = []
|
||
lines.append("📢 <b>PROTEZIONE CIVILE (Allerta)</b>")
|
||
lines.append(f"🕒 <code>{issued_safe}</code>")
|
||
lines.append("")
|
||
|
||
for day_key in ("oggi", "domani"):
|
||
day = parsed.get("days", {}).get(day_key)
|
||
if not day:
|
||
continue
|
||
alerts = day.get("alerts", {})
|
||
if not alerts:
|
||
continue
|
||
|
||
lines.append(f"📅 <b>{html_lib.escape(day.get('date_label',''))}</b>")
|
||
for zone in sorted(alerts.keys()):
|
||
lines.append(f"📍 <b>{html_lib.escape(zone)}</b>")
|
||
for entry in alerts[zone]:
|
||
lines.append(html_lib.escape(entry))
|
||
lines.append("")
|
||
|
||
lines.append("<i>Fonte: mappe.protezionecivile.gov.it</i>")
|
||
return "\n".join(lines)
|
||
|
||
# =============================================================================
|
||
# Main
|
||
# =============================================================================
|
||
|
||
def main():
|
||
LOGGER.info("--- Controllo Protezione Civile (Bollettino ufficiale) ---")
|
||
|
||
try:
|
||
lines = fetch_bulletin_text_lines()
|
||
parsed = parse_bulletin(lines)
|
||
except Exception as e:
|
||
# NESSUN Telegram in caso di errori: solo log.
|
||
LOGGER.exception("Errore durante fetch/parse bollettino: %s", e)
|
||
return
|
||
|
||
if DEBUG:
|
||
LOGGER.debug("title=%s", parsed.get("title", ""))
|
||
for k in ("oggi", "domani"):
|
||
d = parsed.get("days", {}).get(k, {})
|
||
LOGGER.debug("%s label=%s", k, d.get("date_label", ""))
|
||
LOGGER.debug("%s alerts=%s", k, d.get("alerts", {}))
|
||
|
||
# Regola: invia Telegram SOLO se esistono allerte
|
||
if not has_any_alert(parsed):
|
||
LOGGER.info("Nessuna allerta nelle zone monitorate. Nessuna notifica inviata.")
|
||
return
|
||
|
||
sig = build_signature(parsed)
|
||
state = load_state()
|
||
last_sig = state.get("last_alert_signature", "")
|
||
|
||
if sig == last_sig:
|
||
LOGGER.info("Allerta già notificata e invariata. Nessuna nuova notifica.")
|
||
return
|
||
|
||
# A questo punto: ci sono allerte e sono nuove -> prova invio
|
||
msg = format_message(parsed)
|
||
sent_ok = telegram_send_html(msg)
|
||
|
||
if sent_ok:
|
||
LOGGER.info("Notifica allerta inviata con successo.")
|
||
save_state({
|
||
"date": today_str_italy(),
|
||
"last_alert_signature": sig,
|
||
})
|
||
else:
|
||
# Non aggiorniamo lo stato: quando risolvi token/rete, reinvierà.
|
||
LOGGER.warning("Invio non riuscito (token mancante o errore Telegram). Stato NON aggiornato.")
|
||
|
||
if __name__ == "__main__":
|
||
main()
|