Aggiornamento script meteo (Vigilia di Natale)

This commit is contained in:
2025-12-24 19:44:39 +01:00
parent 1ccb77c7ae
commit f2fe1528d4
3 changed files with 1368 additions and 405 deletions

View File

@@ -1,151 +1,371 @@
import requests
import json
import os
import time
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import datetime
import html as html_lib
import json
import logging
import os
import re
import time
from html.parser import HTMLParser
from logging.handlers import RotatingFileHandler
from zoneinfo import ZoneInfo
# --- CONFIGURAZIONE UTENTE ---
# 👇👇 INSERISCI QUI I TUOI DATI 👇👇
TELEGRAM_BOT_TOKEN = "8155587974:AAF9OekvBpixtk8ZH6KoIc0L8edbhdXt7A4"
import requests
# =============================================================================
# Modalità debug:
# DEBUG=1 python3 civil_protection.py
# Token Telegram (senza modifiche al codice):
# 1) export TELEGRAM_BOT_TOKEN="...token..."
# 2) oppure file nel tuo home: ~/.telegram_dpc_bot_token
# =============================================================================
DEBUG = os.environ.get("DEBUG", "0").strip() == "1"
BULLETIN_URL = "https://mappe.protezionecivile.gov.it/it/mappe-rischi/bollettino-di-criticita/"
# Chat IDs
TELEGRAM_CHAT_IDS = ["64463169", "24827341", "132455422", "5405962012"]
# --- ZONE DA MONITORARE ---
# EMR-B2 = Costa Romagnola (Rimini/Dogana)
# EMR-A2 = Alta Collina Romagnola (San Marino/Titano)
# EMR-D1 = Pianura Bolognese (Bologna Città)
TARGET_ZONES = ["EMR-B2", "EMR-A2", "EMR-D1"]
# Directory dello script (log + state qui, così non serve sudo)
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
STATE_FILE = os.path.join(BASE_DIR, "dpc_state.json")
LOG_FILE = os.path.join(BASE_DIR, "dpc_telegram.log")
# URL Ufficiale DPC
DPC_URL = "https://raw.githubusercontent.com/pcm-dpc/DPC-Bollettini-Criticita-Idrogeologica-Idraulica/master/files/geojson/today.json"
# Dove cercare il token (in questordine)
TOKEN_FILE_HOME = os.path.expanduser("~/.telegram_dpc_bot_token")
TOKEN_FILE_ETC = "/etc/telegram_dpc_bot_token"
# File di stato
STATE_FILE = "/home/daniely/docker/telegram-bot/dpc_state.json"
# Mappe
RISK_MAP = {
1: "🟡 GIALLA",
2: "🟠 ARANCIONE",
3: "🔴 ROSSA"
# Zone target (come compaiono tipicamente nel testo del bollettino)
TARGET_ZONES = {
"EMR-B2": "Costa romagnola",
"EMR-A2": "Alta collina romagnola",
"EMR-D1": "Pianura bolognese",
}
RISK_TYPES = {
"idro": "💧 Idraulico",
"idrogeo": "⛰️ Idrogeologico",
"temporali": " Temporali",
"vento": "💨 Vento",
"neve": "❄️ Neve",
"ghiaccio": "🧊 Ghiaccio",
"mare": "🌊 Mareggiate"
RISK_ICON = {
"IDRAULICO": "💧",
"IDROGEOLOGICO": "⛰️",
"TEMPORALI": "",
}
def get_zone_label(zone_id):
if zone_id == "EMR-B2": return "Rimini / Bassa RSM"
if zone_id == "EMR-A2": return "Alta RSM / Carpegna"
if zone_id == "EMR-D1": return "Bologna Città"
return zone_id
COLOR_ICON = {
"GIALLA": "🟡",
"ARANCIONE": "🟠",
"ROSSA": "🔴",
}
def send_telegram_message(message):
if not TELEGRAM_BOT_TOKEN or "INSERISCI" in TELEGRAM_BOT_TOKEN:
print(f"[TEST OUT] {message}")
return
HTTP_HEADERS = {"User-Agent": "rpi-dpc-bollettino/1.2"}
url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage"
for chat_id in TELEGRAM_CHAT_IDS:
try:
requests.post(url, json={"chat_id": chat_id, "text": message, "parse_mode": "Markdown"}, timeout=10)
time.sleep(0.2)
except: pass
# =============================================================================
# Logging
# =============================================================================
def load_last_alert():
if os.path.exists(STATE_FILE):
try:
with open(STATE_FILE, 'r') as f: return json.load(f)
except: pass
def setup_logger() -> logging.Logger:
logger = logging.getLogger("dpc_telegram")
logger.setLevel(logging.DEBUG if DEBUG else logging.INFO)
logger.handlers.clear()
fh = RotatingFileHandler(LOG_FILE, maxBytes=1_000_000, backupCount=5, encoding="utf-8")
fh.setLevel(logging.DEBUG)
fmt = logging.Formatter("%(asctime)s %(levelname)s %(message)s")
fh.setFormatter(fmt)
logger.addHandler(fh)
if DEBUG:
sh = logging.StreamHandler()
sh.setLevel(logging.DEBUG)
sh.setFormatter(fmt)
logger.addHandler(sh)
return logger
LOGGER = setup_logger()
# =============================================================================
# Utility
# =============================================================================
def now_rome():
return datetime.datetime.now(ZoneInfo("Europe/Rome"))
def today_str_italy():
return now_rome().strftime("%d/%m/%Y")
def load_text_file(path: str) -> str:
try:
with open(path, "r", encoding="utf-8") as f:
return f.read().strip()
except FileNotFoundError:
return ""
except PermissionError:
# Non consideriamo “errore fatale”: semplicemente quel path non è leggibile.
LOGGER.debug("Permesso negato leggendo %s", path)
return ""
except Exception as e:
LOGGER.exception("Errore leggendo %s: %s", path, e)
return ""
def load_bot_token() -> str:
# 1) env
tok = os.environ.get("TELEGRAM_BOT_TOKEN", "").strip()
if tok:
return tok
# 2) home file (consigliato)
tok = load_text_file(TOKEN_FILE_HOME)
if tok:
return tok
# 3) /etc (solo se leggibile)
tok = load_text_file(TOKEN_FILE_ETC)
if tok:
return tok
return ""
def telegram_send_html(message_html: str) -> bool:
"""
Prova a inviare il messaggio. Non solleva eccezioni.
Ritorna True se almeno un invio ha avuto status 200.
Importante: lo script chiama questa funzione SOLO in caso di allerte.
"""
token = load_bot_token()
if not token:
LOGGER.warning("Token Telegram assente. Nessun invio effettuato.")
return False
url = f"https://api.telegram.org/bot{token}/sendMessage"
base_payload = {
"text": message_html,
"parse_mode": "HTML",
"disable_web_page_preview": True,
}
sent_ok = False
with requests.Session() as s:
for chat_id in TELEGRAM_CHAT_IDS:
payload = dict(base_payload)
payload["chat_id"] = chat_id
try:
resp = s.post(url, json=payload, timeout=15)
if resp.status_code == 200:
sent_ok = True
else:
LOGGER.error("Telegram error chat_id=%s status=%s body=%s",
chat_id, resp.status_code, resp.text[:500])
time.sleep(0.25)
except Exception as e:
LOGGER.exception("Telegram exception chat_id=%s err=%s", chat_id, e)
return sent_ok
def load_state() -> dict:
try:
if os.path.exists(STATE_FILE):
with open(STATE_FILE, "r", encoding="utf-8") as f:
return json.load(f) or {}
except Exception as e:
LOGGER.exception("Errore lettura stato: %s", e)
return {}
def save_current_alert(data):
def save_state(state: dict) -> None:
try:
with open(STATE_FILE, 'w') as f: json.dump(data, f)
except: pass
def analyze_dpc():
print("--- Controllo Protezione Civile ---")
try:
r = requests.get(DPC_URL, timeout=10)
r.raise_for_status()
data = r.json()
with open(STATE_FILE, "w", encoding="utf-8") as f:
json.dump(state, f, ensure_ascii=False, indent=2)
except Exception as e:
print(f"Errore DPC: {e}")
LOGGER.exception("Errore scrittura stato: %s", e)
def norm(s: str) -> str:
return re.sub(r"\s+", " ", (s or "").strip().lower())
class TextExtractor(HTMLParser):
def __init__(self):
super().__init__()
self.parts = []
def handle_data(self, data):
if data and data.strip():
self.parts.append(data)
def html_to_lines(html_text: str) -> list[str]:
p = TextExtractor()
p.feed(html_text)
text = "\n".join(p.parts)
text = html_lib.unescape(text)
lines = []
for raw in text.splitlines():
line = re.sub(r"\s+", " ", raw).strip()
if line:
lines.append(line)
return lines
# =============================================================================
# Parsing bollettino
# =============================================================================
RISK_HEADER_RE = re.compile(
r"^(ORDINARIA|MODERATA|ELEVATA)\s+CRITICITA'\s+PER\s+RISCHIO\s+([A-ZÀ-Ü]+)\s*/\s*ALLERTA\s+(GIALLA|ARANCIONE|ROSSA)\s*:\s*$"
)
DAY_START_RE = re.compile(r"^Per la giornata di (oggi|domani),\s*(.+?):\s*$", re.IGNORECASE)
TITLE_RE = re.compile(r"^Bollettino di Criticità del (.+?) ore (\d{1,2}:\d{2})", re.IGNORECASE)
def fetch_bulletin_text_lines() -> list[str]:
r = requests.get(BULLETIN_URL, headers=HTTP_HEADERS, timeout=25)
r.raise_for_status()
# Forza encoding per evitare “Mercoledì”
r.encoding = "utf-8"
return html_to_lines(r.text)
def parse_bulletin(lines: list[str]) -> dict:
out = {"title": "", "issued_date": "", "issued_time": "", "days": {}}
for ln in lines[:150]:
m = TITLE_RE.match(ln)
if m:
out["title"] = ln
out["issued_date"] = m.group(1).strip()
out["issued_time"] = m.group(2).strip()
break
current_day_key = None
current_day_label = None
current_risk = None
current_color = None
def add_alert(zone_name: str):
if not current_day_key or not current_risk or not current_color:
return
day = out["days"].setdefault(current_day_key, {"date_label": current_day_label or "", "alerts": {}})
icon = COLOR_ICON.get(current_color, "")
ricon = RISK_ICON.get(current_risk, "⚠️")
entry = f"{icon} {ricon} {current_risk}"
day["alerts"].setdefault(zone_name, [])
if entry not in day["alerts"][zone_name]:
day["alerts"][zone_name].append(entry)
for ln in lines:
md = DAY_START_RE.match(ln)
if md:
current_day_key = md.group(1).lower()
current_day_label = md.group(2).strip()
out["days"].setdefault(current_day_key, {"date_label": current_day_label, "alerts": {}})
current_risk = None
current_color = None
continue
mh = RISK_HEADER_RE.match(ln)
if mh:
current_risk = mh.group(2).strip()
current_color = mh.group(3).strip()
continue
if current_day_key and current_risk and current_color and ":" in ln:
region, rest = ln.split(":", 1)
if norm(region) in (norm("Emilia Romagna"), norm("Emilia-Romagna")):
zones = [z.strip() for z in rest.split(",") if z.strip()]
for _, target_zone_name in TARGET_ZONES.items():
for z in zones:
if norm(z) == norm(target_zone_name):
add_alert(target_zone_name)
return out
def has_any_alert(parsed: dict) -> bool:
for day_key in ("oggi", "domani"):
day = parsed.get("days", {}).get(day_key, {})
if day.get("alerts"):
return True
return False
def build_signature(parsed: dict) -> str:
parts = [parsed.get("issued_date", ""), parsed.get("issued_time", ""), parsed.get("title", "")]
for day_key in ("oggi", "domani"):
day = parsed.get("days", {}).get(day_key, {})
parts.append(day_key + ":" + day.get("date_label", ""))
alerts = day.get("alerts", {})
for zone in sorted(alerts.keys()):
parts.append(zone + "=" + ",".join(sorted(alerts[zone])))
return "|".join(parts)
def format_message(parsed: dict) -> str:
issued = parsed.get("title") or "Bollettino di Criticità"
issued_safe = html_lib.escape(issued)
lines = []
lines.append("📢 <b>PROTEZIONE CIVILE (Allerta)</b>")
lines.append(f"🕒 <code>{issued_safe}</code>")
lines.append("")
for day_key in ("oggi", "domani"):
day = parsed.get("days", {}).get(day_key)
if not day:
continue
alerts = day.get("alerts", {})
if not alerts:
continue
lines.append(f"📅 <b>{html_lib.escape(day.get('date_label',''))}</b>")
for zone in sorted(alerts.keys()):
lines.append(f"📍 <b>{html_lib.escape(zone)}</b>")
for entry in alerts[zone]:
lines.append(html_lib.escape(entry))
lines.append("")
lines.append("<i>Fonte: mappe.protezionecivile.gov.it</i>")
return "\n".join(lines)
# =============================================================================
# Main
# =============================================================================
def main():
LOGGER.info("--- Controllo Protezione Civile (Bollettino ufficiale) ---")
try:
lines = fetch_bulletin_text_lines()
parsed = parse_bulletin(lines)
except Exception as e:
# NESSUN Telegram in caso di errori: solo log.
LOGGER.exception("Errore durante fetch/parse bollettino: %s", e)
return
today_str = datetime.datetime.now(ZoneInfo("Europe/Rome")).strftime('%d/%m/%Y')
active_alerts = {}
max_global_level = 0
# Stringa univoca per identificare se l'allerta è cambiata nel contenuto
current_alert_signature = ""
if DEBUG:
LOGGER.debug("title=%s", parsed.get("title", ""))
for k in ("oggi", "domani"):
d = parsed.get("days", {}).get(k, {})
LOGGER.debug("%s label=%s", k, d.get("date_label", ""))
LOGGER.debug("%s alerts=%s", k, d.get("alerts", {}))
for feature in data['features']:
props = feature['properties']
zone_id = props.get('zone_id')
if zone_id in TARGET_ZONES:
zone_risks = []
for key, label in RISK_TYPES.items():
level = props.get(f"crit_{key}")
if level and level > 0:
if level > max_global_level: max_global_level = level
color_icon = "🟡" if level == 1 else "🟠" if level == 2 else "🔴"
risk_str = f"{color_icon} {label}"
zone_risks.append(risk_str)
if zone_risks:
label = get_zone_label(zone_id)
active_alerts[label] = zone_risks
# Aggiungiamo alla firma per capire se qualcosa è cambiato
current_alert_signature += f"{zone_id}:{','.join(zone_risks)}|"
# --- LOGICA DI INVIO ---
last_state = load_last_alert()
last_date = last_state.get("date")
last_sig = last_state.get("signature", "")
# Se tutto verde (livello 0)
if max_global_level == 0:
print("Nessuna allerta.")
if last_date == today_str and last_sig != "":
# Opzionale: Potremmo mandare "Allerta Rientrata", ma DPC resetta a mezzanotte.
# Per ora resettiamo solo lo stato.
save_current_alert({"date": today_str, "level": 0, "signature": ""})
# Regola: invia Telegram SOLO se esistono allerte
if not has_any_alert(parsed):
LOGGER.info("Nessuna allerta nelle zone monitorate. Nessuna notifica inviata.")
return
# Invia SE:
# 1. È un giorno nuovo
# 2. OPPURE la situazione è cambiata (es. aggiunto Bologna, o passato da Giallo a Rosso)
if last_date != today_str or current_alert_signature != last_sig:
msg = f"📢 **PROTEZIONE CIVILE (Allerta)**\n"
msg += f"📅 {today_str}\n\n"
for zone_name, risks in active_alerts.items():
msg += f"📍 **{zone_name}**\n"
msg += "\n".join(risks) + "\n\n"
msg += "_Fonte: Dipartimento Protezione Civile_"
send_telegram_message(msg)
print("Allerta inviata.")
save_current_alert({"date": today_str, "level": max_global_level, "signature": current_alert_signature})
sig = build_signature(parsed)
state = load_state()
last_sig = state.get("last_alert_signature", "")
if sig == last_sig:
LOGGER.info("Allerta già notificata e invariata. Nessuna nuova notifica.")
return
# A questo punto: ci sono allerte e sono nuove -> prova invio
msg = format_message(parsed)
sent_ok = telegram_send_html(msg)
if sent_ok:
LOGGER.info("Notifica allerta inviata con successo.")
save_state({
"date": today_str_italy(),
"last_alert_signature": sig,
})
else:
print("Allerta già notificata e invariata.")
# Non aggiorniamo lo stato: quando risolvi token/rete, reinvierà.
LOGGER.warning("Invio non riuscito (token mancante o errore Telegram). Stato NON aggiornato.")
if __name__ == "__main__":
analyze_dpc()
main()