Files
loogle-scripts/services/telegram-bot/check_ghiaccio.py

2281 lines
100 KiB
Python

import argparse
import requests
from open_meteo_client import open_meteo_get
import datetime
import os
import sys
import json
import time
from typing import Dict, List, Tuple, Optional
# Import opzionale di pandas e numpy per analisi avanzata
try:
import pandas as pd
import numpy as np
PANDAS_AVAILABLE = True
except ImportError:
PANDAS_AVAILABLE = False
# Placeholder per evitare errori di riferimento
pd = None
np = None
# --- TELEGRAM CONFIG ---
ADMIN_CHAT_ID = "64463169"
TELEGRAM_CHAT_IDS = ["64463169", "24827341", "132455422", "5405962012"]
# FILES
TOKEN_FILE_HOME = os.path.expanduser("~/.telegram_dpc_bot_token")
TOKEN_FILE_ETC = "/etc/telegram_dpc_bot_token"
# File di stato salvato nella cartella del progetto
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
STATE_FILE = os.path.join(SCRIPT_DIR, ".ghiaccio_multimodel_state.json")
# --- CONFIGURAZIONE GRIGLIA ---
# Griglia più fitta per maggiore precisione di rilevazione
GRID_POINTS = [
{"id": "G01", "name": "Dogana", "lat": 43.9850, "lon": 12.4950},
{"id": "G02", "name": "Serravalle", "lat": 43.9680, "lon": 12.4780},
{"id": "G03", "name": "Galazzano", "lat": 43.9650, "lon": 12.4650},
{"id": "G04", "name": "Acquaviva", "lat": 43.9480, "lon": 12.4180},
{"id": "G05", "name": "Chiesanuova", "lat": 43.9150, "lon": 12.4220},
{"id": "G06", "name": "Domagnano", "lat": 43.9480, "lon": 12.4650},
{"id": "G07", "name": "Centro Storico", "lat": 43.9350, "lon": 12.4450},
{"id": "G08", "name": "Fonte dell'Ovo", "lat": 43.9300, "lon": 12.4480},
{"id": "G09", "name": "Cailungo", "lat": 43.9550, "lon": 12.4500},
{"id": "G10", "name": "Faetano", "lat": 43.9280, "lon": 12.4980},
{"id": "G11", "name": "Fiorentino", "lat": 43.9080, "lon": 12.4580},
{"id": "G12", "name": "Cerbaiola", "lat": 43.8977, "lon": 12.4704},
{"id": "G13", "name": "Confine Chiesanuova", "lat": 43.9050, "lon": 12.4100},
{"id": "G14", "name": "Torraccia", "lat": 43.9544, "lon": 12.5080},
{"id": "G15", "name": "Piandavello", "lat": 43.9501, "lon": 12.4836},
{"id": "G16", "name": "Ponte Mellini", "lat": 43.9668, "lon": 12.5006},
{"id": "G17", "name": "Murata", "lat": 43.9184, "lon": 12.4521},
{"id": "G18", "name": "Borgo Maggiore", "lat": 43.9379, "lon": 12.4488},
{"id": "G19", "name": "Santa Mustiola", "lat": 43.9344, "lon": 12.4357},
{"id": "G20", "name": "Montegiardino", "lat": 43.9097, "lon": 12.4855}
]
# Modelli da consultare (Nome Visualizzato : Slug API)
# 'icon_eu': Ottimo generale | 'meteofrance_seamless': AROME Seamless (alta risoluzione, supporta minutely_15)
MODELS_TO_CHECK = {
"ICON": "icon_eu",
"AROME": "meteofrance_seamless" # Aggiornato da arome_medium per avere dati più recenti e minutely_15
}
# Modello preferito per analisi avanzata ghiaccio (ICON Italia fornisce soil_temperature_0cm)
ICON_ITALIA_MODEL = "italia_meteo_arpae_icon_2i"
def get_bot_token():
paths = [TOKEN_FILE_HOME, TOKEN_FILE_ETC]
for path in paths:
if os.path.exists(path):
try:
with open(path, 'r') as f:
return f.read().strip()
except IOError:
pass
print("ERRORE: Token non trovato.")
sys.exit(1)
def save_current_state(state, report_meta=None):
try:
# Aggiungi timestamp corrente per tracciare quando è stato salvato lo stato
if report_meta is None:
report_meta = {}
state_with_meta = {
"points": state,
"last_update": datetime.datetime.now().isoformat(),
"report_meta": report_meta,
}
with open(STATE_FILE, 'w') as f:
json.dump(state_with_meta, f)
except Exception as e:
print(f"Errore salvataggio stato: {e}")
def load_state_with_meta():
if not os.path.exists(STATE_FILE):
return {}, {}
try:
with open(STATE_FILE, 'r') as f:
data = json.load(f)
# Supporta sia il formato vecchio (solo dict di punti) che nuovo (con metadata)
if isinstance(data, dict) and "points" in data:
return data.get("points", {}), data.get("report_meta", {})
else:
return data, {}
except Exception:
return {}, {}
def normalize_report_meta(report_meta: dict) -> dict:
today = datetime.date.today().isoformat()
date_str = report_meta.get("date")
count = report_meta.get("count", 0)
if date_str != today:
return {"date": today, "count": 0}
try:
count = int(count)
except Exception:
count = 0
return {"date": today, "count": max(0, count)}
def is_important_update(new_level: int, old_level: int, message: str) -> bool:
# Importante se rischio alto (gelicidio) o neve su strada (livello 4).
if max(new_level, old_level) >= 3:
return True
lowered = (message or "").lower()
return "gelicidio" in lowered or "neve" in lowered
def append_report(target_list: list, message: str, important: bool, report_meta: dict, debug_mode: bool) -> None:
DAILY_LIMIT = 3
if important:
target_list.append(message)
return
if report_meta.get("count", 0) >= DAILY_LIMIT:
if debug_mode:
print(" ⏸️ Report non importante saltato: limite giornaliero raggiunto")
return
target_list.append(message)
report_meta["count"] = report_meta.get("count", 0) + 1
def is_improvement_report_allowed() -> bool:
"""
Verifica se è consentito inviare un report di miglioramento.
I report di miglioramento possono essere inviati solo alle ore 7:00, 15:00, 23:00.
Returns:
True se l'ora corrente è 7, 15 o 23, False altrimenti
"""
current_hour = datetime.datetime.now().hour
allowed_hours = [7, 15, 23]
return current_hour in allowed_hours
def get_weather_data(lat, lon, model_slug, include_past_days=1):
"""
Recupera dati meteo. Per AROME Seamless, include anche minutely_15 e parametri isobarici
per l'algoritmo FMI_CLIM di rilevamento gelicidio.
Args:
lat, lon: Coordinate geografiche
model_slug: Slug del modello meteo
include_past_days: Numero di giorni passati da includere (default: 1 per 24h precedenti)
"""
url = "https://api.open-meteo.com/v1/forecast"
# Parametri base per tutti i modelli (inclusi cloud_cover e windspeed_10m per analisi avanzata)
# Aggiunto snowfall e snow_depth per verificare presenza neve (importante per logica "allerta rientrata")
# Aggiunto rain e weathercode per analisi precipitazioni dettagliata
hourly_params = "temperature_2m,dew_point_2m,precipitation,rain,showers,soil_temperature_0cm,relative_humidity_2m,surface_pressure,cloud_cover,windspeed_10m,snowfall,snow_depth,weathercode"
# Per AROME Seamless, aggiungi parametri isobarici per algoritmo FMI_CLIM
if model_slug == "meteofrance_seamless":
hourly_params += ",temperature_925hPa,relative_humidity_925hPa,temperature_850hPa,relative_humidity_850hPa,temperature_700hPa,relative_humidity_700hPa"
params = {
"latitude": lat,
"longitude": lon,
"hourly": hourly_params,
"models": model_slug,
"timezone": "Europe/San_Marino",
"past_days": include_past_days, # Include 24h precedenti per analisi storica
"forecast_days": 2 # Aumentato a 2 giorni per analisi finestra temporale
}
# Aggiungi minutely_15 per AROME Seamless (risoluzione 15 minuti)
if model_slug == "meteofrance_seamless":
params["minutely_15"] = "temperature_2m,precipitation,rain,snowfall"
try:
response = open_meteo_get(url, params=params, timeout=(5, 15))
response.raise_for_status()
return response.json()
except Exception as e:
print(f"Errore richiesta API per {model_slug}: {e}")
return None
# =============================================================================
# Parametri di Calibrazione FMI_CLIM (Kämäräinen et al. 2017, Tabella 1)
# Modificati per ridurre falsi positivi mantenendo alta sensibilità
# =============================================================================
H_COLD_THR = 69.0 # hPa (Profondità minima strato freddo)
T_COLD_THR = 0.09 # °C (Temp max al suolo considerata 'fredda') - mantenuta bassa per evitare falsi negativi
T_MELT_THR = 0.0 # °C (Temp min per considerare uno strato 'in fusione') - aumentata da -0.64°C a 0.0°C per ridurre falsi positivi mantenendo sensibilità
RH_MELT_THR = 89.0 # % (Umidità relativa minima nello strato di fusione)
PR_THR_6H = 0.39 # mm/6h
PR_THR_1H = 0.1 # mm/h - aumentata da 0.065 per richiedere precipitazione più significativa
# Differenza minima temperatura tra strato di fusione e suolo (per ridurre falsi positivi)
T_MELT_SURFACE_DIFF = 1.0 # °C - lo strato di fusione deve essere almeno 1°C più caldo del suolo (bilanciato tra riduzione falsi positivi e mantenimento sensibilità)
# Livelli di pressione analizzati dall'algoritmo
PRESSURE_LEVELS = [925, 850, 700]
def detect_freezing_rain_fmi(hourly_data, idx, icon_hourly_data=None):
"""
Implementa l'algoritmo FMI_CLIM per rilevare gelicidio (Freezing Rain - FZRA).
Basato su Kämäräinen et al. (2017).
Args:
hourly_data: Dati hourly del modello principale (AROME Seamless)
idx: Indice dell'ora corrente
icon_hourly_data: Dati hourly di ICON (opzionale, per ottenere soil_temperature_0cm se AROME non lo fornisce)
Returns: (is_fzra: bool, details: str)
"""
try:
# Pre-condizioni: estrazione dati superficie
t_2m = hourly_data.get("temperature_2m", [None])[idx] if idx < len(hourly_data.get("temperature_2m", [])) else None
t_soil = hourly_data.get("soil_temperature_0cm", [None])[idx] if idx < len(hourly_data.get("soil_temperature_0cm", [])) else None
# Se AROME non fornisce soil_temperature_0cm (None o NaN), prova a ottenerlo da ICON
# AROME Seamless spesso non fornisce questo parametro, quindi usiamo ICON come fallback
t_soil_from_icon = False
if t_soil is None and icon_hourly_data:
icon_times = icon_hourly_data.get("time", [])
if icon_times and len(icon_times) > idx:
icon_t_soil = icon_hourly_data.get("soil_temperature_0cm", [None])[idx] if idx < len(icon_hourly_data.get("soil_temperature_0cm", [])) else None
if icon_t_soil is not None:
t_soil = icon_t_soil
t_soil_from_icon = True
precip = hourly_data.get("precipitation", [None])[idx] if idx < len(hourly_data.get("precipitation", [])) else None
p_surf = hourly_data.get("surface_pressure", [None])[idx] if idx < len(hourly_data.get("surface_pressure", [])) else None
# Verifica disponibilità parametri isobarici
has_isobaric = all(
f"temperature_{pl}hPa" in hourly_data and
f"relative_humidity_{pl}hPa" in hourly_data
for pl in PRESSURE_LEVELS
)
if not has_isobaric or precip is None or p_surf is None:
return False, ""
# --- Pre-condizioni (Fig. 2 Pseudo-code) ---
# A. C'è precipitazione sufficiente?
if precip <= PR_THR_1H:
return False, ""
# B. La temperatura al suolo è sufficientemente bassa?
# PRIORITÀ: usa soil_temperature_0cm se disponibile (più accurato per gelicidio)
# Fallback a t_2m se soil_temperature non disponibile
t_surface = t_soil if t_soil is not None else t_2m
if t_surface is None:
return False, ""
if t_surface > T_COLD_THR:
return False, ""
# C. C'è almeno un livello in quota più caldo della soglia di fusione?
t_925 = hourly_data.get("temperature_925hPa", [None])[idx] if idx < len(hourly_data.get("temperature_925hPa", [])) else None
t_850 = hourly_data.get("temperature_850hPa", [None])[idx] if idx < len(hourly_data.get("temperature_850hPa", [])) else None
t_700 = hourly_data.get("temperature_700hPa", [None])[idx] if idx < len(hourly_data.get("temperature_700hPa", [])) else None
if any(t is None for t in [t_925, t_850, t_700]):
return False, ""
t_max_aloft = max(t_925, t_850, t_700)
if t_max_aloft <= T_MELT_THR:
return False, ""
# --- Logica Strato Freddo e Strato Caldo ---
# Calcolo la pressione limite sopra la quale cercare lo strato di fusione
p_threshold = p_surf - H_COLD_THR
# Trova il livello di pressione standard più vicino che si trova SOPRA la soglia
p_cold = None
for pl in sorted(PRESSURE_LEVELS, reverse=True): # 925, 850, 700
if pl <= p_threshold:
p_cold = pl
break
if p_cold is None:
# La superficie è troppo alta, non c'è spazio per i 69 hPa di strato freddo
return False, ""
# Verifica esistenza "Moist Melt Layer" (Strato umido di fusione)
moist_melt_layer_exists = False
for pl in PRESSURE_LEVELS:
if pl <= p_cold: # Verifica solo i livelli sopra lo strato freddo
t_level = hourly_data.get(f"temperature_{pl}hPa", [None])[idx]
rh_level = hourly_data.get(f"relative_humidity_{pl}hPa", [None])[idx]
if t_level is None or rh_level is None:
continue
# Condizione di fusione: T alta e RH alta
# Aggiunta condizione: lo strato di fusione deve essere significativamente più caldo del suolo
# (per ridurre falsi positivi quando la differenza è minima)
if t_level > T_MELT_THR and rh_level >= RH_MELT_THR:
# Verifica che ci sia una differenza significativa tra strato di fusione e superficie
if t_level >= t_surface + T_MELT_SURFACE_DIFF:
moist_melt_layer_exists = True
break
if moist_melt_layer_exists:
# Mostra temperatura suolo se disponibile, altrimenti temperatura 2m
# Se t_soil proviene da ICON (non da AROME), lo indichiamo nel messaggio
temp_label = "T_suolo"
temp_value = t_soil if t_soil is not None else t_2m
if t_soil is None:
temp_label = "T2m"
elif t_soil_from_icon:
# t_soil proviene da ICON, non da AROME
temp_label = "T_suolo(ICON)"
details = f"{temp_label} {temp_value:.1f}°C, Precip {precip:.2f}mm/h, P_surf {p_surf:.0f}hPa"
return True, details
return False, ""
except (KeyError, TypeError, IndexError) as e:
return False, ""
def analyze_past_24h_conditions(weather_data: Dict) -> Dict:
"""
Analizza le condizioni delle 24 ore precedenti per valutare persistenza ghiaccio.
Returns:
Dict con:
- has_precipitation: bool (se c'è stata pioggia/neve/shower nelle 24h)
- precipitation_types: List[str] (tipi di precipitazione: rain, snowfall, showers)
- total_rain_mm: float
- total_snowfall_cm: float
- total_showers_mm: float
- min_temp_2m: float (temperatura minima)
- min_soil_temp: float (temperatura suolo minima)
- hours_below_zero: int (ore con T<0°C)
- hours_below_zero_soil: int (ore con T_suolo<0°C)
- precipitation_with_freeze: bool (precipitazioni con T<0°C)
- ice_formation_likely: bool (probabile formazione ghiaccio)
- ice_melting_likely: bool (probabile scioglimento ghiaccio)
- history: List[Dict] (storico orario per analisi dinamica)
"""
if not weather_data or "hourly" not in weather_data:
return {}
hourly = weather_data["hourly"]
times = hourly.get("time", [])
if not times:
return {}
# Converti times in datetime
try:
if PANDAS_AVAILABLE:
timestamps = pd.to_datetime(times)
else:
timestamps = []
for t in times:
try:
if isinstance(t, str):
timestamps.append(datetime.datetime.fromisoformat(t.replace('Z', '+00:00')))
else:
timestamps.append(t)
except:
continue
except Exception as e:
print(f"Errore conversione timestamp: {e}")
return {}
now = datetime.datetime.now()
# Filtra solo le 24h precedenti (ora corrente - 24h)
past_24h_start = now - datetime.timedelta(hours=24)
# Estrai dati
temp_2m = hourly.get("temperature_2m", [])
soil_temp = hourly.get("soil_temperature_0cm", [])
precipitation = hourly.get("precipitation", [])
rain = hourly.get("rain", [])
snowfall = hourly.get("snowfall", [])
showers = hourly.get("showers", [])
weathercode = hourly.get("weathercode", [])
# Analizza solo le 24h precedenti
history = []
total_rain = 0.0
total_snowfall = 0.0
total_showers = 0.0
min_temp_2m = None
min_soil_temp = None
hours_below_zero = 0
hours_below_zero_soil = 0
precipitation_types = set()
precipitation_with_freeze = False
for i, ts in enumerate(timestamps):
# Converti timestamp se necessario
if isinstance(ts, str):
try:
ts_dt = datetime.datetime.fromisoformat(ts)
except:
continue
else:
ts_dt = ts
# Solo 24h precedenti
if ts_dt < past_24h_start or ts_dt >= now:
continue
# Estrai valori per questa ora
t_2m = temp_2m[i] if i < len(temp_2m) and temp_2m[i] is not None else None
t_soil = soil_temp[i] if i < len(soil_temp) and soil_temp[i] is not None else None
prec = precipitation[i] if i < len(precipitation) and precipitation[i] is not None else 0.0
r = rain[i] if i < len(rain) and rain[i] is not None else 0.0
snow = snowfall[i] if i < len(snowfall) and snowfall[i] is not None else 0.0
show = showers[i] if i < len(showers) and showers[i] is not None else 0.0
wcode = weathercode[i] if i < len(weathercode) and weathercode[i] is not None else None
# Aggiorna totali
if r > 0:
total_rain += r
precipitation_types.add("rain")
if snow > 0:
total_snowfall += snow
precipitation_types.add("snowfall")
if show > 0:
total_showers += show
precipitation_types.add("showers")
# Aggiorna temperature minime
if t_2m is not None:
if min_temp_2m is None or t_2m < min_temp_2m:
min_temp_2m = t_2m
if t_2m < 0.0:
hours_below_zero += 1
if t_soil is not None:
if min_soil_temp is None or t_soil < min_soil_temp:
min_soil_temp = t_soil
if t_soil < 0.0:
hours_below_zero_soil += 1
# Verifica precipitazioni con temperature sotto zero
if (prec > 0.1 or r > 0.1 or snow > 0.1 or show > 0.1) and (t_2m is not None and t_2m < 0.0) or (t_soil is not None and t_soil < 0.0):
precipitation_with_freeze = True
# Aggiungi a storico
history.append({
"timestamp": ts_dt,
"temp_2m": t_2m,
"soil_temp": t_soil,
"precipitation": prec,
"rain": r,
"snowfall": snow,
"showers": show,
"weathercode": wcode
})
# Analizza l'andamento temporale: trova inizio/fine precipitazioni e andamento temperatura
precipitation_events = []
current_event = None
for h in sorted(history, key=lambda x: x["timestamp"]):
has_prec = (h.get("precipitation", 0) > 0.1 or h.get("rain", 0) > 0.1 or
h.get("snowfall", 0) > 0.1 or h.get("showers", 0) > 0.1)
if has_prec and current_event is None:
# Inizio nuovo evento
current_event = {
"start": h["timestamp"],
"end": h["timestamp"],
"types": set(),
"total_rain": h.get("rain", 0),
"total_snowfall": h.get("snowfall", 0),
"total_showers": h.get("showers", 0),
"temp_at_start": h.get("temp_2m"),
"soil_temp_at_start": h.get("soil_temp"),
}
if h.get("rain", 0) > 0.1:
current_event["types"].add("rain")
if h.get("snowfall", 0) > 0.1:
current_event["types"].add("snowfall")
if h.get("showers", 0) > 0.1:
current_event["types"].add("showers")
elif has_prec and current_event is not None:
# Continua evento
current_event["end"] = h["timestamp"]
current_event["total_rain"] += h.get("rain", 0)
current_event["total_snowfall"] += h.get("snowfall", 0)
current_event["total_showers"] += h.get("showers", 0)
if h.get("rain", 0) > 0.1:
current_event["types"].add("rain")
if h.get("snowfall", 0) > 0.1:
current_event["types"].add("snowfall")
if h.get("showers", 0) > 0.1:
current_event["types"].add("showers")
elif not has_prec and current_event is not None:
# Fine evento
current_event["types"] = list(current_event["types"])
precipitation_events.append(current_event)
current_event = None
# Se c'è un evento ancora in corso, aggiungilo
if current_event is not None:
current_event["types"] = list(current_event["types"])
precipitation_events.append(current_event)
# Analizza andamento temperatura dopo ogni evento precipitativo
for event in precipitation_events:
event_end = event["end"]
# Trova temperature nelle 6 ore successive alla fine dell'evento
temps_after = []
for h in sorted(history, key=lambda x: x["timestamp"]):
if h["timestamp"] > event_end:
hours_after = (h["timestamp"] - event_end).total_seconds() / 3600.0
if hours_after <= 6.0:
temps_after.append({
"hours_after": hours_after,
"temp_2m": h.get("temp_2m"),
"soil_temp": h.get("soil_temp"),
})
event["temps_after"] = temps_after
# Valuta se temperature sono compatibili con persistenza ghiaccio
# Ghiaccio persiste se T < +2°C (aria) o T < +4°C (suolo) dopo le precipitazioni
event["ice_persistence_likely"] = False
if temps_after:
for t_after in temps_after:
t_check = t_after.get("soil_temp") if t_after.get("soil_temp") is not None else t_after.get("temp_2m")
if t_check is not None:
threshold = 4.0 if t_after.get("soil_temp") is not None else 2.0
if t_check < threshold:
event["ice_persistence_likely"] = True
break
# Verifica se c'è ancora un fenomeno precipitativo in atto (nelle prossime ore)
ongoing_precipitation = False
ongoing_precipitation_type = None
now = datetime.datetime.now()
hourly = weather_data.get("hourly", {})
times_future = hourly.get("time", [])
# Cerca nelle prossime 12 ore
if times_future:
try:
current_hour_str = now.strftime("%Y-%m-%dT%H:00")
if current_hour_str in times_future:
idx_now = times_future.index(current_hour_str)
end_idx = min(idx_now + 12, len(times_future))
snowfall_future = hourly.get("snowfall", [])
rain_future = hourly.get("rain", [])
precipitation_future = hourly.get("precipitation", [])
temp_future = hourly.get("temperature_2m", [])
soil_temp_future = hourly.get("soil_temperature_0cm", [])
for i in range(idx_now, end_idx):
snow = snowfall_future[i] if i < len(snowfall_future) and snowfall_future[i] is not None else 0.0
r = rain_future[i] if i < len(rain_future) and rain_future[i] is not None else 0.0
prec = precipitation_future[i] if i < len(precipitation_future) and precipitation_future[i] is not None else 0.0
t_2m = temp_future[i] if i < len(temp_future) and temp_future[i] is not None else None
t_soil = soil_temp_future[i] if i < len(soil_temp_future) and soil_temp_future[i] is not None else None
# Verifica se c'è precipitazione con T<0°C (potenziale black ice o gelicidio)
if (snow > 0.1 or r > 0.1 or prec > 0.1):
ongoing_precipitation = True
if snow > 0.1:
ongoing_precipitation_type = "neve"
elif r > 0.1:
ongoing_precipitation_type = "pioggia"
else:
ongoing_precipitation_type = "precipitazione"
# Verifica se può formare black ice o gelicidio
t_check = t_soil if t_soil is not None else t_2m
if t_check is not None and t_check < 0.0:
ongoing_precipitation_type += " con T<0°C (rischio black ice/gelicidio)"
break
except (ValueError, IndexError):
pass
# Valuta probabile formazione ghiaccio
# Condizione: precipitazioni con T<0°C nelle 24h precedenti
ice_formation_likely = precipitation_with_freeze
# Valuta probabile scioglimento ghiaccio
# Condizione: T è salita sopra soglia di scongelamento (+2°C o +4°C)
# Verifica se nelle ultime ore la temperatura è salita sopra la soglia
ice_melting_likely = False
if history:
# Controlla le ultime 6 ore
recent_history = sorted(history, key=lambda x: x["timestamp"], reverse=True)[:6]
if recent_history:
# Se la temperatura è salita sopra +2°C nelle ultime ore, ghiaccio probabilmente sciolto
# Soglia conservativa: +2°C per aria, +4°C per suolo (più lento a scaldarsi)
for h in recent_history:
t_check = h.get("soil_temp") if h.get("soil_temp") is not None else h.get("temp_2m")
if t_check is not None:
# Soglia: +2°C per aria, +4°C per suolo
threshold = 4.0 if h.get("soil_temp") is not None else 2.0
if t_check > threshold:
ice_melting_likely = True
break
return {
"has_precipitation": total_rain > 0.1 or total_snowfall > 0.1 or total_showers > 0.1,
"precipitation_types": list(precipitation_types),
"total_rain_mm": total_rain,
"total_snowfall_cm": total_snowfall,
"total_showers_mm": total_showers,
"min_temp_2m": min_temp_2m,
"min_soil_temp": min_soil_temp,
"hours_below_zero": hours_below_zero,
"hours_below_zero_soil": hours_below_zero_soil,
"precipitation_with_freeze": precipitation_with_freeze,
"ice_formation_likely": ice_formation_likely,
"ice_melting_likely": ice_melting_likely,
"precipitation_events": precipitation_events, # Lista di eventi precipitativi
"ongoing_precipitation": ongoing_precipitation, # Se c'è ancora precipitazione in atto
"ongoing_precipitation_type": ongoing_precipitation_type, # Tipo di precipitazione in atto
"history": history
}
def calculate_ice_risk_dataframe(weather_data: Dict, model_slug: str = "italia_meteo_arpae_icon_2i",
hours_ahead: int = 24):
"""
Calcola l'Indice di Rischio Ghiaccio Stradale per le prossime 24 ore.
REQUIRES: pandas e numpy installati.
Se non disponibili, solleva ImportError con istruzioni.
"""
if not PANDAS_AVAILABLE:
raise ImportError(
"pandas e numpy sono richiesti per l'analisi avanzata del ghiaccio.\n"
"Installa con: pip install --break-system-packages pandas numpy\n"
"Oppure nel container Docker esegui:\n"
" docker exec -it <container_name> pip install --break-system-packages pandas numpy"
)
"""
Calcola l'Indice di Rischio Ghiaccio Stradale per le prossime 24 ore.
Analisi avanzata basata su logiche fisiche di meteorologia stradale:
1. Rischio Brina (Hoar Frost)
2. Rischio Ghiaccio Nero (Black Ice da bagnatura)
3. Rischio Freezing Rain (già implementato con FMI_CLIM)
4. Effetto Cielo Sereno (raffreddamento radiativo)
Args:
weather_data: Dati meteo da Open-Meteo API
model_slug: Slug del modello (default: "icon_eu")
hours_ahead: Numero di ore da analizzare (default: 24)
Returns:
DataFrame Pandas con colonne:
- timestamp: datetime
- temp_2m: Temperatura a 2m
- dewpoint_2m: Punto di rugiada a 2m
- precipitation: Precipitazione
- soil_temp_0cm: Temperatura suolo (0cm)
- cloud_cover: Copertura nuvolosa totale
- wind_speed: Velocità vento
- Ice_Warning_Level: None, Low, Medium, High
- Ice_Phenomenon: Descrizione del fenomeno
- Risk_Score: Punteggio numerico 0-3
"""
if not weather_data or "hourly" not in weather_data:
return pd.DataFrame()
hourly = weather_data["hourly"]
times = hourly.get("time", [])
if not times:
return pd.DataFrame()
# Converti times in datetime
try:
timestamps = pd.to_datetime(times)
except:
return pd.DataFrame()
# Estrai dati
temp_2m = hourly.get("temperature_2m", [None] * len(times))
dewpoint_2m = hourly.get("dew_point_2m", [None] * len(times))
precipitation = hourly.get("precipitation", [None] * len(times))
rain = hourly.get("rain", [None] * len(times))
snowfall = hourly.get("snowfall", [None] * len(times))
soil_temp_0cm = hourly.get("soil_temperature_0cm", [None] * len(times))
cloud_cover = hourly.get("cloud_cover", [None] * len(times))
wind_speed = hourly.get("windspeed_10m", [None] * len(times))
# Crea DataFrame base
now = datetime.datetime.now()
df = pd.DataFrame({
'timestamp': timestamps,
'temp_2m': temp_2m,
'dewpoint_2m': dewpoint_2m,
'precipitation': precipitation,
'rain': rain,
'snowfall': snowfall,
'soil_temp_0cm': soil_temp_0cm,
'cloud_cover': cloud_cover,
'wind_speed': wind_speed
})
# Filtra solo prossime ore
df = df[df['timestamp'] >= now].head(hours_ahead)
if df.empty:
return df
# Inizializza colonne di output
df['Ice_Warning_Level'] = None
df['Ice_Phenomenon'] = ""
df['Risk_Score'] = 0
# Converti None in NaN per calcoli
numeric_cols = ['temp_2m', 'dewpoint_2m', 'precipitation', 'rain', 'snowfall',
'soil_temp_0cm', 'cloud_cover', 'wind_speed']
for col in numeric_cols:
df[col] = pd.to_numeric(df[col], errors='coerce')
# Calcola ora del giorno per determinare notte/giorno
df['hour'] = df['timestamp'].dt.hour
df['is_night'] = (df['hour'] >= 18) | (df['hour'] <= 6)
# Calcola precipitazione cumulativa delle 3 ore precedenti
df['precip_3h_sum'] = df['precipitation'].rolling(window=3, min_periods=1, closed='left').sum()
# --- GESTIONE FALLBACK per soil_temperature_0cm ---
# Se soil_temp_0cm non è disponibile (None o tutti NaN), usa approssimazione da temp_2m
# L'approssimazione sottrae 1-2°C dalla temperatura a 2m (suolo è generalmente più freddo)
if df['soil_temp_0cm'].isna().all():
# Nessun dato soil_temp_0cm disponibile: usa approssimazione da temp_2m
# Durante la notte o con cielo sereno, il suolo può essere 1-2°C più freddo dell'aria a 2m
df['soil_temp_0cm'] = df['temp_2m'] - 1.5 # Approssimazione conservativa
df['soil_temp_source'] = 'estimated' # Flag per tracciare che è stimato
else:
df['soil_temp_source'] = 'measured'
# --- LOGICA 1: Rischio Brina (Hoar Frost) ---
# Condizione: Soil Temp <= 0°C E Dewpoint > Soil Temp (ma < 0°C)
brina_mask = (
df['soil_temp_0cm'].notna() &
(df['soil_temp_0cm'] <= 0.0) &
df['dewpoint_2m'].notna() &
(df['dewpoint_2m'] > df['soil_temp_0cm']) &
(df['dewpoint_2m'] < 0.0)
)
# --- LOGICA 2: Rischio Ghiaccio Nero (Black Ice da bagnatura) o Neve Ghiacciata ---
# Condizione: Precipitazione > 0.1mm nelle 3h precedenti E Soil Temp < 0°C
# (anche se aria > 0°C)
# Distingue tra neve e pioggia per il messaggio appropriato
black_ice_mask = (
(df['precip_3h_sum'] > 0.1) &
df['soil_temp_0cm'].notna() &
(df['soil_temp_0cm'] < 0.0)
)
# Calcola se c'è neve nelle 3h precedenti (per distinguere neve da pioggia)
df['snowfall_3h_sum'] = df['snowfall'].rolling(window=3, min_periods=1, closed='left').sum()
df['rain_3h_sum'] = df['rain'].rolling(window=3, min_periods=1, closed='left').sum()
# --- LOGICA 3: Effetto Cielo Sereno (raffreddamento radiativo) ---
# Se Cloud Cover < 20% durante la notte, aumenta probabilità di raffreddamento
# Questo penalizza la Soil Temp prevista (sottrae 0.5-1.5°C teorici)
clear_sky_mask = (
df['is_night'] &
df['cloud_cover'].notna() &
(df['cloud_cover'] < 20.0)
)
# Aggiusta Soil Temp per effetto cielo sereno (raffreddamento radiativo)
df['soil_temp_adjusted'] = df['soil_temp_0cm'].copy()
if clear_sky_mask.any():
# Raffreddamento radiativo: sottrai 0.5-1.5°C a seconda del vento
# Vento debole (< 5 km/h) = più raffreddamento
wind_factor = df['wind_speed'].fillna(10.0)
cooling = np.where(wind_factor < 5.0, 1.5,
np.where(wind_factor < 10.0, 1.0, 0.5))
df.loc[clear_sky_mask, 'soil_temp_adjusted'] = (
df.loc[clear_sky_mask, 'soil_temp_0cm'] - cooling[clear_sky_mask]
)
# Rivaluta condizioni con soil_temp_adjusted
brina_mask_adjusted = (
df['soil_temp_adjusted'].notna() &
(df['soil_temp_adjusted'] <= 0.0) &
df['dewpoint_2m'].notna() &
(df['dewpoint_2m'] > df['soil_temp_adjusted']) &
(df['dewpoint_2m'] < 0.0)
)
black_ice_mask_adjusted = (
(df['precip_3h_sum'] > 0.1) &
df['soil_temp_adjusted'].notna() &
(df['soil_temp_adjusted'] < 0.0)
)
# --- ASSEGNAZIONE LIVELLI DI RISCHIO ---
# Priorità: Black Ice/Neve Ghiacciata > Brina > Nessun rischio
# Ghiaccio Nero o Neve Ghiacciata = High Risk (3)
# Distingue tra neve e pioggia per messaggio appropriato
if black_ice_mask_adjusted.any():
# Verifica se c'è neve nelle 3h precedenti O nell'ora corrente/futura
# (considera sia neve passata che neve in arrivo)
# Usa np.logical_or per combinare le condizioni
has_snow_past = df['snowfall_3h_sum'] > 0.1
has_snow_current = df['snowfall'] > 0.1
has_snow_combined = has_snow_past | has_snow_current
has_snow_mask = black_ice_mask_adjusted & has_snow_combined
# Se c'è solo pioggia (no neve)
has_rain_past = df['rain_3h_sum'] > 0.1
has_rain_current = df['rain'] > 0.1
has_rain_combined = has_rain_past | has_rain_current
has_rain_only_mask = black_ice_mask_adjusted & ~has_snow_mask & has_rain_combined
# Se c'è neve (anche con pioggia), il rischio è neve/neve ghiacciata (livello 4)
# La neve prevale sempre sulla pioggia
if has_snow_mask.any():
df.loc[has_snow_mask, 'Risk_Score'] = 4
df.loc[has_snow_mask, 'Ice_Warning_Level'] = 'Very High'
df.loc[has_snow_mask, 'Ice_Phenomenon'] = 'Neve/Neve ghiacciata (suolo gelato)'
# Se c'è solo pioggia (no neve), è Black Ice (livello 3)
if has_rain_only_mask.any():
df.loc[has_rain_only_mask, 'Risk_Score'] = 3
df.loc[has_rain_only_mask, 'Ice_Warning_Level'] = 'High'
df.loc[has_rain_only_mask, 'Ice_Phenomenon'] = 'Possibile Black Ice (strada bagnata + suolo gelato)'
# Fallback per altri casi (precipitazione generica senza neve/pioggia specifica)
other_precip_mask = black_ice_mask_adjusted & ~has_snow_mask & ~has_rain_only_mask
if other_precip_mask.any():
df.loc[other_precip_mask, 'Risk_Score'] = 3
df.loc[other_precip_mask, 'Ice_Warning_Level'] = 'High'
df.loc[other_precip_mask, 'Ice_Phenomenon'] = 'Possibile Black Ice (strada bagnata + suolo gelato)'
# --- LOGICA 4: Rilevazione Neve (presenza/persistenza) ---
# Verifica presenza di neve nelle 24h precedenti o future (anche senza suolo gelato)
# Questo è un layer separato per indicare presenza/persistenza di neve
# Calcola neve cumulativa nelle 24h precedenti (rolling window backward)
df['snowfall_24h_past'] = df['snowfall'].rolling(window=24, min_periods=1, closed='left').sum()
# Calcola neve cumulativa nelle 24h future (rolling window forward)
df['snowfall_24h_future'] = df['snowfall'].rolling(window=24, min_periods=1, closed='right').sum()
# Se c'è neve significativa (>= 0.5 cm) nelle 24h precedenti o future, segna come livello 4
# (anche se il suolo non è gelato, la neve presente è un rischio)
# Ma solo se non è già stato segnato come neve ghiacciata (livello 4 da black_ice_mask)
snow_presence_mask = (
(df['snowfall_24h_past'] >= 0.5) | (df['snowfall_24h_future'] >= 0.5) |
(df['snowfall'] >= 0.1) # Neve nell'ora corrente
) & (df['Risk_Score'] < 4) # Solo se non è già stato segnato come neve ghiacciata
if snow_presence_mask.any():
df.loc[snow_presence_mask, 'Risk_Score'] = 4
df.loc[snow_presence_mask, 'Ice_Warning_Level'] = 'Very High'
df.loc[snow_presence_mask, 'Ice_Phenomenon'] = 'Neve presente/persistente'
# Brina = Medium Risk (1-2)
brina_high = brina_mask_adjusted & ~black_ice_mask_adjusted & (df['soil_temp_adjusted'] < -1.0)
brina_medium = brina_mask_adjusted & ~black_ice_mask_adjusted & ~brina_high
if brina_high.any():
df.loc[brina_high, 'Risk_Score'] = 2
df.loc[brina_high, 'Ice_Warning_Level'] = 'Medium'
df.loc[brina_high, 'Ice_Phenomenon'] = 'Brina (condizioni favorevoli)'
if brina_medium.any():
df.loc[brina_medium, 'Risk_Score'] = 1
df.loc[brina_medium, 'Ice_Warning_Level'] = 'Low'
df.loc[brina_medium, 'Ice_Phenomenon'] = 'Brina (possibile)'
# Assicura che se Risk_Score > 0, Ice_Warning_Level e Ice_Phenomenon siano sempre definiti
# (fallback per casi edge)
missing_level = (df['Risk_Score'] > 0) & (df['Ice_Warning_Level'].isna())
if missing_level.any():
df.loc[missing_level & (df['Risk_Score'] >= 3), 'Ice_Warning_Level'] = 'High'
df.loc[missing_level & (df['Risk_Score'] == 2), 'Ice_Warning_Level'] = 'Medium'
df.loc[missing_level & (df['Risk_Score'] == 1), 'Ice_Warning_Level'] = 'Low'
missing_phenomenon = (df['Risk_Score'] > 0) & (df['Ice_Phenomenon'] == '')
if missing_phenomenon.any():
df.loc[missing_phenomenon & (df['Risk_Score'] >= 3), 'Ice_Phenomenon'] = 'Ghiaccio Nero (Black Ice)'
df.loc[missing_phenomenon & (df['Risk_Score'] == 2), 'Ice_Phenomenon'] = 'Brina'
df.loc[missing_phenomenon & (df['Risk_Score'] == 1), 'Ice_Phenomenon'] = 'Brina (possibile)'
# Rimuovi colonne temporanee (mantieni soil_temp_source se presente)
cols_to_drop = ['hour', 'is_night', 'precip_3h_sum', 'snowfall_3h_sum', 'rain_3h_sum',
'soil_temp_adjusted', 'snowfall_24h_past', 'snowfall_24h_future']
if 'soil_temp_source' not in df.columns:
cols_to_drop.append('soil_temp_source')
df = df.drop(columns=[c for c in cols_to_drop if c in df.columns])
return df
def analyze_risk(weather_data, model_slug, icon_weather_data=None):
"""
Analizza i dati di un singolo modello e ritorna rischio e dettagli.
Rischio 3 = GELICIDIO (FZRA), Rischio 2 = GHIACCIO VIVO, Rischio 1 = BRINA
Args:
weather_data: Dati del modello principale
model_slug: Slug del modello (es. "meteofrance_seamless", "icon_eu")
icon_weather_data: Dati ICON opzionali (per fornire soil_temperature_0cm se AROME non lo ha)
"""
if not weather_data:
return 0, ""
hourly = weather_data.get("hourly", {})
times = hourly.get("time", [])
if not times:
return 0, ""
now = datetime.datetime.now()
current_hour_str = now.strftime("%Y-%m-%dT%H:00")
try:
idx = times.index(current_hour_str)
except ValueError:
return 0, ""
# PRIORITÀ 1: Rilevamento GELICIDIO (FZRA) usando algoritmo FMI_CLIM
# Solo per AROME Seamless (ha parametri isobarici)
# Passa anche i dati ICON per ottenere soil_temperature_0cm se AROME non lo fornisce
if model_slug == "meteofrance_seamless":
icon_hourly = icon_weather_data.get("hourly", {}) if icon_weather_data else None
is_fzra, fzra_details = detect_freezing_rain_fmi(hourly, idx, icon_hourly_data=icon_hourly)
if is_fzra:
return 3, f"🔴🔴 <b>GELICIDIO (FZRA)</b> ({fzra_details})"
# PRIORITÀ 2: Analisi ghiaccio/brina tradizionale (fallback per tutti i modelli)
try:
t_soil = hourly.get("soil_temperature_0cm", [None])[idx] if idx < len(hourly.get("soil_temperature_0cm", [])) else None
t_dew = hourly.get("dew_point_2m", [None])[idx] if idx < len(hourly.get("dew_point_2m", [])) else None
t_2m = hourly.get("temperature_2m", [None])[idx] if idx < len(hourly.get("temperature_2m", [])) else None
hum = hourly.get("relative_humidity_2m", [None])[idx] if idx < len(hourly.get("relative_humidity_2m", [])) else None
start_idx = max(0, idx - 6)
precip_history = hourly.get("precipitation", [])[start_idx : idx+1]
precip_sum = sum(p for p in precip_history if p is not None)
except (KeyError, TypeError, IndexError):
return 0, ""
# Se mancano dati essenziali, non possiamo analizzare
if t_2m is None:
return 0, ""
# Se manca t_soil o t_dew, usa solo t_2m e hum per analisi brina
# SOGLIE MOLTO RESTRITTIVE per ridurre falsi positivi (solo quando dati suolo non disponibili)
if t_soil is None or t_dew is None:
# Analisi brina semplificata (solo temperatura aria + umidità)
# Soglia molto restrittiva: t_2m <= 1.0°C e hum > 90%
# Solo quando non abbiamo dati del suolo (fallback)
if t_2m is not None and hum is not None:
if t_2m <= 1.0 and hum > 90:
# Richiede anche punto di rugiada molto vicino se disponibile
if t_dew is not None:
if abs(t_2m - t_dew) < 0.3:
details = f"Aria {t_2m:.1f}°C, Umid {hum:.0f}%"
return 1, f"🟡 <b>Rischio BRINA</b> ({details})"
else:
# Senza t_dew, solo condizioni estreme
if t_2m <= 0.0 and hum > 95:
details = f"Aria {t_2m:.1f}°C, Umid {hum:.0f}%"
return 1, f"🟡 <b>Rischio BRINA</b> ({details})"
return 0, ""
details = f"Suolo {t_soil:.1f}°C"
if t_2m is not None:
details += f", Aria {t_2m:.1f}°C"
if hum is not None:
details += f", Umid {hum:.0f}%"
# GHIACCIO VIVO: precipitazioni su suolo gelato
if t_soil is not None and precip_sum > 0.2 and t_soil <= 0:
return 2, f"🔴 <b>GHIACCIO VIVO</b> ({details})"
# Rischio BRINA: suolo gelato e punto di rugiada raggiunto
# Questa è la condizione più affidabile per la brina
if t_soil is not None and t_dew is not None:
if t_soil <= 0 and t_soil <= t_dew:
return 1, f"🟡 <b>Rischio BRINA</b> ({details})"
# Brina anche con temperatura aria bassa e alta umidità (solo se suolo non disponibile o suolo > 0°C)
# SOGLIE MOLTO RESTRITTIVE per ridurre falsi positivi:
# - Temperatura molto bassa (≤ 1.0°C invece di ≤ 1.5°C)
# - Umidità molto alta (> 90% invece di > 85%)
# - Richiede punto di rugiada molto vicino alla temperatura (≤ 0.3°C)
# - Solo se suolo non disponibile O suolo > 2°C (per evitare falsi positivi quando suolo è caldo)
if t_2m is not None and hum is not None:
# Applica solo se: suolo non disponibile O suolo > 2°C (per evitare falsi positivi)
soil_ok_for_air_frost = (t_soil is None) or (t_soil is not None and t_soil > 2.0)
if soil_ok_for_air_frost:
# Condizione molto restrittiva: t_2m <= 1.0°C, hum > 90%, punto di rugiada molto vicino
if t_2m <= 1.0 and hum > 90:
if t_dew is not None and abs(t_2m - t_dew) < 0.3: # Punto di rugiada molto vicino (0.3°C)
return 1, f"🟡 <b>Rischio BRINA</b> ({details})"
# Condizione estrema: temperatura molto bassa (≤ 0.0°C) e umidità altissima (> 95%)
elif t_2m <= 0.0 and hum > 95:
return 1, f"🟡 <b>Rischio BRINA</b> ({details})"
return 0, details
def check_ice_persistence_conditions(weather_data, model_slug, hours_check=12):
"""
Verifica se ci sono condizioni che mantengono il ghiaccio già formato,
anche se non ci sono più condizioni favorevoli alla formazione di nuovo ghiaccio.
Usa l'analisi delle 24h precedenti per valutare persistenza e scongelamento.
Condizioni che mantengono il ghiaccio:
1. Neve presente (snowfall recente o snow_depth > 0)
2. Temperature vicine allo zero (tra -2°C e +2°C) che impediscono lo scioglimento
3. Precipitazioni con T<0°C nelle 24h precedenti (possibile ghiaccio residuo)
Args:
weather_data: Dati meteo dal modello
model_slug: Slug del modello
hours_check: Numero di ore da controllare (default: 12)
Returns:
(has_snow: bool, has_cold_temps: bool, details: str, past_24h_info: Dict)
"""
if not weather_data or "hourly" not in weather_data:
return False, False, "", {}
hourly = weather_data.get("hourly", {})
times = hourly.get("time", [])
if not times:
return False, False, "", {}
now = datetime.datetime.now()
current_hour_str = now.strftime("%Y-%m-%dT%H:00")
try:
idx = times.index(current_hour_str)
except ValueError:
return False, False, "", {}
# Analizza 24h precedenti per valutare persistenza
past_24h_analysis = analyze_past_24h_conditions(weather_data)
past_24h_info = past_24h_analysis if past_24h_analysis else {}
# Controlla le prossime hours_check ore
end_idx = min(idx + hours_check, len(times))
# 1. Verifica presenza neve
has_snow = False
snowfall_data = hourly.get("snowfall", [])
snow_depth_data = hourly.get("snow_depth", [])
# Controlla snowfall nelle prossime ore (neve in arrivo o recente)
if snowfall_data:
for i in range(idx, end_idx):
if i < len(snowfall_data) and snowfall_data[i] is not None and snowfall_data[i] > 0.1:
has_snow = True
break
# Controlla snow_depth (neve già presente al suolo)
if not has_snow and snow_depth_data:
for i in range(idx, end_idx):
if i < len(snow_depth_data) and snow_depth_data[i] is not None and snow_depth_data[i] > 0.5:
has_snow = True
break
# Verifica anche neve nelle 24h precedenti
if not has_snow and past_24h_info.get("total_snowfall_cm", 0) > 0.1:
has_snow = True
# 2. Verifica temperature che mantengono il ghiaccio già formato
has_cold_temps = False
temp_2m_data = hourly.get("temperature_2m", [])
soil_temp_data = hourly.get("soil_temperature_0cm", [])
# Il ghiaccio persiste se la temperatura è < +2°C (non si scioglie)
# Questo include:
# - Temperature < -2°C: molto freddo, ghiaccio sicuramente presente
# - Temperature tra -2°C e 0°C: ghiaccio non si scioglie
# - Temperature tra 0°C e +2°C: ghiaccio può ancora persistere (scioglimento lento)
# Solo temperature > +2°C permettono lo scioglimento completo del ghiaccio
min_temp_found = None
for i in range(idx, end_idx):
t_2m = temp_2m_data[i] if i < len(temp_2m_data) and temp_2m_data[i] is not None else None
t_soil = soil_temp_data[i] if i < len(soil_temp_data) and soil_temp_data[i] is not None else None
# Usa temperatura suolo se disponibile (più accurata), altrimenti temperatura aria
t_check = t_soil if t_soil is not None else t_2m
if t_check is not None and t_check < 2.0:
has_cold_temps = True
if min_temp_found is None or t_check < min_temp_found:
min_temp_found = t_check
break
# Verifica anche temperature minime nelle 24h precedenti
if not has_cold_temps:
min_temp_2m_past = past_24h_info.get("min_temp_2m")
min_soil_temp_past = past_24h_info.get("min_soil_temp")
if min_temp_2m_past is not None and min_temp_2m_past < 2.0:
has_cold_temps = True
if min_temp_found is None or min_temp_2m_past < min_temp_found:
min_temp_found = min_temp_2m_past
elif min_soil_temp_past is not None and min_soil_temp_past < 4.0: # Suolo più lento a scaldarsi
has_cold_temps = True
if min_temp_found is None or min_soil_temp_past < min_temp_found:
min_temp_found = min_soil_temp_past
# Costruisci dettagli con informazioni 24h precedenti
details_parts = []
if has_snow:
details_parts.append("neve presente")
if has_cold_temps:
# Distingui tra temperature molto basse e vicine allo zero per messaggio più chiaro
if min_temp_found is not None and min_temp_found < -2.0:
details_parts.append("temperature molto basse (< -2°C)")
else:
details_parts.append("temperature che mantengono il ghiaccio (< +2°C)")
# Aggiungi informazioni sulle 24h precedenti se rilevanti
if past_24h_info.get("precipitation_with_freeze", False):
details_parts.append("precipitazioni con T<0°C nelle 24h precedenti")
if past_24h_info.get("ice_formation_likely", False):
details_parts.append("formazione ghiaccio probabile nelle 24h precedenti")
if past_24h_info.get("ice_melting_likely", False):
details_parts.append("scioglimento probabile (T salita sopra soglia)")
details = ", ".join(details_parts) if details_parts else ""
return has_snow, has_cold_temps, details, past_24h_info
def get_coordinates_from_city(city_name: str) -> Optional[Tuple[float, float, str]]:
"""Ottiene coordinate lat/lon da nome città usando Open-Meteo Geocoding API."""
# Caso speciale: "Casa" -> Strada Cà Toro,12, San Marino
if city_name.lower().strip() in ["casa", "home"]:
return 43.9356, 12.4296, "🏠 Casa (Strada Cà Toro, San Marino)"
url = "https://geocoding-api.open-meteo.com/v1/search"
try:
resp = open_meteo_get(url, params={"name": city_name, "count": 1, "language": "it", "format": "json"}, timeout=(5, 10))
res = resp.json().get("results", [])
if res:
res = res[0]
name = f"{res.get('name')} ({res.get('country_code', 'IT').upper()})"
return res['latitude'], res['longitude'], name
except Exception as e:
print(f"Errore geocoding per {city_name}: {e}")
return None
def get_location_name_from_coords(lat: float, lon: float) -> Optional[str]:
"""
Ottiene il nome della località da coordinate usando Nominatim (OpenStreetMap).
Reverse geocoding gratuito, no API key richiesta.
"""
url = "https://nominatim.openstreetmap.org/reverse"
try:
params = {
"lat": lat,
"lon": lon,
"format": "json",
"accept-language": "it",
"zoom": 10, # Livello di dettaglio: 10 = città/paese
"addressdetails": 1
}
headers = {
"User-Agent": "Telegram-Bot-Ice-Road/1.0" # Nominatim richiede User-Agent
}
resp = requests.get(url, params=params, headers=headers, timeout=5)
if resp.status_code == 200:
data = resp.json()
address = data.get("address", {})
# Priorità: città > paese > comune > frazione
location_name = (
address.get("city") or
address.get("town") or
address.get("village") or
address.get("municipality") or
address.get("county") or
address.get("state")
)
if location_name:
# Aggiungi provincia/regione se disponibile
state = address.get("state")
if state and state != location_name:
return f"{location_name} ({state})"
return location_name
# Fallback: usa display_name se disponibile
display_name = data.get("display_name", "")
if display_name:
# Prendi solo la prima parte (prima della virgola)
return display_name.split(",")[0].strip()
except Exception as e:
print(f"Errore reverse geocoding per ({lat}, {lon}): {e}")
return None
def decode_polyline(polyline_str: str) -> List[Tuple[float, float]]:
"""
Decodifica un polyline codificato di Google Maps in una lista di coordinate (lat, lon).
Args:
polyline_str: Stringa polyline codificata
Returns:
Lista di tuple (lat, lon)
"""
def _decode_value(value_str: str) -> int:
"""Decodifica un valore dal polyline."""
result = 0
shift = 0
for char in value_str:
b = ord(char) - 63
result |= (b & 0x1f) << shift
shift += 5
if b < 0x20:
break
return ~result if (result & 1) else result >> 1
points = []
index = 0
lat = 0
lon = 0
while index < len(polyline_str):
# Decodifica latitudine
value_str = ""
while index < len(polyline_str):
char = polyline_str[index]
value_str += char
index += 1
if ord(char) < 0x20:
break
lat_delta = _decode_value(value_str)
lat += lat_delta
# Decodifica longitudine
if index >= len(polyline_str):
break
value_str = ""
while index < len(polyline_str):
char = polyline_str[index]
value_str += char
index += 1
if ord(char) < 0x20:
break
lon_delta = _decode_value(value_str)
lon += lon_delta
points.append((lat / 1e5, lon / 1e5))
return points
def get_google_maps_api_key() -> Optional[str]:
"""
Ottiene la chiave API di Google Maps da variabile d'ambiente.
Returns:
Chiave API o None se non disponibile
"""
# Prova variabili d'ambiente comuni
api_key = os.environ.get('GOOGLE_MAPS_API_KEY', '').strip()
if api_key:
return api_key
api_key = os.environ.get('GOOGLE_API_KEY', '').strip()
if api_key:
return api_key
return None
def calculate_route_points(lat1: float, lon1: float, lat2: float, lon2: float,
num_points: int = 5) -> List[Tuple[float, float]]:
"""
Calcola punti intermedi lungo un percorso stradale reale tra due coordinate.
Usa Google Maps Directions API se disponibile, altrimenti fallback a linea d'aria.
Args:
lat1, lon1: Coordinate punto di partenza
lat2, lon2: Coordinate punto di arrivo
num_points: Numero minimo di punti intermedi desiderati (default: 5)
(ignorato se si usa Google Maps, che restituisce tutti i punti del percorso)
Returns:
Lista di tuple (lat, lon) lungo il percorso
"""
# Prova prima con Google Maps Directions API
api_key = get_google_maps_api_key()
if api_key:
try:
url = "https://maps.googleapis.com/maps/api/directions/json"
params = {
'origin': f"{lat1},{lon1}",
'destination': f"{lat2},{lon2}",
'key': api_key,
'mode': 'driving', # Modalità guida
'alternatives': False # Solo il percorso principale
}
response = requests.get(url, params=params, timeout=10)
if response.status_code == 200:
data = response.json()
if data.get('status') == 'OK' and data.get('routes'):
route = data['routes'][0]
# Estrai polyline dal percorso
overview_polyline = route.get('overview_polyline', {})
encoded_polyline = overview_polyline.get('points', '')
if encoded_polyline:
# Decodifica polyline per ottenere tutti i punti del percorso
route_points = decode_polyline(encoded_polyline)
if route_points:
# Se il percorso ha troppi punti, campiona per avere un numero ragionevole
# ma mantieni sempre partenza e arrivo
if len(route_points) > 20:
# Campiona i punti mantenendo partenza e arrivo
sampled_points = [route_points[0]] # Partenza
step = len(route_points) // (num_points + 1)
for i in range(1, len(route_points) - 1, max(1, step)):
sampled_points.append(route_points[i])
sampled_points.append(route_points[-1]) # Arrivo
return sampled_points
else:
return route_points
except Exception as e:
# In caso di errore, fallback a linea d'aria
print(f"Errore Google Maps Directions API: {e}. Uso fallback linea d'aria.")
# Fallback: calcola punti lungo linea d'aria
points = []
for i in range(num_points + 1):
ratio = i / num_points if num_points > 0 else 0
lat = lat1 + (lat2 - lat1) * ratio
lon = lon1 + (lon2 - lon1) * ratio
points.append((lat, lon))
return points
def get_best_model_for_location(lat: float, lon: float) -> str:
"""
Determina il miglior modello disponibile per una località.
Priorità: ICON Italia (se in Italia) > ICON EU (Europa) > AROME Seamless (Francia/limitrofi)
"""
# ICON Italia copre approssimativamente: 36-48°N, 6-19°E (Italia e zone limitrofe)
if 36.0 <= lat <= 48.0 and 6.0 <= lon <= 19.0:
# Prova prima ICON Italia
test_data = get_weather_data(lat, lon, "italia_meteo_arpae_icon_2i")
if test_data and test_data.get("hourly", {}).get("soil_temperature_0cm"):
return "italia_meteo_arpae_icon_2i"
# ICON EU copre Europa (35-72°N, -12-35°E)
if 35.0 <= lat <= 72.0 and -12.0 <= lon <= 35.0:
test_data = get_weather_data(lat, lon, "icon_eu")
if test_data:
# Verifica se ICON EU fornisce soil_temperature_0cm per questa zona
if test_data.get("hourly", {}).get("soil_temperature_0cm"):
return "icon_eu"
# Anche senza soil_temp, ICON EU può essere usato con approssimazione
return "icon_eu"
# AROME Seamless copre Francia e zone limitrofe
if 41.0 <= lat <= 52.0 and -5.0 <= lon <= 10.0:
test_data = get_weather_data(lat, lon, "meteofrance_seamless")
if test_data:
return "meteofrance_seamless"
# Fallback: ICON EU (copertura più ampia)
return "icon_eu"
def analyze_route_ice_risk(city1: str, city2: str, model_slug: Optional[str] = None) -> Optional[pd.DataFrame]:
"""
Analizza il rischio di ghiaccio lungo un percorso stradale tra due località.
Args:
city1: Nome città di partenza
city2: Nome città di arrivo
model_slug: Modello meteo da usare (None = auto-detect basato su località)
Returns:
DataFrame con analisi del rischio per ogni punto del percorso, o None se errore
"""
# Ottieni coordinate
coord1 = get_coordinates_from_city(city1)
coord2 = get_coordinates_from_city(city2)
if not coord1 or not coord2:
return None
lat1, lon1, name1 = coord1
lat2, lon2, name2 = coord2
# Se modello non specificato, determina automaticamente
if model_slug is None:
# Usa il punto medio del percorso per determinare il miglior modello
mid_lat = (lat1 + lat2) / 2
mid_lon = (lon1 + lon2) / 2
model_slug = get_best_model_for_location(mid_lat, mid_lon)
# Calcola punti lungo il percorso (8 punti intermedi per copertura adeguata)
route_points = calculate_route_points(lat1, lon1, lat2, lon2, num_points=8)
# Analizza ogni punto con fallback automatico se il modello principale non funziona
all_results = []
models_used = set()
for i, (lat, lon) in enumerate(route_points):
# Prova prima il modello principale
weather_data = get_weather_data(lat, lon, model_slug)
point_model = model_slug
# Se fallisce o non ha soil_temp_0cm, prova modelli alternativi
if not weather_data:
# Fallback: prova altri modelli
for fallback_model in ["icon_eu", "italia_meteo_arpae_icon_2i", "meteofrance_seamless"]:
if fallback_model != model_slug:
test_data = get_weather_data(lat, lon, fallback_model)
if test_data:
weather_data = test_data
point_model = fallback_model
break
if not weather_data:
continue
models_used.add(point_model)
# Analizza condizioni 24h precedenti per persistenza ghiaccio
past_24h_analysis = analyze_past_24h_conditions(weather_data)
if not past_24h_analysis:
# Se l'analisi fallisce, usa valori di default
past_24h_analysis = {}
# Calcola rischio per 24h
df = calculate_ice_risk_dataframe(weather_data, point_model, hours_ahead=24)
if df.empty:
continue
# Aggiungi info punto
df['point_index'] = i
df['point_lat'] = lat
df['point_lon'] = lon
df['model_used'] = point_model
# Aggiungi analisi 24h precedenti come colonne aggiuntive
# (saranno duplicate per ogni riga del DataFrame, ma utili per il report)
df['past_24h_has_precip'] = past_24h_analysis.get("has_precipitation", False)
df['past_24h_precip_types'] = str(past_24h_analysis.get("precipitation_types", []))
df['past_24h_total_rain_mm'] = past_24h_analysis.get("total_rain_mm", 0.0)
df['past_24h_total_snowfall_cm'] = past_24h_analysis.get("total_snowfall_cm", 0.0)
df['past_24h_total_showers_mm'] = past_24h_analysis.get("total_showers_mm", 0.0)
df['past_24h_min_temp_2m'] = past_24h_analysis.get("min_temp_2m")
df['past_24h_min_soil_temp'] = past_24h_analysis.get("min_soil_temp")
df['past_24h_hours_below_zero'] = past_24h_analysis.get("hours_below_zero", 0)
df['past_24h_hours_below_zero_soil'] = past_24h_analysis.get("hours_below_zero_soil", 0)
df['past_24h_precip_with_freeze'] = past_24h_analysis.get("precipitation_with_freeze", False)
df['past_24h_ice_formation_likely'] = past_24h_analysis.get("ice_formation_likely", False)
df['past_24h_ice_melting_likely'] = past_24h_analysis.get("ice_melting_likely", False)
df['past_24h_ongoing_precipitation'] = past_24h_analysis.get("ongoing_precipitation", False)
df['past_24h_ongoing_precipitation_type'] = past_24h_analysis.get("ongoing_precipitation_type", "")
# Salva storico come JSON string (per evitare problemi con DataFrame)
import json
df['past_24h_history'] = json.dumps(past_24h_analysis.get("history", []), default=str)
df['past_24h_precipitation_events'] = json.dumps(past_24h_analysis.get("precipitation_events", []), default=str)
# Crea etichetta punto con nome località
if i == 0:
df['point_label'] = f"Partenza: {name1}"
df['point_name'] = name1
elif i == len(route_points) - 1:
df['point_label'] = f"Arrivo: {name2}"
df['point_name'] = name2
else:
# Per punti intermedi, usa reverse geocoding per ottenere nome località
# Delay per rispettare rate limiting di Nominatim (1 req/sec)
if i > 1: # Non delay per primo punto (già fatto per partenza)
time.sleep(1.1) # 1.1 secondi per sicurezza
location_name = get_location_name_from_coords(lat, lon)
if location_name:
df['point_name'] = location_name
df['point_label'] = f"{location_name}"
else:
# Fallback se reverse geocoding fallisce
df['point_name'] = f"Punto {i+1}"
df['point_label'] = f"Punto {i+1}/{len(route_points)}"
all_results.append(df)
if not all_results:
return None
# Combina tutti i DataFrame
result_df = pd.concat(all_results, ignore_index=True)
# Aggiungi nota se sono stati usati modelli diversi o approssimazioni
if len(models_used) > 1:
result_df['note'] = f"Usati modelli: {', '.join(models_used)}"
elif result_df['soil_temp_source'].iloc[0] == 'estimated' if 'soil_temp_source' in result_df.columns else False:
result_df['note'] = "Temperatura suolo stimata (non disponibile nel modello)"
return result_df
def generate_maps_link(lat, lon):
return f"<a href='https://www.google.com/maps/search/?api=1&query={lat},{lon}'>[Mappa]</a>"
def format_route_ice_report(df: pd.DataFrame, city1: str, city2: str) -> str:
"""
Formatta un DataFrame di analisi rischio ghiaccio lungo percorso in messaggio Telegram compatto.
Versione semplificata ora che c'è anche la mappa visiva.
"""
if df.empty:
return "❌ Nessun dato disponibile per il percorso."
# Raggruppa per punto e trova rischio massimo per ogni punto
max_risk_per_point = df.groupby('point_index').agg({
'Risk_Score': 'max',
'point_label': 'first',
'point_name': 'first',
}).sort_values('point_index')
# Trova ore con rischio per ogni punto
risk_hours = df[df['Risk_Score'] > 0].groupby('point_index').agg({
'timestamp': lambda x: f"{x.min().strftime('%d/%m %H:%M')} - {x.max().strftime('%d/%m %H:%M')}",
'Ice_Phenomenon': lambda x: x.iloc[0] if len(x) > 0 and pd.notna(x.iloc[0]) and x.iloc[0] != '' else 'Rischio ghiaccio',
'Risk_Score': 'max',
'Ice_Warning_Level': lambda x: x.iloc[0] if len(x) > 0 and pd.notna(x.iloc[0]) else 'Unknown'
})
# Costruisci messaggio compatto
msg = f"🛣️ **Rischio Ghiaccio Stradale**\n"
msg += f"📍 {city1}{city2}\n\n"
points_with_risk = []
for idx, row in max_risk_per_point.iterrows():
risk_score = row['Risk_Score']
if risk_score > 0:
point_name = row.get('point_name', row.get('point_label', f'Punto {idx}'))
# Ottieni dati dal gruppo risk_hours se disponibile
if idx in risk_hours.index:
risk_level = risk_hours.loc[idx, 'Ice_Warning_Level']
phenomenon = risk_hours.loc[idx, 'Ice_Phenomenon']
time_range = risk_hours.loc[idx, 'timestamp']
else:
# Fallback: cerca nel DataFrame originale
point_df = df[df['point_index'] == idx]
if len(point_df) > 0:
risk_level = point_df['Ice_Warning_Level'].iloc[0] if pd.notna(point_df['Ice_Warning_Level'].iloc[0]) else 'Unknown'
phenomenon = point_df['Ice_Phenomenon'].iloc[0] if pd.notna(point_df['Ice_Phenomenon'].iloc[0]) and point_df['Ice_Phenomenon'].iloc[0] != '' else 'Rischio ghiaccio'
time_range = f"{point_df['timestamp'].min().strftime('%d/%m %H:%M')} - {point_df['timestamp'].max().strftime('%d/%m %H:%M')}"
else:
risk_level = 'Unknown'
phenomenon = 'Rischio ghiaccio'
time_range = ''
risk_emoji = "🔴" if risk_score >= 3 else "🟠" if risk_score >= 2 else "🟡"
# Messaggio compatto per punto
point_msg = f"{risk_emoji} {point_name}: {risk_level} ({phenomenon})\n"
point_msg += f"{time_range}\n"
points_with_risk.append(point_msg)
if points_with_risk:
msg += "⚠️ **Punti a rischio:**\n"
msg += "\n".join(points_with_risk)
else:
msg += "✅ Nessun rischio rilevato per le prossime 24h"
# Riepilogo compatto
risk_df = df[df['Risk_Score'] > 0]
if not risk_df.empty:
min_time = risk_df['timestamp'].min()
max_time = risk_df['timestamp'].max()
time_span_hours = (max_time - min_time).total_seconds() / 3600
points_with_any_risk = risk_df['point_index'].nunique()
total_points = len(max_risk_per_point)
# Conta per livello di rischio
high_risk_count = len(risk_df[risk_df['Risk_Score'] >= 3]['point_index'].unique())
medium_risk_count = len(risk_df[(risk_df['Risk_Score'] == 2)]['point_index'].unique())
low_risk_count = len(risk_df[(risk_df['Risk_Score'] == 1)]['point_index'].unique())
msg += f"\n\n📊 **Riepilogo:**\n"
msg += f"• Punti: {points_with_any_risk}/{total_points} a rischio\n"
if high_risk_count > 0:
msg += f"• 🔴 Alto: {high_risk_count} | 🟠 Medio: {medium_risk_count} | 🟡 Basso: {low_risk_count}\n"
msg += f"• ⏰ {min_time.strftime('%d/%m %H:%M')} - {max_time.strftime('%d/%m %H:%M')} ({time_span_hours:.1f}h)\n"
return msg
def send_telegram_broadcast(token, message, debug_mode=False):
base_url = f"https://api.telegram.org/bot{token}/sendMessage"
recipients = [ADMIN_CHAT_ID] if debug_mode else TELEGRAM_CHAT_IDS
if debug_mode:
message = f"🛠 <b>[DEBUG - MULTI MODEL]</b> 🛠\n{message}"
for chat_id in recipients:
try:
requests.post(base_url, data={"chat_id": chat_id, "text": message, "parse_mode": "HTML", "disable_web_page_preview": True}, timeout=5)
except Exception:
pass
def generate_route_ice_map(df: pd.DataFrame, city1: str, city2: str, output_path: str) -> bool:
"""
Genera una mappa grafica del percorso con punti colorati in base al livello di rischio ghiaccio.
Args:
df: DataFrame con analisi del rischio per ogni punto del percorso
city1: Nome città di partenza
city2: Nome città di arrivo
output_path: Percorso file output PNG
Returns:
True se generata con successo, False altrimenti
"""
try:
import matplotlib
matplotlib.use('Agg') # Backend senza GUI
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches
except ImportError as e:
print(f"matplotlib non disponibile: {e}. Mappa non generata.")
return False
# Prova a importare contextily per mappa di sfondo
try:
import contextily as ctx
CONTEXTILY_AVAILABLE = True
except ImportError:
CONTEXTILY_AVAILABLE = False
print("contextily non disponibile. Mappa generata senza sfondo geografico.")
if df.empty:
return False
# Raggruppa per punto e trova rischio massimo per ogni punto
max_risk_per_point = df.groupby('point_index').agg({
'Risk_Score': 'max',
'point_label': 'first',
'point_name': 'first',
'point_lat': 'first',
'point_lon': 'first'
}).sort_values('point_index')
# Estrai coordinate e livelli di rischio
lats = max_risk_per_point['point_lat'].tolist()
lons = max_risk_per_point['point_lon'].tolist()
names = max_risk_per_point['point_name'].fillna(max_risk_per_point['point_label']).tolist()
risk_levels = max_risk_per_point['Risk_Score'].astype(int).tolist()
# Calcola limiti mappa con margine
lat_min, lat_max = min(lats), max(lats)
lon_min, lon_max = min(lons), max(lons)
# Aggiungi margine del 10%
lat_range = lat_max - lat_min
lon_range = lon_max - lon_min
lat_min -= lat_range * 0.1
lat_max += lat_range * 0.1
lon_min -= lon_range * 0.1
lon_max += lon_range * 0.1
# Crea figura
fig, ax = plt.subplots(figsize=(14, 10))
fig.patch.set_facecolor('white')
# Configura assi PRIMA di aggiungere lo sfondo
ax.set_xlim(lon_min, lon_max)
ax.set_ylim(lat_min, lat_max)
ax.set_aspect('equal', adjustable='box')
# Aggiungi mappa di sfondo OpenStreetMap se disponibile
if CONTEXTILY_AVAILABLE:
try:
ctx.add_basemap(ax, crs='EPSG:4326', source=ctx.providers.OpenStreetMap.Mapnik,
alpha=0.6, attribution_size=6)
except Exception as e:
print(f"Errore aggiunta mappa sfondo: {e}")
CONTEXTILY_AVAILABLE = False
# Disegna linea del percorso
ax.plot(lons, lats, 'k--', linewidth=2, alpha=0.5, zorder=3, label='Percorso')
# Colori per livelli di rischio: verde (0), giallo (1), arancione (2), rosso scuro (3), azzurro (4=neve)
colors_map = {0: '#32CD32', 1: '#FFD700', 2: '#FF8C00', 3: '#8B0000', 4: '#00CED1'}
colors = [colors_map.get(level, '#808080') for level in risk_levels]
# Disegna punti
scatter = ax.scatter(lons, lats, c=colors, s=400,
edgecolors='black', linewidths=2.5, alpha=0.85, zorder=5)
# Evidenzia partenza e arrivo con marker diversi
if len(lats) >= 2:
# Partenza (primo punto)
ax.scatter([lons[0]], [lats[0]], c='blue', s=600, marker='s',
edgecolors='white', linewidths=3, alpha=0.9, zorder=6, label='Partenza')
# Arrivo (ultimo punto)
ax.scatter([lons[-1]], [lats[-1]], c='red', s=600, marker='s',
edgecolors='white', linewidths=3, alpha=0.9, zorder=6, label='Arrivo')
# Aggiungi etichette per i punti
for lon, lat, name, risk_level in zip(lons, lats, names, risk_levels):
# Offset intelligente per evitare sovrapposizioni
offset_x = 10 if risk_level > 0 else 8
offset_y = 10 if risk_level > 0 else 8
# Nome abbreviato se troppo lungo
display_name = name
if len(display_name) > 20:
display_name = display_name[:17] + "..."
ax.annotate(display_name, (lon, lat), xytext=(offset_x, offset_y), textcoords='offset points',
fontsize=8, fontweight='bold',
bbox=dict(boxstyle='round,pad=0.4', facecolor='white', alpha=0.95,
edgecolor='black', linewidth=1.2),
zorder=7)
# Legenda personalizzata per livelli di rischio
legend_elements = [
mpatches.Patch(facecolor='#32CD32', label='Nessun rischio'),
mpatches.Patch(facecolor='#FFD700', label='Brina (Livello 1)'),
mpatches.Patch(facecolor='#FF8C00', label='Ghiaccio vivo (Livello 2)'),
mpatches.Patch(facecolor='#8B0000', label='Gelicidio (Livello 3)'),
mpatches.Patch(facecolor='#00CED1', label='Neve (Livello 4)'),
]
ax.legend(handles=legend_elements, loc='lower left', fontsize=9,
framealpha=0.95, edgecolor='black', fancybox=True, shadow=True)
# Configura assi
ax.set_xlabel('Longitudine (°E)', fontsize=11, fontweight='bold')
ax.set_ylabel('Latitudine (°N)', fontsize=11, fontweight='bold')
ax.set_title(f'RISCHIO GHIACCIO STRADALE\n{city1}{city2}',
fontsize=14, fontweight='bold', pad=20)
# Griglia solo se non c'è mappa di sfondo
if not CONTEXTILY_AVAILABLE:
ax.grid(True, alpha=0.3, linestyle='--', zorder=1)
# Info timestamp in alto a sinistra
now = datetime.datetime.now()
points_with_risk = sum(1 for r in risk_levels if r > 0)
info_text = f"Aggiornamento: {now.strftime('%d/%m/%Y %H:%M')}\nPunti monitorati: {len(risk_levels)}\nPunti a rischio: {points_with_risk}"
ax.text(0.02, 0.98, info_text, transform=ax.transAxes,
fontsize=9, verticalalignment='top', horizontalalignment='left',
bbox=dict(boxstyle='round,pad=0.5', facecolor='white', alpha=0.9,
edgecolor='gray', linewidth=1.5),
zorder=10)
plt.tight_layout()
# Salva
try:
plt.savefig(output_path, dpi=150, bbox_inches='tight', facecolor='white')
plt.close(fig)
return True
except Exception as e:
print(f"Errore salvataggio mappa: {e}")
plt.close(fig)
return False
def send_telegram_photo(token, photo_path, caption, debug_mode=False):
"""Invia foto via Telegram API."""
if not os.path.exists(photo_path):
return False
url = f"https://api.telegram.org/bot{token}/sendPhoto"
recipients = [ADMIN_CHAT_ID] if debug_mode else TELEGRAM_CHAT_IDS
# Limite Telegram per caption: 1024 caratteri
if len(caption) > 1024:
caption = caption[:1021] + "..."
sent_ok = False
for chat_id in recipients:
try:
with open(photo_path, 'rb') as photo_file:
files = {'photo': photo_file}
data = {
'chat_id': chat_id,
'caption': caption,
'parse_mode': 'HTML'
}
resp = requests.post(url, files=files, data=data, timeout=30)
if resp.status_code == 200:
sent_ok = True
time.sleep(0.5)
except Exception as e:
if debug_mode:
print(f"Errore invio foto: {e}")
return sent_ok
def generate_ice_risk_map(points_data: List[Dict], output_path: str) -> bool:
"""
Genera una mappa grafica con punti colorati in base al livello di rischio ghiaccio.
Args:
points_data: Lista di dict con 'name', 'lat', 'lon', 'risk_level' (0-3)
output_path: Percorso file output PNG
Returns:
True se generata con successo, False altrimenti
"""
try:
import matplotlib
matplotlib.use('Agg') # Backend senza GUI
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches
from matplotlib.colors import ListedColormap
except ImportError as e:
print(f"matplotlib non disponibile: {e}. Mappa non generata.")
return False
# Prova a importare contextily per mappa di sfondo
try:
import contextily as ctx
CONTEXTILY_AVAILABLE = True
except ImportError:
CONTEXTILY_AVAILABLE = False
print("contextily non disponibile. Mappa generata senza sfondo geografico.")
if not points_data:
return False
# Estrai coordinate e livelli di rischio
lats = [p["lat"] for p in points_data]
lons = [p["lon"] for p in points_data]
names = [p["name"] for p in points_data]
risk_levels = [p.get("risk_level", 0) for p in points_data]
# Crea figura
fig, ax = plt.subplots(figsize=(12, 10))
fig.patch.set_facecolor('white')
# Limiti mappa per San Marino (più zoomata)
lat_min, lat_max = 43.88, 43.99
lon_min, lon_max = 12.40, 12.52
# Configura assi PRIMA di aggiungere lo sfondo
ax.set_xlim(lon_min, lon_max)
ax.set_ylim(lat_min, lat_max)
ax.set_aspect('equal', adjustable='box')
# Aggiungi mappa di sfondo OpenStreetMap se disponibile
if CONTEXTILY_AVAILABLE:
try:
ctx.add_basemap(ax, crs='EPSG:4326', source=ctx.providers.OpenStreetMap.Mapnik,
alpha=0.6, attribution_size=6)
except Exception as e:
print(f"Errore aggiunta mappa sfondo: {e}")
CONTEXTILY_AVAILABLE = False
# Colori per livelli di rischio: verde (0), giallo (1), arancione (2), rosso scuro (3), azzurro (4=neve)
colors_map = {0: '#32CD32', 1: '#FFD700', 2: '#FF8C00', 3: '#8B0000', 4: '#00CED1'}
colors = [colors_map.get(level, '#808080') for level in risk_levels]
# Disegna punti
scatter = ax.scatter(lons, lats, c=colors, s=300,
edgecolors='black', linewidths=2, alpha=0.85, zorder=5)
# Aggiungi etichette per tutti i punti con posizionamento personalizzato
label_positions = {
"Galazzano": (-15, 15), # Alto a sx
"Centro Storico": (-15, -15), # Basso a sx
"Santa Mustiola": (-15, 15), # Alto a sx
}
for lon, lat, name, risk_level in zip(lons, lats, names, risk_levels):
# Usa posizionamento personalizzato se disponibile, altrimenti default
if name in label_positions:
offset_x, offset_y = label_positions[name]
else:
# Offset intelligente per evitare sovrapposizioni
offset_x = 8 if risk_level > 0 else 5
offset_y = 8 if risk_level > 0 else 5
ax.annotate(name, (lon, lat), xytext=(offset_x, offset_y), textcoords='offset points',
fontsize=7, fontweight='bold',
bbox=dict(boxstyle='round,pad=0.3', facecolor='white', alpha=0.9,
edgecolor='black', linewidth=1),
zorder=6)
# Colorbar personalizzata per livelli di rischio
legend_elements = [
mpatches.Patch(facecolor='#32CD32', label='Nessun rischio'),
mpatches.Patch(facecolor='#FFD700', label='Brina (Livello 1)'),
mpatches.Patch(facecolor='#FF8C00', label='Ghiaccio vivo (Livello 2)'),
mpatches.Patch(facecolor='#8B0000', label='Gelicidio (Livello 3)'),
mpatches.Patch(facecolor='#00CED1', label='Neve (Livello 4)'),
]
ax.legend(handles=legend_elements, loc='lower left', fontsize=9,
framealpha=0.95, edgecolor='black', fancybox=True, shadow=True)
# Configura assi
ax.set_xlabel('Longitudine (°E)', fontsize=11, fontweight='bold')
ax.set_ylabel('Latitudine (°N)', fontsize=11, fontweight='bold')
ax.set_title('RISCHIO GHIACCIO STRADALE - San Marino',
fontsize=14, fontweight='bold', pad=20)
# Griglia solo se non c'è mappa di sfondo
if not CONTEXTILY_AVAILABLE:
ax.grid(True, alpha=0.3, linestyle='--', zorder=1)
# Info timestamp in alto a sinistra
now = datetime.datetime.now()
info_text = f"Aggiornamento: {now.strftime('%d/%m/%Y %H:%M')}\nPunti monitorati: {len(points_data)}"
ax.text(0.02, 0.98, info_text, transform=ax.transAxes,
fontsize=9, verticalalignment='top', horizontalalignment='left',
bbox=dict(boxstyle='round,pad=0.5', facecolor='white', alpha=0.9,
edgecolor='gray', linewidth=1.5),
zorder=10)
plt.tight_layout()
# Salva
try:
plt.savefig(output_path, dpi=150, bbox_inches='tight', facecolor='white')
plt.close(fig)
return True
except Exception as e:
print(f"Errore salvataggio mappa: {e}")
plt.close(fig)
return False
def main():
parser = argparse.ArgumentParser(description="Check ghiaccio stradale")
parser.add_argument("--debug", action="store_true", help="Invia messaggi solo all'admin (chat ID: %s)" % ADMIN_CHAT_ID)
args = parser.parse_args()
DEBUG_MODE = args.debug
token = get_bot_token()
previous_state, report_meta = load_state_with_meta()
report_meta = normalize_report_meta(report_meta)
current_state = {}
new_alerts = []
solved_alerts = []
# Raccogli dati per la mappa
map_points_data = []
print(f"--- Check Multi-Modello {datetime.datetime.now()} ---")
for point in GRID_POINTS:
pid = point["id"]
# Variabili per aggregare i risultati dei modelli
max_risk_level = 0
triggered_models = []
alert_messages = []
# Ottieni i dati di entrambi i modelli PRIMA del ciclo
# (necessario per passare i dati ICON ad AROME quando analizza gelicidio)
all_models_data = {}
for model_name, model_slug in MODELS_TO_CHECK.items():
data = get_weather_data(point["lat"], point["lon"], model_slug)
if data is None:
if DEBUG_MODE:
print(f" ⚠️ {model_name}: Dati non disponibili")
else:
all_models_data[model_slug] = data
# CICLO SUI MODELLI (ICON, AROME) - Usa analisi temporale avanzata
for model_name, model_slug in MODELS_TO_CHECK.items():
data = all_models_data.get(model_slug)
if data is None:
continue
# Usa calculate_ice_risk_dataframe per analisi temporale pregressa e futura
if not PANDAS_AVAILABLE:
# Fallback alla vecchia logica se pandas non disponibile
icon_data = all_models_data.get("icon_eu") if model_slug == "meteofrance_seamless" else None
risk, msg = analyze_risk(data, model_slug, icon_weather_data=icon_data)
if DEBUG_MODE:
print(f" {model_name}: Rischio={risk}, Msg={msg[:50] if msg else 'Nessun rischio'}")
if risk > 0:
triggered_models.append(model_name)
alert_messages.append(msg)
if risk > max_risk_level:
max_risk_level = risk
else:
# Analisi temporale avanzata con pandas
try:
df = calculate_ice_risk_dataframe(data, model_slug, hours_ahead=24)
if df.empty:
continue
# Estrai rischio massimo dal DataFrame
max_risk_in_df = int(df['Risk_Score'].max()) if 'Risk_Score' in df.columns else 0
if DEBUG_MODE:
risk_counts = df['Risk_Score'].value_counts().to_dict() if 'Risk_Score' in df.columns else {}
print(f" {model_name}: Rischio max={max_risk_in_df}, Distribuzione={risk_counts}")
if max_risk_in_df > 0:
triggered_models.append(model_name)
# Crea messaggio descrittivo basato sul DataFrame
high_risk_rows = df[df['Risk_Score'] == max_risk_in_df]
if not high_risk_rows.empty:
first_high_risk = high_risk_rows.iloc[0]
phenomenon = first_high_risk.get('Ice_Phenomenon', 'Rischio ghiaccio')
level = first_high_risk.get('Ice_Warning_Level', 'Unknown')
msg = f"{level}: {phenomenon}"
alert_messages.append(msg)
if max_risk_in_df > max_risk_level:
max_risk_level = max_risk_in_df
except Exception as e:
if DEBUG_MODE:
print(f" ⚠️ {model_name}: Errore analisi avanzata: {e}")
# Fallback alla vecchia logica
icon_data = all_models_data.get("icon_eu") if model_slug == "meteofrance_seamless" else None
risk, msg = analyze_risk(data, model_slug, icon_weather_data=icon_data)
if risk > 0:
triggered_models.append(model_name)
alert_messages.append(msg)
if risk > max_risk_level:
max_risk_level = risk
# Salvataggio stato (prendiamo il rischio massimo rilevato tra i modelli)
current_state[pid] = max_risk_level
old_level = previous_state.get(pid, 0)
maps_link = generate_maps_link(point["lat"], point["lon"])
# Aggiungi punto ai dati per la mappa
map_points_data.append({
"name": point["name"],
"lat": point["lat"],
"lon": point["lon"],
"risk_level": max_risk_level
})
# --- LOGICA NOTIFICHE ---
if DEBUG_MODE:
print(f" Stato: old_level={old_level}, max_risk_level={max_risk_level}, triggered_models={triggered_models}")
# 1. Nessun cambiamento di LIVELLO - non inviare (anti-spam)
if max_risk_level == old_level:
if DEBUG_MODE:
print(f" ⏭️ Skip: rischio invariato ({max_risk_level})")
continue
# 2. Nuovo Rischio o Aggravamento (rischio aumenta)
if max_risk_level > old_level:
if DEBUG_MODE:
print(f" 📈 Nuovo/aggravamento: {old_level}{max_risk_level}")
# Creiamo una stringa che dice chi ha rilevato cosa
sources = " + ".join(triggered_models)
# Prendiamo il messaggio del rischio più alto (o il primo)
main_msg = alert_messages[0] if alert_messages else "Dati incerti"
# Aggiungi informazioni sulle 24h precedenti se disponibili
past_24h_details = []
for model_name, model_slug in MODELS_TO_CHECK.items():
data = all_models_data.get(model_slug)
if data:
_, _, _, past_24h_info = check_ice_persistence_conditions(data, model_slug, hours_check=12)
if past_24h_info:
# Informazioni rilevanti sulle 24h precedenti
if past_24h_info.get("precipitation_with_freeze", False):
precip_str = []
if past_24h_info.get("total_rain_mm", 0) > 0.1:
precip_str.append(f"Pioggia: {past_24h_info['total_rain_mm']:.1f}mm")
if past_24h_info.get("total_snowfall_cm", 0) > 0.1:
precip_str.append(f"Neve: {past_24h_info['total_snowfall_cm']:.1f}cm")
if precip_str:
past_24h_details.append(f"⬅️ {model_name}: {', '.join(precip_str)} con T<0°C")
if past_24h_info.get("ice_formation_likely", False):
past_24h_details.append(f"🧊 {model_name}: Formazione ghiaccio probabile nelle 24h precedenti")
final_msg = (f"📍 <b>{point['name']}</b> {maps_link}\n"
f"{main_msg}\n"
f"📡 <i>Rilevato da: {sources}</i>")
# Aggiungi informazioni 24h precedenti se disponibili
if past_24h_details:
final_msg += f"\n\n📊 <b>Analisi 24h precedenti:</b>\n"
for detail in past_24h_details:
final_msg += f"{detail}\n"
important = is_important_update(max_risk_level, old_level, final_msg)
append_report(new_alerts, final_msg, important, report_meta, DEBUG_MODE)
# 3. Rischio Cessato (Tutti i modelli danno verde)
# IMPORTANTE: Non inviare "allerta rientrata" se ci sono ancora condizioni che mantengono il ghiaccio
# (neve presente o temperature vicine allo zero)
elif max_risk_level == 0 and old_level > 0:
# Verifica se ci sono condizioni che mantengono il ghiaccio già formato
# Controlla tutti i modelli disponibili per avere una visione completa
ice_persists = False
persistence_details = []
all_past_24h_info = []
for model_name, model_slug in MODELS_TO_CHECK.items():
data = all_models_data.get(model_slug)
if data:
has_snow, has_cold, details, past_24h_info = check_ice_persistence_conditions(data, model_slug, hours_check=12)
if has_snow or has_cold:
ice_persists = True
if details:
persistence_details.append(f"{model_name}: {details}")
# Raccogli informazioni 24h precedenti
if past_24h_info:
all_past_24h_info.append((model_name, past_24h_info))
if ice_persists:
# Non inviare "allerta rientrata" perché il ghiaccio potrebbe ancora essere presente
# Ma aggiungi informazioni dettagliate sulla persistenza
if DEBUG_MODE:
print(f" ⏸️ Rischio cessato ma condizioni persistenti: {', '.join(persistence_details)}")
# Costruisci messaggio dettagliato con informazioni 24h precedenti
persist_msg = f"⏸️ <b>{point['name']}</b> {maps_link}: Rischio cessato ma persistenza ghiaccio possibile\n"
persist_msg += f"📊 <i>Condizioni persistenti: {', '.join(persistence_details) if persistence_details else 'ghiaccio residuo possibile'}</i>\n"
# Aggiungi dettagli 24h precedenti se disponibili
for model_name, past_info in all_past_24h_info:
if past_info.get("has_precipitation", False):
precip_details = []
if past_info.get("total_rain_mm", 0) > 0.1:
precip_details.append(f"Pioggia: {past_info['total_rain_mm']:.1f}mm")
if past_info.get("total_snowfall_cm", 0) > 0.1:
precip_details.append(f"Neve: {past_info['total_snowfall_cm']:.1f}cm")
if past_info.get("total_showers_mm", 0) > 0.1:
precip_details.append(f"Rovesci: {past_info['total_showers_mm']:.1f}mm")
if precip_details:
persist_msg += f"⬅️ {model_name} ultime 24h: {', '.join(precip_details)}\n"
if past_info.get("precipitation_with_freeze", False):
persist_msg += f"🧊 {model_name}: Precipitazioni con T<0°C nelle 24h precedenti\n"
if past_info.get("ice_melting_likely", False):
persist_msg += f"☀️ {model_name}: Scioglimento probabile (T salita sopra soglia)\n"
# Non aggiungere a solved_alerts - il ghiaccio potrebbe ancora essere presente
# Ma potremmo inviare un messaggio informativo se in debug mode
if DEBUG_MODE:
important = is_important_update(max_risk_level, old_level, persist_msg)
append_report(new_alerts, persist_msg, important, report_meta, DEBUG_MODE)
else:
# Condizioni completamente risolte: neve sciolta e temperature sopra lo zero
if DEBUG_MODE:
print(f" ✅ Rischio cessato: {old_level} → 0 (condizioni completamente risolte)")
# Limita i report di miglioramento a 3 al giorno (ore 7:00, 15:00, 23:00)
if not is_improvement_report_allowed():
if DEBUG_MODE:
current_hour = datetime.datetime.now().hour
print(f" ⏸️ Report miglioramento saltato: ora {current_hour} non è tra 7, 15, 23")
continue
# Verifica se c'è stato scioglimento nelle 24h precedenti
melting_info = []
for model_name, past_info in all_past_24h_info:
if past_info.get("ice_melting_likely", False):
melting_info.append(model_name)
solved_msg = f"✅ <b>{point['name']}</b> {maps_link}: Rischio rientrato"
if melting_info:
solved_msg += f" (Scioglimento confermato: {', '.join(melting_info)})"
else:
solved_msg += " (Tutti i modelli)"
important = is_important_update(max_risk_level, old_level, solved_msg)
append_report(solved_alerts, solved_msg, important, report_meta, DEBUG_MODE)
# 4. Rischio Diminuito (es. Da Ghiaccio a Brina, o da Brina a nessun rischio ma non ancora 0)
elif max_risk_level < old_level and max_risk_level > 0:
if DEBUG_MODE:
print(f" 📉 Rischio diminuito: {old_level}{max_risk_level}")
# Limita i report di miglioramento a 3 al giorno (ore 7:00, 15:00, 23:00)
if not is_improvement_report_allowed():
if DEBUG_MODE:
current_hour = datetime.datetime.now().hour
print(f" ⏸️ Report miglioramento saltato: ora {current_hour} non è tra 7, 15, 23")
continue
sources = " + ".join(triggered_models)
main_msg = alert_messages[0] if alert_messages else "Dati incerti"
# Aggiungi informazioni sulle 24h precedenti se disponibili
past_24h_details = []
for model_name, model_slug in MODELS_TO_CHECK.items():
data = all_models_data.get(model_slug)
if data:
_, _, _, past_24h_info = check_ice_persistence_conditions(data, model_slug, hours_check=12)
if past_24h_info:
# Informazioni rilevanti sulle 24h precedenti
if past_24h_info.get("ice_melting_likely", False):
past_24h_details.append(f"☀️ {model_name}: Scioglimento in corso (T salita sopra soglia)")
elif past_24h_info.get("precipitation_with_freeze", False):
past_24h_details.append(f"⚠️ {model_name}: Possibile ghiaccio residuo (precipitazioni con T<0°C nelle 24h precedenti)")
improvement_msg = f"📍 <b>{point['name']}</b> {maps_link} [MIGLIORAMENTO]\n{main_msg}\n📡 <i>Fonte: {sources}</i>"
# Aggiungi informazioni 24h precedenti se disponibili
if past_24h_details:
improvement_msg += f"\n\n📊 <b>Analisi 24h precedenti:</b>\n"
for detail in past_24h_details:
improvement_msg += f"{detail}\n"
important = is_important_update(max_risk_level, old_level, improvement_msg)
append_report(new_alerts, improvement_msg, important, report_meta, DEBUG_MODE)
# Genera e invia mappa solo quando ci sono aggiornamenti
if new_alerts or solved_alerts:
if DEBUG_MODE:
print(f"Generazione mappa per {len(map_points_data)} punti...")
map_path = os.path.join(SCRIPT_DIR, "ice_risk_map.png")
map_generated = generate_ice_risk_map(map_points_data, map_path)
if map_generated:
if DEBUG_MODE:
print(f"Mappa generata con successo: {map_path}")
now = datetime.datetime.now()
caption = (
f"🧊 <b>RISCHIO GHIACCIO STRADALE - San Marino</b>\n"
f"🕒 {now.strftime('%d/%m/%Y %H:%M')}\n"
f"📊 Punti monitorati: {len(map_points_data)}"
)
photo_sent = send_telegram_photo(token, map_path, caption, debug_mode=DEBUG_MODE)
if DEBUG_MODE:
print(f"Mappa inviata via Telegram: {photo_sent}")
# Pulisci file temporaneo solo se non in debug mode (per permettere verifica)
if not DEBUG_MODE:
try:
if os.path.exists(map_path):
os.remove(map_path)
except Exception:
pass
elif DEBUG_MODE:
print(f"File mappa mantenuto per debug: {map_path}")
print("Mappa inviata.")
else:
if DEBUG_MODE:
print("Errore nella generazione della mappa.")
else:
if DEBUG_MODE:
print("Nessuna variazione - mappa non inviata.")
else:
print("Nessuna variazione.")
if not DEBUG_MODE:
save_current_state(current_state, report_meta=report_meta)
if __name__ == "__main__":
main()