#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Road Weather Analysis - Analisi completa dei rischi meteo lungo un percorso stradale. Analizza: ghiaccio, neve, pioggia, rovesci, pioggia intensa, nebbia, grandine, temporali. """ import argparse import datetime import json import logging import os import requests import time from logging.handlers import RotatingFileHandler from typing import Dict, List, Tuple, Optional from open_meteo_client import open_meteo_get # Setup logging SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) LOG_FILE = os.path.join(SCRIPT_DIR, "road_weather.log") def setup_logger() -> logging.Logger: logger = logging.getLogger("road_weather") logger.setLevel(logging.INFO) logger.handlers.clear() fh = RotatingFileHandler(LOG_FILE, maxBytes=1_000_000, backupCount=5, encoding="utf-8") fh.setLevel(logging.DEBUG) fmt = logging.Formatter("%(asctime)s %(levelname)s %(message)s") fh.setFormatter(fmt) logger.addHandler(fh) return logger LOGGER = setup_logger() # Import opzionale di pandas e numpy per analisi avanzata try: import pandas as pd import numpy as np PANDAS_AVAILABLE = True except ImportError: PANDAS_AVAILABLE = False pd = None np = None # ============================================================================= # CONFIGURAZIONE # ============================================================================= # Modelli meteo disponibili MODELS = { "ICON Italia": "italia_meteo_arpae_icon_2i", "ICON EU": "icon_eu", "AROME Seamless": "meteofrance_seamless" } # Soglie di rischio THRESHOLDS = { # Ghiaccio/Neve "ice_temp_air": 2.0, # °C - temperatura aria per rischio ghiaccio "ice_temp_soil": 4.0, # °C - temperatura suolo per rischio ghiaccio "snowfall_cm_h": 0.5, # cm/h - neve significativa # Pioggia "rain_light_mm_h": 2.5, # mm/h - pioggia leggera "rain_moderate_mm_h": 7.5, # mm/h - pioggia moderata "rain_heavy_mm_h": 15.0, # mm/h - pioggia intensa "rain_very_heavy_mm_h": 30.0, # mm/h - pioggia molto intensa # Vento "wind_strong_kmh": 50.0, # km/h - vento forte "wind_very_strong_kmh": 70.0, # km/h - vento molto forte # Nebbia "fog_visibility_m": 200.0, # m - visibilità per nebbia # Temporali "cape_lightning": 800.0, # J/kg - CAPE per rischio fulminazioni "cape_severe": 1500.0, # J/kg - CAPE per temporali severi "wind_gust_downburst": 60.0, # km/h - raffiche per downburst } # Weather codes WMO WEATHER_CODES = { # Pioggia 61: "Pioggia leggera", 63: "Pioggia moderata", 65: "Pioggia forte", 66: "Pioggia gelata leggera", 67: "Pioggia gelata forte", 80: "Rovesci leggeri", 81: "Rovesci moderati", 82: "Rovesci violenti", # Neve 71: "Nevischio leggero", 73: "Nevischio moderato", 75: "Nevischio forte", 77: "Granelli di neve", 85: "Rovesci di neve leggeri", 86: "Rovesci di neve forti", # Grandine 89: "Grandine", 90: "Grandine con temporale", # Temporali 95: "Temporale", 96: "Temporale con grandine", 99: "Temporale violento con grandine", # Nebbia 45: "Nebbia", 48: "Nebbia con brina", } # ============================================================================= # UTILITY FUNCTIONS # ============================================================================= def get_google_maps_api_key() -> Optional[str]: """Ottiene la chiave API di Google Maps da variabile d'ambiente.""" api_key = os.environ.get('GOOGLE_MAPS_API_KEY', '').strip() if api_key: return api_key api_key = os.environ.get('GOOGLE_API_KEY', '').strip() if api_key: return api_key # Debug: verifica tutte le variabili d'ambiente che contengono GOOGLE if os.environ.get('DEBUG_GOOGLE_MAPS', ''): google_vars = {k: v[:10] + '...' if len(v) > 10 else v for k, v in os.environ.items() if 'GOOGLE' in k.upper()} LOGGER.debug(f"Variabili GOOGLE trovate: {google_vars}") return None def decode_polyline(polyline_str: str) -> List[Tuple[float, float]]: """Decodifica un polyline codificato di Google Maps (algoritmo standard).""" if not polyline_str: LOGGER.warning("Polyline string vuota") return [] def _decode_value(index: int) -> Tuple[int, int]: """Decodifica un valore dal polyline e ritorna (valore, nuovo_indice).""" result = 0 shift = 0 b = 0x20 while b >= 0x20 and index < len(polyline_str): b = ord(polyline_str[index]) - 63 result |= (b & 0x1f) << shift shift += 5 index += 1 if result & 1: result = ~result return (result >> 1, index) points = [] index = 0 lat = 0 lon = 0 try: while index < len(polyline_str): # Decodifica latitudine lat_delta, index = _decode_value(index) lat += lat_delta # Decodifica longitudine (se disponibile) if index >= len(polyline_str): # Se abbiamo solo la latitudine, aggiungiamo il punto comunque # (potrebbe essere l'ultimo punto del percorso) LOGGER.debug(f"Fine stringa dopo latitudine, aggiungo punto con lon precedente") points.append((lat / 1e5, lon / 1e5)) break lon_delta, index = _decode_value(index) lon += lon_delta points.append((lat / 1e5, lon / 1e5)) LOGGER.info(f"Polyline decodificato: {len(points)} punti estratti") if len(points) > 0: LOGGER.debug(f"Primo punto: {points[0]}, Ultimo punto: {points[-1]}") else: LOGGER.warning("Nessun punto estratto dal polyline") return points except Exception as e: LOGGER.error(f"Errore durante decodifica polyline: {e}", exc_info=True) return [] def calculate_route_points(lat1: float, lon1: float, lat2: float, lon2: float, num_points: int = 8) -> List[Tuple[float, float]]: """Calcola punti lungo percorso stradale reale usando Google Maps.""" api_key = get_google_maps_api_key() # Debug: verifica se la chiave è stata trovata if not api_key: # Prova a verificare tutte le variabili d'ambiente all_env_vars = {k: '***' for k in os.environ.keys() if 'GOOGLE' in k.upper() or 'MAPS' in k.upper()} if all_env_vars: LOGGER.warning(f"Variabili GOOGLE trovate ma non riconosciute: {list(all_env_vars.keys())}") else: LOGGER.warning("Nessuna variabile GOOGLE_MAPS_API_KEY o GOOGLE_API_KEY trovata") if api_key: LOGGER.info(f"Google Maps API Key trovata (lunghezza: {len(api_key)} caratteri)") try: # Prova prima con Routes API (nuova) - POST request url = f"https://routes.googleapis.com/directions/v2:computeRoutes" headers = { 'Content-Type': 'application/json', 'X-Goog-Api-Key': api_key, 'X-Goog-FieldMask': 'routes.distanceMeters,routes.duration,routes.polyline.encodedPolyline' } payload = { "origin": { "location": { "latLng": { "latitude": lat1, "longitude": lon1 } } }, "destination": { "location": { "latLng": { "latitude": lat2, "longitude": lon2 } } }, "travelMode": "DRIVE", "routingPreference": "TRAFFIC_AWARE", "computeAlternativeRoutes": False, "polylineEncoding": "ENCODED_POLYLINE" } LOGGER.info(f"Chiamata Google Maps Routes API: origin=({lat1},{lon1}), dest=({lat2},{lon2})") try: response = requests.post(url, headers=headers, json=payload, timeout=10) LOGGER.info(f"Google Maps Routes API HTTP status: {response.status_code}") except requests.exceptions.RequestException as e: LOGGER.error(f"Errore richiesta HTTP Routes API: {e}", exc_info=True) raise if response.status_code == 200: try: data = response.json() LOGGER.debug(f"Google Maps Routes API response keys: {list(data.keys())}") except json.JSONDecodeError as e: LOGGER.error(f"Errore parsing JSON risposta Routes API: {e}") LOGGER.error(f"Response text: {response.text[:500]}") raise if 'routes' in data and len(data['routes']) > 0: route = data['routes'][0] # Routes API usa 'polyline' invece di 'overview_polyline' polyline_data = route.get('polyline', {}) encoded_polyline = polyline_data.get('encodedPolyline', '') LOGGER.info(f"Polyline presente: {bool(encoded_polyline)}, lunghezza: {len(encoded_polyline) if encoded_polyline else 0}") if encoded_polyline: route_points = decode_polyline(encoded_polyline) if route_points: LOGGER.info(f"✅ Google Maps Routes API: percorso trovato con {len(route_points)} punti") if len(route_points) > 20: sampled_points = [route_points[0]] step = len(route_points) // (num_points + 1) for i in range(1, len(route_points) - 1, max(1, step)): sampled_points.append(route_points[i]) sampled_points.append(route_points[-1]) LOGGER.info(f"✅ Percorso campionato a {len(sampled_points)} punti per analisi") return sampled_points else: return route_points else: LOGGER.warning("Polyline decodificato ma risultato vuoto") else: LOGGER.warning("Polyline non presente nella risposta Routes API") LOGGER.warning(f"Route keys: {list(route.keys())}") LOGGER.warning(f"Route data: {json.dumps(route, indent=2)[:1000]}") else: LOGGER.warning("Nessuna route nella risposta Routes API") LOGGER.warning(f"Response keys: {list(data.keys())}") LOGGER.warning(f"Response data: {json.dumps(data, indent=2)[:1000]}") else: LOGGER.error(f"Google Maps Routes API HTTP error: {response.status_code}") try: error_data = response.json() LOGGER.error(f"Error details: {json.dumps(error_data, indent=2)[:1000]}") except: LOGGER.error(f"Response text: {response.text[:500]}") # Fallback: prova con Directions API (legacy) se Routes API fallisce LOGGER.info("Tentativo fallback a Directions API (legacy)...") url_legacy = "https://maps.googleapis.com/maps/api/directions/json" params_legacy = { 'origin': f"{lat1},{lon1}", 'destination': f"{lat2},{lon2}", 'key': api_key, 'mode': 'driving', 'alternatives': False } response_legacy = requests.get(url_legacy, params=params_legacy, timeout=10) if response_legacy.status_code == 200: data_legacy = response_legacy.json() status = data_legacy.get('status', 'UNKNOWN') if status == 'OK' and data_legacy.get('routes'): route_legacy = data_legacy['routes'][0] overview_polyline = route_legacy.get('overview_polyline', {}) encoded_polyline = overview_polyline.get('points', '') if encoded_polyline: route_points = decode_polyline(encoded_polyline) if route_points: LOGGER.info(f"✅ Google Maps Directions API (legacy): percorso trovato con {len(route_points)} punti") if len(route_points) > 20: sampled_points = [route_points[0]] step = len(route_points) // (num_points + 1) for i in range(1, len(route_points) - 1, max(1, step)): sampled_points.append(route_points[i]) sampled_points.append(route_points[-1]) return sampled_points else: return route_points else: error_message = data_legacy.get('error_message', 'Nessun messaggio') LOGGER.error(f"Directions API (legacy) errore: {status} - {error_message}") except requests.exceptions.RequestException as e: LOGGER.error(f"Errore richiesta Google Maps Routes API: {e}", exc_info=True) except Exception as e: LOGGER.error(f"Errore Google Maps Routes API: {e}", exc_info=True) else: LOGGER.warning("Google Maps API Key non trovata - uso fallback linea d'aria") # Fallback: linea d'aria LOGGER.info("Uso fallback: percorso in linea d'aria (non segue strade reali)") # Fallback: linea d'aria points = [] for i in range(num_points + 1): ratio = i / num_points if num_points > 0 else 0 lat = lat1 + (lat2 - lat1) * ratio lon = lon1 + (lon2 - lon1) * ratio points.append((lat, lon)) return points def get_coordinates_from_city(city_name: str) -> Optional[Tuple[float, float, str]]: """Ottiene coordinate da nome città usando Open-Meteo Geocoding API.""" # Gestione caso speciale "Casa" if not city_name or city_name.lower() == "casa": # Coordinate fisse per Casa (San Marino) return (43.9356, 12.4296, "Casa") url = "https://geocoding-api.open-meteo.com/v1/search" params = {"name": city_name, "count": 1, "language": "it"} try: resp = open_meteo_get(url, params=params, timeout=(5, 10)) if resp.status_code == 200: data = resp.json() if data.get("results"): result = data["results"][0] return (result["latitude"], result["longitude"], result.get("name", city_name)) except Exception as e: LOGGER.warning(f"Errore geocoding per {city_name}: {e}") return None def get_location_name_from_coords(lat: float, lon: float) -> Optional[str]: """Ottiene nome località da coordinate usando Nominatim.""" url = "https://nominatim.openstreetmap.org/reverse" try: params = { "lat": lat, "lon": lon, "format": "json", "accept-language": "it", "zoom": 10, "addressdetails": 1 } headers = {"User-Agent": "Telegram-Bot-Road-Weather/1.0"} resp = requests.get(url, params=params, headers=headers, timeout=5) if resp.status_code == 200: data = resp.json() address = data.get("address", {}) location_name = ( address.get("city") or address.get("town") or address.get("village") or address.get("municipality") or address.get("county") or address.get("state") ) if location_name: state = address.get("state") if state and state != location_name: return f"{location_name} ({state})" return location_name except Exception as e: LOGGER.warning(f"Errore reverse geocoding: {e}") return None def get_best_model_for_location(lat: float, lon: float) -> str: """Determina il miglior modello disponibile per una località.""" if 36.0 <= lat <= 48.0 and 6.0 <= lon <= 19.0: test_data = get_weather_data(lat, lon, "italia_meteo_arpae_icon_2i") if test_data: return "italia_meteo_arpae_icon_2i" if 35.0 <= lat <= 72.0 and -12.0 <= lon <= 35.0: test_data = get_weather_data(lat, lon, "icon_eu") if test_data: return "icon_eu" if 41.0 <= lat <= 52.0 and -5.0 <= lon <= 10.0: test_data = get_weather_data(lat, lon, "meteofrance_seamless") if test_data: return "meteofrance_seamless" return "icon_eu" def get_weather_data(lat: float, lon: float, model_slug: str) -> Optional[Dict]: """Ottiene dati meteo da Open-Meteo.""" url = f"https://api.open-meteo.com/v1/forecast" # Parametri base (aggiunto soil_temperature_0cm per analisi ghiaccio più accurata) hourly_params = "temperature_2m,relative_humidity_2m,precipitation,rain,showers,snowfall,weathercode,visibility,wind_speed_10m,wind_gusts_10m,soil_temperature_0cm,dew_point_2m" # Aggiungi CAPE se disponibile (AROME Seamless o ICON) if model_slug in ["meteofrance_seamless", "italia_meteo_arpae_icon_2i", "icon_eu"]: hourly_params += ",cape" params = { "latitude": lat, "longitude": lon, "models": model_slug, "hourly": hourly_params, "forecast_days": 2, "past_days": 1, # Include 24h precedenti per analisi trend "timezone": "auto" } try: resp = open_meteo_get(url, params=params, timeout=(5, 10)) if resp.status_code == 200: data = resp.json() # Verifica che snowfall sia presente nei dati if data.get("hourly", {}).get("snowfall") is None: LOGGER.warning(f"Modello {model_slug} non fornisce dati snowfall per ({lat}, {lon})") return data except Exception as e: LOGGER.error(f"Errore fetch dati meteo: {e}") return None # ============================================================================= # ANALISI 24H PRECEDENTI # ============================================================================= def analyze_past_24h_conditions(weather_data: Dict) -> Dict: """ Analizza le condizioni delle 24 ore precedenti per valutare trend e persistenza ghiaccio. Returns: Dict con: - has_precipitation: bool - total_rain_mm: float - total_snowfall_cm: float - min_temp_2m: float - hours_below_zero: int - ice_persistence_likely: bool (ghiaccio persistente se T<2°C e/o neve presente) - snow_present: bool """ if not weather_data or "hourly" not in weather_data: return {} hourly = weather_data["hourly"] times = hourly.get("time", []) if not times: return {} now = datetime.datetime.now(datetime.timezone.utc) past_24h_start = now - datetime.timedelta(hours=24) # Converti times in datetime timestamps = [] for ts_str in times: try: if 'Z' in ts_str: ts = datetime.datetime.fromisoformat(ts_str.replace('Z', '+00:00')) else: ts = datetime.datetime.fromisoformat(ts_str) if ts.tzinfo is None: ts = ts.replace(tzinfo=datetime.timezone.utc) timestamps.append(ts) except: continue temp_2m = hourly.get("temperature_2m", []) soil_temp = hourly.get("soil_temperature_0cm", []) precipitation = hourly.get("precipitation", []) rain = hourly.get("rain", []) snowfall = hourly.get("snowfall", []) weathercode = hourly.get("weathercode", []) total_rain = 0.0 total_snowfall = 0.0 min_temp_2m = None min_soil_temp = None hours_below_zero = 0 hours_below_2c = 0 hours_below_zero_soil = 0 snow_present = False for i, ts in enumerate(timestamps): # Solo 24h precedenti if ts < past_24h_start or ts >= now: continue t_2m = temp_2m[i] if i < len(temp_2m) and temp_2m[i] is not None else None t_soil = soil_temp[i] if i < len(soil_temp) and soil_temp[i] is not None else None r = rain[i] if i < len(rain) and rain[i] is not None else 0.0 snow = snowfall[i] if i < len(snowfall) and snowfall[i] is not None else 0.0 code = weathercode[i] if i < len(weathercode) and weathercode[i] is not None else None if t_2m is not None: if min_temp_2m is None or t_2m < min_temp_2m: min_temp_2m = t_2m if t_2m < 0: hours_below_zero += 1 if t_2m < 2.0: hours_below_2c += 1 if t_soil is not None: if min_soil_temp is None or t_soil < min_soil_temp: min_soil_temp = t_soil if t_soil < 0: hours_below_zero_soil += 1 total_rain += r total_snowfall += snow # Neve presente se snowfall > 0 o weathercode indica neve (71, 73, 75, 77, 85, 86) if snow > 0.1 or (code is not None and code in [71, 73, 75, 77, 85, 86]): snow_present = True # Ghiaccio persistente se: neve presente OPPURE (suolo gelato OPPURE T<2°C per molte ore E precipitazioni recenti) ice_persistence_likely = snow_present or (min_soil_temp is not None and min_soil_temp <= 0) or (hours_below_2c >= 6 and total_rain > 0) # Analizza precipitazioni ultime 12 ore (più rilevanti per condizioni attuali) now_12h = now - datetime.timedelta(hours=12) total_rain_12h = 0.0 total_snowfall_12h = 0.0 max_precip_intensity_12h = 0.0 for i, ts in enumerate(timestamps): if ts < now_12h or ts >= now: continue r = rain[i] if i < len(rain) and rain[i] is not None else 0.0 snow = snowfall[i] if i < len(snowfall) and snowfall[i] is not None else 0.0 prec = precipitation[i] if i < len(precipitation) and precipitation[i] is not None else 0.0 total_rain_12h += r total_snowfall_12h += snow if prec > max_precip_intensity_12h: max_precip_intensity_12h = prec # Calcola intensità media (mm/h) nelle ultime 12h avg_precip_intensity_12h = (total_rain_12h + total_snowfall_12h * 10) / 12.0 if total_rain_12h > 0 or total_snowfall_12h > 0 else 0.0 # Analizza temperature attuali e previste (prossime 6h) current_temp = None next_6h_temps = [] next_6h_snow = [] for i, ts in enumerate(timestamps): if ts < now: continue if ts >= now + datetime.timedelta(hours=6): break t_2m = temp_2m[i] if i < len(temp_2m) and temp_2m[i] is not None else None snow = snowfall[i] if i < len(snowfall) and snowfall[i] is not None else 0.0 if current_temp is None and t_2m is not None: current_temp = t_2m if t_2m is not None: next_6h_temps.append(t_2m) if snow > 0: next_6h_snow.append(snow) # Calcola min/max temperature prossime 6h min_temp_next_6h = min(next_6h_temps) if next_6h_temps else None max_temp_next_6h = max(next_6h_temps) if next_6h_temps else None avg_temp_next_6h = sum(next_6h_temps) / len(next_6h_temps) if next_6h_temps else None return { 'has_precipitation': total_rain > 0 or total_snowfall > 0, 'total_rain_mm': total_rain, 'total_snowfall_cm': total_snowfall, 'total_rain_12h_mm': total_rain_12h, 'total_snowfall_12h_cm': total_snowfall_12h, 'avg_precip_intensity_12h_mmh': avg_precip_intensity_12h, 'max_precip_intensity_12h_mmh': max_precip_intensity_12h, 'min_temp_2m': min_temp_2m, 'min_soil_temp': min_soil_temp, 'current_temp_2m': current_temp, 'min_temp_next_6h': min_temp_next_6h, 'max_temp_next_6h': max_temp_next_6h, 'avg_temp_next_6h': avg_temp_next_6h, 'hours_below_zero': hours_below_zero, 'hours_below_2c': hours_below_2c, 'hours_below_zero_soil': hours_below_zero_soil, 'ice_persistence_likely': ice_persistence_likely, 'snow_present': snow_present, 'snow_next_6h_cm': sum(next_6h_snow) if next_6h_snow else 0.0 } # ============================================================================= # ANALISI RISCHI METEO # ============================================================================= def evaluate_ice_risk_temporal(weather_data: Dict, hour_idx: int, past_24h_info: Dict) -> Tuple[int, str]: """ Valuta il rischio ghiaccio basandosi sull'evoluzione temporale delle temperature e precipitazioni. Algoritmo: - Temperatura scesa almeno a 0°C nelle 24h precedenti - Precipitazioni (pioggia/temporali) presenti con temperature sotto zero - Nessuna risalita significativa sopra 3°C nelle ore precedenti che indicherebbe scioglimento Returns: (risk_level: int, description: str) risk_level: 0=nessuno, 1=brina, 2=ghiaccio, 3=gelicidio """ if not past_24h_info: return 0, "" # Estrai dati 24h precedenti min_temp_24h = past_24h_info.get('min_temp_2m') hours_below_zero = past_24h_info.get('hours_below_zero', 0) hours_below_2c = past_24h_info.get('hours_below_2c', 0) total_rain_24h = past_24h_info.get('total_rain_mm', 0) total_rain_12h = past_24h_info.get('total_rain_12h_mm', 0) avg_temp_next_6h = past_24h_info.get('avg_temp_next_6h') current_temp = past_24h_info.get('current_temp_2m') # Estrai dati ora corrente hourly = weather_data.get("hourly", {}) times = hourly.get("time", []) temps = hourly.get("temperature_2m", []) soil_temps = hourly.get("soil_temperature_0cm", []) rain = hourly.get("rain", []) showers = hourly.get("showers", []) weathercode = hourly.get("weathercode", []) if hour_idx >= len(times) or hour_idx >= len(temps): return 0, "" temp_current = temps[hour_idx] if hour_idx < len(temps) and temps[hour_idx] is not None else None soil_temp_current = soil_temps[hour_idx] if hour_idx < len(soil_temps) and soil_temps[hour_idx] is not None else None rain_current = rain[hour_idx] if hour_idx < len(rain) and rain[hour_idx] is not None else 0.0 showers_current = showers[hour_idx] if hour_idx < len(showers) and showers[hour_idx] is not None else 0.0 code_current = weathercode[hour_idx] if hour_idx < len(weathercode) and weathercode[hour_idx] is not None else None # Usa temperatura suolo se disponibile (più accurata per gelicidio/ghiaccio), altrimenti temperatura aria temp_for_ice = soil_temp_current if soil_temp_current is not None else temp_current # Verifica se c'è precipitazione in atto o prevista has_precipitation = (rain_current > 0.1) or (showers_current > 0.1) is_rain_code = code_current is not None and code_current in [61, 63, 65, 66, 67, 80, 81, 82] # Condizione 1: Temperatura scesa almeno a 0°C nelle 24h precedenti if min_temp_24h is None or min_temp_24h > 0: return 0, "" # Condizione 2: Precipitazioni presenti (nelle 24h precedenti o attuali) con temperature sotto zero has_precip_with_freeze = False if has_precipitation and temp_current is not None and temp_current <= 0: has_precip_with_freeze = True elif total_rain_24h > 0.5 and min_temp_24h <= 0: has_precip_with_freeze = True elif is_rain_code and temp_current is not None and temp_current <= 0: has_precip_with_freeze = True # Condizione 3: Verifica risalite significative (scioglimento) # Se la temperatura media nelle prossime 6h è > 3°C, probabilmente il ghiaccio si scioglie is_melting = False if avg_temp_next_6h is not None and avg_temp_next_6h > 3.0: is_melting = True if current_temp is not None and current_temp > 3.0: is_melting = True # Se sta sciogliendo, riduci il rischio if is_melting: return 0, "" # Valuta livello di rischio basato su condizioni # GELICIDIO (3): Precipitazione (pioggia/temporali) in atto/futura con T<0°C (suolo o aria) # Il gelicidio si forma quando la pioggia cade su una superficie gelata e congela immediatamente # Usa temperatura suolo se disponibile (più accurata), altrimenti temperatura aria temp_threshold = temp_for_ice if temp_for_ice is not None else temp_current if has_precipitation and temp_threshold is not None and temp_threshold <= 0: precip_type = "" precip_amount = 0.0 if is_rain_code: precip_type = "pioggia" precip_amount = rain_current + showers_current elif rain_current > 0.1: precip_type = "pioggia" precip_amount = rain_current elif showers_current > 0.1: precip_type = "rovesci/temporali" precip_amount = showers_current if precip_type: temp_display = temp_for_ice if temp_for_ice is not None else temp_current temp_label = "T_suolo" if temp_for_ice is not None else "T_aria" return 3, f"🔴🔴 Gelicidio previsto ({temp_label}: {temp_display:.1f}°C, {precip_type}: {precip_amount:.1f}mm/h)" # GHIACCIO (2): Temperature sotto zero per molte ore con precipitazioni recenti O persistenza ghiaccio # Black ice o ghiaccio persistente da precipitazioni precedenti if hours_below_zero >= 6 and (total_rain_12h > 0.5 or has_precipitation): return 2, f"🔴 Ghiaccio persistente (Tmin: {min_temp_24h:.1f}°C, {hours_below_zero}h <0°C)" elif hours_below_2c >= 6 and total_rain_24h > 0.5: # C'è stata pioggia con temperature basse, possibile black ice return 2, f"🔴 Ghiaccio possibile (Tmin: {min_temp_24h:.1f}°C, {hours_below_2c}h <2°C, pioggia: {total_rain_24h:.1f}mm)" elif temp_threshold is not None and temp_threshold < 0 and total_rain_24h > 0.5: # Temperatura attuale sotto zero e c'è stata pioggia nelle 24h, possibile black ice temp_display = temp_threshold temp_label = "T_suolo" if temp_for_ice is not None else "T_aria" return 2, f"🔴 Ghiaccio possibile ({temp_label}: {temp_display:.1f}°C, pioggia recente: {total_rain_24h:.1f}mm)" # BRINA (1): Temperature basse ma condizioni meno severe # Suolo gelato o temperature vicine allo zero senza precipitazioni significative if min_temp_24h <= 0 and hours_below_2c >= 3: return 1, f"🟡 Brina possibile (Tmin: {min_temp_24h:.1f}°C, {hours_below_2c}h <2°C)" elif temp_threshold is not None and temp_threshold <= 1.0 and temp_threshold >= -2.0 and total_rain_24h < 0.5: # Temperature vicine allo zero senza precipitazioni significative = brina temp_display = temp_threshold temp_label = "T_suolo" if temp_for_ice is not None else "T_aria" return 1, f"🟡 Brina possibile ({temp_label}: {temp_display:.1f}°C)" return 0, "" def analyze_weather_risks(weather_data: Dict, model_slug: str, hours_ahead: int = 24, past_24h_info: Optional[Dict] = None) -> List[Dict]: """ Analizza tutti i rischi meteo per le prossime ore. Returns: Lista di dict con rischi per ogni ora: { 'timestamp': str, 'risks': List[Dict], # Lista rischi con tipo, livello, descrizione 'max_risk_level': int # 0-4 (0=nessuno, 1=basso, 2=medio, 3=alto, 4=molto alto) } """ if not weather_data or not weather_data.get("hourly"): return [] hourly = weather_data["hourly"] times = hourly.get("time", []) temps = hourly.get("temperature_2m", []) precip = hourly.get("precipitation", []) rain = hourly.get("rain", []) showers = hourly.get("showers", []) snowfall = hourly.get("snowfall", []) weathercode = hourly.get("weathercode", []) visibility = hourly.get("visibility", []) wind_speed = hourly.get("wind_speed_10m", []) wind_gusts = hourly.get("wind_gusts_10m", []) # Prova a ottenere CAPE se disponibile (AROME o ICON) cape = hourly.get("cape", []) results = [] # Usa timezone-aware datetime per il confronto now = datetime.datetime.now(datetime.timezone.utc) # Analizza condizioni 24h precedenti se non fornite if past_24h_info is None: past_24h_info = analyze_past_24h_conditions(weather_data) for i in range(min(hours_ahead, len(times))): if i >= len(times): break try: timestamp_str = times[i] # Assicurati che il timestamp sia timezone-aware try: if 'Z' in timestamp_str: timestamp = datetime.datetime.fromisoformat(timestamp_str.replace('Z', '+00:00')) elif '+' in timestamp_str or timestamp_str.count('-') > 2: # Formato con timezone offset timestamp = datetime.datetime.fromisoformat(timestamp_str) else: # Timezone-naive, aggiungi UTC timestamp = datetime.datetime.fromisoformat(timestamp_str) if timestamp.tzinfo is None: timestamp = timestamp.replace(tzinfo=datetime.timezone.utc) except (ValueError, AttributeError): # Fallback: prova parsing semplice e aggiungi UTC timestamp = datetime.datetime.fromisoformat(timestamp_str) if timestamp.tzinfo is None: timestamp = timestamp.replace(tzinfo=datetime.timezone.utc) # Assicurati che entrambi siano timezone-aware per il confronto if timestamp.tzinfo is None: timestamp = timestamp.replace(tzinfo=datetime.timezone.utc) # Salta ore passate if timestamp < now: continue risks = [] max_risk_level = 0 # 1. NEVE (controlla prima la neve, è più importante) temp = temps[i] if i < len(temps) and temps[i] is not None else None snow = snowfall[i] if i < len(snowfall) and snowfall[i] is not None else 0.0 code = weathercode[i] if i < len(weathercode) and weathercode[i] is not None else None # Codici WMO per neve: 71, 73, 75, 77, 85, 86 is_snow_weathercode = code in [71, 73, 75, 77, 85, 86] if code is not None else False # Debug logging per neve if snow > 0 or is_snow_weathercode: LOGGER.debug(f"Neve rilevata: snowfall={snow:.2f} cm/h, weathercode={code}, is_snow_code={is_snow_weathercode}") if snow > THRESHOLDS["snowfall_cm_h"] or is_snow_weathercode: # C'è neve prevista o in atto - Livello 4 (azzurro/blu) snow_level = 4 snow_desc = f"Neve: {snow:.1f} cm/h" if snow > 0 else f"Neve prevista (codice: {code})" risks.append({ "type": "neve", "level": snow_level, "description": snow_desc, "value": snow }) max_risk_level = max(max_risk_level, snow_level) LOGGER.info(f"Rischio neve aggiunto: {snow_desc}, livello {snow_level}") elif temp is not None and temp < THRESHOLDS["ice_temp_air"]: # Valuta rischio ghiaccio usando analisi temporale evolutiva ice_level, ice_desc = evaluate_ice_risk_temporal(weather_data, i, past_24h_info) if ice_level > 0: # Determina tipo di rischio in base al livello e descrizione risk_type = "ghiaccio" # Default if ice_level == 3 and ("gelicidio" in ice_desc.lower() or "fzra" in ice_desc.lower()): risk_type = "gelicidio" elif ice_level == 1 or "brina" in ice_desc.lower(): risk_type = "brina" elif ice_level == 2: risk_type = "ghiaccio" # Rischio rilevato tramite analisi temporale risks.append({ "type": risk_type, "level": ice_level, "description": ice_desc, "value": temp }) max_risk_level = max(max_risk_level, ice_level) elif temp < 2.0: # Fallback: rischio brina basato solo su temperatura attuale risks.append({ "type": "brina", "level": 1, "description": f"🟡 Brina possibile (T: {temp:.1f}°C)", "value": temp }) max_risk_level = max(max_risk_level, 1) # 2. PIOGGIA rain_val = rain[i] if i < len(rain) and rain[i] is not None else 0.0 precip_val = precip[i] if i < len(precip) and precip[i] is not None else 0.0 code = weathercode[i] if i < len(weathercode) and weathercode[i] is not None else None if rain_val >= THRESHOLDS["rain_very_heavy_mm_h"]: risks.append({ "type": "pioggia_intensa", "level": 4, "description": f"Pioggia molto intensa: {rain_val:.1f} mm/h", "value": rain_val }) max_risk_level = max(max_risk_level, 4) elif rain_val >= THRESHOLDS["rain_heavy_mm_h"]: risks.append({ "type": "pioggia_forte", "level": 3, "description": f"Pioggia forte: {rain_val:.1f} mm/h", "value": rain_val }) max_risk_level = max(max_risk_level, 3) elif rain_val >= THRESHOLDS["rain_moderate_mm_h"]: risks.append({ "type": "pioggia_moderata", "level": 2, "description": f"Pioggia moderata: {rain_val:.1f} mm/h", "value": rain_val }) max_risk_level = max(max_risk_level, 2) elif rain_val >= THRESHOLDS["rain_light_mm_h"]: risks.append({ "type": "pioggia_leggera", "level": 1, "description": f"Pioggia leggera: {rain_val:.1f} mm/h", "value": rain_val }) max_risk_level = max(max_risk_level, 1) # 3. ROVESCI showers_val = showers[i] if i < len(showers) and showers[i] is not None else 0.0 if showers_val > 0: if code in [82, 89, 90, 96, 99]: # Rovesci violenti o con grandine risks.append({ "type": "rovesci_violenti", "level": 4, "description": f"Rovesci violenti: {showers_val:.1f} mm/h", "value": showers_val }) max_risk_level = max(max_risk_level, 4) elif showers_val >= THRESHOLDS["rain_heavy_mm_h"]: risks.append({ "type": "rovesci_forti", "level": 3, "description": f"Rovesci forti: {showers_val:.1f} mm/h", "value": showers_val }) max_risk_level = max(max_risk_level, 3) else: risks.append({ "type": "rovesci", "level": 1, "description": f"Rovesci: {showers_val:.1f} mm/h", "value": showers_val }) max_risk_level = max(max_risk_level, 1) # 4. GRANDINE if code in [89, 90, 96, 99]: risks.append({ "type": "grandine", "level": 4, "description": "Grandine", "value": 1.0 }) max_risk_level = max(max_risk_level, 4) # 5. TEMPORALI if code in [95, 96, 99]: cape_val = cape[i] if i < len(cape) and cape[i] is not None else 0.0 if cape_val >= THRESHOLDS["cape_severe"]: risks.append({ "type": "temporale_severo", "level": 4, "description": f"Temporale severo (CAPE: {cape_val:.0f} J/kg)", "value": cape_val }) max_risk_level = max(max_risk_level, 4) elif cape_val >= THRESHOLDS["cape_lightning"]: risks.append({ "type": "temporale", "level": 3, "description": f"Temporale (CAPE: {cape_val:.0f} J/kg)", "value": cape_val }) max_risk_level = max(max_risk_level, 3) else: risks.append({ "type": "temporale", "level": 2, "description": "Temporale", "value": 1.0 }) max_risk_level = max(max_risk_level, 2) # 6. VENTO FORTE wind_gust = wind_gusts[i] if i < len(wind_gusts) and wind_gusts[i] is not None else 0.0 if wind_gust >= THRESHOLDS["wind_very_strong_kmh"]: risks.append({ "type": "vento_molto_forte", "level": 4, "description": f"Vento molto forte: {wind_gust:.0f} km/h", "value": wind_gust }) max_risk_level = max(max_risk_level, 4) elif wind_gust >= THRESHOLDS["wind_strong_kmh"]: risks.append({ "type": "vento_forte", "level": 2, "description": f"Vento forte: {wind_gust:.0f} km/h", "value": wind_gust }) max_risk_level = max(max_risk_level, 2) # 7. NEBBIA vis = visibility[i] if i < len(visibility) and visibility[i] is not None else None if vis is not None and vis < THRESHOLDS["fog_visibility_m"]: risks.append({ "type": "nebbia", "level": 3 if vis < 50 else 2, "description": f"Nebbia (visibilità: {vis:.0f} m)", "value": vis }) max_risk_level = max(max_risk_level, 3 if vis < 50 else 2) elif code in [45, 48]: risks.append({ "type": "nebbia", "level": 2, "description": "Nebbia", "value": 1.0 }) max_risk_level = max(max_risk_level, 2) results.append({ "timestamp": timestamp_str, "risks": risks, "max_risk_level": max_risk_level }) except Exception as e: LOGGER.error(f"Errore analisi ora {i}: {e}", exc_info=True) continue return results # ============================================================================= # ANALISI PERCORSO # ============================================================================= def analyze_route_weather_risks(city1: str, city2: str, model_slug: Optional[str] = None) -> Optional[pd.DataFrame]: """ Analizza tutti i rischi meteo lungo un percorso stradale. Returns: DataFrame con analisi per ogni punto del percorso """ if not PANDAS_AVAILABLE: return None # Ottieni coordinate coord1 = get_coordinates_from_city(city1) coord2 = get_coordinates_from_city(city2) if not coord1 or not coord2: return None lat1, lon1, name1 = coord1 lat2, lon2, name2 = coord2 # Determina modello if model_slug is None: mid_lat = (lat1 + lat2) / 2 mid_lon = (lon1 + lon2) / 2 model_slug = get_best_model_for_location(mid_lat, mid_lon) # Calcola punti lungo percorso route_points = calculate_route_points(lat1, lon1, lat2, lon2, num_points=8) all_results = [] for i, (lat, lon) in enumerate(route_points): # Determina nome località PRIMA di analizzare if i == 0: point_name = name1 elif i == len(route_points) - 1: point_name = name2 else: if i > 1: time.sleep(1.1) # Rate limiting Nominatim point_name = get_location_name_from_coords(lat, lon) or f"Punto {i+1}" weather_data = get_weather_data(lat, lon, model_slug) if not weather_data: # Aggiungi comunque una riga per indicare che il punto è stato analizzato all_results.append({ 'point_index': i, 'point_lat': lat, 'point_lon': lon, 'timestamp': datetime.datetime.now(datetime.timezone.utc), 'risk_type': 'dati_non_disponibili', 'risk_level': 0, 'risk_description': 'Dati meteo non disponibili', 'risk_value': 0.0, 'max_risk_level': 0, 'point_name': point_name }) continue # Analizza condizioni 24h precedenti past_24h = analyze_past_24h_conditions(weather_data) # Analizza rischi (passa anche past_24h per analisi temporale evolutiva) risk_analysis = analyze_weather_risks(weather_data, model_slug, hours_ahead=24, past_24h_info=past_24h) if not risk_analysis: # Se non ci sono rischi, aggiungi comunque una riga per il punto all_results.append({ 'point_index': i, 'point_lat': lat, 'point_lon': lon, 'timestamp': datetime.datetime.now(datetime.timezone.utc), 'risk_type': 'nessuno', 'risk_level': 0, 'risk_description': 'Nessun rischio', 'risk_value': 0.0, 'max_risk_level': 0, 'point_name': point_name, 'past_24h': past_24h # Aggiungi analisi 24h precedenti anche se nessun rischio }) continue # Converti in DataFrame for hour_data in risk_analysis: timestamp_str = hour_data["timestamp"] # Assicurati che il timestamp sia timezone-aware try: if 'Z' in timestamp_str: timestamp = datetime.datetime.fromisoformat(timestamp_str.replace('Z', '+00:00')) elif '+' in timestamp_str or timestamp_str.count('-') > 2: timestamp = datetime.datetime.fromisoformat(timestamp_str) else: timestamp = datetime.datetime.fromisoformat(timestamp_str) if timestamp.tzinfo is None: timestamp = timestamp.replace(tzinfo=datetime.timezone.utc) except (ValueError, AttributeError): timestamp = datetime.datetime.fromisoformat(timestamp_str) if timestamp.tzinfo is None: timestamp = timestamp.replace(tzinfo=datetime.timezone.utc) # Assicurati che sia timezone-aware if timestamp.tzinfo is None: timestamp = timestamp.replace(tzinfo=datetime.timezone.utc) # Crea riga per ogni rischio o una riga con rischio massimo if hour_data["risks"]: for risk in hour_data["risks"]: all_results.append({ 'point_index': i, 'point_lat': lat, 'point_lon': lon, 'timestamp': timestamp, 'risk_type': risk["type"], 'risk_level': risk["level"], 'risk_description': risk["description"], 'risk_value': risk.get("value", 0.0), 'max_risk_level': hour_data["max_risk_level"], 'point_name': point_name, 'past_24h': past_24h }) else: all_results.append({ 'point_index': i, 'point_lat': lat, 'point_lon': lon, 'timestamp': timestamp, 'risk_type': 'nessuno', 'risk_level': 0, 'risk_description': 'Nessun rischio', 'risk_value': 0.0, 'max_risk_level': 0, 'point_name': point_name, 'past_24h': past_24h }) if not all_results: return None df = pd.DataFrame(all_results) return df # ============================================================================= # FORMATTAZIONE REPORT # ============================================================================= def format_route_weather_report(df: pd.DataFrame, city1: str, city2: str) -> str: """Formatta report compatto dei rischi meteo lungo percorso.""" if df.empty: return "❌ Nessun dato disponibile per il percorso." # Raggruppa per punto e trova rischio massimo + analisi 24h # Usa funzione custom per past_24h per assicurarsi che venga preservato correttamente def first_dict(series): """Prende il primo valore non-nullo, utile per dict.""" for val in series: if val is not None and (isinstance(val, dict) or (isinstance(val, str) and val != '')): return val return {} max_risk_per_point = df.groupby('point_index').agg({ 'max_risk_level': 'max', 'point_name': 'first', 'past_24h': first_dict # Usa funzione custom per preservare dict }).sort_values('point_index') # Rimuovi duplicati per nome (punti con stesso nome ma indici diversi) # Considera anche neve/ghiaccio persistente nella scelta seen_names = {} unique_indices = [] for idx, row in max_risk_per_point.iterrows(): point_name = row['point_name'] # Normalizza nome (rimuovi suffissi tra parentesi) name_key = point_name.split('(')[0].strip() past_24h = row.get('past_24h', {}) if isinstance(row.get('past_24h'), dict) else {} has_snow_ice = past_24h.get('snow_present') or past_24h.get('ice_persistence_likely') if name_key not in seen_names: seen_names[name_key] = idx unique_indices.append(idx) else: # Se duplicato, mantieni quello con rischio maggiore O con neve/ghiaccio existing_idx = seen_names[name_key] existing_row = max_risk_per_point.loc[existing_idx] existing_past_24h = existing_row.get('past_24h', {}) if isinstance(existing_row.get('past_24h'), dict) else {} existing_has_snow_ice = existing_past_24h.get('snow_present') or existing_past_24h.get('ice_persistence_likely') # Priorità: rischio maggiore, oppure neve/ghiaccio se rischio uguale if row['max_risk_level'] > existing_row['max_risk_level']: unique_indices.remove(existing_idx) seen_names[name_key] = idx unique_indices.append(idx) elif row['max_risk_level'] == existing_row['max_risk_level'] and has_snow_ice and not existing_has_snow_ice: # Stesso rischio, ma questo ha neve/ghiaccio unique_indices.remove(existing_idx) seen_names[name_key] = idx unique_indices.append(idx) # Filtra solo punti unici max_risk_per_point = max_risk_per_point.loc[unique_indices] # Calcola effective_risk_level per ogni punto UNICO (considerando persistenza) effective_risk_levels_dict = {} for idx, row in max_risk_per_point.iterrows(): level = int(row['max_risk_level']) past_24h = row.get('past_24h', {}) if isinstance(row.get('past_24h'), dict) else {} # Se livello è 0, verifica persistenza per assegnare livello appropriato if level == 0 and past_24h: if past_24h.get('snow_present'): level = 4 # Neve presente elif past_24h.get('ice_persistence_likely'): # Se ice_persistence_likely è True, significa che c'è ghiaccio persistente # (calcolato in analyze_past_24h_conditions basandosi su suolo gelato, # precipitazioni con temperature basse, o neve presente) # Quindi deve essere classificato come ghiaccio (livello 2), non brina level = 2 # Ghiaccio persistente effective_risk_levels_dict[idx] = level # Aggiungi effective_risk_level al DataFrame max_risk_per_point['effective_risk_level'] = max_risk_per_point.index.map(effective_risk_levels_dict) # Trova rischi unici per ogni punto (raggruppa per tipo, mantieni solo il più grave) risks_per_point = {} # Prima aggiungi rischi futuri (max_risk_level > 0) for idx, row in df[df['max_risk_level'] > 0].iterrows(): point_idx = row['point_index'] if point_idx not in risks_per_point: risks_per_point[point_idx] = {} risk_type = row['risk_type'] risk_level = row['risk_level'] risk_desc = row['risk_description'] # Raggruppa per tipo di rischio, mantieni solo quello con livello più alto if risk_type not in risks_per_point[point_idx] or risks_per_point[point_idx][risk_type]['level'] < risk_level: risks_per_point[point_idx][risk_type] = { 'type': risk_type, 'desc': risk_desc, 'level': risk_level } # Poi aggiungi punti con persistenza ma senza rischi futuri (max_risk_level == 0 ma effective_risk > 0) for idx, row in max_risk_per_point.iterrows(): effective_risk = row.get('effective_risk_level', 0) max_risk = int(row['max_risk_level']) # Se ha persistenza ma non rischi futuri, aggiungi rischio basato su persistenza if effective_risk > 0 and max_risk == 0: if idx not in risks_per_point: risks_per_point[idx] = {} past_24h = row.get('past_24h', {}) if isinstance(row.get('past_24h'), dict) else {} # Determina tipo di rischio basandosi su effective_risk_level if effective_risk >= 4: risk_type = 'neve' risk_desc = "Neve presente" elif effective_risk == 2: risk_type = 'ghiaccio' # Determina descrizione basandosi su condizioni min_temp = past_24h.get('min_temp_2m') hours_below_2c = past_24h.get('hours_below_2c', 0) if min_temp is not None: risk_desc = f"Ghiaccio persistente (Tmin: {min_temp:.1f}°C, {hours_below_2c}h <2°C)" else: risk_desc = "Ghiaccio persistente" elif effective_risk == 1: risk_type = 'brina' min_temp = past_24h.get('min_temp_2m') if min_temp is not None: risk_desc = f"Brina possibile (Tmin: {min_temp:.1f}°C)" else: risk_desc = "Brina possibile" else: continue # Skip se non abbiamo un tipo valido # Aggiungi al dict rischi (usa idx come chiave, non point_idx) risks_per_point[idx][risk_type] = { 'type': risk_type, 'desc': risk_desc, 'level': effective_risk } # Verifica se la chiave Google Maps è disponibile api_key_available = get_google_maps_api_key() is not None # Costruisci messaggio msg = f"🛣️ **Rischi Meteo Stradali**\n" msg += f"📍 {city1} → {city2}\n" if not api_key_available: msg += f"⚠️ Percorso in linea d'aria (configura GOOGLE_MAPS_API_KEY per percorso stradale reale)\n" msg += "\n" points_with_risk = [] LOGGER.debug(f"Analizzando {len(max_risk_per_point)} punti per report") for idx, row in max_risk_per_point.iterrows(): max_risk = row['max_risk_level'] effective_risk = row.get('effective_risk_level', max_risk) # Usa effective_risk_level se disponibile point_name = row['point_name'] past_24h = row.get('past_24h', {}) if isinstance(row.get('past_24h'), dict) else {} LOGGER.debug(f"Punto {point_name}: max_risk={max_risk}, effective_risk={effective_risk}, snow_present={past_24h.get('snow_present')}, ice_persistent={past_24h.get('ice_persistence_likely')}") # Mostra punto se ha rischio futuro (max_risk > 0) OPPURE persistenza (effective_risk > 0) if effective_risk > 0: risks = risks_per_point.get(idx, []) # Emoji basati su effective_risk_level (allineati con check_ghiaccio.py) # Neve: ❄️, Gelicidio: 🔴🔴, Ghiaccio: 🔴, Brina: 🟡 risk_emoji = "⚪" # Default if effective_risk >= 4: risk_emoji = "❄️" # Neve (usiamo ❄️ invece di ⚪ per maggiore chiarezza) elif effective_risk == 3: # Verifica se è gelicidio risk_types_str = ' '.join([r.get('type', '') for r in (list(risks.values()) if isinstance(risks, dict) else risks)]) if 'gelicidio' in risk_types_str.lower() or 'fzra' in risk_types_str.lower(): risk_emoji = "🔴🔴" # Gelicidio else: risk_emoji = "🔴" # Ghiaccio elif effective_risk == 2: # Ghiaccio (livello 2) risk_emoji = "🔴" # Ghiaccio elif effective_risk == 1: risk_emoji = "🟡" # Brina # Converti dict in lista e ordina per livello (più grave prima) risk_list = list(risks.values()) if isinstance(risks, dict) else risks risk_list.sort(key=lambda x: x.get('level', 0), reverse=True) # Raggruppa rischi per tipo e crea descrizioni strutturate risk_by_type = {} for risk in risk_list: risk_type = risk.get('type', '') risk_level = risk.get('level', 0) risk_desc = risk.get('desc', '') # Raggruppa per tipo, mantieni il più grave if risk_type not in risk_by_type or risk_by_type[risk_type]['level'] < risk_level: risk_by_type[risk_type] = { 'desc': risk_desc, 'level': risk_level } # Crea descrizioni ordinate per tipo (neve prima, poi gelicidio, ghiaccio, brina, poi altri) type_order = ['neve', 'gelicidio', 'ghiaccio', 'brina', 'pioggia_intensa', 'pioggia_forte', 'rovesci_violenti', 'grandine', 'temporale_severo', 'temporale', 'vento_molto_forte', 'nebbia'] risk_descriptions = [] # Prima aggiungi rischi ordinati for risk_type in type_order: if risk_type in risk_by_type: risk_info = risk_by_type[risk_type] risk_desc = risk_info['desc'] # Semplifica e formatta descrizioni in base al tipo if risk_type == 'neve': risk_descriptions.append(f"❄️ {risk_desc}") elif risk_type == 'gelicidio': # Estrai temperatura se presente import re temp_match = re.search(r'T: ([\d\.-]+)°C', risk_desc) if temp_match: risk_descriptions.append(f"🔴🔴 Gelicidio (T: {temp_match.group(1)}°C)") else: risk_descriptions.append("🔴🔴 Gelicidio") elif risk_type == 'ghiaccio': import re temp_match = re.search(r'T: ([\d\.-]+)°C|Tmin: ([\d\.-]+)°C', risk_desc) if temp_match: temp_val = temp_match.group(1) or temp_match.group(2) risk_descriptions.append(f"🧊 Ghiaccio (T: {temp_val}°C)") else: risk_descriptions.append("🧊 Ghiaccio") elif risk_type == 'brina': import re temp_match = re.search(r'T: ([\d\.-]+)°C|Tmin: ([\d\.-]+)°C', risk_desc) if temp_match: temp_val = temp_match.group(1) or temp_match.group(2) risk_descriptions.append(f"🟡 Brina (T: {temp_val}°C)") else: risk_descriptions.append("🟡 Brina") else: risk_descriptions.append(risk_desc) # Poi aggiungi altri rischi non in type_order for risk_type, risk_info in risk_by_type.items(): if risk_type not in type_order: risk_descriptions.append(risk_info['desc']) # Costruisci messaggio punto dettagliato per situational awareness point_msg = f"{risk_emoji} **{point_name}**\n" # Sezione 1: Condizioni attuali e ultime 12h current_info = [] if past_24h: # Temperatura attuale if past_24h.get('current_temp_2m') is not None: current_info.append(f"🌡️ T: {past_24h['current_temp_2m']:.1f}°C") # Precipitazioni ultime 12h if past_24h.get('total_snowfall_12h_cm', 0) > 0.5: current_info.append(f"❄️ {past_24h['total_snowfall_12h_cm']:.1f}cm/12h") elif past_24h.get('total_rain_12h_mm', 0) > 1: current_info.append(f"🌧️ {past_24h['total_rain_12h_mm']:.1f}mm/12h") # Temperatura minima 24h if past_24h.get('min_temp_2m') is not None: t_min = past_24h['min_temp_2m'] current_info.append(f"📉 Tmin: {t_min:.1f}°C") if current_info: point_msg += f" • {' | '.join(current_info)}\n" # Sezione 2: Previsioni prossime 6h forecast_info = [] if past_24h: # Temperature previste if past_24h.get('min_temp_next_6h') is not None and past_24h.get('max_temp_next_6h') is not None: t_min_6h = past_24h['min_temp_next_6h'] t_max_6h = past_24h['max_temp_next_6h'] if t_min_6h == t_max_6h: forecast_info.append(f"📊 6h: {t_min_6h:.1f}°C") else: forecast_info.append(f"📊 6h: {t_min_6h:.1f}→{t_max_6h:.1f}°C") # Neve prevista if past_24h.get('snow_next_6h_cm', 0) > 0.1: forecast_info.append(f"❄️ +{past_24h['snow_next_6h_cm']:.1f}cm") # Rischi futuri (prossime 24h) future_risks = [] if risk_descriptions: for desc in risk_descriptions[:4]: # Max 4 rischi if "❄️" in desc: future_risks.append("❄️ Neve") elif "🧊" in desc: import re temp_match = re.search(r'\(T: ([\d\.-]+)°C\)', desc) if temp_match: future_risks.append(f"🧊 Ghiaccio ({temp_match.group(1)}°C)") else: future_risks.append("🧊 Ghiaccio") elif "🌧️" in desc or "Pioggia" in desc: future_risks.append("🌧️ Pioggia") elif "⛈️" in desc or "Temporale" in desc: future_risks.append("⛈️ Temporale") elif "💨" in desc or "Vento" in desc: future_risks.append("💨 Vento") elif "🌫️" in desc or "Nebbia" in desc: future_risks.append("🌫️ Nebbia") if forecast_info or future_risks: point_msg += f" • " if forecast_info: point_msg += f"{' | '.join(forecast_info)}" if future_risks: if forecast_info: point_msg += " | " point_msg += f"Rischi: {', '.join(future_risks[:3])}" point_msg += "\n" # Sezione 3: Stato persistenza persistence_info = [] if past_24h: if past_24h.get('snow_present'): persistence_info.append("❄️ Neve presente") if past_24h.get('ice_persistence_likely') and not past_24h.get('snow_present'): persistence_info.append("🧊 Ghiaccio persistente") if past_24h.get('hours_below_2c', 0) >= 6: persistence_info.append(f"⏱️ {past_24h['hours_below_2c']}h <2°C") if persistence_info: point_msg += f" • {' | '.join(persistence_info)}\n" points_with_risk.append(point_msg) elif effective_risk > 0 and max_risk == 0: # Mostra punti senza rischi futuri ma con persistenza (ghiaccio/brina/neve già formato) # Determina emoji basandosi su effective_risk_level (allineati con check_ghiaccio.py) if effective_risk >= 4: risk_emoji = "❄️" # Neve elif effective_risk == 2: risk_emoji = "🔴" # Ghiaccio elif effective_risk == 1: risk_emoji = "🟡" # Brina else: risk_emoji = "⚪" # Default point_msg = f"{risk_emoji} **{point_name}**\n" # Condizioni attuali current_info = [] if past_24h.get('current_temp_2m') is not None: current_info.append(f"🌡️ T: {past_24h['current_temp_2m']:.1f}°C") if past_24h.get('total_snowfall_12h_cm', 0) > 0.5: current_info.append(f"❄️ {past_24h['total_snowfall_12h_cm']:.1f}cm/12h") elif past_24h.get('total_rain_12h_mm', 0) > 1: current_info.append(f"🌧️ {past_24h['total_rain_12h_mm']:.1f}mm/12h") if past_24h.get('min_temp_2m') is not None: current_info.append(f"📉 Tmin: {past_24h['min_temp_2m']:.1f}°C") if current_info: point_msg += f" • {' | '.join(current_info)}\n" # Previsioni 6h forecast_info = [] if past_24h.get('min_temp_next_6h') is not None and past_24h.get('max_temp_next_6h') is not None: t_min_6h = past_24h['min_temp_next_6h'] t_max_6h = past_24h['max_temp_next_6h'] if t_min_6h == t_max_6h: forecast_info.append(f"📊 6h: {t_min_6h:.1f}°C") else: forecast_info.append(f"📊 6h: {t_min_6h:.1f}→{t_max_6h:.1f}°C") if past_24h.get('snow_next_6h_cm', 0) > 0.1: forecast_info.append(f"❄️ +{past_24h['snow_next_6h_cm']:.1f}cm") if forecast_info: point_msg += f" • {' | '.join(forecast_info)}\n" # Persistenza persistence_info = [] if past_24h.get('snow_present'): persistence_info.append("❄️ Neve presente") if past_24h.get('ice_persistence_likely') and not past_24h.get('snow_present'): persistence_info.append("🧊 Ghiaccio persistente") if past_24h.get('hours_below_2c', 0) >= 6: persistence_info.append(f"⏱️ {past_24h['hours_below_2c']}h <2°C") if persistence_info: point_msg += f" • {' | '.join(persistence_info)}\n" points_with_risk.append(point_msg) LOGGER.debug(f"Aggiunto punto con neve/ghiaccio persistente: {point_name}") LOGGER.info(f"Totale punti con rischio/neve/ghiaccio: {len(points_with_risk)}") if points_with_risk: msg += "⚠️ **Punti a rischio:**\n" msg += "\n".join(points_with_risk) else: msg += "✅ Nessun rischio significativo per le prossime 24h" # Riepilogo (usa effective_risk_level per conteggio corretto) total_points = len(max_risk_per_point) points_with_any_risk = sum(1 for r in effective_risk_levels_dict.values() if r > 0) # Conta per livello usando effective_risk_level neve_count = sum(1 for r in effective_risk_levels_dict.values() if r >= 4) gelicidio_count = sum(1 for r in effective_risk_levels_dict.values() if r == 3) ghiaccio_count = sum(1 for r in effective_risk_levels_dict.values() if r == 2) brina_count = sum(1 for r in effective_risk_levels_dict.values() if r == 1) if points_with_any_risk > 0: msg += f"\n\n📊 **Riepilogo:**\n" msg += f"• Punti: {points_with_any_risk}/{total_points} a rischio\n" risk_parts = [] if neve_count > 0: risk_parts.append(f"⚪ Neve: {neve_count}") if gelicidio_count > 0: risk_parts.append(f"🔴🔴 Gelicidio: {gelicidio_count}") if ghiaccio_count > 0: risk_parts.append(f"🔴 Ghiaccio: {ghiaccio_count}") if brina_count > 0: risk_parts.append(f"🟡 Brina: {brina_count}") if risk_parts: msg += f"• {' | '.join(risk_parts)}\n" return msg # ============================================================================= # GENERAZIONE MAPPA # ============================================================================= def generate_route_weather_map(df: pd.DataFrame, city1: str, city2: str, output_path: str) -> bool: """Genera mappa con rischi meteo lungo percorso.""" try: import matplotlib matplotlib.use('Agg') import matplotlib.pyplot as plt import matplotlib.patches as mpatches from matplotlib.lines import Line2D except ImportError: return False try: import contextily as ctx CONTEXTILY_AVAILABLE = True except ImportError: CONTEXTILY_AVAILABLE = False if df.empty: return False # Raggruppa per punto max_risk_per_point = df.groupby('point_index').agg({ 'max_risk_level': 'max', 'point_name': 'first', 'point_lat': 'first', 'point_lon': 'first', 'past_24h': 'first', 'risk_type': lambda x: ','.join([str(v) for v in x.unique() if pd.notna(v) and str(v) != '']) if len(x.unique()) > 0 else '' }).sort_values('point_index') # Calcola effective_risk_level considerando anche persistenza effective_risk_levels = [] for idx, row in max_risk_per_point.iterrows(): level = int(row['max_risk_level']) risk_type_str = str(row.get('risk_type', '')) past_24h_data = row.get('past_24h', {}) # Se livello è 0, verifica persistenza per assegnare livello appropriato if level == 0 and isinstance(past_24h_data, dict): if past_24h_data.get('snow_present'): level = 4 # Neve presente elif past_24h_data.get('ice_persistence_likely'): # Se ice_persistence_likely è True, significa che c'è ghiaccio persistente # (calcolato in analyze_past_24h_conditions basandosi su suolo gelato, # precipitazioni con temperature basse, o neve presente) # Quindi deve essere classificato come ghiaccio (livello 2), non brina level = 2 # Ghiaccio persistente # Considera anche risk_type se presente risk_type_lower = risk_type_str.lower() if 'neve' in risk_type_lower: level = max(level, 4) elif 'gelicidio' in risk_type_lower or 'fzra' in risk_type_lower: level = max(level, 3) elif 'ghiaccio' in risk_type_lower and 'brina' not in risk_type_lower: level = max(level, 2) elif 'brina' in risk_type_lower: level = max(level, 1) effective_risk_levels.append(level) max_risk_per_point['effective_risk_level'] = effective_risk_levels lats = max_risk_per_point['point_lat'].tolist() lons = max_risk_per_point['point_lon'].tolist() names = max_risk_per_point['point_name'].fillna("Punto").tolist() risk_levels = max_risk_per_point['effective_risk_level'].astype(int).tolist() risk_types = max_risk_per_point['risk_type'].fillna('').tolist() past_24h_list = max_risk_per_point['past_24h'].tolist() # Calcola limiti mappa lat_min, lat_max = min(lats), max(lats) lon_min, lon_max = min(lons), max(lons) lat_range = lat_max - lat_min lon_range = lon_max - lon_min lat_min -= lat_range * 0.1 lat_max += lat_range * 0.1 lon_min -= lon_range * 0.1 lon_max += lon_range * 0.1 fig, ax = plt.subplots(figsize=(14, 10)) fig.patch.set_facecolor('white') ax.set_xlim(lon_min, lon_max) ax.set_ylim(lat_min, lat_max) ax.set_aspect('equal', adjustable='box') if CONTEXTILY_AVAILABLE: try: ctx.add_basemap(ax, crs='EPSG:4326', source=ctx.providers.OpenStreetMap.Mapnik, alpha=0.6, attribution_size=6) except: CONTEXTILY_AVAILABLE = False # Linea percorso ax.plot(lons, lats, 'k--', linewidth=2, alpha=0.5, zorder=3) # Determina colori: allineati con check_ghiaccio.py # Verde (0), Giallo (1=brina), Arancione (2=ghiaccio), Rosso scuro (3=gelicidio), Azzurro (4=neve) colors = [] edge_colors = [] markers = [] for i, (level, risk_type_str, past_24h_data) in enumerate(zip(risk_levels, risk_types, past_24h_list)): # level ora contiene già effective_risk_level (calcolato sopra considerando persistenza) # Determina tipo esatto basandosi su livello e risk_type_str risk_type_lower = risk_type_str.lower() # Determina colore e marker basato su livello (allineato con check_ghiaccio.py): # - Neve: livello 4 (azzurro/blu) # - Gelicidio: livello 3 (rosso scuro #8B0000) # - Ghiaccio: livello 2 (arancione #FF8C00) # - Brina: livello 1 (giallo #FFD700) # - Nessun rischio: livello 0 (verde #32CD32) if level == 4 or 'neve' in risk_type_lower: # Neve: azzurro/blu (livello 4) colors.append('#87CEEB') # Sky blue per neve edge_colors.append('#4682B4') # Steel blue per bordo markers.append('*') # Asterisco per neve (come nella legenda) elif level == 3 or 'gelicidio' in risk_type_lower or 'fzra' in risk_type_lower: # Gelicidio: rosso scuro (livello 3) colors.append('#8B0000') # Dark red edge_colors.append('#FF0000') # Red per bordo markers.append('D') # Diamante per gelicidio elif level == 2 or ('ghiaccio' in risk_type_lower and 'brina' not in risk_type_lower): # Ghiaccio: arancione (livello 2) colors.append('#FF8C00') # Dark orange edge_colors.append('#FF6600') # Orange per bordo markers.append('D') # Diamante per ghiaccio elif level == 1 or 'brina' in risk_type_lower: # Brina: giallo (livello 1) colors.append('#FFD700') # Gold edge_colors.append('#FFA500') # Orange per bordo markers.append('o') # Cerchio per brina else: # Nessun rischio: verde (livello 0) colors.append('#32CD32') # Lime green edge_colors.append('black') markers.append('o') # Cerchio normale # Punti con colori e marker diversi for lon, lat, color, edge_color, marker in zip(lons, lats, colors, edge_colors, markers): ax.scatter([lon], [lat], c=[color], s=400, marker=marker, edgecolors=edge_color, linewidths=2.5, alpha=0.85, zorder=5) # Partenza e arrivo if len(lats) >= 2: ax.scatter([lons[0]], [lats[0]], c='blue', s=600, marker='s', edgecolors='white', linewidths=3, alpha=0.9, zorder=6) ax.scatter([lons[-1]], [lats[-1]], c='red', s=600, marker='s', edgecolors='white', linewidths=3, alpha=0.9, zorder=6) # Etichette for lon, lat, name, risk_level in zip(lons, lats, names, risk_levels): display_name = name[:20] + "..." if len(name) > 20 else name ax.annotate(display_name, (lon, lat), xytext=(10, 10), textcoords='offset points', fontsize=8, fontweight='bold', bbox=dict(boxstyle='round,pad=0.4', facecolor='white', alpha=0.95, edgecolor='black', linewidth=1.2), zorder=7) # Legenda: allineata con check_ghiaccio.py (5 livelli: 0-4) legend_elements = [ mpatches.Patch(facecolor='#32CD32', label='Nessun rischio'), mpatches.Patch(facecolor='#FFD700', label='Brina (1)'), mpatches.Patch(facecolor='#FF8C00', label='Ghiaccio (2)'), mpatches.Patch(facecolor='#8B0000', label='Gelicidio (3)'), Line2D([0], [0], marker='*', color='w', markerfacecolor='#87CEEB', markeredgecolor='#4682B4', markersize=14, markeredgewidth=2, label='* Neve'), ] ax.legend(handles=legend_elements, loc='lower left', fontsize=9, framealpha=0.95, edgecolor='black', fancybox=True, shadow=True) ax.set_xlabel('Longitudine (°E)', fontsize=11, fontweight='bold') ax.set_ylabel('Latitudine (°N)', fontsize=11, fontweight='bold') ax.set_title(f'RISCHI METEO STRADALI\n{city1} → {city2}', fontsize=14, fontweight='bold', pad=20) if not CONTEXTILY_AVAILABLE: ax.grid(True, alpha=0.3, linestyle='--', zorder=1) now = datetime.datetime.now() # Conta punti con rischio usando effective_risk_level points_with_risk = sum(1 for r in risk_levels if r > 0) info_text = f"Aggiornamento: {now.strftime('%d/%m/%Y %H:%M')}\nPunti: {len(risk_levels)}\nA rischio: {points_with_risk}" ax.text(0.02, 0.98, info_text, transform=ax.transAxes, fontsize=9, verticalalignment='top', horizontalalignment='left', bbox=dict(boxstyle='round,pad=0.5', facecolor='white', alpha=0.9, edgecolor='gray', linewidth=1.5), zorder=10) plt.tight_layout() try: plt.savefig(output_path, dpi=150, bbox_inches='tight', facecolor='white') plt.close(fig) return True except Exception as e: LOGGER.error(f"Errore salvataggio mappa: {e}") plt.close(fig) return False