#!/usr/bin/env python3 """ Assistente Climatico Intelligente - Report Meteo Avanzato Analizza evoluzione meteo, fronti, cambiamenti e fornisce consigli pratici """ import requests import argparse import datetime import os import sys from zoneinfo import ZoneInfo from collections import defaultdict, Counter, Counter from typing import List, Dict, Tuple, Optional from statistics import mean, median from open_meteo_client import open_meteo_get # --- CONFIGURAZIONE DEFAULT --- DEFAULT_LAT = 43.9356 DEFAULT_LON = 12.4296 DEFAULT_NAME = "🏠 Casa (Strada Cà Toro,12 - San Marino)" # --- TIMEZONE --- TZ_STR = "Europe/Berlin" TZINFO = ZoneInfo(TZ_STR) # --- TELEGRAM CONFIG --- ADMIN_CHAT_ID = "64463169" TELEGRAM_CHAT_IDS = ["64463169", "24827341", "132455422", "5405962012"] TOKEN_FILE_HOME = os.path.expanduser("~/.telegram_dpc_bot_token") TOKEN_FILE_ETC = "/etc/telegram_dpc_bot_token" TOKEN_FILE_VOLUME = "/Volumes/Pi2/etc/telegram_dpc_bot_token" # --- SOGLIE --- SOGLIA_VENTO_KMH = 40.0 MIN_MM_PER_EVENTO = 0.1 # --- MODELLI METEO --- # Modelli a breve termine (alta risoluzione, 48-72h) SHORT_TERM_MODELS = ["meteofrance_seamless", "icon_d2"] # Usa seamless invece di arome_france_hd # Modelli a lungo termine (globale, 10 giorni) LONG_TERM_MODELS = ["gfs_global", "ecmwf_ifs04"] # Modelli alternativi MODELS_IT_SM = ["meteofrance_seamless", "icon_d2", "gfs_global"] MODEL_NAMES = { "meteofrance_arome_france_hd": "AROME HD", "meteofrance_seamless": "AROME Seamless", "icon_d2": "ICON-D2", "gfs_global": "GFS", "ecmwf_ifs04": "ECMWF", "jma_msm": "JMA MSM", "metno_nordic": "Yr.no", "ukmo_global": "UK MetOffice", "icon_eu": "ICON-EU", "italia_meteo_arpae_icon_2i": "ICON Italia (ARPAE 2i)" } def choose_models_by_country(cc, is_home=False): """ Seleziona modelli meteo ottimali. - Per Casa: usa AROME Seamless e ICON-D2 (alta risoluzione) - Per Italia: usa italia_meteo_arpae_icon_2i (include snow_depth quando > 0) - Per altre località: usa best match di Open-Meteo (senza specificare models) Ritorna (short_term_models, long_term_models) """ cc = cc.upper() if cc else "UNKNOWN" # Modelli a lungo termine (sempre globali, funzionano ovunque) long_term_default = ["gfs_global", "ecmwf_ifs04"] if is_home: # Per Casa, usa AROME Seamless, ICON-D2 e ICON Italia (alta risoluzione europea) # ICON Italia include snow_depth quando disponibile (> 0) return ["meteofrance_seamless", "icon_d2", "italia_meteo_arpae_icon_2i"], long_term_default elif cc == "IT": # Per Italia, usa ICON Italia (ARPAE 2i) che include snow_depth quando disponibile return ["italia_meteo_arpae_icon_2i"], long_term_default else: # Per query worldwide, usa best match (Open-Meteo sceglie automaticamente) # Ritorna None per indicare best match return None, long_term_default def get_bot_token(): paths = [TOKEN_FILE_HOME, TOKEN_FILE_ETC, TOKEN_FILE_VOLUME] for path in paths: if os.path.exists(path): try: with open(path, 'r') as f: return f.read().strip() except: pass return None def get_coordinates(query): if not query or query.lower() == "casa": return DEFAULT_LAT, DEFAULT_LON, DEFAULT_NAME, "SM" url = "https://geocoding-api.open-meteo.com/v1/search" try: resp = open_meteo_get(url, params={"name": query, "count": 1, "language": "it", "format": "json"}, timeout=(5, 10)) res = resp.json().get("results", []) if res: res = res[0] cc = res.get("country_code", "IT").upper() name = f"{res.get('name')} ({cc})" return res['latitude'], res['longitude'], name, cc except: pass return None, None, None, None def degrees_to_cardinal(d: int) -> str: """Converte gradi in direzione cardinale (N, NE, E, SE, S, SW, W, NW)""" dirs = ["N", "NE", "E", "SE", "S", "SW", "W", "NW"] try: return dirs[round(d / 45) % 8] except: return "N" def get_weather_multi_model(lat, lon, short_term_models, long_term_models, forecast_days=10, timezone=None): """ Recupera dati da modelli a breve e lungo termine per ensemble completo. Se short_term_models è None, usa best match di Open-Meteo (senza specificare models). """ results = {} # Recupera modelli a breve termine (alta risoluzione, fino a ~72h) if short_term_models is None: # Best match: non specificare models, Open-Meteo sceglie automaticamente url = "https://api.open-meteo.com/v1/forecast" params = { "latitude": lat, "longitude": lon, "hourly": "temperature_2m,precipitation_probability,precipitation,snowfall,rain,snow_depth,weathercode,windspeed_10m,windgusts_10m,winddirection_10m,dewpoint_2m,relative_humidity_2m,cloud_cover,soil_temperature_0cm", "daily": "temperature_2m_max,temperature_2m_min,precipitation_sum,precipitation_hours,precipitation_probability_max,snowfall_sum,showers_sum,rain_sum,weathercode,winddirection_10m_dominant,windspeed_10m_max,windgusts_10m_max", "timezone": timezone if timezone else TZ_STR, "forecast_days": min(forecast_days, 3) # Limita a 3 giorni per modelli ad alta risoluzione } try: resp = open_meteo_get(url, params=params, timeout=(5, 20)) if resp.status_code == 200: data = resp.json() # Converti snow_depth da metri a cm per tutti i modelli (Open-Meteo restituisce in metri) hourly_data = data.get("hourly", {}) if hourly_data and "snow_depth" in hourly_data: snow_depth_values = hourly_data.get("snow_depth", []) # Converti da metri a cm (moltiplica per 100) snow_depth_cm = [] for sd in snow_depth_values: if sd is not None: try: val_m = float(sd) val_cm = val_m * 100.0 # Converti da metri a cm snow_depth_cm.append(val_cm) except (ValueError, TypeError): snow_depth_cm.append(None) else: snow_depth_cm.append(None) hourly_data["snow_depth"] = snow_depth_cm data["hourly"] = hourly_data results["best_match"] = data results["best_match"]["model_type"] = "short_term" else: results["best_match"] = None except: results["best_match"] = None else: # Modelli specifici (per Casa: AROME + ICON, per Italia: ICON ARPAE) for model in short_term_models: url = "https://api.open-meteo.com/v1/forecast" # Per italia_meteo_arpae_icon_2i, includi sempre snow_depth (supportato quando > 0) hourly_params = "temperature_2m,precipitation_probability,precipitation,snowfall,rain,snow_depth,weathercode,windspeed_10m,windgusts_10m,winddirection_10m,dewpoint_2m,relative_humidity_2m,cloud_cover,soil_temperature_0cm" params = { "latitude": lat, "longitude": lon, "hourly": hourly_params, "daily": "temperature_2m_max,temperature_2m_min,precipitation_sum,precipitation_hours,precipitation_probability_max,snowfall_sum,showers_sum,rain_sum,weathercode,winddirection_10m_dominant,windspeed_10m_max,windgusts_10m_max", "timezone": timezone if timezone else TZ_STR, "models": model, "forecast_days": min(forecast_days, 3) # Limita a 3 giorni per modelli ad alta risoluzione } try: resp = open_meteo_get(url, params=params, timeout=(5, 20)) if resp.status_code == 200: data = resp.json() # Converti snow_depth da metri a cm per tutti i modelli (Open-Meteo restituisce in metri) hourly_data = data.get("hourly", {}) if hourly_data and "snow_depth" in hourly_data: snow_depth_values = hourly_data.get("snow_depth", []) # Converti da metri a cm (moltiplica per 100) snow_depth_cm = [] for sd in snow_depth_values: if sd is not None: try: val_m = float(sd) val_cm = val_m * 100.0 # Converti da metri a cm snow_depth_cm.append(val_cm) except (ValueError, TypeError): snow_depth_cm.append(None) else: snow_depth_cm.append(None) hourly_data["snow_depth"] = snow_depth_cm data["hourly"] = hourly_data # Per italia_meteo_arpae_icon_2i, verifica se snow_depth è disponibile e > 0 if model == "italia_meteo_arpae_icon_2i": if hourly_data and "snow_depth" in hourly_data: snow_depth_values_cm = hourly_data.get("snow_depth", []) # Verifica se almeno un valore di snow_depth è > 0 (ora già in cm) has_snow_depth = False if snow_depth_values_cm: for sd in snow_depth_values_cm[:24]: # Controlla prime 24h if sd is not None: try: if float(sd) > 0.5: # > 0.5 cm has_snow_depth = True break except (ValueError, TypeError): continue # Se snow_depth > 0, assicurati che sia incluso nei dati if has_snow_depth: data["has_snow_depth_data"] = True results[model] = data results[model]["model_type"] = "short_term" else: results[model] = None except: results[model] = None # Recupera modelli a lungo termine (globale, fino a 10 giorni) for model in long_term_models: url = "https://api.open-meteo.com/v1/forecast" params = { "latitude": lat, "longitude": lon, "hourly": "temperature_2m,precipitation_probability,precipitation,snowfall,rain,snow_depth,weathercode,windspeed_10m,windgusts_10m,winddirection_10m,dewpoint_2m,relative_humidity_2m,cloud_cover,soil_temperature_0cm", "daily": "temperature_2m_max,temperature_2m_min,precipitation_sum,precipitation_hours,precipitation_probability_max,snowfall_sum,showers_sum,rain_sum,weathercode,winddirection_10m_dominant,windspeed_10m_max,windgusts_10m_max", "timezone": timezone if timezone else TZ_STR, "models": model, "forecast_days": forecast_days } try: resp = open_meteo_get(url, params=params, timeout=(5, 25)) if resp.status_code == 200: data = resp.json() # Converti snow_depth da metri a cm per tutti i modelli (Open-Meteo restituisce in metri) hourly_data = data.get("hourly", {}) if hourly_data and "snow_depth" in hourly_data: snow_depth_values = hourly_data.get("snow_depth", []) # Converti da metri a cm (moltiplica per 100) snow_depth_cm = [] for sd in snow_depth_values: if sd is not None: try: val_m = float(sd) val_cm = val_m * 100.0 # Converti da metri a cm snow_depth_cm.append(val_cm) except (ValueError, TypeError): snow_depth_cm.append(None) else: snow_depth_cm.append(None) hourly_data["snow_depth"] = snow_depth_cm data["hourly"] = hourly_data results[model] = data results[model]["model_type"] = "long_term" else: results[model] = None except Exception: results[model] = None return results def merge_multi_model_forecast(models_data, forecast_days=10): """Combina dati da modelli a breve e lungo termine in un forecast unificato""" merged = { "daily": { "time": [], "temperature_2m_max": [], "temperature_2m_min": [], "precipitation_sum": [], "precipitation_hours": [], "precipitation_probability_max": [], "snowfall_sum": [], "showers_sum": [], "rain_sum": [], "weathercode": [], "windspeed_10m_max": [], "windgusts_10m_max": [] }, "hourly": { "time": [], "temperature_2m": [], "precipitation": [], "snowfall": [], "snow_depth": [], "rain": [], "weathercode": [], "windspeed_10m": [], "winddirection_10m": [], "dewpoint_2m": [], "precipitation_probability": [], "cloud_cover": [], "soil_temperature_0cm": [] }, "models_used": [] } # Trova modello a breve termine disponibile (cerca tutti i modelli con type "short_term") # Priorità: ICON Italia per snow_depth, altrimenti primo disponibile short_term_data = None short_term_model = None icon_italia_data = None icon_italia_model = None # Prima cerca ICON Italia (ha snow_depth quando disponibile) # Cerca anche altri modelli che potrebbero avere snow_depth (icon_d2, etc.) for model in models_data.keys(): if models_data[model] and models_data[model].get("model_type") == "short_term": # Priorità a ICON Italia, ma cerca anche altri modelli con snow_depth if model == "italia_meteo_arpae_icon_2i": icon_italia_data = models_data[model] icon_italia_model = model # ICON-D2 può avere anche snow_depth elif model == "icon_d2" and icon_italia_data is None: # Usa ICON-D2 come fallback se ICON Italia non disponibile hourly_data = models_data[model].get("hourly", {}) snow_depth_values = hourly_data.get("snow_depth", []) if hourly_data else [] # Verifica se ha dati di snow_depth validi has_valid_snow_depth = False if snow_depth_values: for sd in snow_depth_values[:24]: if sd is not None: try: if float(sd) > 0: has_valid_snow_depth = True break except (ValueError, TypeError): continue if has_valid_snow_depth: icon_italia_data = models_data[model] icon_italia_model = model # Poi cerca primo modello disponibile (per altri parametri) for model in models_data.keys(): if models_data[model] and models_data[model].get("model_type") == "short_term": short_term_data = models_data[model] short_term_model = model break # Trova modello a lungo termine disponibile (cerca tutti i modelli con type "long_term") long_term_data = None long_term_model = None for model in models_data.keys(): if models_data[model] and models_data[model].get("model_type") == "long_term": long_term_data = models_data[model] long_term_model = model break if not short_term_data and not long_term_data: return None # Usa dati a breve termine per primi 2-3 giorni, poi passa a lungo termine cutoff_day = 2 # Usa modelli ad alta risoluzione per primi 2 giorni if short_term_data: # Gestisci best_match o modelli specifici if short_term_model == "best_match": model_display = "Best Match" else: model_display = MODEL_NAMES.get(short_term_model, short_term_model) # Verifica se ICON Italia ha dati di snow_depth (controllo diretto, non solo il flag) has_icon_snow_depth = False if icon_italia_data: icon_hourly = icon_italia_data.get("hourly", {}) icon_snow_depth = icon_hourly.get("snow_depth", []) if icon_hourly else [] # Verifica se ci sono dati non-null di snow_depth if icon_snow_depth: for sd in icon_snow_depth[:72]: # Controlla prime 72h if sd is not None: try: if float(sd) > 0: # Anche valori piccoli has_icon_snow_depth = True break except (ValueError, TypeError): continue # Se ICON Italia ha dati di snow_depth, aggiungilo ai modelli usati if has_icon_snow_depth or (icon_italia_data and icon_italia_data.get("has_snow_depth_data")): icon_display = MODEL_NAMES.get("italia_meteo_arpae_icon_2i", "ICON Italia") merged["models_used"].append(f"{model_display} + {icon_display} (snow_depth) (0-{cutoff_day+1}d)") else: merged["models_used"].append(f"{model_display} (0-{cutoff_day+1}d)") short_daily = short_term_data.get("daily", {}) short_hourly = short_term_data.get("hourly", {}) # Prendi dati daily dai primi giorni del modello a breve termine short_daily_times = short_daily.get("time", [])[:cutoff_day+1] for i, day_time in enumerate(short_daily_times): merged["daily"]["time"].append(day_time) for key in ["temperature_2m_max", "temperature_2m_min", "precipitation_sum", "precipitation_hours", "precipitation_probability_max", "snowfall_sum", "showers_sum", "rain_sum", "weathercode", "windspeed_10m_max", "windgusts_10m_max"]: val = short_daily.get(key, [])[i] if i < len(short_daily.get(key, [])) else None merged["daily"][key].append(val) # Prendi dati hourly dal modello a breve termine # Priorità: usa snow_depth da ICON Italia se disponibile, altrimenti dal modello principale short_hourly_times = short_hourly.get("time", []) icon_italia_hourly = icon_italia_data.get("hourly", {}) if icon_italia_data else {} icon_italia_hourly_times = icon_italia_hourly.get("time", []) if icon_italia_hourly else [] icon_italia_snow_depth = icon_italia_hourly.get("snow_depth", []) if icon_italia_hourly else [] # Crea mappa timestamp -> snow_depth per ICON Italia (per corrispondenza esatta o approssimata) icon_snow_depth_map = {} if icon_italia_hourly_times and icon_italia_snow_depth: for idx, ts in enumerate(icon_italia_hourly_times): if idx < len(icon_italia_snow_depth) and icon_italia_snow_depth[idx] is not None: try: val_cm = float(icon_italia_snow_depth[idx]) if val_cm >= 0: # Solo valori validi (già in cm) icon_snow_depth_map[ts] = val_cm except (ValueError, TypeError): continue cutoff_hour = (cutoff_day + 1) * 24 for i, hour_time in enumerate(short_hourly_times[:cutoff_hour]): merged["hourly"]["time"].append(hour_time) for key in ["temperature_2m", "precipitation", "snowfall", "rain", "weathercode", "windspeed_10m", "winddirection_10m", "dewpoint_2m", "precipitation_probability", "cloud_cover", "soil_temperature_0cm"]: val = short_hourly.get(key, [])[i] if i < len(short_hourly.get(key, [])) else None merged["hourly"][key].append(val) # Per snow_depth: usa ICON Italia se disponibile (corrispondenza per timestamp), altrimenti modello principale # NOTA: I valori sono già convertiti in cm durante il recupero dall'API val_snow_depth = None # Cerca corrispondenza esatta per timestamp if hour_time in icon_snow_depth_map: # Usa snow_depth da ICON Italia per questo timestamp (già in cm) val_snow_depth = icon_snow_depth_map[hour_time] else: # Fallback 1: cerca corrispondenza per ora approssimata (se i timestamp non corrispondono esattamente) # Estrai solo la parte ora (YYYY-MM-DDTHH) per corrispondenza approssimata hour_time_base = hour_time[:13] if len(hour_time) >= 13 else hour_time # "2025-01-09T12" for icon_ts, icon_val in icon_snow_depth_map.items(): if icon_ts.startswith(hour_time_base): val_snow_depth = icon_val break # Fallback 2: se non trovato, cerca il valore più vicino nello stesso giorno if val_snow_depth is None and hour_time_base: day_date_str = hour_time[:10] if len(hour_time) >= 10 else None # "2025-01-09" if day_date_str: # Cerca tutti i valori di ICON Italia per lo stesso giorno same_day_values = [v for ts, v in icon_snow_depth_map.items() if ts.startswith(day_date_str)] if same_day_values: # Usa il primo valore disponibile per quel giorno (approssimazione) val_snow_depth = same_day_values[0] # Fallback 3: usa snow_depth dal modello principale se ICON Italia non disponibile if val_snow_depth is None and i < len(short_hourly.get("snow_depth", [])): val_snow_depth = short_hourly.get("snow_depth", [])[i] merged["hourly"]["snow_depth"].append(val_snow_depth) if long_term_data: merged["models_used"].append(f"{MODEL_NAMES.get(long_term_model, long_term_model)} ({cutoff_day+1}-{forecast_days}d)") long_daily = long_term_data.get("daily", {}) long_hourly = long_term_data.get("hourly", {}) # Prendi dati daily dal modello a lungo termine per i giorni successivi long_daily_times = long_daily.get("time", []) start_idx = cutoff_day + 1 for i in range(start_idx, min(len(long_daily_times), forecast_days)): merged["daily"]["time"].append(long_daily_times[i]) for key in ["temperature_2m_max", "temperature_2m_min", "precipitation_sum", "precipitation_hours", "precipitation_probability_max", "snowfall_sum", "showers_sum", "rain_sum", "weathercode", "windspeed_10m_max", "windgusts_10m_max"]: val = long_daily.get(key, [])[i] if i < len(long_daily.get(key, [])) else None merged["daily"][key].append(val) # Per i dati hourly, completa con dati a lungo termine se necessario long_hourly_times = long_hourly.get("time", []) current_hourly_count = len(merged["hourly"]["time"]) needed_hours = forecast_days * 24 if current_hourly_count < needed_hours: start_hour_idx = current_hourly_count for i in range(start_hour_idx, min(len(long_hourly_times), needed_hours)): merged["hourly"]["time"].append(long_hourly_times[i]) for key in ["temperature_2m", "precipitation", "snowfall", "snow_depth", "rain", "weathercode", "windspeed_10m", "winddirection_10m", "dewpoint_2m", "precipitation_probability", "cloud_cover", "soil_temperature_0cm"]: val = long_hourly.get(key, [])[i] if i < len(long_hourly.get(key, [])) else None merged["hourly"][key].append(val) return merged def analyze_temperature_trend(daily_temps_max, daily_temps_min, days=10): """Analizza trend temperatura per identificare fronti caldi/freddi con dettaglio completo""" if not daily_temps_max or not daily_temps_min: return None max_days = min(days, len(daily_temps_max), len(daily_temps_min)) if max_days < 3: return None # Filtra valori None e calcola temperature medie giornaliere avg_temps = [] valid_indices = [] for i in range(max_days): t_max = daily_temps_max[i] t_min = daily_temps_min[i] if t_max is not None and t_min is not None: avg_temps.append((float(t_max) + float(t_min)) / 2) valid_indices.append(i) else: avg_temps.append(None) if len([t for t in avg_temps if t is not None]) < 3: return None # Analizza tendenza generale (prime 3 giorni vs ultimi 3 giorni validi) valid_temps = [t for t in avg_temps if t is not None] if len(valid_temps) < 3: return None first_avg = mean(valid_temps[:3]) last_avg = mean(valid_temps[-3:]) diff = last_avg - first_avg trend_type = None trend_intensity = "moderato" if diff > 5: trend_type = "fronte_caldo" trend_intensity = "forte" if diff > 8 else "moderato" elif diff > 2: trend_type = "riscaldamento" trend_intensity = "moderato" elif diff < -5: trend_type = "fronte_freddo" trend_intensity = "forte" if diff < -8 else "moderato" elif diff < -2: trend_type = "raffreddamento" trend_intensity = "moderato" else: trend_type = "stabile" # Identifica giorni di cambio significativo change_days = [] prev_temp = None for i, temp in enumerate(avg_temps): if temp is not None: if prev_temp is not None: day_diff = temp - prev_temp if abs(day_diff) > 3: # Cambio significativo (>3°C) change_days.append({ "day": i, "delta": round(day_diff, 1), "from": round(prev_temp, 1), "to": round(temp, 1) }) prev_temp = temp # Analisi per periodi (primi 3 giorni, medio termine, lungo termine) period_analysis = {} if len(valid_temps) >= 7: period_analysis["short_term"] = { "avg": round(mean(valid_temps[:3]), 1), "range": round(max(valid_temps[:3]) - min(valid_temps[:3]), 1) } mid_start = len(valid_temps) // 3 mid_end = (len(valid_temps) * 2) // 3 period_analysis["mid_term"] = { "avg": round(mean(valid_temps[mid_start:mid_end]), 1), "range": round(max(valid_temps[mid_start:mid_end]) - min(valid_temps[mid_start:mid_end]), 1) } period_analysis["long_term"] = { "avg": round(mean(valid_temps[-3:]), 1), "range": round(max(valid_temps[-3:]) - min(valid_temps[-3:]), 1) } return { "type": trend_type, "intensity": trend_intensity, "delta": round(diff, 1), "change_days": change_days, "first_avg": round(first_avg, 1), "last_avg": round(last_avg, 1), "period_analysis": period_analysis, "daily_avg_temps": avg_temps, "daily_max": daily_temps_max[:max_days], "daily_min": daily_temps_min[:max_days] } def analyze_weather_transitions(daily_weathercodes): """Analizza transizioni meteo significative""" transitions = [] if not daily_weathercodes or len(daily_weathercodes) < 2: return transitions # Categorie meteo def get_category(code): if code is None: return "variabile" code = int(code) if code in (0, 1): return "sereno" if code in (2, 3): return "nuvoloso" if code in (45, 48): return "nebbia" if code in (51, 53, 55, 56, 57, 61, 63, 65, 66, 67, 80, 81, 82): return "pioggia" if code in (71, 73, 75, 77, 85, 86): return "neve" if code in (95, 96, 99): return "temporale" return "variabile" for i in range(1, min(len(daily_weathercodes), 8)): prev_code = daily_weathercodes[i-1] if i-1 < len(daily_weathercodes) else None curr_code = daily_weathercodes[i] if i < len(daily_weathercodes) else None prev_cat = get_category(prev_code) curr_cat = get_category(curr_code) if prev_cat != curr_cat: transitions.append({ "day": i, "from": prev_cat, "to": curr_cat, "significant": prev_cat in ["sereno", "nuvoloso"] and curr_cat in ["pioggia", "neve", "temporale"] }) return transitions def get_precip_type(code): """Definisce il tipo di precipitazione in base al codice WMO.""" if (71 <= code <= 77) or code in [85, 86]: return "❄️ Neve" if code in [96, 99]: return "⚡🌨 Grandine" if code in [66, 67]: return "🧊☔ Pioggia Congelantesi" return "☔ Pioggia" def get_intensity_label(mm_h): if mm_h < 2.5: return "Debole" if mm_h < 7.6: return "Moderata" return "Forte ⚠️" def analyze_daily_events(times, codes, probs, precip, winds, temps, dewpoints, snowfalls=None, rains=None, soil_temps=None, cloud_covers=None, wind_speeds=None): """Scansiona le 24 ore e trova blocchi di eventi continui.""" events = [] # Prepara array per calcoli avanzati (allineato a check_ghiaccio.py) if snowfalls is None: snowfalls = [0.0] * len(times) if rains is None: rains = [0.0] * len(times) if soil_temps is None: soil_temps = [None] * len(times) if cloud_covers is None: cloud_covers = [None] * len(times) if wind_speeds is None: wind_speeds = [None] * len(times) # Calcola precipitazioni cumulative delle 3h precedenti per ogni punto precip_3h_sum = [] rain_3h_sum = [] snow_3h_sum = [] for i in range(len(times)): # Somma delle 3 ore precedenti (i-3, i-2, i-1) start_idx = max(0, i - 3) precip_sum = sum([float(p) if p is not None else 0.0 for p in precip[start_idx:i]]) rain_sum = sum([float(r) if r is not None else 0.0 for r in rains[start_idx:i]]) snow_sum = sum([float(s) if s is not None else 0.0 for s in snowfalls[start_idx:i]]) precip_3h_sum.append(precip_sum) rain_3h_sum.append(rain_sum) snow_3h_sum.append(snow_sum) # 1. PERICOLI (Ghiaccio, Gelo, Brina) - Logica migliorata allineata a check_ghiaccio.py in_ice = False start_ice = 0 ice_type = "" for i in range(len(times)): t = temps[i] if i < len(temps) and temps[i] is not None else 10 d = dewpoints[i] if i < len(dewpoints) and dewpoints[i] is not None else t p = precip[i] if i < len(precip) and precip[i] is not None else 0 c = codes[i] if i < len(codes) and codes[i] is not None else 0 if c is not None: try: c = int(c) except (ValueError, TypeError): c = 0 else: c = 0 # Estrai parametri avanzati t_soil = soil_temps[i] if i < len(soil_temps) and soil_temps[i] is not None else None cloud = cloud_covers[i] if i < len(cloud_covers) and cloud_covers[i] is not None else None wind = wind_speeds[i] if i < len(wind_speeds) and wind_speeds[i] is not None else None snowfall_curr = snowfalls[i] if i < len(snowfalls) and snowfalls[i] is not None else 0.0 rain_curr = rains[i] if i < len(rains) and rains[i] is not None else 0.0 # Determina se è notte (18:00-06:00) per raffreddamento radiativo try: hour = int(times[i].split("T")[1].split(":")[0]) if "T" in times[i] else 12 is_night = (hour >= 18) or (hour <= 6) except: is_night = False # Calcola temperatura suolo: usa valore misurato se disponibile, altrimenti stima (1-2°C più fredda) if t_soil is None: t_soil = t - 1.5 # Approssimazione conservativa # Applica raffreddamento radiativo: cielo sereno + notte + vento debole # Riduce la temperatura del suolo di 0.5-1.5°C (come in check_ghiaccio.py) t_soil_adjusted = t_soil if is_night and cloud is not None and cloud < 20.0: if wind is None or wind < 5.0: cooling = 1.5 # Vento molto debole = più raffreddamento elif wind < 10.0: cooling = 1.0 else: cooling = 0.5 t_soil_adjusted = t_soil - cooling # Precipitazioni nelle 3h precedenti p_3h = precip_3h_sum[i] if i < len(precip_3h_sum) else 0.0 r_3h = rain_3h_sum[i] if i < len(rain_3h_sum) else 0.0 s_3h = snow_3h_sum[i] if i < len(snow_3h_sum) else 0.0 # LOGICA MIGLIORATA (allineata a check_ghiaccio.py): current_ice_condition = None # 1. GELICIDIO (Freezing Rain) - priorità massima is_raining_code = (50 <= c <= 69) or (80 <= c <= 82) if c in [66, 67] or (p > 0 and t <= 0 and is_raining_code): current_ice_condition = "🧊☠️ GELICIDIO" # 2. Black Ice o Neve Ghiacciata - Precipitazione nelle 3h precedenti + suolo gelato elif p_3h > 0.1 and t_soil_adjusted < 0.0: # Distingue tra neve e pioggia has_snow = (s_3h > 0.1) or (snowfall_curr > 0.1) has_rain = (r_3h > 0.1) or (rain_curr > 0.1) if has_snow: current_ice_condition = "⛸️⚠️ Neve ghiacciata (suolo gelato)" elif has_rain: current_ice_condition = "⛸️⚠️ Black Ice (strada bagnata + suolo gelato)" else: current_ice_condition = "⛸️⚠️ Black Ice (strada bagnata + suolo gelato)" # 3. BRINA (Hoar Frost) - Suolo <= 0°C e punto di rugiada > suolo ma < 0°C elif p_3h <= 0.1 and t_soil_adjusted <= 0.0 and d is not None: if d > t_soil_adjusted and d < 0.0: current_ice_condition = "⛸️⚠️ GHIACCIO/BRINA" # 4. GELATA - Temperatura aria < 0°C (senza altre condizioni) elif t < 0: current_ice_condition = "🧊 Gelata" if current_ice_condition and not in_ice: in_ice = True start_ice = i ice_type = current_ice_condition elif (not current_ice_condition and in_ice) or (in_ice and current_ice_condition != ice_type) or (in_ice and i == len(times)-1): end_idx = i if not current_ice_condition else i if end_idx > start_ice: start_time = times[start_ice].split("T")[1][:5] end_time = times[min(end_idx, len(times)-1)].split("T")[1][:5] temp_block = temps[start_ice:min(end_idx+1, len(temps))] temp_block_clean = [t for t in temp_block if t is not None] min_t = min(temp_block_clean) if temp_block_clean else 0 # Per GHIACCIO/BRINA, verifica che la temperatura minima sia effettivamente sotto/sopra soglia critica # Se la temperatura minima è > 1.5°C, non è un rischio reale if ice_type == "⛸️⚠️ GHIACCIO/BRINA" and min_t > 1.5: # Non segnalare se la temperatura minima è troppo alta pass else: events.append(f"{ice_type}: {start_time}-{end_time} (Min: {min_t:.0f}°C)") in_ice = False if current_ice_condition: in_ice = True start_ice = i ice_type = current_ice_condition # 2. PRECIPITAZIONI in_rain = False start_idx = 0 current_rain_type = "" for i in range(len(times)): p_val = precip[i] if i < len(precip) and precip[i] is not None else 0 is_raining = p_val >= MIN_MM_PER_EVENTO if is_raining and not in_rain: in_rain = True start_idx = i code_val = codes[i] if i < len(codes) and codes[i] is not None else 0 try: code_val = int(code_val) if code_val is not None else 0 except (ValueError, TypeError): code_val = 0 current_rain_type = get_precip_type(code_val) elif in_rain and is_raining and i < len(codes): code_val = codes[i] if codes[i] is not None else 0 try: code_val = int(code_val) if code_val is not None else 0 except (ValueError, TypeError): code_val = 0 new_type = get_precip_type(code_val) if new_type != current_rain_type: end_idx = i block_precip = precip[start_idx:end_idx] if end_idx <= len(precip) else precip[start_idx:] block_precip_clean = [p for p in block_precip if p is not None] tot_mm = sum(block_precip_clean) prob_block = probs[start_idx:end_idx] if end_idx <= len(probs) else probs[start_idx:] prob_block_clean = [p for p in prob_block if p is not None] max_prob = max(prob_block_clean) if prob_block_clean else 0 start_time = times[start_idx].split("T")[1][:5] end_time = times[end_idx].split("T")[1][:5] if end_idx < len(times) else times[-1].split("T")[1][:5] avg_intensity = tot_mm / len(block_precip) if block_precip else 0 events.append( f"{current_rain_type} ({get_intensity_label(avg_intensity)}):\n" f" 🕒 {start_time}-{end_time} | 💧 {tot_mm:.1f}mm | Prob: {max_prob}%" ) start_idx = i current_rain_type = new_type elif (not is_raining and in_rain) or (in_rain and i == len(times)-1): in_rain = False end_idx = i if not is_raining else i + 1 block_precip = precip[start_idx:end_idx] if end_idx <= len(precip) else precip[start_idx:] block_precip_clean = [p for p in block_precip if p is not None] tot_mm = sum(block_precip_clean) if tot_mm > 0: prob_block = probs[start_idx:end_idx] if end_idx <= len(probs) else probs[start_idx:] prob_block_clean = [p for p in prob_block if p is not None] max_prob = max(prob_block_clean) if prob_block_clean else 0 start_time = times[start_idx].split("T")[1][:5] end_time = times[min(end_idx-1, len(times)-1)].split("T")[1][:5] avg_intensity = tot_mm / len(block_precip) if block_precip else 0 events.append( f"{current_rain_type} ({get_intensity_label(avg_intensity)}):\n" f" 🕒 {start_time}-{end_time} | 💧 {tot_mm:.1f}mm | Prob: {max_prob}%" ) # 3. VENTO if winds: wind_values = [w for w in winds if w is not None] if wind_values: max_wind = max(wind_values) if max_wind > SOGLIA_VENTO_KMH: try: peak_idx = winds.index(max_wind) except ValueError: peak_idx = 0 peak_time = times[min(peak_idx, len(times)-1)].split("T")[1][:5] events.append(f"💨 Vento Forte: Picco {max_wind:.0f}km/h alle {peak_time}") return events def generate_practical_advice(trend, transitions, events_summary, daily_data): """Genera consigli pratici basati sull'analisi meteo""" advice = [] # Consigli basati su trend temperatura if trend: if trend["type"] == "fronte_freddo" and trend["intensity"] == "forte": advice.append("❄️ Fronte Freddo in Arrivo: Preparati a temperature in calo significativo. Controlla riscaldamento, proteggi piante sensibili.") elif trend["type"] == "fronte_caldo" and trend["intensity"] == "forte": advice.append("🔥 Ondata di Calore: Temperature in aumento. Mantieni case fresche, idratazione importante, attenzione a persone fragili.") elif trend["type"] == "raffreddamento": advice.append("🌡️ Raffreddamento: Temperature in calo graduale. Vestiti a strati, verifica isolamento porte/finestre.") elif trend["type"] == "riscaldamento": advice.append("☀️ Riscaldamento: Temperature in aumento. Buon momento per attività all'aperto, ventilazione naturale.") # Consigli basati su transizioni meteo significant_rain = any(t["to"] in ["pioggia", "neve", "temporale"] and t["significant"] for t in transitions[:3]) if significant_rain: advice.append("🌧️ Precipitazioni in Arrivo: Prepara ombrelli/impermeabili. Evita viaggi non necessari durante picchi, controlla grondaie.") # Consigli basati su eventi pericolosi has_ice = any("GELICIDIO" in e or "GHIACCIO" in e for events in events_summary for e in events if events) if has_ice: advice.append("⚠️ Rischio Ghiaccio: Strade scivolose previste. Evita viaggi non urgenti, guida con estrema cautela, antigelo/sale pronti.") # Consigli basati su vento forte has_wind = any("Vento Forte" in e for events in events_summary for e in events if events) if has_wind: advice.append("💨 Vento Forte: Fissa oggetti in balcone/giardino, attenzione a rami, guidare con prudenza su strade esposte.") # Consigli stagionali generali if daily_data and len(daily_data) > 0: first_day = daily_data[0] if first_day.get("t_min", 15) < 5: advice.append("🏠 Gestione Domestica: Temperature basse previste. Verifica caldaia, risparmio energetico con isolamento, attenzione a tubazioni esterne.") elif first_day.get("precip_sum", 0) > 20: advice.append("💧 Piogge Intense: Accumuli significativi previsti. Controlla drenaggi, pozzetti, evitare zone soggette ad allagamenti.") return advice def format_detailed_trend_explanation(trend, daily_data_list): """Genera spiegazione dettagliata del trend temperatura su 10 giorni""" if not trend: return "" explanation = [] explanation.append(f"📊 EVOLUZIONE TEMPERATURE (10 GIORNI)\n") # Trend principale con spiegazione chiara trend_type = trend["type"] intensity = trend["intensity"] delta = trend['delta'] first_avg = trend['first_avg'] last_avg = trend['last_avg'] if trend_type == "fronte_caldo": trend_desc = "🔥 Fronte Caldo in Arrivo" desc_text = f"Arrivo di aria più calda: temperatura media passerà da {first_avg:.1f}°C a {last_avg:.1f}°C (+{delta:.1f}°C)." elif trend_type == "fronte_freddo": trend_desc = "❄️ Fronte Freddo in Arrivo" desc_text = f"Arrivo di aria più fredda: temperatura media scenderà da {first_avg:.1f}°C a {last_avg:.1f}°C ({delta:+.1f}°C)." elif trend_type == "riscaldamento": trend_desc = "📈 Riscaldamento Progressivo" desc_text = f"Tendenza al rialzo delle temperature: da {first_avg:.1f}°C a {last_avg:.1f}°C (+{delta:.1f}°C)." elif trend_type == "raffreddamento": trend_desc = "📉 Raffreddamento Progressivo" desc_text = f"Tendenza al ribasso delle temperature: da {first_avg:.1f}°C a {last_avg:.1f}°C ({delta:+.1f}°C)." elif trend_type == "stabile": trend_desc = "➡️ Temperature Stabili" desc_text = f"Temperature medie sostanzialmente stabili: da {first_avg:.1f}°C a {last_avg:.1f}°C (variazione {delta:+.1f}°C)." else: trend_desc = "🌡️ Variazione Termica" desc_text = f"Evoluzione temperature: da {first_avg:.1f}°C a {last_avg:.1f}°C ({delta:+.1f}°C)." intensity_text = " (variazione significativa)" if intensity == "forte" else " (variazione moderata)" explanation.append(f"{trend_desc}{intensity_text}") explanation.append(f"{desc_text}") # Aggiungi solo picchi significativi in modo sintetico if trend.get("change_days"): significant_changes = [c for c in trend["change_days"] if abs(c['delta']) > 3.0][:3] if significant_changes: change_texts = [] for change in significant_changes: day_name = f"Giorno {change['day']+1}" direction = "↑" if change['delta'] > 0 else "↓" change_texts.append(f"{direction} {day_name}: {change['from']:.0f}°→{change['to']:.0f}°C") if change_texts: explanation.append(f"Picchi: {', '.join(change_texts)}") explanation.append("") return "\n".join(explanation) def format_weather_context_report(models_data, location_name, country_code): """Genera report contestuale intelligente con ensemble multi-modello""" # Combina modelli a breve e lungo termine merged_data = merge_multi_model_forecast(models_data, forecast_days=10) if not merged_data: return "❌ Errore: Nessun dato meteo disponibile" hourly = merged_data.get('hourly', {}) daily = merged_data.get('daily', {}) models_used = merged_data.get('models_used', []) if not daily or not daily.get('time'): return "❌ Errore: Dati meteo incompleti" msg_parts = [] # HEADER models_text = " + ".join(models_used) if models_used else "Multi-modello" msg_parts.append(f"🌍 METEO FORECAST") msg_parts.append(f"{location_name.upper()}") msg_parts.append(f"📡 Ensemble: {models_text}\n") # ANALISI TREND TEMPERATURA (Fronti) - Completa su 10 giorni daily_temps_max = daily.get('temperature_2m_max', []) daily_temps_min = daily.get('temperature_2m_min', []) trend = analyze_temperature_trend(daily_temps_max, daily_temps_min, days=10) # Spiegazione dettagliata trend (sempre, anche se stabile) if trend: trend_explanation = format_detailed_trend_explanation(trend, daily_data_list=[]) if trend_explanation: msg_parts.append(trend_explanation) # ANALISI TRANSIZIONI METEO - Include anche precipitazioni prossimi giorni daily_time_list = daily.get('time', []) # Definito qui per uso successivo daily_weathercodes = daily.get('weathercode', []) transitions = analyze_weather_transitions(daily_weathercodes) # Cerca anche precipitazioni significative nei primi giorni precip_list = daily.get('precipitation_sum', []) weather_changes = [] # Aggiungi transizioni significative if transitions: significant_trans = [t for t in transitions if t.get("significant", False)] for trans in significant_trans[:5]: day_names = ["oggi", "domani", "dopodomani", "fra 3 giorni", "fra 4 giorni", "fra 5 giorni", "fra 6 giorni"] day_idx = trans["day"] - 1 if day_idx < len(day_names): day_ref = day_names[day_idx] else: day_ref = f"fra {trans['day']} giorni" weather_changes.append({ "day": trans["day"], "day_ref": day_ref, "from": trans["from"], "to": trans["to"], "type": "transition" }) # Aggiungi precipitazioni significative per domani/dopodomani se non già presenti # Usa snowfall (più affidabile) per determinare il tipo (pioggia/neve/grandine) # Prepara mappa hourly per giorni daily_map = defaultdict(list) times = hourly.get('time', []) for i, t in enumerate(times): daily_map[t.split("T")[0]].append(i) for day_idx in range(min(3, len(precip_list))): if day_idx >= len(precip_list) or precip_list[day_idx] is None: continue precip_amount = float(precip_list[day_idx]) day_num = day_idx + 1 # Verifica se già presente come transizione already_present = any(wc["day"] == day_num for wc in weather_changes) if already_present: continue # Ottieni dati hourly per questo giorno per determinare tipo precipitazione day_date = daily_time_list[day_idx].split("T")[0] if day_idx < len(daily_time_list) else None if not day_date: continue indices = daily_map.get(day_date, []) d_snow_day = [hourly.get('snowfall', [])[i] for i in indices if i < len(hourly.get('snowfall', []))] d_codes_day = [hourly.get('weathercode', [])[i] for i in indices if i < len(hourly.get('weathercode', []))] d_temps_day = [hourly.get('temperature_2m', [])[i] for i in indices if i < len(hourly.get('temperature_2m', []))] d_dews_day = [hourly.get('dewpoint_2m', [])[i] for i in indices if i < len(hourly.get('dewpoint_2m', []))] # Calcola accumulo neve per questo giorno snow_sum_day = sum([float(s) for s in d_snow_day if s is not None]) if d_snow_day else 0.0 # Determina tipo precipitazione usando snowfall (priorità) o weathercode (fallback) # NON inventiamo neve basandoci su temperatura - solo se snowfall>0 o weathercode indica neve # PRIORITÀ: Se c'è neve (anche poca), il simbolo è sempre ❄️, anche se la pioggia è maggiore precip_type_symbol = "💧" # Default pioggia threshold_mm = 5.0 # Soglia default per pioggia if precip_amount > 0.1: # Se snowfall è disponibile e positivo, usa quello (più preciso) if snow_sum_day > 0.1: # Se c'è neve (anche poca), il simbolo è sempre ❄️ (priorità alla neve) precip_type_symbol = "❄️" # Neve threshold_mm = 0.5 # Soglia più bassa per neve (anche pochi mm sono significativi) else: # Fallback: verifica weathercode per neve esplicita # Solo se i modelli indicano esplicitamente neve nei codici WMO snow_codes = [71, 73, 75, 77, 85, 86] # Codici WMO per neve hail_codes = [96, 99] # Codici WMO per grandine/temporale snow_count = sum(1 for c in d_codes_day if c is not None and int(c) in snow_codes) hail_count = sum(1 for c in d_codes_day if c is not None and int(c) in hail_codes) if hail_count > 0: precip_type_symbol = "⛈️" # Grandine/Temporale threshold_mm = 5.0 elif snow_count > 0: # Solo se weathercode indica esplicitamente neve precip_type_symbol = "❄️" # Neve threshold_mm = 0.5 # Soglia più bassa per neve # Aggiungi solo se supera la soglia appropriata if precip_amount > threshold_mm: day_names = ["oggi", "domani", "dopodomani"] if day_idx < len(day_names): weather_changes.append({ "day": day_num, "day_ref": day_names[day_idx], "from": "variabile", "to": "precipitazioni", "type": "precip", "amount": precip_amount, "precip_symbol": precip_type_symbol }) if weather_changes: # Ordina per giorno weather_changes.sort(key=lambda x: x["day"]) msg_parts.append("🔄 CAMBIAMENTI METEO SIGNIFICATIVI") for wc in weather_changes[:5]: if wc["type"] == "transition": from_icon = "☀️" if wc["from"] == "sereno" else "☁️" if wc["from"] == "nuvoloso" else "🌧️" to_icon = "🌧️" if "pioggia" in wc["to"] else "❄️" if "neve" in wc["to"] else "⛈️" if "temporale" in wc["to"] else "☁️" msg_parts.append(f" {from_icon}→{to_icon} {wc['day_ref']}: {wc['from']} → {wc['to']}") else: # Precipitazioni - usa simbolo appropriato precip_sym = wc.get('precip_symbol', '💧') msg_parts.append(f" ☁️→{precip_sym} {wc['day_ref']}: precipitazioni ({wc['amount']:.1f}mm)") msg_parts.append("") # DETTAGLIO GIORNALIERO # Usa i dati daily come riferimento principale (sono più affidabili) # daily_time_list già definito sopra temp_min_list = daily.get('temperature_2m_min', []) temp_max_list = daily.get('temperature_2m_max', []) # Limita ai giorni per cui abbiamo dati daily validi max_days = min(len(daily_time_list), len(temp_min_list), len(temp_max_list), 10) # Mappa hourly per eventi dettagliati daily_map = defaultdict(list) times = hourly.get('time', []) for i, t in enumerate(times): daily_map[t.split("T")[0]].append(i) events_summary = [] daily_details = [] for count in range(max_days): day_date = daily_time_list[count].split("T")[0] if count < len(daily_time_list) else None if not day_date: break # Ottieni indici hourly per questo giorno indices = daily_map.get(day_date, []) # Estrai dati hourly per questo giorno (se disponibili) d_times = [hourly['time'][i] for i in indices if i < len(hourly.get('time', []))] d_codes = [hourly.get('weathercode', [])[i] for i in indices if i < len(hourly.get('weathercode', []))] d_probs = [hourly.get('precipitation_probability', [])[i] for i in indices if i < len(hourly.get('precipitation_probability', []))] d_precip = [hourly.get('precipitation', [])[i] for i in indices if i < len(hourly.get('precipitation', []))] d_snow = [hourly.get('snowfall', [])[i] for i in indices if i < len(hourly.get('snowfall', []))] d_winds = [hourly.get('windspeed_10m', [])[i] for i in indices if i < len(hourly.get('windspeed_10m', []))] d_winddir = [hourly.get('winddirection_10m', [])[i] for i in indices if i < len(hourly.get('winddirection_10m', []))] d_temps = [hourly.get('temperature_2m', [])[i] for i in indices if i < len(hourly.get('temperature_2m', []))] d_dews = [hourly.get('dewpoint_2m', [])[i] for i in indices if i < len(hourly.get('dewpoint_2m', []))] d_clouds = [hourly.get('cloud_cover', [])[i] for i in indices if i < len(hourly.get('cloud_cover', []))] d_rains = [hourly.get('rain', [])[i] for i in indices if i < len(hourly.get('rain', []))] d_soil_temps = [hourly.get('soil_temperature_0cm', [])[i] for i in indices if i < len(hourly.get('soil_temperature_0cm', []))] d_snow_depth = [hourly.get('snow_depth', [])[i] for i in indices if i < len(hourly.get('snow_depth', []))] # Usa dati daily come primario (più affidabili) try: t_min_val = temp_min_list[count] if count < len(temp_min_list) else None t_max_val = temp_max_list[count] if count < len(temp_max_list) else None # Se dati daily validi, usali; altrimenti calcola da hourly if t_min_val is not None and t_max_val is not None: t_min = float(t_min_val) t_max = float(t_max_val) elif d_temps and any(t is not None for t in d_temps): temp_clean = [float(t) for t in d_temps if t is not None] t_min = min(temp_clean) t_max = max(temp_clean) else: # Se non ci sono dati, salta questo giorno continue # Usa dati daily per caratterizzazione precipitazioni precip_list = daily.get('precipitation_sum', []) snowfall_list = daily.get('snowfall_sum', []) rain_list = daily.get('rain_sum', []) showers_list = daily.get('showers_sum', []) if count < len(precip_list) and precip_list[count] is not None: precip_sum = float(precip_list[count]) elif d_precip and any(p is not None for p in d_precip): precip_sum = sum([float(p) for p in d_precip if p is not None]) else: precip_sum = 0.0 # Caratterizza precipitazioni usando dati daily snowfall_sum = 0.0 rain_sum = 0.0 showers_sum = 0.0 if count < len(snowfall_list) and snowfall_list[count] is not None: snowfall_sum = float(snowfall_list[count]) elif d_snow and any(s is not None for s in d_snow): snowfall_sum = sum([float(s) for s in d_snow if s is not None]) if count < len(rain_list) and rain_list[count] is not None: rain_sum = float(rain_list[count]) if count < len(showers_list) and showers_list[count] is not None: showers_sum = float(showers_list[count]) wind_list = daily.get('windspeed_10m_max', []) if count < len(wind_list) and wind_list[count] is not None: wind_max = float(wind_list[count]) elif d_winds and any(w is not None for w in d_winds): wind_max = max([float(w) for w in d_winds if w is not None]) else: wind_max = 0.0 # Calcola direzione vento dominante wind_dir_deg = None wind_dir_list = daily.get('winddirection_10m_dominant', []) if count < len(wind_dir_list) and wind_dir_list[count] is not None: wind_dir_deg = float(wind_dir_list[count]) elif d_winddir and any(wd is not None for wd in d_winddir): # Media delle direzioni vento del giorno wind_dir_clean = [float(wd) for wd in d_winddir if wd is not None] if wind_dir_clean: # Calcola media circolare import math sin_sum = sum(math.sin(math.radians(wd)) for wd in wind_dir_clean) cos_sum = sum(math.cos(math.radians(wd)) for wd in wind_dir_clean) wind_dir_deg = math.degrees(math.atan2(sin_sum / len(wind_dir_clean), cos_sum / len(wind_dir_clean))) if wind_dir_deg < 0: wind_dir_deg += 360 wind_dir_cardinal = degrees_to_cardinal(int(wind_dir_deg)) if wind_dir_deg is not None else "N" except (ValueError, TypeError, IndexError) as e: # Se ci sono errori nei dati, salta questo giorno continue events_list = analyze_daily_events(d_times, d_codes, d_probs, d_precip, d_winds, d_temps, d_dews, snowfalls=d_snow, rains=d_rains, soil_temps=d_soil_temps, cloud_covers=d_clouds, wind_speeds=d_winds) events_summary.append(events_list) dt = datetime.datetime.strptime(day_date, "%Y-%m-%d") # Nomi giorni in italiano giorni_ita = ["Lun", "Mar", "Mer", "Gio", "Ven", "Sab", "Dom"] day_str = f"{giorni_ita[dt.weekday()]} {dt.strftime('%d/%m')}" # Icona meteo principale basata sul weathercode del giorno wcode = daily.get('weathercode', [])[count] if count < len(daily.get('weathercode', [])) else None if wcode is None and d_codes: # Se non c'è weathercode daily, usa il più frequente tra gli hourly codes_clean = [int(c) for c in d_codes if c is not None] if codes_clean: wcode = Counter(codes_clean).most_common(1)[0][0] else: wcode = 0 else: wcode = int(wcode) if wcode is not None else 0 # Calcola probabilità e tipo precipitazione per il giorno (PRIMA di determinare icona) precip_prob = 0 precip_type = None if d_probs and any(p is not None for p in d_probs): prob_values = [p for p in d_probs if p is not None] precip_prob = max(prob_values) if prob_values else 0 # Determina tipo precipitazione usando dati daily (più affidabili) # Usa snowfall_sum, rain_sum, showers_sum per caratterizzazione precisa # PRIORITÀ: Se snow_depth > 0 (modello italia_meteo_arpae_icon_2i), usa questi dati per determinare neve precip_type = None # Verifica snow_depth (per modello italia_meteo_arpae_icon_2i quando snow_depth > 0) # NOTA: I valori sono già convertiti in cm durante il recupero dall'API has_snow_depth_data = False max_snow_depth = 0.0 if d_snow_depth and len(d_snow_depth) > 0: snow_depth_valid = [] for sd in d_snow_depth: if sd is not None: try: val_cm = float(sd) # Già in cm if val_cm >= 0: snow_depth_valid.append(val_cm) except (ValueError, TypeError): continue if snow_depth_valid: max_snow_depth = max(snow_depth_valid) if max_snow_depth > 0: # Se snow_depth > 0 cm, considera presenza di neve persistente has_snow_depth_data = True # Verifica snow_depth INDIPENDENTEMENTE dalle precipitazioni # snow_depth rappresenta il manto nevoso persistente, non la neve che cade if has_snow_depth_data and max_snow_depth > 0: # C'è manto nevoso al suolo (indipendente da snowfall) # Questo influenza l'icona meteo, ma non il tipo di precipitazione se non sta nevicando pass # Gestito separatamente per l'icona meteo if precip_sum > 0.1: # Priorità 1: Se sta nevicando (snowfall > 0) e c'è manto nevoso, considera entrambi if has_snow_depth_data and max_snow_depth > 0: # C'è sia neve in caduta che manto nevoso persistente if rain_sum > 0.1 or showers_sum > 0.1: precip_type = "mixed" # Neve + pioggia/temporali elif snowfall_sum > 0.1: precip_type = "snow" # Neve in caduta + manto nevoso else: # Solo manto nevoso persistente, nessuna neve in caduta # Il tipo di precipitazione resta quello basato su snowfall/rain pass # Priorità 2: Usa dati daily se disponibili elif snowfall_sum > 0.1: # C'è neve significativa if snowfall_sum >= precip_sum * 0.5: precip_type = "snow" elif rain_sum > 0.1 or showers_sum > 0.1: # Mista (neve + pioggia/temporali) precip_type = "mixed" else: precip_type = "snow" elif rain_sum > 0.1: # Pioggia if showers_sum > 0.1: # Temporali (showers) precip_type = "thunderstorms" else: precip_type = "rain" elif showers_sum > 0.1: # Solo temporali precip_type = "thunderstorms" else: # Fallback: usa dati hourly se daily non disponibili snow_sum_day = sum([float(s) for s in d_snow if s is not None]) if d_snow else 0.0 if snow_sum_day > 0.1: if snow_sum_day >= precip_sum * 0.5: precip_type = "snow" else: precip_type = "rain" else: # Fallback: verifica weathercode per neve esplicita snow_codes = [71, 73, 75, 77, 85, 86] # Codici WMO per neve rain_codes = [51, 53, 55, 56, 57, 61, 63, 65, 80, 81, 82, 66, 67] # Codici WMO per pioggia hail_codes = [96, 99] # Codici WMO per grandine/temporale snow_count = sum(1 for c in d_codes if c is not None and int(c) in snow_codes) rain_count = sum(1 for c in d_codes if c is not None and int(c) in rain_codes) hail_count = sum(1 for c in d_codes if c is not None and int(c) in hail_codes) if hail_count > 0: precip_type = "hail" elif snow_count > rain_count: precip_type = "snow" else: precip_type = "rain" # Determina icona basandosi su precipitazioni (priorità) e poi nuvolosità/weathercode # Usa precip_type già calcolato # NOTA: snow_depth è INDIPENDENTE da precipitazioni - influenza l'icona anche senza nevicate has_precip = precip_sum > 0.1 if has_precip: # Precipitazioni: usa precip_type per determinare icona if precip_type == "snow": weather_icon = "❄️" # Neve elif precip_type == "thunderstorms" or precip_type == "hail" or wcode in (95, 96, 99): weather_icon = "⛈️" # Temporale/Grandine elif precip_type == "mixed": weather_icon = "🌨️" # Precipitazione mista else: weather_icon = "🌧️" # Pioggia elif has_snow_depth_data and max_snow_depth > 0: # C'è manto nevoso persistente anche senza precipitazioni # Mostra icona neve anche se non sta nevicando weather_icon = "❄️" # Manto nevoso presente elif t_min < 0: # Giorno freddo (t_min < 0): usa icona ghiaccio (indipendentemente da snow_depth) # Se c'è anche snow_depth, viene già gestito sopra weather_icon = "🧊" # Gelo/Ghiaccio (giorno freddo) elif wcode in (45, 48): weather_icon = "🌫️" # Nebbia else: # Nessuna precipitazione: usa nuvolosità se disponibile, altrimenti weathercode avg_cloud = 0 if d_clouds and any(c is not None for c in d_clouds): cloud_clean = [float(c) for c in d_clouds if c is not None] avg_cloud = sum(cloud_clean) / len(cloud_clean) if cloud_clean else 0 if avg_cloud > 0: # Usa nuvolosità media if avg_cloud <= 25: weather_icon = "☀️" # Sereno elif avg_cloud <= 50: weather_icon = "⛅" # Parzialmente nuvoloso elif avg_cloud <= 75: weather_icon = "☁️" # Nuvoloso else: weather_icon = "☁️" # Molto nuvoloso else: # Fallback a weathercode if wcode in (0, 1): weather_icon = "☀️" # Sereno elif wcode in (2, 3): weather_icon = "⛅" # Parzialmente nuvoloso else: weather_icon = "☁️" # Nuvoloso # Recupera probabilità max daily se disponibile prob_max_list = daily.get('precipitation_probability_max', []) precip_prob_max = None if count < len(prob_max_list) and prob_max_list[count] is not None: precip_prob_max = int(prob_max_list[count]) elif precip_prob > 0: precip_prob_max = int(precip_prob) # Calcola spessore manto nevoso (snow_depth) per questo giorno # Nota: snow_depth è disponibile solo per alcuni modelli (es. ICON Italia, ICON-D2) # Se il modello non supporta snow_depth, tutti i valori saranno None o mancanti snow_depth_min = None snow_depth_max = None snow_depth_avg = None snow_depth_end = None # Verifica se abbiamo dati snow_depth validi per questo giorno # NOTA: I valori sono già convertiti in cm durante il recupero dall'API # Estrai anche direttamente dall'array hourly per sicurezza (fallback se d_snow_depth è vuoto) # PRIORITÀ: usa sempre i dati dall'array hourly per quel giorno (più affidabile) all_snow_depth_values = [] hourly_snow_depth = hourly.get('snow_depth', []) hourly_times = hourly.get('time', []) # Cerca direttamente nell'array hourly per questo giorno usando i timestamp (più affidabile) if hourly_snow_depth and hourly_times and day_date: for idx, ts in enumerate(hourly_times): if ts.startswith(day_date) and idx < len(hourly_snow_depth): if hourly_snow_depth[idx] is not None: all_snow_depth_values.append(hourly_snow_depth[idx]) # Se non trovato con timestamp, usa d_snow_depth come fallback if not all_snow_depth_values and d_snow_depth and len(d_snow_depth) > 0: all_snow_depth_values = d_snow_depth if all_snow_depth_values and len(all_snow_depth_values) > 0: # Filtra solo valori validi (>= 0 e non null, già in cm) snow_depth_clean = [] has_valid_data = False for sd in all_snow_depth_values: if sd is not None: has_valid_data = True # Almeno un dato non-null presente try: val_cm = float(sd) # Già in cm if val_cm >= 0: # Solo valori non negativi snow_depth_clean.append(val_cm) except (ValueError, TypeError): continue # Calcola statistiche se abbiamo almeno un valore non-null (il modello supporta snow_depth) # snow_depth è INDIPENDENTE da snowfall: rappresenta il manto nevoso persistente al suolo # Mostriamo sempre quando disponibile e > 0, anche se non nevica if has_valid_data and snow_depth_clean: max_depth = max(snow_depth_clean) min_depth = min(snow_depth_clean) # Calcola sempre le statistiche se ci sono dati validi, anche se il valore è piccolo if max_depth > 0: # Mostra se almeno un valore > 0 cm snow_depth_min = min_depth snow_depth_max = max_depth snow_depth_avg = sum(snow_depth_clean) / len(snow_depth_clean) # Prendi l'ultimo valore non-null del giorno (spessore alla fine del giorno) # Ordina per valore per trovare l'ultimo valore del giorno (non necessariamente l'ultimo della lista) snow_depth_end = snow_depth_clean[-1] if snow_depth_clean else None # Preferisci l'ultimo valore non-null originale per avere il valore alla fine del giorno if all_snow_depth_values: for sd in reversed(all_snow_depth_values): if sd is not None: try: val_cm = float(sd) if val_cm > 0: snow_depth_end = val_cm break except (ValueError, TypeError): continue day_info = { "day_str": day_str, "t_min": t_min, "t_max": t_max, "precip_sum": precip_sum, "precip_prob": precip_prob_max if precip_prob_max is not None else precip_prob, "precip_type": precip_type, "snowfall_sum": snowfall_sum, "rain_sum": rain_sum, "showers_sum": showers_sum, "wind_max": wind_max, "wind_dir": wind_dir_cardinal, "events": events_list, "weather_icon": weather_icon, "snow_depth_min": snow_depth_min, "snow_depth_max": snow_depth_max, "snow_depth_avg": snow_depth_avg, "snow_depth_end": snow_depth_end } daily_details.append(day_info) count += 1 # Formatta dettagli giornalieri (tutti i giorni disponibili) msg_parts.append("📅 PREVISIONI GIORNALIERE") prev_snow_depth_end = None # Traccia lo spessore del giorno precedente per mostrare evoluzione for day_info in daily_details: line = f"{day_info['weather_icon']} {day_info['day_str']} 🌡️ {day_info['t_min']:.0f}°/{day_info['t_max']:.0f}°C" # Aggiungi informazioni precipitazioni con caratterizzazione dettagliata # Nota: mm è accumulo totale giornaliero (somma di tutte le ore) if day_info['precip_sum'] > 0.1: # Caratterizza usando dati daily se disponibili precip_parts = [] # Neve if day_info.get('snowfall_sum', 0) > 0.1: precip_parts.append(f"❄️ {day_info['snowfall_sum']:.1f}cm") # Pioggia if day_info.get('rain_sum', 0) > 0.1: precip_parts.append(f"🌧️ {day_info['rain_sum']:.1f}mm") # Temporali (showers) if day_info.get('showers_sum', 0) > 0.1: precip_parts.append(f"⛈️ {day_info['showers_sum']:.1f}mm") # Se non abbiamo dati daily dettagliati, usa il tipo generale if not precip_parts: precip_symbol = "❄️" if day_info['precip_type'] == "snow" else "⛈️" if day_info['precip_type'] in ("hail", "thunderstorms") else "🌨️" if day_info['precip_type'] == "mixed" else "🌧️" precip_parts.append(f"{precip_symbol} {day_info['precip_sum']:.1f}mm") line += f" | {' + '.join(precip_parts)}" # Aggiungi probabilità se disponibile if day_info['precip_prob'] and day_info['precip_prob'] > 0: line += f" ({int(day_info['precip_prob'])}%)" elif day_info['precip_prob'] > 50: # Probabilità alta ma nessuna precipitazione prevista (può essere un errore del modello) line += f" | 💧 Possibile ({int(day_info['precip_prob'])}%)" # Aggiungi vento (sempre se disponibile, formattato come direzione intensità) if day_info['wind_max'] > 0: wind_str = f"{day_info['wind_dir']} {day_info['wind_max']:.0f}km/h" line += f" | 💨 {wind_str}" msg_parts.append(line) # Mostra spessore manto nevoso se disponibile e > 0 # snow_depth è INDIPENDENTE da snowfall: rappresenta il manto nevoso persistente al suolo # Deve essere sempre mostrato quando disponibile, anche nei giorni senza nevicate # Se snow_depth_end è None ma ci sono dati validi, ricalcola snow_depth_end = day_info.get('snow_depth_end') if snow_depth_end is None: # Prova a ricalcolare da snow_depth_min/max/avg se disponibili snow_depth_max = day_info.get('snow_depth_max') snow_depth_avg = day_info.get('snow_depth_avg') if snow_depth_max is not None and snow_depth_max > 0: snow_depth_end = snow_depth_max # Usa il massimo come fallback elif snow_depth_avg is not None and snow_depth_avg > 0: snow_depth_end = snow_depth_avg # Usa la media come fallback if snow_depth_end is not None and snow_depth_end > 0: snow_depth_str = f"❄️ Manto nevoso: {snow_depth_end:.1f} cm" # Mostra evoluzione rispetto al giorno precedente if prev_snow_depth_end is not None: diff = snow_depth_end - prev_snow_depth_end if abs(diff) > 0.5: # Solo se variazione significativa if diff > 0: snow_depth_str += f" (↑ +{diff:.1f} cm)" else: snow_depth_str += f" (↓ {diff:.1f} cm)" # Mostra range se c'è variazione significativa durante il giorno snow_depth_min = day_info.get('snow_depth_min') snow_depth_max = day_info.get('snow_depth_max') if snow_depth_min is not None and snow_depth_max is not None: if snow_depth_max - snow_depth_min > 1.0: # Variazione > 1cm durante il giorno snow_depth_str += f" [range: {snow_depth_min:.1f}-{snow_depth_max:.1f} cm]" msg_parts.append(f" {snow_depth_str}") if day_info['events']: for ev in day_info['events'][:3]: # Limita a 3 eventi principali msg_parts.append(f" ➤ {ev}") # Aggiorna per il prossimo giorno prev_snow_depth_end = snow_depth_end if snow_depth_end is not None else prev_snow_depth_end msg_parts.append("") return "\n".join(msg_parts) def send_telegram(text, chat_id, token, debug_mode=False): if not token: return False url = f"https://api.telegram.org/bot{token}/sendMessage" payload = { "chat_id": chat_id, "text": text, "parse_mode": "HTML", "disable_web_page_preview": True } try: resp = requests.post(url, json=payload, timeout=15) return resp.status_code == 200 except: return False def main(): parser = argparse.ArgumentParser() parser.add_argument("query", nargs="?", default="casa") parser.add_argument("--chat_id") parser.add_argument("--debug", action="store_true") parser.add_argument("--home", action="store_true") parser.add_argument("--timezone", help="Timezone IANA (es: Europe/Rome, America/New_York)") args = parser.parse_args() token = get_bot_token() debug_mode = args.debug # Determina destinatari if debug_mode: recipients = [ADMIN_CHAT_ID] elif args.chat_id: recipients = [args.chat_id] else: recipients = TELEGRAM_CHAT_IDS # Determina località if args.home or (not args.query or args.query.lower() == "casa"): lat, lon, name, cc = DEFAULT_LAT, DEFAULT_LON, DEFAULT_NAME, "SM" else: coords = get_coordinates(args.query) if not coords[0]: error_msg = f"❌ Località '{args.query}' non trovata." if token: for chat_id in recipients: send_telegram(error_msg, chat_id, token, debug_mode) else: print(error_msg) return lat, lon, name, cc = coords # Recupera dati multi-modello (breve + lungo termine) - selezione intelligente basata su country code # Determina se è Casa is_home = (abs(lat - DEFAULT_LAT) < 0.01 and abs(lon - DEFAULT_LON) < 0.01) # Recupera dati multi-modello (breve + lungo termine) # - Per Casa: usa AROME Seamless e ICON-D2 # - Per altre località: usa best match di Open-Meteo short_models, long_models = choose_models_by_country(cc, is_home=is_home) # Usa timezone personalizzata se fornita timezone = args.timezone if hasattr(args, 'timezone') and args.timezone else None models_data = get_weather_multi_model(lat, lon, short_models, long_models, forecast_days=10, timezone=timezone) if not any(models_data.values()): error_msg = "❌ Errore: Impossibile recuperare dati meteo." if token: for chat_id in recipients: send_telegram(error_msg, chat_id, token, debug_mode) else: print(error_msg) return # Genera report report = format_weather_context_report(models_data, name, cc) if debug_mode: report = f"🛠 [DEBUG MODE] 🛠\n\n{report}" # Invia if token: success = False for chat_id in recipients: if send_telegram(report, chat_id, token, debug_mode): success = True if not success: print("❌ Errore invio Telegram") else: print(report) if __name__ == "__main__": main()