#!/usr/bin/env python3 """ Assistente Climatico Intelligente - Report Meteo Avanzato Analizza evoluzione meteo, fronti, cambiamenti e fornisce consigli pratici """ import requests import argparse import datetime import os import sys from zoneinfo import ZoneInfo from collections import defaultdict, Counter, Counter from typing import List, Dict, Tuple, Optional from statistics import mean, median from open_meteo_client import open_meteo_get # --- CONFIGURAZIONE DEFAULT --- DEFAULT_LAT = 43.9356 DEFAULT_LON = 12.4296 DEFAULT_NAME = "🏠 Casa (Strada Cà Toro,12 - San Marino)" # --- TIMEZONE --- TZ_STR = "Europe/Berlin" TZINFO = ZoneInfo(TZ_STR) # --- TELEGRAM CONFIG --- ADMIN_CHAT_ID = "64463169" TELEGRAM_CHAT_IDS = ["64463169", "24827341", "132455422", "5405962012"] TOKEN_FILE_HOME = os.path.expanduser("~/.telegram_dpc_bot_token") TOKEN_FILE_ETC = "/etc/telegram_dpc_bot_token" TOKEN_FILE_VOLUME = "/Volumes/Pi2/etc/telegram_dpc_bot_token" # --- SOGLIE --- SOGLIA_VENTO_KMH = 40.0 MIN_MM_PER_EVENTO = 0.1 # --- MODELLI METEO --- # Modelli a breve termine (alta risoluzione, 48-72h) SHORT_TERM_MODELS = ["meteofrance_seamless", "icon_d2"] # Usa seamless invece di arome_france_hd # Modelli a lungo termine (globale, 10 giorni) LONG_TERM_MODELS = ["gfs_global", "ecmwf_ifs04"] # Modelli alternativi MODELS_IT_SM = ["meteofrance_seamless", "icon_d2", "gfs_global"] MODEL_NAMES = { "meteofrance_arome_france_hd": "AROME HD", "meteofrance_seamless": "AROME Seamless", "icon_d2": "ICON-D2", "gfs_global": "GFS", "ecmwf_ifs04": "ECMWF", "ecmwf_ifs": "ECMWF IFS", "jma_msm": "JMA MSM", "metno_nordic": "Yr.no", "ukmo_global": "UK MetOffice", "icon_eu": "ICON-EU", "italia_meteo_arpae_icon_2i": "ICON Italia (ARPAE 2i)" } # Per Casa/Italia: forecast_days per modello lungo termine (come Agent Irrigazione / OPEN_METEO_MODELS.md) LONG_TERM_FORECAST_DAYS = { "italia_meteo_arpae_icon_2i": 10, "ecmwf_ifs": 10, "meteofrance_seamless": 4, } def choose_models_by_country(cc, is_home=False): """ Seleziona modelli meteo ottimali. - Per Casa e Italia: 0-2d mediana ICON Italia + AROME HD; 3-10d mediana ICON Italia + ECMWF IFS + ARPEGE. - Per altre località: best match Open-Meteo. Ritorna (short_term_models, long_term_models) """ cc = cc.upper() if cc else "UNKNOWN" long_term_default = ["gfs_global", "ecmwf_ifs04"] if is_home or cc == "IT": # 0-2d: due modelli ad alta risoluzione (mediana). 3-10d: tre modelli (mediana, come Irrigazione). return ( ["italia_meteo_arpae_icon_2i", "meteofrance_arome_france_hd"], ["italia_meteo_arpae_icon_2i", "ecmwf_ifs", "meteofrance_seamless"], ) else: return None, long_term_default def get_bot_token(): paths = [TOKEN_FILE_HOME, TOKEN_FILE_ETC, TOKEN_FILE_VOLUME] for path in paths: if os.path.exists(path): try: with open(path, 'r') as f: return f.read().strip() except: pass return None def get_coordinates(query): if not query or query.lower() == "casa": return DEFAULT_LAT, DEFAULT_LON, DEFAULT_NAME, "SM" url = "https://geocoding-api.open-meteo.com/v1/search" try: resp = open_meteo_get(url, params={"name": query, "count": 1, "language": "it", "format": "json"}, timeout=(5, 10)) res = resp.json().get("results", []) if res: res = res[0] cc = res.get("country_code", "IT").upper() name = f"{res.get('name')} ({cc})" return res['latitude'], res['longitude'], name, cc except: pass return None, None, None, None def degrees_to_cardinal(d: int) -> str: """Converte gradi in direzione cardinale (N, NE, E, SE, S, SW, W, NW)""" dirs = ["N", "NE", "E", "SE", "S", "SW", "W", "NW"] try: return dirs[round(d / 45) % 8] except: return "N" def get_weather_multi_model(lat, lon, short_term_models, long_term_models, forecast_days=10, timezone=None): """ Recupera dati da modelli a breve e lungo termine per ensemble completo. Se short_term_models è None, usa best match di Open-Meteo (senza specificare models). """ results = {} # Recupera modelli a breve termine (alta risoluzione, fino a ~72h) if short_term_models is None: # Best match: non specificare models, Open-Meteo sceglie automaticamente url = "https://api.open-meteo.com/v1/forecast" params = { "latitude": lat, "longitude": lon, "hourly": "temperature_2m,precipitation,snowfall,rain,snow_depth,weathercode,windspeed_10m,windgusts_10m,winddirection_10m,dewpoint_2m,relative_humidity_2m,cloud_cover,soil_temperature_0cm", "daily": "temperature_2m_max,temperature_2m_min,precipitation_sum,precipitation_hours,snowfall_sum,showers_sum,rain_sum,weathercode,winddirection_10m_dominant,windspeed_10m_max,windgusts_10m_max", "timezone": timezone if timezone else TZ_STR, "forecast_days": min(forecast_days, 3) } try: resp = open_meteo_get(url, params=params, timeout=(5, 20)) if resp.status_code == 200: data = resp.json() # Converti snow_depth da metri a cm per tutti i modelli (Open-Meteo restituisce in metri) hourly_data = data.get("hourly", {}) if hourly_data and "snow_depth" in hourly_data: snow_depth_values = hourly_data.get("snow_depth", []) # Converti da metri a cm (moltiplica per 100) snow_depth_cm = [] for sd in snow_depth_values: if sd is not None: try: val_m = float(sd) val_cm = val_m * 100.0 # Converti da metri a cm snow_depth_cm.append(val_cm) except (ValueError, TypeError): snow_depth_cm.append(None) else: snow_depth_cm.append(None) hourly_data["snow_depth"] = snow_depth_cm data["hourly"] = hourly_data results["best_match"] = data results["best_match"]["model_type"] = "short_term" else: results["best_match"] = None except: results["best_match"] = None else: # Modelli specifici: 0-2d ICON Italia + AROME HD (mediana); AROME HD solo 2 giorni for model in short_term_models: url = "https://api.open-meteo.com/v1/forecast" if model == "italia_meteo_arpae_icon_2i": hourly_params = "rain,showers,snowfall,snow_depth,precipitation,temperature_2m,weathercode,windspeed_10m,winddirection_10m" daily_params = "temperature_2m_max,temperature_2m_min,showers_sum,rain_sum,snowfall_sum,precipitation_sum,precipitation_hours,weathercode,winddirection_10m_dominant,windspeed_10m_max,windgusts_10m_max" fd_short = min(forecast_days, 7) elif model == "meteofrance_arome_france_hd": # AROME HD: 2 giorni, set variabili ridotto (no snow_depth/showers in output) hourly_params = "temperature_2m,precipitation,snowfall,rain,weathercode,windspeed_10m,winddirection_10m" daily_params = "temperature_2m_max,temperature_2m_min,precipitation_sum,precipitation_hours,snowfall_sum,rain_sum,weathercode,winddirection_10m_dominant,windspeed_10m_max,windgusts_10m_max" fd_short = 2 else: hourly_params = "temperature_2m,precipitation,snowfall,rain,snow_depth,weathercode,windspeed_10m,windgusts_10m,winddirection_10m,dewpoint_2m,relative_humidity_2m,cloud_cover,soil_temperature_0cm" daily_params = "temperature_2m_max,temperature_2m_min,precipitation_sum,precipitation_hours,snowfall_sum,showers_sum,rain_sum,weathercode,winddirection_10m_dominant,windspeed_10m_max,windgusts_10m_max" fd_short = min(forecast_days, 3) params = { "latitude": lat, "longitude": lon, "hourly": hourly_params, "daily": daily_params, "timezone": timezone if timezone else TZ_STR, "models": model, "forecast_days": fd_short } try: resp = open_meteo_get(url, params=params, timeout=(5, 20)) if resp.status_code == 200: data = resp.json() # Converti snow_depth da metri a cm per tutti i modelli (Open-Meteo restituisce in metri) hourly_data = data.get("hourly", {}) if hourly_data and "snow_depth" in hourly_data: snow_depth_values = hourly_data.get("snow_depth", []) # Converti da metri a cm (moltiplica per 100) snow_depth_cm = [] for sd in snow_depth_values: if sd is not None: try: val_m = float(sd) val_cm = val_m * 100.0 # Converti da metri a cm snow_depth_cm.append(val_cm) except (ValueError, TypeError): snow_depth_cm.append(None) else: snow_depth_cm.append(None) hourly_data["snow_depth"] = snow_depth_cm data["hourly"] = hourly_data # Per italia_meteo_arpae_icon_2i, verifica se snow_depth è disponibile e > 0 if model == "italia_meteo_arpae_icon_2i": if hourly_data and "snow_depth" in hourly_data: snow_depth_values_cm = hourly_data.get("snow_depth", []) # Verifica se almeno un valore di snow_depth è > 0 (ora già in cm) has_snow_depth = False if snow_depth_values_cm: for sd in snow_depth_values_cm[:24]: # Controlla prime 24h if sd is not None: try: if float(sd) > 0.5: # > 0.5 cm has_snow_depth = True break except (ValueError, TypeError): continue # Se snow_depth > 0, assicurati che sia incluso nei dati if has_snow_depth: data["has_snow_depth_data"] = True results[model] = data results[model]["model_type"] = "short_term" else: results[model] = None except: results[model] = None # Recupera modelli a lungo termine (3-10d): tre modelli per mediana (come Agent Irrigazione) for model in (long_term_models or []): url = "https://api.open-meteo.com/v1/forecast" fd_long = LONG_TERM_FORECAST_DAYS.get(model, forecast_days) if model == "ecmwf_ifs": hourly_params = "rain,showers,snowfall,precipitation,temperature_2m,weathercode,windspeed_10m,winddirection_10m" daily_params = "temperature_2m_max,temperature_2m_min,showers_sum,rain_sum,snowfall_sum,precipitation_sum,precipitation_hours,weathercode,winddirection_10m_dominant,windspeed_10m_max,windgusts_10m_max" elif model == "meteofrance_seamless": hourly_params = "temperature_2m,precipitation,snowfall,rain,weathercode,windspeed_10m,windgusts_10m,winddirection_10m" daily_params = "temperature_2m_max,temperature_2m_min,precipitation_sum,precipitation_hours,snowfall_sum,rain_sum,weathercode,winddirection_10m_dominant,windspeed_10m_max,windgusts_10m_max" else: # ICON Italia e altri hourly_params = "rain,showers,snowfall,snow_depth,precipitation,temperature_2m,weathercode,windspeed_10m,winddirection_10m" daily_params = "temperature_2m_max,temperature_2m_min,showers_sum,rain_sum,snowfall_sum,precipitation_sum,precipitation_hours,weathercode,winddirection_10m_dominant,windspeed_10m_max,windgusts_10m_max" params = { "latitude": lat, "longitude": lon, "hourly": hourly_params, "daily": daily_params, "timezone": timezone if timezone else TZ_STR, "models": model, "forecast_days": fd_long } try: resp = open_meteo_get(url, params=params, timeout=(5, 25)) if resp.status_code == 200: data = resp.json() # Converti snow_depth da metri a cm per tutti i modelli (Open-Meteo restituisce in metri) hourly_data = data.get("hourly", {}) if hourly_data and "snow_depth" in hourly_data: snow_depth_values = hourly_data.get("snow_depth", []) # Converti da metri a cm (moltiplica per 100) snow_depth_cm = [] for sd in snow_depth_values: if sd is not None: try: val_m = float(sd) val_cm = val_m * 100.0 # Converti da metri a cm snow_depth_cm.append(val_cm) except (ValueError, TypeError): snow_depth_cm.append(None) else: snow_depth_cm.append(None) hourly_data["snow_depth"] = snow_depth_cm data["hourly"] = hourly_data results[model] = data results[model]["model_type"] = "long_term" else: results[model] = None except Exception: results[model] = None return results def _normalize_time_key(t): """Normalizza timestamp per confronto (YYYY-MM-DDTHH:MM).""" if not t or not isinstance(t, str): return str(t) if t else "" return t.strip()[:16] def _median_or_single(values): """Mediana dei valori numerici; ignora None. Con 2 valori restituisce la media dei due.""" nums = [float(v) for v in values if v is not None] if not nums: return None if len(nums) == 1: return nums[0] return median(nums) # Chiavi che esistono solo su ICON Italia (no merge, si tiene il valore da quel modello) HOURLY_KEYS_ICON_ONLY = ["snow_depth", "showers"] DAILY_KEYS_ICON_ONLY = ["showers_sum"] def _merge_hourly_median(hourly_by_model, single_source_keys=None, single_source_model=None): """ Unisce hourly da più modelli: mediana per ogni timestamp. single_source_keys: per queste chiavi si prende il valore solo da single_source_model (es. ICON Italia per snow_depth, showers). """ single_source_keys = single_source_keys or [] time_idx = {} all_times = [] for _model, h in hourly_by_model: times = h.get("time", []) or [] for t in times: k = _normalize_time_key(str(t)) if t else "" if k and k not in time_idx: time_idx[k] = len(all_times) all_times.append(t if isinstance(t, str) else k) if not all_times: return {"time": [], "temperature_2m": [], "precipitation": [], "snowfall": [], "rain": [], "weathercode": [], "windspeed_10m": [], "winddirection_10m": [], "snow_depth": [], "dewpoint_2m": [], "cloud_cover": [], "soil_temperature_0cm": []} # Raccogli tutte le chiavi numeriche dal primo modello che le ha all_keys = [] for _model, h in hourly_by_model: for key in (h.keys() - {"time"}): if key not in all_keys: all_keys.append(key) out = {"time": all_times} for key in all_keys: out[key] = [] for ref_t in all_times: ref_k = _normalize_time_key(str(ref_t)) if key in single_source_keys and single_source_model: val = None for m, h in hourly_by_model: if m != single_source_model: continue times = h.get("time", []) or [] arr = h.get(key, []) or [] for i, t in enumerate(times): if _normalize_time_key(str(t)) == ref_k and i < len(arr) and arr[i] is not None: try: val = float(arr[i]) if key != "weathercode" else (int(arr[i]) if arr[i] is not None else None) except (TypeError, ValueError): pass break break out[key].append(val) else: vals = [] for _m, h in hourly_by_model: times = h.get("time", []) or [] arr = h.get(key, []) or [] for i, t in enumerate(times): if _normalize_time_key(str(t)) == ref_k and i < len(arr) and arr[i] is not None: try: vals.append(float(arr[i])) except (TypeError, ValueError): pass break out[key].append(_median_or_single(vals) if vals else None) n = len(out["time"]) if n > 1: order = sorted(range(n), key=lambda i: str(out["time"][i])) out["time"] = [out["time"][i] for i in order] for key in all_keys: if key == "time": continue if len(out.get(key, [])) == n: out[key] = [out[key][i] for i in order] return out def _merge_daily_median(daily_by_model, single_source_keys=None, single_source_model=None): """Unisce daily da più modelli: mediana per data. single_source_keys: valore solo da single_source_model.""" single_source_keys = single_source_keys or [] time_idx = {} all_times = [] for _model, d in daily_by_model: times = d.get("time", []) or [] for t in times: key = str(t)[:10] if t else "" if key and key not in time_idx: time_idx[key] = len(all_times) all_times.append(key) if not all_times: return {"time": [], "temperature_2m_max": [], "temperature_2m_min": [], "precipitation_sum": [], "precipitation_hours": [], "snowfall_sum": [], "showers_sum": [], "rain_sum": [], "weathercode": [], "winddirection_10m_dominant": [], "windspeed_10m_max": [], "windgusts_10m_max": []} all_keys = [] for _model, d in daily_by_model: for key in (d.keys() - {"time"}): if key not in all_keys: all_keys.append(key) out = {"time": all_times} for key in all_keys: out[key] = [] for date_str in all_times: if key in single_source_keys and single_source_model: val = None for m, d in daily_by_model: if m != single_source_model: continue times = d.get("time", []) or [] arr = d.get(key, []) or [] for i, t in enumerate(times): if str(t)[:10] == date_str and i < len(arr) and arr[i] is not None: try: val = float(arr[i]) if key != "weathercode" else (int(arr[i]) if arr[i] is not None else None) except (TypeError, ValueError): pass break break out[key].append(val) else: vals = [] for _m, d in daily_by_model: times = d.get("time", []) or [] arr = d.get(key, []) or [] for i, t in enumerate(times): if str(t)[:10] == date_str and i < len(arr) and arr[i] is not None: try: vals.append(float(arr[i])) except (TypeError, ValueError): pass break out[key].append(_median_or_single(vals) if vals else None) # Ordina cronologicamente (evita buchi nel report se l'unione non era ordinata) n = len(out["time"]) if n > 1: order = sorted(range(n), key=lambda i: out["time"][i]) out["time"] = [out["time"][i] for i in order] for key in all_keys: if key == "time": continue if len(out.get(key, [])) == n: out[key] = [out[key][i] for i in order] return out def merge_multi_model_forecast(models_data, forecast_days=10): """Combina dati da modelli a breve e lungo termine in un forecast unificato""" merged = { "daily": { "time": [], "temperature_2m_max": [], "temperature_2m_min": [], "precipitation_sum": [], "precipitation_hours": [], "snowfall_sum": [], "showers_sum": [], "rain_sum": [], "weathercode": [], "winddirection_10m_dominant": [], "windspeed_10m_max": [], "windgusts_10m_max": [] }, "hourly": { "time": [], "temperature_2m": [], "precipitation": [], "snowfall": [], "snow_depth": [], "rain": [], "weathercode": [], "windspeed_10m": [], "winddirection_10m": [], "dewpoint_2m": [], "cloud_cover": [], "soil_temperature_0cm": [] }, "models_used": [] } cutoff_day = 2 # 0-2d alta risoluzione, 3-10d mediana tre modelli short_term_list = [(m, models_data[m]) for m in models_data if models_data.get(m) and models_data[m].get("model_type") == "short_term"] long_term_list = [(m, models_data[m]) for m in models_data if models_data.get(m) and models_data[m].get("model_type") == "long_term"] if not short_term_list and not long_term_list: return None daily_keys = list(merged["daily"].keys()) hourly_keys = list(merged["hourly"].keys()) def ensure_merged_keys(merged, daily_times, hourly_times): for k in daily_keys: if k == "time": continue while len(merged["daily"][k]) < len(merged["daily"]["time"]): merged["daily"][k].append(None) for k in hourly_keys: if k == "time": continue while len(merged["hourly"][k]) < len(merged["hourly"]["time"]): merged["hourly"][k].append(None) # ---- 0-2 giorni: uno o due modelli short-term ---- if short_term_list: if len(short_term_list) >= 2: # Mediana ICON Italia + AROME HD; snow_depth e showers solo da ICON Italia short_daily_by_model = [(m, d.get("daily", {}) or {}) for m, d in short_term_list] short_hourly_by_model = [(m, d.get("hourly", {}) or {}) for m, d in short_term_list] merged_short_daily = _merge_daily_median(short_daily_by_model, single_source_keys=DAILY_KEYS_ICON_ONLY, single_source_model="italia_meteo_arpae_icon_2i") merged_short_hourly = _merge_hourly_median(short_hourly_by_model, single_source_keys=HOURLY_KEYS_ICON_ONLY, single_source_model="italia_meteo_arpae_icon_2i") short_daily_times = (merged_short_daily.get("time") or [])[:cutoff_day + 1] if long_term_list else (merged_short_daily.get("time") or []) short_hourly_times = merged_short_hourly.get("time") or [] cutoff_h = (cutoff_day + 1) * 24 if long_term_list else len(short_hourly_times) short_hourly_times = short_hourly_times[:cutoff_h] names_short = " + ".join(MODEL_NAMES.get(m, m) for m, _ in short_term_list[:2]) merged["models_used"].append(f"{names_short} (mediana) (0-{len(short_daily_times)}d)") for i, day_time in enumerate(short_daily_times): merged["daily"]["time"].append(day_time) for key in daily_keys: if key == "time": continue arr = merged_short_daily.get(key, []) merged["daily"][key].append(arr[i] if i < len(arr) else None) for i, hour_time in enumerate(short_hourly_times): merged["hourly"]["time"].append(hour_time) for key in hourly_keys: if key == "time": continue arr = merged_short_hourly.get(key, []) merged["hourly"][key].append(arr[i] if i < len(arr) else None) else: # Un solo modello short-term (es. best_match o fallback) short_term_model, short_term_data = short_term_list[0] short_daily = short_term_data.get("daily", {}) or {} short_hourly = short_term_data.get("hourly", {}) or {} short_daily_times_all = short_daily.get("time", []) or [] short_daily_times = short_daily_times_all[:cutoff_day + 1] if long_term_list else short_daily_times_all model_display = "Best Match" if short_term_model == "best_match" else MODEL_NAMES.get(short_term_model, short_term_model) merged["models_used"].append(f"{model_display} (0-{len(short_daily_times)}d)") for i, day_time in enumerate(short_daily_times): merged["daily"]["time"].append(day_time) for key in daily_keys: if key == "time": continue arr = short_daily.get(key, []) merged["daily"][key].append(arr[i] if i < len(arr) else None) short_hourly_times = short_hourly.get("time", []) or [] cutoff_h = (cutoff_day + 1) * 24 if long_term_list else len(short_hourly_times) for i, hour_time in enumerate(short_hourly_times[:cutoff_h]): merged["hourly"]["time"].append(hour_time) for key in hourly_keys: if key == "time": continue arr = short_hourly.get(key, []) merged["hourly"][key].append(arr[i] if i < len(arr) else None) ensure_merged_keys(merged, merged["daily"]["time"], merged["hourly"]["time"]) # ---- 3-10 giorni: uno o più modelli long-term ---- if long_term_list: if len(long_term_list) >= 2: long_daily_by_model = [(m, d.get("daily", {}) or {}) for m, d in long_term_list] long_hourly_by_model = [(m, d.get("hourly", {}) or {}) for m, d in long_term_list] merged_long_daily = _merge_daily_median(long_daily_by_model, single_source_keys=DAILY_KEYS_ICON_ONLY, single_source_model="italia_meteo_arpae_icon_2i") merged_long_hourly = _merge_hourly_median(long_hourly_by_model, single_source_keys=HOURLY_KEYS_ICON_ONLY, single_source_model="italia_meteo_arpae_icon_2i") long_daily_times = merged_long_daily.get("time") or [] long_hourly_times = merged_long_hourly.get("time") or [] names_long = " + ".join(MODEL_NAMES.get(m, m) for m, _ in long_term_list[:3]) # Allinea al numero effettivo di giorni/orari short (non indice fisso): evita buco del 3° giorno start_idx = len(merged["daily"]["time"]) start_hour_idx = len(merged["hourly"]["time"]) merged["models_used"].append(f"{names_long} (mediana) (giorno {start_idx + 1}-{forecast_days}d)") for i, day_time in enumerate(long_daily_times): if i < start_idx: continue if i >= forecast_days: break merged["daily"]["time"].append(day_time) for key in daily_keys: if key == "time": continue arr = merged_long_daily.get(key, []) merged["daily"][key].append(arr[i] if i < len(arr) else None) needed_hours = forecast_days * 24 for i in range(start_hour_idx, min(len(long_hourly_times), needed_hours)): merged["hourly"]["time"].append(long_hourly_times[i]) for key in hourly_keys: if key == "time": continue arr = merged_long_hourly.get(key, []) merged["hourly"][key].append(arr[i] if i < len(arr) else None) else: long_term_model, long_term_data = long_term_list[0] long_daily = long_term_data.get("daily", {}) or {} long_hourly = long_term_data.get("hourly", {}) or {} start_idx = len(merged["daily"]["time"]) merged["models_used"].append(f"{MODEL_NAMES.get(long_term_model, long_term_model)} (giorno {start_idx + 1}-{forecast_days}d)") long_daily_times = long_daily.get("time", []) or [] for i in range(start_idx, min(len(long_daily_times), forecast_days)): merged["daily"]["time"].append(long_daily_times[i]) for key in daily_keys: if key == "time": continue arr = long_daily.get(key, []) merged["daily"][key].append(arr[i] if i < len(arr) else None) long_hourly_times = long_hourly.get("time", []) or [] current_hourly_count = len(merged["hourly"]["time"]) needed_hours = forecast_days * 24 for i in range(current_hourly_count, min(len(long_hourly_times), needed_hours)): merged["hourly"]["time"].append(long_hourly_times[i]) for key in hourly_keys: if key == "time": continue arr = long_hourly.get(key, []) merged["hourly"][key].append(arr[i] if i < len(arr) else None) ensure_merged_keys(merged, merged["daily"]["time"], merged["hourly"]["time"]) return merged def analyze_temperature_trend(daily_temps_max, daily_temps_min, days=10): """Analizza trend temperatura per identificare fronti caldi/freddi con dettaglio completo""" if not daily_temps_max or not daily_temps_min: return None max_days = min(days, len(daily_temps_max), len(daily_temps_min)) if max_days < 3: return None # Filtra valori None e calcola temperature medie giornaliere avg_temps = [] valid_indices = [] for i in range(max_days): t_max = daily_temps_max[i] t_min = daily_temps_min[i] if t_max is not None and t_min is not None: avg_temps.append((float(t_max) + float(t_min)) / 2) valid_indices.append(i) else: avg_temps.append(None) if len([t for t in avg_temps if t is not None]) < 3: return None # Analizza tendenza generale (prime 3 giorni vs ultimi 3 giorni validi) valid_temps = [t for t in avg_temps if t is not None] if len(valid_temps) < 3: return None first_avg = mean(valid_temps[:3]) last_avg = mean(valid_temps[-3:]) diff = last_avg - first_avg trend_type = None trend_intensity = "moderato" if diff > 5: trend_type = "fronte_caldo" trend_intensity = "forte" if diff > 8 else "moderato" elif diff > 2: trend_type = "riscaldamento" trend_intensity = "moderato" elif diff < -5: trend_type = "fronte_freddo" trend_intensity = "forte" if diff < -8 else "moderato" elif diff < -2: trend_type = "raffreddamento" trend_intensity = "moderato" else: trend_type = "stabile" # Identifica giorni di cambio significativo change_days = [] prev_temp = None for i, temp in enumerate(avg_temps): if temp is not None: if prev_temp is not None: day_diff = temp - prev_temp if abs(day_diff) > 3: # Cambio significativo (>3°C) change_days.append({ "day": i, "delta": round(day_diff, 1), "from": round(prev_temp, 1), "to": round(temp, 1) }) prev_temp = temp # Analisi per periodi (primi 3 giorni, medio termine, lungo termine) period_analysis = {} if len(valid_temps) >= 7: period_analysis["short_term"] = { "avg": round(mean(valid_temps[:3]), 1), "range": round(max(valid_temps[:3]) - min(valid_temps[:3]), 1) } mid_start = len(valid_temps) // 3 mid_end = (len(valid_temps) * 2) // 3 period_analysis["mid_term"] = { "avg": round(mean(valid_temps[mid_start:mid_end]), 1), "range": round(max(valid_temps[mid_start:mid_end]) - min(valid_temps[mid_start:mid_end]), 1) } period_analysis["long_term"] = { "avg": round(mean(valid_temps[-3:]), 1), "range": round(max(valid_temps[-3:]) - min(valid_temps[-3:]), 1) } return { "type": trend_type, "intensity": trend_intensity, "delta": round(diff, 1), "change_days": change_days, "first_avg": round(first_avg, 1), "last_avg": round(last_avg, 1), "period_analysis": period_analysis, "daily_avg_temps": avg_temps, "daily_max": daily_temps_max[:max_days], "daily_min": daily_temps_min[:max_days] } def analyze_weather_transitions(daily_weathercodes): """Analizza transizioni meteo significative""" transitions = [] if not daily_weathercodes or len(daily_weathercodes) < 2: return transitions # Categorie meteo def get_category(code): if code is None: return "variabile" code = int(code) if code in (0, 1): return "sereno" if code in (2, 3): return "nuvoloso" if code in (45, 48): return "nebbia" if code in (51, 53, 55, 56, 57, 61, 63, 65, 66, 67, 80, 81, 82): return "pioggia" if code in (71, 73, 75, 77, 85, 86): return "neve" if code in (95, 96, 99): return "temporale" return "variabile" for i in range(1, min(len(daily_weathercodes), 8)): prev_code = daily_weathercodes[i-1] if i-1 < len(daily_weathercodes) else None curr_code = daily_weathercodes[i] if i < len(daily_weathercodes) else None prev_cat = get_category(prev_code) curr_cat = get_category(curr_code) if prev_cat != curr_cat: transitions.append({ "day": i, "from": prev_cat, "to": curr_cat, "significant": prev_cat in ["sereno", "nuvoloso"] and curr_cat in ["pioggia", "neve", "temporale"] }) return transitions def get_precip_type(code): """Definisce il tipo di precipitazione in base al codice WMO.""" if (71 <= code <= 77) or code in [85, 86]: return "❄️ Neve" if code in [96, 99]: return "⚡🌨 Grandine" if code in [66, 67]: return "🧊☔ Pioggia Congelantesi" return "☔ Pioggia" def get_intensity_label(mm_h): if mm_h < 2.5: return "Debole" if mm_h < 7.6: return "Moderata" return "Forte ⚠️" def analyze_daily_events(times, codes, probs, precip, winds, temps, dewpoints, snowfalls=None, rains=None, soil_temps=None, cloud_covers=None, wind_speeds=None): """Scansiona le 24 ore e trova blocchi di eventi continui.""" events = [] # Prepara array per calcoli avanzati (allineato a check_ghiaccio.py) if snowfalls is None: snowfalls = [0.0] * len(times) if rains is None: rains = [0.0] * len(times) if soil_temps is None: soil_temps = [None] * len(times) if cloud_covers is None: cloud_covers = [None] * len(times) if wind_speeds is None: wind_speeds = [None] * len(times) # Calcola precipitazioni cumulative delle 3h precedenti per ogni punto precip_3h_sum = [] rain_3h_sum = [] snow_3h_sum = [] for i in range(len(times)): # Somma delle 3 ore precedenti (i-3, i-2, i-1) start_idx = max(0, i - 3) precip_sum = sum([float(p) if p is not None else 0.0 for p in precip[start_idx:i]]) rain_sum = sum([float(r) if r is not None else 0.0 for r in rains[start_idx:i]]) snow_sum = sum([float(s) if s is not None else 0.0 for s in snowfalls[start_idx:i]]) precip_3h_sum.append(precip_sum) rain_3h_sum.append(rain_sum) snow_3h_sum.append(snow_sum) # 1. PERICOLI (Ghiaccio, Gelo, Brina) - Logica migliorata allineata a check_ghiaccio.py in_ice = False start_ice = 0 ice_type = "" for i in range(len(times)): t = temps[i] if i < len(temps) and temps[i] is not None else 10 d = dewpoints[i] if i < len(dewpoints) and dewpoints[i] is not None else t p = precip[i] if i < len(precip) and precip[i] is not None else 0 c = codes[i] if i < len(codes) and codes[i] is not None else 0 if c is not None: try: c = int(c) except (ValueError, TypeError): c = 0 else: c = 0 # Estrai parametri avanzati t_soil = soil_temps[i] if i < len(soil_temps) and soil_temps[i] is not None else None cloud = cloud_covers[i] if i < len(cloud_covers) and cloud_covers[i] is not None else None wind = wind_speeds[i] if i < len(wind_speeds) and wind_speeds[i] is not None else None snowfall_curr = snowfalls[i] if i < len(snowfalls) and snowfalls[i] is not None else 0.0 rain_curr = rains[i] if i < len(rains) and rains[i] is not None else 0.0 # Determina se è notte (18:00-06:00) per raffreddamento radiativo try: hour = int(times[i].split("T")[1].split(":")[0]) if "T" in times[i] else 12 is_night = (hour >= 18) or (hour <= 6) except: is_night = False # Calcola temperatura suolo: usa valore misurato se disponibile, altrimenti stima (1-2°C più fredda) if t_soil is None: t_soil = t - 1.5 # Approssimazione conservativa # Applica raffreddamento radiativo: cielo sereno + notte + vento debole # Riduce la temperatura del suolo di 0.5-1.5°C (come in check_ghiaccio.py) t_soil_adjusted = t_soil if is_night and cloud is not None and cloud < 20.0: if wind is None or wind < 5.0: cooling = 1.5 # Vento molto debole = più raffreddamento elif wind < 10.0: cooling = 1.0 else: cooling = 0.5 t_soil_adjusted = t_soil - cooling # Precipitazioni nelle 3h precedenti p_3h = precip_3h_sum[i] if i < len(precip_3h_sum) else 0.0 r_3h = rain_3h_sum[i] if i < len(rain_3h_sum) else 0.0 s_3h = snow_3h_sum[i] if i < len(snow_3h_sum) else 0.0 # LOGICA MIGLIORATA (allineata a check_ghiaccio.py): current_ice_condition = None # 1. GELICIDIO (Freezing Rain) - priorità massima is_raining_code = (50 <= c <= 69) or (80 <= c <= 82) if c in [66, 67] or (p > 0 and t <= 0 and is_raining_code): current_ice_condition = "🧊☠️ GELICIDIO" # 2. Black Ice o Neve Ghiacciata - Precipitazione nelle 3h precedenti + suolo gelato elif p_3h > 0.1 and t_soil_adjusted < 0.0: # Distingue tra neve e pioggia has_snow = (s_3h > 0.1) or (snowfall_curr > 0.1) has_rain = (r_3h > 0.1) or (rain_curr > 0.1) if has_snow: current_ice_condition = "⛸️⚠️ Neve ghiacciata (suolo gelato)" elif has_rain: current_ice_condition = "⛸️⚠️ Black Ice (strada bagnata + suolo gelato)" else: current_ice_condition = "⛸️⚠️ Black Ice (strada bagnata + suolo gelato)" # 3. BRINA (Hoar Frost) - Suolo <= 0°C e punto di rugiada > suolo ma < 0°C elif p_3h <= 0.1 and t_soil_adjusted <= 0.0 and d is not None: if d > t_soil_adjusted and d < 0.0: current_ice_condition = "⛸️⚠️ GHIACCIO/BRINA" # 4. GELATA - Temperatura aria < 0°C (senza altre condizioni) elif t < 0: current_ice_condition = "🧊 Gelata" if current_ice_condition and not in_ice: in_ice = True start_ice = i ice_type = current_ice_condition elif (not current_ice_condition and in_ice) or (in_ice and current_ice_condition != ice_type) or (in_ice and i == len(times)-1): end_idx = i if not current_ice_condition else i if end_idx > start_ice: start_time = times[start_ice].split("T")[1][:5] end_time = times[min(end_idx, len(times)-1)].split("T")[1][:5] temp_block = temps[start_ice:min(end_idx+1, len(temps))] temp_block_clean = [t for t in temp_block if t is not None] min_t = min(temp_block_clean) if temp_block_clean else 0 # Per GHIACCIO/BRINA, verifica che la temperatura minima sia effettivamente sotto/sopra soglia critica # Se la temperatura minima è > 1.5°C, non è un rischio reale if ice_type == "⛸️⚠️ GHIACCIO/BRINA" and min_t > 1.5: # Non segnalare se la temperatura minima è troppo alta pass else: events.append(f"{ice_type}: {start_time}-{end_time} (Min: {min_t:.0f}°C)") in_ice = False if current_ice_condition: in_ice = True start_ice = i ice_type = current_ice_condition # 2. PRECIPITAZIONI in_rain = False start_idx = 0 current_rain_type = "" for i in range(len(times)): p_val = precip[i] if i < len(precip) and precip[i] is not None else 0 is_raining = p_val >= MIN_MM_PER_EVENTO if is_raining and not in_rain: in_rain = True start_idx = i code_val = codes[i] if i < len(codes) and codes[i] is not None else 0 try: code_val = int(code_val) if code_val is not None else 0 except (ValueError, TypeError): code_val = 0 current_rain_type = get_precip_type(code_val) elif in_rain and is_raining and i < len(codes): code_val = codes[i] if codes[i] is not None else 0 try: code_val = int(code_val) if code_val is not None else 0 except (ValueError, TypeError): code_val = 0 new_type = get_precip_type(code_val) if new_type != current_rain_type: end_idx = i block_precip = precip[start_idx:end_idx] if end_idx <= len(precip) else precip[start_idx:] block_precip_clean = [p for p in block_precip if p is not None] tot_mm = sum(block_precip_clean) start_time = times[start_idx].split("T")[1][:5] end_time = times[end_idx].split("T")[1][:5] if end_idx < len(times) else times[-1].split("T")[1][:5] avg_intensity = tot_mm / len(block_precip) if block_precip else 0 events.append( f"{current_rain_type} ({get_intensity_label(avg_intensity)}):\n" f" 🕒 {start_time}-{end_time} | 💧 {tot_mm:.1f}mm" ) start_idx = i current_rain_type = new_type elif (not is_raining and in_rain) or (in_rain and i == len(times)-1): in_rain = False end_idx = i if not is_raining else i + 1 block_precip = precip[start_idx:end_idx] if end_idx <= len(precip) else precip[start_idx:] block_precip_clean = [p for p in block_precip if p is not None] tot_mm = sum(block_precip_clean) if tot_mm > 0: start_time = times[start_idx].split("T")[1][:5] end_time = times[min(end_idx-1, len(times)-1)].split("T")[1][:5] avg_intensity = tot_mm / len(block_precip) if block_precip else 0 events.append( f"{current_rain_type} ({get_intensity_label(avg_intensity)}):\n" f" 🕒 {start_time}-{end_time} | 💧 {tot_mm:.1f}mm" ) # 3. VENTO if winds: wind_values = [w for w in winds if w is not None] if wind_values: max_wind = max(wind_values) if max_wind > SOGLIA_VENTO_KMH: try: peak_idx = winds.index(max_wind) except ValueError: peak_idx = 0 peak_time = times[min(peak_idx, len(times)-1)].split("T")[1][:5] events.append(f"💨 Vento Forte: Picco {max_wind:.0f}km/h alle {peak_time}") return events def generate_practical_advice(trend, transitions, events_summary, daily_data): """Genera consigli pratici basati sull'analisi meteo""" advice = [] # Consigli basati su trend temperatura if trend: if trend["type"] == "fronte_freddo" and trend["intensity"] == "forte": advice.append("❄️ Fronte Freddo in Arrivo: Preparati a temperature in calo significativo. Controlla riscaldamento, proteggi piante sensibili.") elif trend["type"] == "fronte_caldo" and trend["intensity"] == "forte": advice.append("🔥 Ondata di Calore: Temperature in aumento. Mantieni case fresche, idratazione importante, attenzione a persone fragili.") elif trend["type"] == "raffreddamento": advice.append("🌡️ Raffreddamento: Temperature in calo graduale. Vestiti a strati, verifica isolamento porte/finestre.") elif trend["type"] == "riscaldamento": advice.append("☀️ Riscaldamento: Temperature in aumento. Buon momento per attività all'aperto, ventilazione naturale.") # Consigli basati su transizioni meteo significant_rain = any(t["to"] in ["pioggia", "neve", "temporale"] and t["significant"] for t in transitions[:3]) if significant_rain: advice.append("🌧️ Precipitazioni in Arrivo: Prepara ombrelli/impermeabili. Evita viaggi non necessari durante picchi, controlla grondaie.") # Consigli basati su eventi pericolosi has_ice = any("GELICIDIO" in e or "GHIACCIO" in e for events in events_summary for e in events if events) if has_ice: advice.append("⚠️ Rischio Ghiaccio: Strade scivolose previste. Evita viaggi non urgenti, guida con estrema cautela, antigelo/sale pronti.") # Consigli basati su vento forte has_wind = any("Vento Forte" in e for events in events_summary for e in events if events) if has_wind: advice.append("💨 Vento Forte: Fissa oggetti in balcone/giardino, attenzione a rami, guidare con prudenza su strade esposte.") # Consigli stagionali generali if daily_data and len(daily_data) > 0: first_day = daily_data[0] if first_day.get("t_min", 15) < 5: advice.append("🏠 Gestione Domestica: Temperature basse previste. Verifica caldaia, risparmio energetico con isolamento, attenzione a tubazioni esterne.") elif first_day.get("precip_sum", 0) > 20: advice.append("💧 Piogge Intense: Accumuli significativi previsti. Controlla drenaggi, pozzetti, evitare zone soggette ad allagamenti.") return advice def format_detailed_trend_explanation(trend, daily_data_list): """Genera spiegazione dettagliata del trend temperatura su 10 giorni""" if not trend: return "" explanation = [] explanation.append(f"📊 EVOLUZIONE TEMPERATURE (10 GIORNI)\n") # Trend principale con spiegazione chiara trend_type = trend["type"] intensity = trend["intensity"] delta = trend['delta'] first_avg = trend['first_avg'] last_avg = trend['last_avg'] if trend_type == "fronte_caldo": trend_desc = "🔥 Fronte Caldo in Arrivo" desc_text = f"Arrivo di aria più calda: temperatura media passerà da {first_avg:.1f}°C a {last_avg:.1f}°C (+{delta:.1f}°C)." elif trend_type == "fronte_freddo": trend_desc = "❄️ Fronte Freddo in Arrivo" desc_text = f"Arrivo di aria più fredda: temperatura media scenderà da {first_avg:.1f}°C a {last_avg:.1f}°C ({delta:+.1f}°C)." elif trend_type == "riscaldamento": trend_desc = "📈 Riscaldamento Progressivo" desc_text = f"Tendenza al rialzo delle temperature: da {first_avg:.1f}°C a {last_avg:.1f}°C (+{delta:.1f}°C)." elif trend_type == "raffreddamento": trend_desc = "📉 Raffreddamento Progressivo" desc_text = f"Tendenza al ribasso delle temperature: da {first_avg:.1f}°C a {last_avg:.1f}°C ({delta:+.1f}°C)." elif trend_type == "stabile": trend_desc = "➡️ Temperature Stabili" desc_text = f"Temperature medie sostanzialmente stabili: da {first_avg:.1f}°C a {last_avg:.1f}°C (variazione {delta:+.1f}°C)." else: trend_desc = "🌡️ Variazione Termica" desc_text = f"Evoluzione temperature: da {first_avg:.1f}°C a {last_avg:.1f}°C ({delta:+.1f}°C)." intensity_text = " (variazione significativa)" if intensity == "forte" else " (variazione moderata)" explanation.append(f"{trend_desc}{intensity_text}") explanation.append(f"{desc_text}") # Aggiungi solo picchi significativi in modo sintetico if trend.get("change_days"): significant_changes = [c for c in trend["change_days"] if abs(c['delta']) > 3.0][:3] if significant_changes: change_texts = [] for change in significant_changes: day_name = f"Giorno {change['day']+1}" direction = "↑" if change['delta'] > 0 else "↓" change_texts.append(f"{direction} {day_name}: {change['from']:.0f}°→{change['to']:.0f}°C") if change_texts: explanation.append(f"Picchi: {', '.join(change_texts)}") explanation.append("") return "\n".join(explanation) def format_weather_context_report(models_data, location_name, country_code): """Genera report contestuale intelligente con ensemble multi-modello""" # Combina modelli a breve e lungo termine merged_data = merge_multi_model_forecast(models_data, forecast_days=10) if not merged_data: return "❌ Errore: Nessun dato meteo disponibile" hourly = merged_data.get('hourly', {}) daily = merged_data.get('daily', {}) models_used = merged_data.get('models_used', []) if not daily or not daily.get('time'): return "❌ Errore: Dati meteo incompleti" msg_parts = [] # HEADER models_text = " + ".join(models_used) if models_used else "Multi-modello" msg_parts.append(f"🌍 METEO FORECAST") msg_parts.append(f"{location_name.upper()}") msg_parts.append(f"📡 Ensemble: {models_text}\n") # ANALISI TREND TEMPERATURA (Fronti) - Completa su 10 giorni daily_temps_max = daily.get('temperature_2m_max', []) daily_temps_min = daily.get('temperature_2m_min', []) trend = analyze_temperature_trend(daily_temps_max, daily_temps_min, days=10) # Spiegazione dettagliata trend (sempre, anche se stabile) if trend: trend_explanation = format_detailed_trend_explanation(trend, daily_data_list=[]) if trend_explanation: msg_parts.append(trend_explanation) # ANALISI TRANSIZIONI METEO - Include anche precipitazioni prossimi giorni daily_time_list = daily.get('time', []) # Definito qui per uso successivo daily_weathercodes = daily.get('weathercode', []) transitions = analyze_weather_transitions(daily_weathercodes) # Cerca anche precipitazioni significative nei primi giorni precip_list = daily.get('precipitation_sum', []) weather_changes = [] # Aggiungi transizioni significative if transitions: significant_trans = [t for t in transitions if t.get("significant", False)] for trans in significant_trans[:5]: day_names = ["oggi", "domani", "dopodomani", "fra 3 giorni", "fra 4 giorni", "fra 5 giorni", "fra 6 giorni"] day_idx = trans["day"] - 1 if day_idx < len(day_names): day_ref = day_names[day_idx] else: day_ref = f"fra {trans['day']} giorni" weather_changes.append({ "day": trans["day"], "day_ref": day_ref, "from": trans["from"], "to": trans["to"], "type": "transition" }) # Aggiungi precipitazioni significative per domani/dopodomani se non già presenti # Usa snowfall (più affidabile) per determinare il tipo (pioggia/neve/grandine) # Prepara mappa hourly per giorni daily_map = defaultdict(list) times = hourly.get('time', []) for i, t in enumerate(times): daily_map[t.split("T")[0]].append(i) for day_idx in range(min(3, len(precip_list))): if day_idx >= len(precip_list) or precip_list[day_idx] is None: continue precip_amount = float(precip_list[day_idx]) day_num = day_idx + 1 # Verifica se già presente come transizione already_present = any(wc["day"] == day_num for wc in weather_changes) if already_present: continue # Ottieni dati hourly per questo giorno per determinare tipo precipitazione day_date = daily_time_list[day_idx].split("T")[0] if day_idx < len(daily_time_list) else None if not day_date: continue indices = daily_map.get(day_date, []) d_snow_day = [hourly.get('snowfall', [])[i] for i in indices if i < len(hourly.get('snowfall', []))] d_codes_day = [hourly.get('weathercode', [])[i] for i in indices if i < len(hourly.get('weathercode', []))] d_temps_day = [hourly.get('temperature_2m', [])[i] for i in indices if i < len(hourly.get('temperature_2m', []))] d_dews_day = [hourly.get('dewpoint_2m', [])[i] for i in indices if i < len(hourly.get('dewpoint_2m', []))] # Calcola accumulo neve per questo giorno snow_sum_day = sum([float(s) for s in d_snow_day if s is not None]) if d_snow_day else 0.0 # Determina tipo precipitazione usando snowfall (priorità) o weathercode (fallback) # NON inventiamo neve basandoci su temperatura - solo se snowfall>0 o weathercode indica neve # PRIORITÀ: Se c'è neve (anche poca), il simbolo è sempre ❄️, anche se la pioggia è maggiore precip_type_symbol = "💧" # Default pioggia threshold_mm = 5.0 # Soglia default per pioggia if precip_amount > 0.1: # Se snowfall è disponibile e positivo, usa quello (più preciso) if snow_sum_day > 0.1: # Se c'è neve (anche poca), il simbolo è sempre ❄️ (priorità alla neve) precip_type_symbol = "❄️" # Neve threshold_mm = 0.5 # Soglia più bassa per neve (anche pochi mm sono significativi) else: # Fallback: verifica weathercode per neve esplicita # Solo se i modelli indicano esplicitamente neve nei codici WMO snow_codes = [71, 73, 75, 77, 85, 86] # Codici WMO per neve hail_codes = [96, 99] # Codici WMO per grandine/temporale snow_count = sum(1 for c in d_codes_day if c is not None and int(c) in snow_codes) hail_count = sum(1 for c in d_codes_day if c is not None and int(c) in hail_codes) if hail_count > 0: precip_type_symbol = "⛈️" # Grandine/Temporale threshold_mm = 5.0 elif snow_count > 0: # Solo se weathercode indica esplicitamente neve precip_type_symbol = "❄️" # Neve threshold_mm = 0.5 # Soglia più bassa per neve # Aggiungi solo se supera la soglia appropriata if precip_amount > threshold_mm: day_names = ["oggi", "domani", "dopodomani"] if day_idx < len(day_names): weather_changes.append({ "day": day_num, "day_ref": day_names[day_idx], "from": "variabile", "to": "precipitazioni", "type": "precip", "amount": precip_amount, "precip_symbol": precip_type_symbol }) if weather_changes: # Ordina per giorno weather_changes.sort(key=lambda x: x["day"]) msg_parts.append("🔄 CAMBIAMENTI METEO SIGNIFICATIVI") for wc in weather_changes[:5]: if wc["type"] == "transition": from_icon = "☀️" if wc["from"] == "sereno" else "☁️" if wc["from"] == "nuvoloso" else "🌧️" to_icon = "🌧️" if "pioggia" in wc["to"] else "❄️" if "neve" in wc["to"] else "⛈️" if "temporale" in wc["to"] else "☁️" msg_parts.append(f" {from_icon}→{to_icon} {wc['day_ref']}: {wc['from']} → {wc['to']}") else: # Precipitazioni - usa simbolo appropriato precip_sym = wc.get('precip_symbol', '💧') msg_parts.append(f" ☁️→{precip_sym} {wc['day_ref']}: precipitazioni ({wc['amount']:.1f}mm)") msg_parts.append("") # DETTAGLIO GIORNALIERO # Usa i dati daily come riferimento principale (sono più affidabili) # daily_time_list già definito sopra temp_min_list = daily.get('temperature_2m_min', []) temp_max_list = daily.get('temperature_2m_max', []) # Limita ai giorni per cui abbiamo dati daily validi max_days = min(len(daily_time_list), len(temp_min_list), len(temp_max_list), 10) # Mappa hourly per eventi dettagliati daily_map = defaultdict(list) times = hourly.get('time', []) for i, t in enumerate(times): daily_map[t.split("T")[0]].append(i) events_summary = [] daily_details = [] for count in range(max_days): day_date = daily_time_list[count].split("T")[0] if count < len(daily_time_list) else None if not day_date: break # Ottieni indici hourly per questo giorno indices = daily_map.get(day_date, []) # Estrai dati hourly per questo giorno (se disponibili) d_times = [hourly['time'][i] for i in indices if i < len(hourly.get('time', []))] d_codes = [hourly.get('weathercode', [])[i] for i in indices if i < len(hourly.get('weathercode', []))] d_probs = [hourly.get('precipitation_probability', [])[i] for i in indices if i < len(hourly.get('precipitation_probability', []))] d_precip = [hourly.get('precipitation', [])[i] for i in indices if i < len(hourly.get('precipitation', []))] d_snow = [hourly.get('snowfall', [])[i] for i in indices if i < len(hourly.get('snowfall', []))] d_winds = [hourly.get('windspeed_10m', [])[i] for i in indices if i < len(hourly.get('windspeed_10m', []))] d_winddir = [hourly.get('winddirection_10m', [])[i] for i in indices if i < len(hourly.get('winddirection_10m', []))] d_temps = [hourly.get('temperature_2m', [])[i] for i in indices if i < len(hourly.get('temperature_2m', []))] d_dews = [hourly.get('dewpoint_2m', [])[i] for i in indices if i < len(hourly.get('dewpoint_2m', []))] d_clouds = [hourly.get('cloud_cover', [])[i] for i in indices if i < len(hourly.get('cloud_cover', []))] d_rains = [hourly.get('rain', [])[i] for i in indices if i < len(hourly.get('rain', []))] d_soil_temps = [hourly.get('soil_temperature_0cm', [])[i] for i in indices if i < len(hourly.get('soil_temperature_0cm', []))] d_snow_depth = [hourly.get('snow_depth', [])[i] for i in indices if i < len(hourly.get('snow_depth', []))] # Usa dati daily come primario (più affidabili) try: t_min_val = temp_min_list[count] if count < len(temp_min_list) else None t_max_val = temp_max_list[count] if count < len(temp_max_list) else None # Se dati daily validi, usali; altrimenti calcola da hourly if t_min_val is not None and t_max_val is not None: t_min = float(t_min_val) t_max = float(t_max_val) elif d_temps and any(t is not None for t in d_temps): temp_clean = [float(t) for t in d_temps if t is not None] t_min = min(temp_clean) t_max = max(temp_clean) else: # Se non ci sono dati, salta questo giorno continue # Usa dati daily per caratterizzazione precipitazioni precip_list = daily.get('precipitation_sum', []) snowfall_list = daily.get('snowfall_sum', []) rain_list = daily.get('rain_sum', []) showers_list = daily.get('showers_sum', []) if count < len(precip_list) and precip_list[count] is not None: precip_sum = float(precip_list[count]) elif d_precip and any(p is not None for p in d_precip): precip_sum = sum([float(p) for p in d_precip if p is not None]) else: precip_sum = 0.0 # Caratterizza precipitazioni usando dati daily snowfall_sum = 0.0 rain_sum = 0.0 showers_sum = 0.0 if count < len(snowfall_list) and snowfall_list[count] is not None: snowfall_sum = float(snowfall_list[count]) elif d_snow and any(s is not None for s in d_snow): snowfall_sum = sum([float(s) for s in d_snow if s is not None]) if count < len(rain_list) and rain_list[count] is not None: rain_sum = float(rain_list[count]) if count < len(showers_list) and showers_list[count] is not None: showers_sum = float(showers_list[count]) wind_list = daily.get('windspeed_10m_max', []) if count < len(wind_list) and wind_list[count] is not None: wind_max = float(wind_list[count]) elif d_winds and any(w is not None for w in d_winds): wind_max = max([float(w) for w in d_winds if w is not None]) else: wind_max = 0.0 # Calcola direzione vento dominante wind_dir_deg = None wind_dir_list = daily.get('winddirection_10m_dominant', []) if count < len(wind_dir_list) and wind_dir_list[count] is not None: wind_dir_deg = float(wind_dir_list[count]) elif d_winddir and any(wd is not None for wd in d_winddir): # Media delle direzioni vento del giorno wind_dir_clean = [float(wd) for wd in d_winddir if wd is not None] if wind_dir_clean: # Calcola media circolare import math sin_sum = sum(math.sin(math.radians(wd)) for wd in wind_dir_clean) cos_sum = sum(math.cos(math.radians(wd)) for wd in wind_dir_clean) wind_dir_deg = math.degrees(math.atan2(sin_sum / len(wind_dir_clean), cos_sum / len(wind_dir_clean))) if wind_dir_deg < 0: wind_dir_deg += 360 wind_dir_cardinal = degrees_to_cardinal(int(wind_dir_deg)) if wind_dir_deg is not None else "N" except (ValueError, TypeError, IndexError) as e: # Se ci sono errori nei dati, salta questo giorno continue events_list = analyze_daily_events(d_times, d_codes, d_probs, d_precip, d_winds, d_temps, d_dews, snowfalls=d_snow, rains=d_rains, soil_temps=d_soil_temps, cloud_covers=d_clouds, wind_speeds=d_winds) events_summary.append(events_list) dt = datetime.datetime.strptime(day_date, "%Y-%m-%d") # Nomi giorni in italiano giorni_ita = ["Lun", "Mar", "Mer", "Gio", "Ven", "Sab", "Dom"] day_str = f"{giorni_ita[dt.weekday()]} {dt.strftime('%d/%m')}" # Icona meteo principale basata sul weathercode del giorno wcode = daily.get('weathercode', [])[count] if count < len(daily.get('weathercode', [])) else None if wcode is None and d_codes: # Se non c'è weathercode daily, usa il più frequente tra gli hourly codes_clean = [int(c) for c in d_codes if c is not None] if codes_clean: wcode = Counter(codes_clean).most_common(1)[0][0] else: wcode = 0 else: wcode = int(wcode) if wcode is not None else 0 # Calcola probabilità e tipo precipitazione per il giorno (PRIMA di determinare icona) precip_type = None # Determina tipo precipitazione usando dati daily (più affidabili) # Usa snowfall_sum, rain_sum, showers_sum per caratterizzazione precisa # PRIORITÀ: Se snow_depth > 0 (modello italia_meteo_arpae_icon_2i), usa questi dati per determinare neve precip_type = None # Verifica snow_depth (per modello italia_meteo_arpae_icon_2i quando snow_depth > 0) # NOTA: I valori sono già convertiti in cm durante il recupero dall'API has_snow_depth_data = False max_snow_depth = 0.0 if d_snow_depth and len(d_snow_depth) > 0: snow_depth_valid = [] for sd in d_snow_depth: if sd is not None: try: val_cm = float(sd) # Già in cm if val_cm >= 0: snow_depth_valid.append(val_cm) except (ValueError, TypeError): continue if snow_depth_valid: max_snow_depth = max(snow_depth_valid) if max_snow_depth > 0: # Se snow_depth > 0 cm, considera presenza di neve persistente has_snow_depth_data = True # Verifica snow_depth INDIPENDENTEMENTE dalle precipitazioni # snow_depth rappresenta il manto nevoso persistente, non la neve che cade if has_snow_depth_data and max_snow_depth > 0: # C'è manto nevoso al suolo (indipendente da snowfall) # Questo influenza l'icona meteo, ma non il tipo di precipitazione se non sta nevicando pass # Gestito separatamente per l'icona meteo if precip_sum > 0.1: # Priorità 1: Se sta nevicando (snowfall > 0) e c'è manto nevoso, considera entrambi if has_snow_depth_data and max_snow_depth > 0: # C'è sia neve in caduta che manto nevoso persistente if rain_sum > 0.1 or showers_sum > 0.1: precip_type = "mixed" # Neve + pioggia/temporali elif snowfall_sum > 0.1: precip_type = "snow" # Neve in caduta + manto nevoso else: # Solo manto nevoso persistente, nessuna neve in caduta # Il tipo di precipitazione resta quello basato su snowfall/rain pass # Priorità 2: Usa dati daily se disponibili elif snowfall_sum > 0.1: # C'è neve significativa if snowfall_sum >= precip_sum * 0.5: precip_type = "snow" elif rain_sum > 0.1 or showers_sum > 0.1: # Mista (neve + pioggia/temporali) precip_type = "mixed" else: precip_type = "snow" elif rain_sum > 0.1: # Pioggia if showers_sum > 0.1: # Temporali (showers) precip_type = "thunderstorms" else: precip_type = "rain" elif showers_sum > 0.1: # Solo temporali precip_type = "thunderstorms" else: # Fallback: usa dati hourly se daily non disponibili snow_sum_day = sum([float(s) for s in d_snow if s is not None]) if d_snow else 0.0 if snow_sum_day > 0.1: if snow_sum_day >= precip_sum * 0.5: precip_type = "snow" else: precip_type = "rain" else: # Fallback: verifica weathercode per neve esplicita snow_codes = [71, 73, 75, 77, 85, 86] # Codici WMO per neve rain_codes = [51, 53, 55, 56, 57, 61, 63, 65, 80, 81, 82, 66, 67] # Codici WMO per pioggia hail_codes = [96, 99] # Codici WMO per grandine/temporale snow_count = sum(1 for c in d_codes if c is not None and int(c) in snow_codes) rain_count = sum(1 for c in d_codes if c is not None and int(c) in rain_codes) hail_count = sum(1 for c in d_codes if c is not None and int(c) in hail_codes) if hail_count > 0: precip_type = "hail" elif snow_count > rain_count: precip_type = "snow" else: precip_type = "rain" # Determina icona basandosi su precipitazioni (priorità) e poi nuvolosità/weathercode # Usa precip_type già calcolato # NOTA: snow_depth è INDIPENDENTE da precipitazioni - influenza l'icona anche senza nevicate has_precip = precip_sum > 0.1 if has_precip: # Precipitazioni: usa precip_type per determinare icona if precip_type == "snow": weather_icon = "❄️" # Neve elif precip_type == "thunderstorms" or precip_type == "hail" or wcode in (95, 96, 99): weather_icon = "⛈️" # Temporale/Grandine elif precip_type == "mixed": weather_icon = "🌨️" # Precipitazione mista else: weather_icon = "🌧️" # Pioggia elif has_snow_depth_data and max_snow_depth > 0: # C'è manto nevoso persistente anche senza precipitazioni # Mostra icona neve anche se non sta nevicando weather_icon = "❄️" # Manto nevoso presente elif t_min < 0: # Giorno freddo (t_min < 0): usa icona ghiaccio (indipendentemente da snow_depth) # Se c'è anche snow_depth, viene già gestito sopra weather_icon = "🧊" # Gelo/Ghiaccio (giorno freddo) elif wcode in (45, 48): weather_icon = "🌫️" # Nebbia else: # Nessuna precipitazione: usa nuvolosità se disponibile, altrimenti weathercode avg_cloud = 0 if d_clouds and any(c is not None for c in d_clouds): cloud_clean = [float(c) for c in d_clouds if c is not None] avg_cloud = sum(cloud_clean) / len(cloud_clean) if cloud_clean else 0 if avg_cloud > 0: # Usa nuvolosità media if avg_cloud <= 25: weather_icon = "☀️" # Sereno elif avg_cloud <= 50: weather_icon = "⛅" # Parzialmente nuvoloso elif avg_cloud <= 75: weather_icon = "☁️" # Nuvoloso else: weather_icon = "☁️" # Molto nuvoloso else: # Fallback a weathercode if wcode in (0, 1): weather_icon = "☀️" # Sereno elif wcode in (2, 3): weather_icon = "⛅" # Parzialmente nuvoloso else: weather_icon = "☁️" # Nuvoloso # Calcola spessore manto nevoso (snow_depth) per questo giorno # Nota: snow_depth è disponibile solo per alcuni modelli (es. ICON Italia, ICON-D2) # Se il modello non supporta snow_depth, tutti i valori saranno None o mancanti snow_depth_min = None snow_depth_max = None snow_depth_avg = None snow_depth_end = None # Verifica se abbiamo dati snow_depth validi per questo giorno # NOTA: I valori sono già convertiti in cm durante il recupero dall'API # Estrai anche direttamente dall'array hourly per sicurezza (fallback se d_snow_depth è vuoto) # PRIORITÀ: usa sempre i dati dall'array hourly per quel giorno (più affidabile) all_snow_depth_values = [] hourly_snow_depth = hourly.get('snow_depth', []) hourly_times = hourly.get('time', []) # Cerca direttamente nell'array hourly per questo giorno usando i timestamp (più affidabile) if hourly_snow_depth and hourly_times and day_date: for idx, ts in enumerate(hourly_times): if ts.startswith(day_date) and idx < len(hourly_snow_depth): if hourly_snow_depth[idx] is not None: all_snow_depth_values.append(hourly_snow_depth[idx]) # Se non trovato con timestamp, usa d_snow_depth come fallback if not all_snow_depth_values and d_snow_depth and len(d_snow_depth) > 0: all_snow_depth_values = d_snow_depth if all_snow_depth_values and len(all_snow_depth_values) > 0: # Filtra solo valori validi (>= 0 e non null, già in cm) snow_depth_clean = [] has_valid_data = False for sd in all_snow_depth_values: if sd is not None: has_valid_data = True # Almeno un dato non-null presente try: val_cm = float(sd) # Già in cm if val_cm >= 0: # Solo valori non negativi snow_depth_clean.append(val_cm) except (ValueError, TypeError): continue # Calcola statistiche se abbiamo almeno un valore non-null (il modello supporta snow_depth) # snow_depth è INDIPENDENTE da snowfall: rappresenta il manto nevoso persistente al suolo # Mostriamo sempre quando disponibile e > 0, anche se non nevica if has_valid_data and snow_depth_clean: max_depth = max(snow_depth_clean) min_depth = min(snow_depth_clean) # Calcola sempre le statistiche se ci sono dati validi, anche se il valore è piccolo if max_depth > 0: # Mostra se almeno un valore > 0 cm snow_depth_min = min_depth snow_depth_max = max_depth snow_depth_avg = sum(snow_depth_clean) / len(snow_depth_clean) # Prendi l'ultimo valore non-null del giorno (spessore alla fine del giorno) # Ordina per valore per trovare l'ultimo valore del giorno (non necessariamente l'ultimo della lista) snow_depth_end = snow_depth_clean[-1] if snow_depth_clean else None # Preferisci l'ultimo valore non-null originale per avere il valore alla fine del giorno if all_snow_depth_values: for sd in reversed(all_snow_depth_values): if sd is not None: try: val_cm = float(sd) if val_cm > 0: snow_depth_end = val_cm break except (ValueError, TypeError): continue day_info = { "day_str": day_str, "t_min": t_min, "t_max": t_max, "precip_sum": precip_sum, "precip_type": precip_type, "snowfall_sum": snowfall_sum, "rain_sum": rain_sum, "showers_sum": showers_sum, "wind_max": wind_max, "wind_dir": wind_dir_cardinal, "events": events_list, "weather_icon": weather_icon, "snow_depth_min": snow_depth_min, "snow_depth_max": snow_depth_max, "snow_depth_avg": snow_depth_avg, "snow_depth_end": snow_depth_end } daily_details.append(day_info) count += 1 # Formatta dettagli giornalieri (tutti i giorni disponibili) msg_parts.append("📅 PREVISIONI GIORNALIERE") prev_snow_depth_end = None # Traccia lo spessore del giorno precedente per mostrare evoluzione for day_info in daily_details: line = f"{day_info['weather_icon']} {day_info['day_str']} 🌡️ {day_info['t_min']:.0f}°/{day_info['t_max']:.0f}°C" # Aggiungi informazioni precipitazioni con caratterizzazione dettagliata # Nota: mm è accumulo totale giornaliero (somma di tutte le ore) if day_info['precip_sum'] > 0.1: # Caratterizza usando dati daily se disponibili precip_parts = [] # Neve if day_info.get('snowfall_sum', 0) > 0.1: precip_parts.append(f"❄️ {day_info['snowfall_sum']:.1f}cm") # Pioggia if day_info.get('rain_sum', 0) > 0.1: precip_parts.append(f"🌧️ {day_info['rain_sum']:.1f}mm") # Temporali (showers) if day_info.get('showers_sum', 0) > 0.1: precip_parts.append(f"⛈️ {day_info['showers_sum']:.1f}mm") # Se non abbiamo dati daily dettagliati, usa il tipo generale if not precip_parts: precip_symbol = "❄️" if day_info['precip_type'] == "snow" else "⛈️" if day_info['precip_type'] in ("hail", "thunderstorms") else "🌨️" if day_info['precip_type'] == "mixed" else "🌧️" precip_parts.append(f"{precip_symbol} {day_info['precip_sum']:.1f}mm") line += f" | {' + '.join(precip_parts)}" # Aggiungi vento (sempre se disponibile, formattato come direzione intensità) if day_info['wind_max'] > 0: wind_str = f"{day_info['wind_dir']} {day_info['wind_max']:.0f}km/h" line += f" | 💨 {wind_str}" msg_parts.append(line) # Mostra spessore manto nevoso se disponibile e > 0 # snow_depth è INDIPENDENTE da snowfall: rappresenta il manto nevoso persistente al suolo # Deve essere sempre mostrato quando disponibile, anche nei giorni senza nevicate # Se snow_depth_end è None ma ci sono dati validi, ricalcola snow_depth_end = day_info.get('snow_depth_end') if snow_depth_end is None: # Prova a ricalcolare da snow_depth_min/max/avg se disponibili snow_depth_max = day_info.get('snow_depth_max') snow_depth_avg = day_info.get('snow_depth_avg') if snow_depth_max is not None and snow_depth_max > 0: snow_depth_end = snow_depth_max # Usa il massimo come fallback elif snow_depth_avg is not None and snow_depth_avg > 0: snow_depth_end = snow_depth_avg # Usa la media come fallback if snow_depth_end is not None and snow_depth_end > 0: snow_depth_str = f"❄️ Manto nevoso: {snow_depth_end:.1f} cm" # Mostra evoluzione rispetto al giorno precedente if prev_snow_depth_end is not None: diff = snow_depth_end - prev_snow_depth_end if abs(diff) > 0.5: # Solo se variazione significativa if diff > 0: snow_depth_str += f" (↑ +{diff:.1f} cm)" else: snow_depth_str += f" (↓ {diff:.1f} cm)" # Mostra range se c'è variazione significativa durante il giorno snow_depth_min = day_info.get('snow_depth_min') snow_depth_max = day_info.get('snow_depth_max') if snow_depth_min is not None and snow_depth_max is not None: if snow_depth_max - snow_depth_min > 1.0: # Variazione > 1cm durante il giorno snow_depth_str += f" [range: {snow_depth_min:.1f}-{snow_depth_max:.1f} cm]" msg_parts.append(f" {snow_depth_str}") if day_info['events']: for ev in day_info['events'][:3]: # Limita a 3 eventi principali msg_parts.append(f" ➤ {ev}") # Aggiorna per il prossimo giorno prev_snow_depth_end = snow_depth_end if snow_depth_end is not None else prev_snow_depth_end msg_parts.append("") return "\n".join(msg_parts) def send_telegram(text, chat_id, token, debug_mode=False): if not token: return False url = f"https://api.telegram.org/bot{token}/sendMessage" payload = { "chat_id": chat_id, "text": text, "parse_mode": "HTML", "disable_web_page_preview": True } try: resp = requests.post(url, json=payload, timeout=15) return resp.status_code == 200 except: return False def main(): parser = argparse.ArgumentParser() parser.add_argument("query", nargs="?", default="casa") parser.add_argument("--chat_id") parser.add_argument("--debug", action="store_true") parser.add_argument("--home", action="store_true") parser.add_argument("--timezone", help="Timezone IANA (es: Europe/Rome, America/New_York)") args = parser.parse_args() token = get_bot_token() debug_mode = args.debug # Determina destinatari if debug_mode: recipients = [ADMIN_CHAT_ID] elif args.chat_id: recipients = [args.chat_id] else: recipients = TELEGRAM_CHAT_IDS # Determina località if args.home or (not args.query or args.query.lower() == "casa"): lat, lon, name, cc = DEFAULT_LAT, DEFAULT_LON, DEFAULT_NAME, "SM" else: coords = get_coordinates(args.query) if not coords[0]: error_msg = f"❌ Località '{args.query}' non trovata." if token: for chat_id in recipients: send_telegram(error_msg, chat_id, token, debug_mode) else: print(error_msg) return lat, lon, name, cc = coords # Recupera dati multi-modello (breve + lungo termine) - selezione intelligente basata su country code # Determina se è Casa is_home = (abs(lat - DEFAULT_LAT) < 0.01 and abs(lon - DEFAULT_LON) < 0.01) # Recupera dati multi-modello (breve + lungo termine) # - Per Casa: usa AROME Seamless e ICON-D2 # - Per altre località: usa best match di Open-Meteo short_models, long_models = choose_models_by_country(cc, is_home=is_home) # Usa timezone personalizzata se fornita timezone = args.timezone if hasattr(args, 'timezone') and args.timezone else None models_data = get_weather_multi_model(lat, lon, short_models, long_models, forecast_days=10, timezone=timezone) if not any(models_data.values()): error_msg = "❌ Errore: Impossibile recuperare dati meteo." if token: for chat_id in recipients: send_telegram(error_msg, chat_id, token, debug_mode) else: print(error_msg) return # Genera report report = format_weather_context_report(models_data, name, cc) if debug_mode: report = f"🛠 [DEBUG MODE] 🛠\n\n{report}" # Invia if token: success = False for chat_id in recipients: if send_telegram(report, chat_id, token, debug_mode): success = True if not success: print("❌ Errore invio Telegram") else: print(report) if __name__ == "__main__": main()