Files

1649 lines
80 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Assistente Climatico Intelligente - Report Meteo Avanzato
Analizza evoluzione meteo, fronti, cambiamenti e fornisce consigli pratici
"""
import requests
import argparse
import datetime
import os
import sys
from zoneinfo import ZoneInfo
from collections import defaultdict, Counter, Counter
from typing import List, Dict, Tuple, Optional
from statistics import mean, median
from open_meteo_client import open_meteo_get
# --- CONFIGURAZIONE DEFAULT ---
DEFAULT_LAT = 43.9356
DEFAULT_LON = 12.4296
DEFAULT_NAME = "🏠 Casa (Strada Cà Toro,12 - San Marino)"
# --- TIMEZONE ---
TZ_STR = "Europe/Berlin"
TZINFO = ZoneInfo(TZ_STR)
# --- TELEGRAM CONFIG ---
ADMIN_CHAT_ID = "64463169"
TELEGRAM_CHAT_IDS = ["64463169", "24827341", "132455422", "5405962012"]
TOKEN_FILE_HOME = os.path.expanduser("~/.telegram_dpc_bot_token")
TOKEN_FILE_ETC = "/etc/telegram_dpc_bot_token"
TOKEN_FILE_VOLUME = "/Volumes/Pi2/etc/telegram_dpc_bot_token"
# --- SOGLIE ---
SOGLIA_VENTO_KMH = 40.0
MIN_MM_PER_EVENTO = 0.1
# --- MODELLI METEO ---
# Modelli a breve termine (alta risoluzione, 48-72h)
SHORT_TERM_MODELS = ["meteofrance_seamless", "icon_d2"] # Usa seamless invece di arome_france_hd
# Modelli a lungo termine (globale, 10 giorni)
LONG_TERM_MODELS = ["gfs_global", "ecmwf_ifs04"]
# Modelli alternativi
MODELS_IT_SM = ["meteofrance_seamless", "icon_d2", "gfs_global"]
MODEL_NAMES = {
"meteofrance_arome_france_hd": "AROME HD",
"meteofrance_seamless": "AROME Seamless",
"icon_d2": "ICON-D2",
"gfs_global": "GFS",
"ecmwf_ifs04": "ECMWF",
"jma_msm": "JMA MSM",
"metno_nordic": "Yr.no",
"ukmo_global": "UK MetOffice",
"icon_eu": "ICON-EU",
"italia_meteo_arpae_icon_2i": "ICON Italia (ARPAE 2i)"
}
def choose_models_by_country(cc, is_home=False):
"""
Seleziona modelli meteo ottimali.
- Per Casa: usa AROME Seamless e ICON-D2 (alta risoluzione)
- Per Italia: usa italia_meteo_arpae_icon_2i (include snow_depth quando > 0)
- Per altre località: usa best match di Open-Meteo (senza specificare models)
Ritorna (short_term_models, long_term_models)
"""
cc = cc.upper() if cc else "UNKNOWN"
# Modelli a lungo termine (sempre globali, funzionano ovunque)
long_term_default = ["gfs_global", "ecmwf_ifs04"]
if is_home:
# Per Casa, usa AROME Seamless, ICON-D2 e ICON Italia (alta risoluzione europea)
# ICON Italia include snow_depth quando disponibile (> 0)
return ["meteofrance_seamless", "icon_d2", "italia_meteo_arpae_icon_2i"], long_term_default
elif cc == "IT":
# Per Italia, usa ICON Italia (ARPAE 2i) che include snow_depth quando disponibile
return ["italia_meteo_arpae_icon_2i"], long_term_default
else:
# Per query worldwide, usa best match (Open-Meteo sceglie automaticamente)
# Ritorna None per indicare best match
return None, long_term_default
def get_bot_token():
paths = [TOKEN_FILE_HOME, TOKEN_FILE_ETC, TOKEN_FILE_VOLUME]
for path in paths:
if os.path.exists(path):
try:
with open(path, 'r') as f:
return f.read().strip()
except:
pass
return None
def get_coordinates(query):
if not query or query.lower() == "casa":
return DEFAULT_LAT, DEFAULT_LON, DEFAULT_NAME, "SM"
url = "https://geocoding-api.open-meteo.com/v1/search"
try:
resp = open_meteo_get(url, params={"name": query, "count": 1, "language": "it", "format": "json"}, timeout=(5, 10))
res = resp.json().get("results", [])
if res:
res = res[0]
cc = res.get("country_code", "IT").upper()
name = f"{res.get('name')} ({cc})"
return res['latitude'], res['longitude'], name, cc
except:
pass
return None, None, None, None
def degrees_to_cardinal(d: int) -> str:
"""Converte gradi in direzione cardinale (N, NE, E, SE, S, SW, W, NW)"""
dirs = ["N", "NE", "E", "SE", "S", "SW", "W", "NW"]
try:
return dirs[round(d / 45) % 8]
except:
return "N"
def get_weather_multi_model(lat, lon, short_term_models, long_term_models, forecast_days=10, timezone=None):
"""
Recupera dati da modelli a breve e lungo termine per ensemble completo.
Se short_term_models è None, usa best match di Open-Meteo (senza specificare models).
"""
results = {}
# Recupera modelli a breve termine (alta risoluzione, fino a ~72h)
if short_term_models is None:
# Best match: non specificare models, Open-Meteo sceglie automaticamente
url = "https://api.open-meteo.com/v1/forecast"
params = {
"latitude": lat, "longitude": lon,
"hourly": "temperature_2m,precipitation_probability,precipitation,snowfall,rain,snow_depth,weathercode,windspeed_10m,windgusts_10m,winddirection_10m,dewpoint_2m,relative_humidity_2m,cloud_cover,soil_temperature_0cm",
"daily": "temperature_2m_max,temperature_2m_min,precipitation_sum,precipitation_hours,precipitation_probability_max,snowfall_sum,showers_sum,rain_sum,weathercode,winddirection_10m_dominant,windspeed_10m_max,windgusts_10m_max",
"timezone": timezone if timezone else TZ_STR, "forecast_days": min(forecast_days, 3) # Limita a 3 giorni per modelli ad alta risoluzione
}
try:
resp = open_meteo_get(url, params=params, timeout=(5, 20))
if resp.status_code == 200:
data = resp.json()
# Converti snow_depth da metri a cm per tutti i modelli (Open-Meteo restituisce in metri)
hourly_data = data.get("hourly", {})
if hourly_data and "snow_depth" in hourly_data:
snow_depth_values = hourly_data.get("snow_depth", [])
# Converti da metri a cm (moltiplica per 100)
snow_depth_cm = []
for sd in snow_depth_values:
if sd is not None:
try:
val_m = float(sd)
val_cm = val_m * 100.0 # Converti da metri a cm
snow_depth_cm.append(val_cm)
except (ValueError, TypeError):
snow_depth_cm.append(None)
else:
snow_depth_cm.append(None)
hourly_data["snow_depth"] = snow_depth_cm
data["hourly"] = hourly_data
results["best_match"] = data
results["best_match"]["model_type"] = "short_term"
else:
results["best_match"] = None
except:
results["best_match"] = None
else:
# Modelli specifici (per Casa: AROME + ICON, per Italia: ICON ARPAE)
for model in short_term_models:
url = "https://api.open-meteo.com/v1/forecast"
# Per italia_meteo_arpae_icon_2i, includi sempre snow_depth (supportato quando > 0)
hourly_params = "temperature_2m,precipitation_probability,precipitation,snowfall,rain,snow_depth,weathercode,windspeed_10m,windgusts_10m,winddirection_10m,dewpoint_2m,relative_humidity_2m,cloud_cover,soil_temperature_0cm"
params = {
"latitude": lat, "longitude": lon,
"hourly": hourly_params,
"daily": "temperature_2m_max,temperature_2m_min,precipitation_sum,precipitation_hours,precipitation_probability_max,snowfall_sum,showers_sum,rain_sum,weathercode,winddirection_10m_dominant,windspeed_10m_max,windgusts_10m_max",
"timezone": timezone if timezone else TZ_STR, "models": model, "forecast_days": min(forecast_days, 3) # Limita a 3 giorni per modelli ad alta risoluzione
}
try:
resp = open_meteo_get(url, params=params, timeout=(5, 20))
if resp.status_code == 200:
data = resp.json()
# Converti snow_depth da metri a cm per tutti i modelli (Open-Meteo restituisce in metri)
hourly_data = data.get("hourly", {})
if hourly_data and "snow_depth" in hourly_data:
snow_depth_values = hourly_data.get("snow_depth", [])
# Converti da metri a cm (moltiplica per 100)
snow_depth_cm = []
for sd in snow_depth_values:
if sd is not None:
try:
val_m = float(sd)
val_cm = val_m * 100.0 # Converti da metri a cm
snow_depth_cm.append(val_cm)
except (ValueError, TypeError):
snow_depth_cm.append(None)
else:
snow_depth_cm.append(None)
hourly_data["snow_depth"] = snow_depth_cm
data["hourly"] = hourly_data
# Per italia_meteo_arpae_icon_2i, verifica se snow_depth è disponibile e > 0
if model == "italia_meteo_arpae_icon_2i":
if hourly_data and "snow_depth" in hourly_data:
snow_depth_values_cm = hourly_data.get("snow_depth", [])
# Verifica se almeno un valore di snow_depth è > 0 (ora già in cm)
has_snow_depth = False
if snow_depth_values_cm:
for sd in snow_depth_values_cm[:24]: # Controlla prime 24h
if sd is not None:
try:
if float(sd) > 0.5: # > 0.5 cm
has_snow_depth = True
break
except (ValueError, TypeError):
continue
# Se snow_depth > 0, assicurati che sia incluso nei dati
if has_snow_depth:
data["has_snow_depth_data"] = True
results[model] = data
results[model]["model_type"] = "short_term"
else:
results[model] = None
except:
results[model] = None
# Recupera modelli a lungo termine (globale, fino a 10 giorni)
for model in long_term_models:
url = "https://api.open-meteo.com/v1/forecast"
params = {
"latitude": lat, "longitude": lon,
"hourly": "temperature_2m,precipitation_probability,precipitation,snowfall,rain,snow_depth,weathercode,windspeed_10m,windgusts_10m,winddirection_10m,dewpoint_2m,relative_humidity_2m,cloud_cover,soil_temperature_0cm",
"daily": "temperature_2m_max,temperature_2m_min,precipitation_sum,precipitation_hours,precipitation_probability_max,snowfall_sum,showers_sum,rain_sum,weathercode,winddirection_10m_dominant,windspeed_10m_max,windgusts_10m_max",
"timezone": timezone if timezone else TZ_STR, "models": model, "forecast_days": forecast_days
}
try:
resp = open_meteo_get(url, params=params, timeout=(5, 25))
if resp.status_code == 200:
data = resp.json()
# Converti snow_depth da metri a cm per tutti i modelli (Open-Meteo restituisce in metri)
hourly_data = data.get("hourly", {})
if hourly_data and "snow_depth" in hourly_data:
snow_depth_values = hourly_data.get("snow_depth", [])
# Converti da metri a cm (moltiplica per 100)
snow_depth_cm = []
for sd in snow_depth_values:
if sd is not None:
try:
val_m = float(sd)
val_cm = val_m * 100.0 # Converti da metri a cm
snow_depth_cm.append(val_cm)
except (ValueError, TypeError):
snow_depth_cm.append(None)
else:
snow_depth_cm.append(None)
hourly_data["snow_depth"] = snow_depth_cm
data["hourly"] = hourly_data
results[model] = data
results[model]["model_type"] = "long_term"
else:
results[model] = None
except Exception:
results[model] = None
return results
def merge_multi_model_forecast(models_data, forecast_days=10):
"""Combina dati da modelli a breve e lungo termine in un forecast unificato"""
merged = {
"daily": {
"time": [],
"temperature_2m_max": [],
"temperature_2m_min": [],
"precipitation_sum": [],
"precipitation_hours": [],
"precipitation_probability_max": [],
"snowfall_sum": [],
"showers_sum": [],
"rain_sum": [],
"weathercode": [],
"windspeed_10m_max": [],
"windgusts_10m_max": []
},
"hourly": {
"time": [],
"temperature_2m": [],
"precipitation": [],
"snowfall": [],
"snow_depth": [],
"rain": [],
"weathercode": [],
"windspeed_10m": [],
"winddirection_10m": [],
"dewpoint_2m": [],
"precipitation_probability": [],
"cloud_cover": [],
"soil_temperature_0cm": []
},
"models_used": []
}
# Trova modello a breve termine disponibile (cerca tutti i modelli con type "short_term")
# Priorità: ICON Italia per snow_depth, altrimenti primo disponibile
short_term_data = None
short_term_model = None
icon_italia_data = None
icon_italia_model = None
# Prima cerca ICON Italia (ha snow_depth quando disponibile)
# Cerca anche altri modelli che potrebbero avere snow_depth (icon_d2, etc.)
for model in models_data.keys():
if models_data[model] and models_data[model].get("model_type") == "short_term":
# Priorità a ICON Italia, ma cerca anche altri modelli con snow_depth
if model == "italia_meteo_arpae_icon_2i":
icon_italia_data = models_data[model]
icon_italia_model = model
# ICON-D2 può avere anche snow_depth
elif model == "icon_d2" and icon_italia_data is None:
# Usa ICON-D2 come fallback se ICON Italia non disponibile
hourly_data = models_data[model].get("hourly", {})
snow_depth_values = hourly_data.get("snow_depth", []) if hourly_data else []
# Verifica se ha dati di snow_depth validi
has_valid_snow_depth = False
if snow_depth_values:
for sd in snow_depth_values[:24]:
if sd is not None:
try:
if float(sd) > 0:
has_valid_snow_depth = True
break
except (ValueError, TypeError):
continue
if has_valid_snow_depth:
icon_italia_data = models_data[model]
icon_italia_model = model
# Poi cerca primo modello disponibile (per altri parametri)
for model in models_data.keys():
if models_data[model] and models_data[model].get("model_type") == "short_term":
short_term_data = models_data[model]
short_term_model = model
break
# Trova modello a lungo termine disponibile (cerca tutti i modelli con type "long_term")
long_term_data = None
long_term_model = None
for model in models_data.keys():
if models_data[model] and models_data[model].get("model_type") == "long_term":
long_term_data = models_data[model]
long_term_model = model
break
if not short_term_data and not long_term_data:
return None
# Usa dati a breve termine per primi 2-3 giorni, poi passa a lungo termine
cutoff_day = 2 # Usa modelli ad alta risoluzione per primi 2 giorni
if short_term_data:
# Gestisci best_match o modelli specifici
if short_term_model == "best_match":
model_display = "Best Match"
else:
model_display = MODEL_NAMES.get(short_term_model, short_term_model)
# Verifica se ICON Italia ha dati di snow_depth (controllo diretto, non solo il flag)
has_icon_snow_depth = False
if icon_italia_data:
icon_hourly = icon_italia_data.get("hourly", {})
icon_snow_depth = icon_hourly.get("snow_depth", []) if icon_hourly else []
# Verifica se ci sono dati non-null di snow_depth
if icon_snow_depth:
for sd in icon_snow_depth[:72]: # Controlla prime 72h
if sd is not None:
try:
if float(sd) > 0: # Anche valori piccoli
has_icon_snow_depth = True
break
except (ValueError, TypeError):
continue
# Se ICON Italia ha dati di snow_depth, aggiungilo ai modelli usati
if has_icon_snow_depth or (icon_italia_data and icon_italia_data.get("has_snow_depth_data")):
icon_display = MODEL_NAMES.get("italia_meteo_arpae_icon_2i", "ICON Italia")
merged["models_used"].append(f"{model_display} + {icon_display} (snow_depth) (0-{cutoff_day+1}d)")
else:
merged["models_used"].append(f"{model_display} (0-{cutoff_day+1}d)")
short_daily = short_term_data.get("daily", {})
short_hourly = short_term_data.get("hourly", {})
# Prendi dati daily dai primi giorni del modello a breve termine
short_daily_times = short_daily.get("time", [])[:cutoff_day+1]
for i, day_time in enumerate(short_daily_times):
merged["daily"]["time"].append(day_time)
for key in ["temperature_2m_max", "temperature_2m_min", "precipitation_sum", "precipitation_hours", "precipitation_probability_max", "snowfall_sum", "showers_sum", "rain_sum", "weathercode", "windspeed_10m_max", "windgusts_10m_max"]:
val = short_daily.get(key, [])[i] if i < len(short_daily.get(key, [])) else None
merged["daily"][key].append(val)
# Prendi dati hourly dal modello a breve termine
# Priorità: usa snow_depth da ICON Italia se disponibile, altrimenti dal modello principale
short_hourly_times = short_hourly.get("time", [])
icon_italia_hourly = icon_italia_data.get("hourly", {}) if icon_italia_data else {}
icon_italia_hourly_times = icon_italia_hourly.get("time", []) if icon_italia_hourly else []
icon_italia_snow_depth = icon_italia_hourly.get("snow_depth", []) if icon_italia_hourly else []
# Crea mappa timestamp -> snow_depth per ICON Italia (per corrispondenza esatta o approssimata)
icon_snow_depth_map = {}
if icon_italia_hourly_times and icon_italia_snow_depth:
for idx, ts in enumerate(icon_italia_hourly_times):
if idx < len(icon_italia_snow_depth) and icon_italia_snow_depth[idx] is not None:
try:
val_cm = float(icon_italia_snow_depth[idx])
if val_cm >= 0: # Solo valori validi (già in cm)
icon_snow_depth_map[ts] = val_cm
except (ValueError, TypeError):
continue
cutoff_hour = (cutoff_day + 1) * 24
for i, hour_time in enumerate(short_hourly_times[:cutoff_hour]):
merged["hourly"]["time"].append(hour_time)
for key in ["temperature_2m", "precipitation", "snowfall", "rain", "weathercode", "windspeed_10m", "winddirection_10m", "dewpoint_2m", "precipitation_probability", "cloud_cover", "soil_temperature_0cm"]:
val = short_hourly.get(key, [])[i] if i < len(short_hourly.get(key, [])) else None
merged["hourly"][key].append(val)
# Per snow_depth: usa ICON Italia se disponibile (corrispondenza per timestamp), altrimenti modello principale
# NOTA: I valori sono già convertiti in cm durante il recupero dall'API
val_snow_depth = None
# Cerca corrispondenza esatta per timestamp
if hour_time in icon_snow_depth_map:
# Usa snow_depth da ICON Italia per questo timestamp (già in cm)
val_snow_depth = icon_snow_depth_map[hour_time]
else:
# Fallback 1: cerca corrispondenza per ora approssimata (se i timestamp non corrispondono esattamente)
# Estrai solo la parte ora (YYYY-MM-DDTHH) per corrispondenza approssimata
hour_time_base = hour_time[:13] if len(hour_time) >= 13 else hour_time # "2025-01-09T12"
for icon_ts, icon_val in icon_snow_depth_map.items():
if icon_ts.startswith(hour_time_base):
val_snow_depth = icon_val
break
# Fallback 2: se non trovato, cerca il valore più vicino nello stesso giorno
if val_snow_depth is None and hour_time_base:
day_date_str = hour_time[:10] if len(hour_time) >= 10 else None # "2025-01-09"
if day_date_str:
# Cerca tutti i valori di ICON Italia per lo stesso giorno
same_day_values = [v for ts, v in icon_snow_depth_map.items() if ts.startswith(day_date_str)]
if same_day_values:
# Usa il primo valore disponibile per quel giorno (approssimazione)
val_snow_depth = same_day_values[0]
# Fallback 3: usa snow_depth dal modello principale se ICON Italia non disponibile
if val_snow_depth is None and i < len(short_hourly.get("snow_depth", [])):
val_snow_depth = short_hourly.get("snow_depth", [])[i]
merged["hourly"]["snow_depth"].append(val_snow_depth)
if long_term_data:
merged["models_used"].append(f"{MODEL_NAMES.get(long_term_model, long_term_model)} ({cutoff_day+1}-{forecast_days}d)")
long_daily = long_term_data.get("daily", {})
long_hourly = long_term_data.get("hourly", {})
# Prendi dati daily dal modello a lungo termine per i giorni successivi
long_daily_times = long_daily.get("time", [])
start_idx = cutoff_day + 1
for i in range(start_idx, min(len(long_daily_times), forecast_days)):
merged["daily"]["time"].append(long_daily_times[i])
for key in ["temperature_2m_max", "temperature_2m_min", "precipitation_sum", "precipitation_hours", "precipitation_probability_max", "snowfall_sum", "showers_sum", "rain_sum", "weathercode", "windspeed_10m_max", "windgusts_10m_max"]:
val = long_daily.get(key, [])[i] if i < len(long_daily.get(key, [])) else None
merged["daily"][key].append(val)
# Per i dati hourly, completa con dati a lungo termine se necessario
long_hourly_times = long_hourly.get("time", [])
current_hourly_count = len(merged["hourly"]["time"])
needed_hours = forecast_days * 24
if current_hourly_count < needed_hours:
start_hour_idx = current_hourly_count
for i in range(start_hour_idx, min(len(long_hourly_times), needed_hours)):
merged["hourly"]["time"].append(long_hourly_times[i])
for key in ["temperature_2m", "precipitation", "snowfall", "snow_depth", "rain", "weathercode", "windspeed_10m", "winddirection_10m", "dewpoint_2m", "precipitation_probability", "cloud_cover", "soil_temperature_0cm"]:
val = long_hourly.get(key, [])[i] if i < len(long_hourly.get(key, [])) else None
merged["hourly"][key].append(val)
return merged
def analyze_temperature_trend(daily_temps_max, daily_temps_min, days=10):
"""Analizza trend temperatura per identificare fronti caldi/freddi con dettaglio completo"""
if not daily_temps_max or not daily_temps_min:
return None
max_days = min(days, len(daily_temps_max), len(daily_temps_min))
if max_days < 3:
return None
# Filtra valori None e calcola temperature medie giornaliere
avg_temps = []
valid_indices = []
for i in range(max_days):
t_max = daily_temps_max[i]
t_min = daily_temps_min[i]
if t_max is not None and t_min is not None:
avg_temps.append((float(t_max) + float(t_min)) / 2)
valid_indices.append(i)
else:
avg_temps.append(None)
if len([t for t in avg_temps if t is not None]) < 3:
return None
# Analizza tendenza generale (prime 3 giorni vs ultimi 3 giorni validi)
valid_temps = [t for t in avg_temps if t is not None]
if len(valid_temps) < 3:
return None
first_avg = mean(valid_temps[:3])
last_avg = mean(valid_temps[-3:])
diff = last_avg - first_avg
trend_type = None
trend_intensity = "moderato"
if diff > 5:
trend_type = "fronte_caldo"
trend_intensity = "forte" if diff > 8 else "moderato"
elif diff > 2:
trend_type = "riscaldamento"
trend_intensity = "moderato"
elif diff < -5:
trend_type = "fronte_freddo"
trend_intensity = "forte" if diff < -8 else "moderato"
elif diff < -2:
trend_type = "raffreddamento"
trend_intensity = "moderato"
else:
trend_type = "stabile"
# Identifica giorni di cambio significativo
change_days = []
prev_temp = None
for i, temp in enumerate(avg_temps):
if temp is not None:
if prev_temp is not None:
day_diff = temp - prev_temp
if abs(day_diff) > 3: # Cambio significativo (>3°C)
change_days.append({
"day": i,
"delta": round(day_diff, 1),
"from": round(prev_temp, 1),
"to": round(temp, 1)
})
prev_temp = temp
# Analisi per periodi (primi 3 giorni, medio termine, lungo termine)
period_analysis = {}
if len(valid_temps) >= 7:
period_analysis["short_term"] = {
"avg": round(mean(valid_temps[:3]), 1),
"range": round(max(valid_temps[:3]) - min(valid_temps[:3]), 1)
}
mid_start = len(valid_temps) // 3
mid_end = (len(valid_temps) * 2) // 3
period_analysis["mid_term"] = {
"avg": round(mean(valid_temps[mid_start:mid_end]), 1),
"range": round(max(valid_temps[mid_start:mid_end]) - min(valid_temps[mid_start:mid_end]), 1)
}
period_analysis["long_term"] = {
"avg": round(mean(valid_temps[-3:]), 1),
"range": round(max(valid_temps[-3:]) - min(valid_temps[-3:]), 1)
}
return {
"type": trend_type,
"intensity": trend_intensity,
"delta": round(diff, 1),
"change_days": change_days,
"first_avg": round(first_avg, 1),
"last_avg": round(last_avg, 1),
"period_analysis": period_analysis,
"daily_avg_temps": avg_temps,
"daily_max": daily_temps_max[:max_days],
"daily_min": daily_temps_min[:max_days]
}
def analyze_weather_transitions(daily_weathercodes):
"""Analizza transizioni meteo significative"""
transitions = []
if not daily_weathercodes or len(daily_weathercodes) < 2:
return transitions
# Categorie meteo
def get_category(code):
if code is None:
return "variabile"
code = int(code)
if code in (0, 1): return "sereno"
if code in (2, 3): return "nuvoloso"
if code in (45, 48): return "nebbia"
if code in (51, 53, 55, 56, 57, 61, 63, 65, 66, 67, 80, 81, 82): return "pioggia"
if code in (71, 73, 75, 77, 85, 86): return "neve"
if code in (95, 96, 99): return "temporale"
return "variabile"
for i in range(1, min(len(daily_weathercodes), 8)):
prev_code = daily_weathercodes[i-1] if i-1 < len(daily_weathercodes) else None
curr_code = daily_weathercodes[i] if i < len(daily_weathercodes) else None
prev_cat = get_category(prev_code)
curr_cat = get_category(curr_code)
if prev_cat != curr_cat:
transitions.append({
"day": i,
"from": prev_cat,
"to": curr_cat,
"significant": prev_cat in ["sereno", "nuvoloso"] and curr_cat in ["pioggia", "neve", "temporale"]
})
return transitions
def get_precip_type(code):
"""Definisce il tipo di precipitazione in base al codice WMO."""
if (71 <= code <= 77) or code in [85, 86]:
return "❄️ Neve"
if code in [96, 99]:
return "⚡🌨 Grandine"
if code in [66, 67]:
return "🧊☔ Pioggia Congelantesi"
return "☔ Pioggia"
def get_intensity_label(mm_h):
if mm_h < 2.5:
return "Debole"
if mm_h < 7.6:
return "Moderata"
return "Forte ⚠️"
def analyze_daily_events(times, codes, probs, precip, winds, temps, dewpoints, snowfalls=None, rains=None, soil_temps=None, cloud_covers=None, wind_speeds=None):
"""Scansiona le 24 ore e trova blocchi di eventi continui."""
events = []
# Prepara array per calcoli avanzati (allineato a check_ghiaccio.py)
if snowfalls is None:
snowfalls = [0.0] * len(times)
if rains is None:
rains = [0.0] * len(times)
if soil_temps is None:
soil_temps = [None] * len(times)
if cloud_covers is None:
cloud_covers = [None] * len(times)
if wind_speeds is None:
wind_speeds = [None] * len(times)
# Calcola precipitazioni cumulative delle 3h precedenti per ogni punto
precip_3h_sum = []
rain_3h_sum = []
snow_3h_sum = []
for i in range(len(times)):
# Somma delle 3 ore precedenti (i-3, i-2, i-1)
start_idx = max(0, i - 3)
precip_sum = sum([float(p) if p is not None else 0.0 for p in precip[start_idx:i]])
rain_sum = sum([float(r) if r is not None else 0.0 for r in rains[start_idx:i]])
snow_sum = sum([float(s) if s is not None else 0.0 for s in snowfalls[start_idx:i]])
precip_3h_sum.append(precip_sum)
rain_3h_sum.append(rain_sum)
snow_3h_sum.append(snow_sum)
# 1. PERICOLI (Ghiaccio, Gelo, Brina) - Logica migliorata allineata a check_ghiaccio.py
in_ice = False
start_ice = 0
ice_type = ""
for i in range(len(times)):
t = temps[i] if i < len(temps) and temps[i] is not None else 10
d = dewpoints[i] if i < len(dewpoints) and dewpoints[i] is not None else t
p = precip[i] if i < len(precip) and precip[i] is not None else 0
c = codes[i] if i < len(codes) and codes[i] is not None else 0
if c is not None:
try:
c = int(c)
except (ValueError, TypeError):
c = 0
else:
c = 0
# Estrai parametri avanzati
t_soil = soil_temps[i] if i < len(soil_temps) and soil_temps[i] is not None else None
cloud = cloud_covers[i] if i < len(cloud_covers) and cloud_covers[i] is not None else None
wind = wind_speeds[i] if i < len(wind_speeds) and wind_speeds[i] is not None else None
snowfall_curr = snowfalls[i] if i < len(snowfalls) and snowfalls[i] is not None else 0.0
rain_curr = rains[i] if i < len(rains) and rains[i] is not None else 0.0
# Determina se è notte (18:00-06:00) per raffreddamento radiativo
try:
hour = int(times[i].split("T")[1].split(":")[0]) if "T" in times[i] else 12
is_night = (hour >= 18) or (hour <= 6)
except:
is_night = False
# Calcola temperatura suolo: usa valore misurato se disponibile, altrimenti stima (1-2°C più fredda)
if t_soil is None:
t_soil = t - 1.5 # Approssimazione conservativa
# Applica raffreddamento radiativo: cielo sereno + notte + vento debole
# Riduce la temperatura del suolo di 0.5-1.5°C (come in check_ghiaccio.py)
t_soil_adjusted = t_soil
if is_night and cloud is not None and cloud < 20.0:
if wind is None or wind < 5.0:
cooling = 1.5 # Vento molto debole = più raffreddamento
elif wind < 10.0:
cooling = 1.0
else:
cooling = 0.5
t_soil_adjusted = t_soil - cooling
# Precipitazioni nelle 3h precedenti
p_3h = precip_3h_sum[i] if i < len(precip_3h_sum) else 0.0
r_3h = rain_3h_sum[i] if i < len(rain_3h_sum) else 0.0
s_3h = snow_3h_sum[i] if i < len(snow_3h_sum) else 0.0
# LOGICA MIGLIORATA (allineata a check_ghiaccio.py):
current_ice_condition = None
# 1. GELICIDIO (Freezing Rain) - priorità massima
is_raining_code = (50 <= c <= 69) or (80 <= c <= 82)
if c in [66, 67] or (p > 0 and t <= 0 and is_raining_code):
current_ice_condition = "🧊☠️ GELICIDIO"
# 2. Black Ice o Neve Ghiacciata - Precipitazione nelle 3h precedenti + suolo gelato
elif p_3h > 0.1 and t_soil_adjusted < 0.0:
# Distingue tra neve e pioggia
has_snow = (s_3h > 0.1) or (snowfall_curr > 0.1)
has_rain = (r_3h > 0.1) or (rain_curr > 0.1)
if has_snow:
current_ice_condition = "⛸️⚠️ Neve ghiacciata (suolo gelato)"
elif has_rain:
current_ice_condition = "⛸️⚠️ Black Ice (strada bagnata + suolo gelato)"
else:
current_ice_condition = "⛸️⚠️ Black Ice (strada bagnata + suolo gelato)"
# 3. BRINA (Hoar Frost) - Suolo <= 0°C e punto di rugiada > suolo ma < 0°C
elif p_3h <= 0.1 and t_soil_adjusted <= 0.0 and d is not None:
if d > t_soil_adjusted and d < 0.0:
current_ice_condition = "⛸️⚠️ GHIACCIO/BRINA"
# 4. GELATA - Temperatura aria < 0°C (senza altre condizioni)
elif t < 0:
current_ice_condition = "🧊 Gelata"
if current_ice_condition and not in_ice:
in_ice = True
start_ice = i
ice_type = current_ice_condition
elif (not current_ice_condition and in_ice) or (in_ice and current_ice_condition != ice_type) or (in_ice and i == len(times)-1):
end_idx = i if not current_ice_condition else i
if end_idx > start_ice:
start_time = times[start_ice].split("T")[1][:5]
end_time = times[min(end_idx, len(times)-1)].split("T")[1][:5]
temp_block = temps[start_ice:min(end_idx+1, len(temps))]
temp_block_clean = [t for t in temp_block if t is not None]
min_t = min(temp_block_clean) if temp_block_clean else 0
# Per GHIACCIO/BRINA, verifica che la temperatura minima sia effettivamente sotto/sopra soglia critica
# Se la temperatura minima è > 1.5°C, non è un rischio reale
if ice_type == "⛸️⚠️ GHIACCIO/BRINA" and min_t > 1.5:
# Non segnalare se la temperatura minima è troppo alta
pass
else:
events.append(f"{ice_type}: {start_time}-{end_time} (Min: {min_t:.0f}°C)")
in_ice = False
if current_ice_condition:
in_ice = True
start_ice = i
ice_type = current_ice_condition
# 2. PRECIPITAZIONI
in_rain = False
start_idx = 0
current_rain_type = ""
for i in range(len(times)):
p_val = precip[i] if i < len(precip) and precip[i] is not None else 0
is_raining = p_val >= MIN_MM_PER_EVENTO
if is_raining and not in_rain:
in_rain = True
start_idx = i
code_val = codes[i] if i < len(codes) and codes[i] is not None else 0
try:
code_val = int(code_val) if code_val is not None else 0
except (ValueError, TypeError):
code_val = 0
current_rain_type = get_precip_type(code_val)
elif in_rain and is_raining and i < len(codes):
code_val = codes[i] if codes[i] is not None else 0
try:
code_val = int(code_val) if code_val is not None else 0
except (ValueError, TypeError):
code_val = 0
new_type = get_precip_type(code_val)
if new_type != current_rain_type:
end_idx = i
block_precip = precip[start_idx:end_idx] if end_idx <= len(precip) else precip[start_idx:]
block_precip_clean = [p for p in block_precip if p is not None]
tot_mm = sum(block_precip_clean)
prob_block = probs[start_idx:end_idx] if end_idx <= len(probs) else probs[start_idx:]
prob_block_clean = [p for p in prob_block if p is not None]
max_prob = max(prob_block_clean) if prob_block_clean else 0
start_time = times[start_idx].split("T")[1][:5]
end_time = times[end_idx].split("T")[1][:5] if end_idx < len(times) else times[-1].split("T")[1][:5]
avg_intensity = tot_mm / len(block_precip) if block_precip else 0
events.append(
f"{current_rain_type} ({get_intensity_label(avg_intensity)}):\n"
f" 🕒 {start_time}-{end_time} | 💧 {tot_mm:.1f}mm | Prob: {max_prob}%"
)
start_idx = i
current_rain_type = new_type
elif (not is_raining and in_rain) or (in_rain and i == len(times)-1):
in_rain = False
end_idx = i if not is_raining else i + 1
block_precip = precip[start_idx:end_idx] if end_idx <= len(precip) else precip[start_idx:]
block_precip_clean = [p for p in block_precip if p is not None]
tot_mm = sum(block_precip_clean)
if tot_mm > 0:
prob_block = probs[start_idx:end_idx] if end_idx <= len(probs) else probs[start_idx:]
prob_block_clean = [p for p in prob_block if p is not None]
max_prob = max(prob_block_clean) if prob_block_clean else 0
start_time = times[start_idx].split("T")[1][:5]
end_time = times[min(end_idx-1, len(times)-1)].split("T")[1][:5]
avg_intensity = tot_mm / len(block_precip) if block_precip else 0
events.append(
f"{current_rain_type} ({get_intensity_label(avg_intensity)}):\n"
f" 🕒 {start_time}-{end_time} | 💧 {tot_mm:.1f}mm | Prob: {max_prob}%"
)
# 3. VENTO
if winds:
wind_values = [w for w in winds if w is not None]
if wind_values:
max_wind = max(wind_values)
if max_wind > SOGLIA_VENTO_KMH:
try:
peak_idx = winds.index(max_wind)
except ValueError:
peak_idx = 0
peak_time = times[min(peak_idx, len(times)-1)].split("T")[1][:5]
events.append(f"💨 Vento Forte: Picco {max_wind:.0f}km/h alle {peak_time}")
return events
def generate_practical_advice(trend, transitions, events_summary, daily_data):
"""Genera consigli pratici basati sull'analisi meteo"""
advice = []
# Consigli basati su trend temperatura
if trend:
if trend["type"] == "fronte_freddo" and trend["intensity"] == "forte":
advice.append("❄️ <b>Fronte Freddo in Arrivo:</b> Preparati a temperature in calo significativo. Controlla riscaldamento, proteggi piante sensibili.")
elif trend["type"] == "fronte_caldo" and trend["intensity"] == "forte":
advice.append("🔥 <b>Ondata di Calore:</b> Temperature in aumento. Mantieni case fresche, idratazione importante, attenzione a persone fragili.")
elif trend["type"] == "raffreddamento":
advice.append("🌡️ <b>Raffreddamento:</b> Temperature in calo graduale. Vestiti a strati, verifica isolamento porte/finestre.")
elif trend["type"] == "riscaldamento":
advice.append("☀️ <b>Riscaldamento:</b> Temperature in aumento. Buon momento per attività all'aperto, ventilazione naturale.")
# Consigli basati su transizioni meteo
significant_rain = any(t["to"] in ["pioggia", "neve", "temporale"] and t["significant"] for t in transitions[:3])
if significant_rain:
advice.append("🌧️ <b>Precipitazioni in Arrivo:</b> Prepara ombrelli/impermeabili. Evita viaggi non necessari durante picchi, controlla grondaie.")
# Consigli basati su eventi pericolosi
has_ice = any("GELICIDIO" in e or "GHIACCIO" in e for events in events_summary for e in events if events)
if has_ice:
advice.append("⚠️ <b>Rischio Ghiaccio:</b> Strade scivolose previste. Evita viaggi non urgenti, guida con estrema cautela, antigelo/sale pronti.")
# Consigli basati su vento forte
has_wind = any("Vento Forte" in e for events in events_summary for e in events if events)
if has_wind:
advice.append("💨 <b>Vento Forte:</b> Fissa oggetti in balcone/giardino, attenzione a rami, guidare con prudenza su strade esposte.")
# Consigli stagionali generali
if daily_data and len(daily_data) > 0:
first_day = daily_data[0]
if first_day.get("t_min", 15) < 5:
advice.append("🏠 <b>Gestione Domestica:</b> Temperature basse previste. Verifica caldaia, risparmio energetico con isolamento, attenzione a tubazioni esterne.")
elif first_day.get("precip_sum", 0) > 20:
advice.append("💧 <b>Piogge Intense:</b> Accumuli significativi previsti. Controlla drenaggi, pozzetti, evitare zone soggette ad allagamenti.")
return advice
def format_detailed_trend_explanation(trend, daily_data_list):
"""Genera spiegazione dettagliata del trend temperatura su 10 giorni"""
if not trend:
return ""
explanation = []
explanation.append(f"📊 <b>EVOLUZIONE TEMPERATURE (10 GIORNI)</b>\n")
# Trend principale con spiegazione chiara
trend_type = trend["type"]
intensity = trend["intensity"]
delta = trend['delta']
first_avg = trend['first_avg']
last_avg = trend['last_avg']
if trend_type == "fronte_caldo":
trend_desc = "🔥 <b>Fronte Caldo in Arrivo</b>"
desc_text = f"Arrivo di aria più calda: temperatura media passerà da {first_avg:.1f}°C a {last_avg:.1f}°C (+{delta:.1f}°C)."
elif trend_type == "fronte_freddo":
trend_desc = "❄️ <b>Fronte Freddo in Arrivo</b>"
desc_text = f"Arrivo di aria più fredda: temperatura media scenderà da {first_avg:.1f}°C a {last_avg:.1f}°C ({delta:+.1f}°C)."
elif trend_type == "riscaldamento":
trend_desc = "📈 <b>Riscaldamento Progressivo</b>"
desc_text = f"Tendenza al rialzo delle temperature: da {first_avg:.1f}°C a {last_avg:.1f}°C (+{delta:.1f}°C)."
elif trend_type == "raffreddamento":
trend_desc = "📉 <b>Raffreddamento Progressivo</b>"
desc_text = f"Tendenza al ribasso delle temperature: da {first_avg:.1f}°C a {last_avg:.1f}°C ({delta:+.1f}°C)."
elif trend_type == "stabile":
trend_desc = "➡️ <b>Temperature Stabili</b>"
desc_text = f"Temperature medie sostanzialmente stabili: da {first_avg:.1f}°C a {last_avg:.1f}°C (variazione {delta:+.1f}°C)."
else:
trend_desc = "🌡️ <b>Variazione Termica</b>"
desc_text = f"Evoluzione temperature: da {first_avg:.1f}°C a {last_avg:.1f}°C ({delta:+.1f}°C)."
intensity_text = " (variazione significativa)" if intensity == "forte" else " (variazione moderata)"
explanation.append(f"{trend_desc}{intensity_text}")
explanation.append(f"{desc_text}")
# Aggiungi solo picchi significativi in modo sintetico
if trend.get("change_days"):
significant_changes = [c for c in trend["change_days"] if abs(c['delta']) > 3.0][:3]
if significant_changes:
change_texts = []
for change in significant_changes:
day_name = f"Giorno {change['day']+1}"
direction = "" if change['delta'] > 0 else ""
change_texts.append(f"{direction} {day_name}: {change['from']:.0f}°→{change['to']:.0f}°C")
if change_texts:
explanation.append(f"Picchi: {', '.join(change_texts)}")
explanation.append("")
return "\n".join(explanation)
def format_weather_context_report(models_data, location_name, country_code):
"""Genera report contestuale intelligente con ensemble multi-modello"""
# Combina modelli a breve e lungo termine
merged_data = merge_multi_model_forecast(models_data, forecast_days=10)
if not merged_data:
return "❌ Errore: Nessun dato meteo disponibile"
hourly = merged_data.get('hourly', {})
daily = merged_data.get('daily', {})
models_used = merged_data.get('models_used', [])
if not daily or not daily.get('time'):
return "❌ Errore: Dati meteo incompleti"
msg_parts = []
# HEADER
models_text = " + ".join(models_used) if models_used else "Multi-modello"
msg_parts.append(f"🌍 <b>METEO FORECAST</b>")
msg_parts.append(f"{location_name.upper()}")
msg_parts.append(f"📡 <i>Ensemble: {models_text}</i>\n")
# ANALISI TREND TEMPERATURA (Fronti) - Completa su 10 giorni
daily_temps_max = daily.get('temperature_2m_max', [])
daily_temps_min = daily.get('temperature_2m_min', [])
trend = analyze_temperature_trend(daily_temps_max, daily_temps_min, days=10)
# Spiegazione dettagliata trend (sempre, anche se stabile)
if trend:
trend_explanation = format_detailed_trend_explanation(trend, daily_data_list=[])
if trend_explanation:
msg_parts.append(trend_explanation)
# ANALISI TRANSIZIONI METEO - Include anche precipitazioni prossimi giorni
daily_time_list = daily.get('time', []) # Definito qui per uso successivo
daily_weathercodes = daily.get('weathercode', [])
transitions = analyze_weather_transitions(daily_weathercodes)
# Cerca anche precipitazioni significative nei primi giorni
precip_list = daily.get('precipitation_sum', [])
weather_changes = []
# Aggiungi transizioni significative
if transitions:
significant_trans = [t for t in transitions if t.get("significant", False)]
for trans in significant_trans[:5]:
day_names = ["oggi", "domani", "dopodomani", "fra 3 giorni", "fra 4 giorni", "fra 5 giorni", "fra 6 giorni"]
day_idx = trans["day"] - 1
if day_idx < len(day_names):
day_ref = day_names[day_idx]
else:
day_ref = f"fra {trans['day']} giorni"
weather_changes.append({
"day": trans["day"],
"day_ref": day_ref,
"from": trans["from"],
"to": trans["to"],
"type": "transition"
})
# Aggiungi precipitazioni significative per domani/dopodomani se non già presenti
# Usa snowfall (più affidabile) per determinare il tipo (pioggia/neve/grandine)
# Prepara mappa hourly per giorni
daily_map = defaultdict(list)
times = hourly.get('time', [])
for i, t in enumerate(times):
daily_map[t.split("T")[0]].append(i)
for day_idx in range(min(3, len(precip_list))):
if day_idx >= len(precip_list) or precip_list[day_idx] is None:
continue
precip_amount = float(precip_list[day_idx])
day_num = day_idx + 1
# Verifica se già presente come transizione
already_present = any(wc["day"] == day_num for wc in weather_changes)
if already_present:
continue
# Ottieni dati hourly per questo giorno per determinare tipo precipitazione
day_date = daily_time_list[day_idx].split("T")[0] if day_idx < len(daily_time_list) else None
if not day_date:
continue
indices = daily_map.get(day_date, [])
d_snow_day = [hourly.get('snowfall', [])[i] for i in indices if i < len(hourly.get('snowfall', []))]
d_codes_day = [hourly.get('weathercode', [])[i] for i in indices if i < len(hourly.get('weathercode', []))]
d_temps_day = [hourly.get('temperature_2m', [])[i] for i in indices if i < len(hourly.get('temperature_2m', []))]
d_dews_day = [hourly.get('dewpoint_2m', [])[i] for i in indices if i < len(hourly.get('dewpoint_2m', []))]
# Calcola accumulo neve per questo giorno
snow_sum_day = sum([float(s) for s in d_snow_day if s is not None]) if d_snow_day else 0.0
# Determina tipo precipitazione usando snowfall (priorità) o weathercode (fallback)
# NON inventiamo neve basandoci su temperatura - solo se snowfall>0 o weathercode indica neve
# PRIORITÀ: Se c'è neve (anche poca), il simbolo è sempre ❄️, anche se la pioggia è maggiore
precip_type_symbol = "💧" # Default pioggia
threshold_mm = 5.0 # Soglia default per pioggia
if precip_amount > 0.1:
# Se snowfall è disponibile e positivo, usa quello (più preciso)
if snow_sum_day > 0.1:
# Se c'è neve (anche poca), il simbolo è sempre ❄️ (priorità alla neve)
precip_type_symbol = "❄️" # Neve
threshold_mm = 0.5 # Soglia più bassa per neve (anche pochi mm sono significativi)
else:
# Fallback: verifica weathercode per neve esplicita
# Solo se i modelli indicano esplicitamente neve nei codici WMO
snow_codes = [71, 73, 75, 77, 85, 86] # Codici WMO per neve
hail_codes = [96, 99] # Codici WMO per grandine/temporale
snow_count = sum(1 for c in d_codes_day if c is not None and int(c) in snow_codes)
hail_count = sum(1 for c in d_codes_day if c is not None and int(c) in hail_codes)
if hail_count > 0:
precip_type_symbol = "⛈️" # Grandine/Temporale
threshold_mm = 5.0
elif snow_count > 0:
# Solo se weathercode indica esplicitamente neve
precip_type_symbol = "❄️" # Neve
threshold_mm = 0.5 # Soglia più bassa per neve
# Aggiungi solo se supera la soglia appropriata
if precip_amount > threshold_mm:
day_names = ["oggi", "domani", "dopodomani"]
if day_idx < len(day_names):
weather_changes.append({
"day": day_num,
"day_ref": day_names[day_idx],
"from": "variabile",
"to": "precipitazioni",
"type": "precip",
"amount": precip_amount,
"precip_symbol": precip_type_symbol
})
if weather_changes:
# Ordina per giorno
weather_changes.sort(key=lambda x: x["day"])
msg_parts.append("🔄 <b>CAMBIAMENTI METEO SIGNIFICATIVI</b>")
for wc in weather_changes[:5]:
if wc["type"] == "transition":
from_icon = "☀️" if wc["from"] == "sereno" else "☁️" if wc["from"] == "nuvoloso" else "🌧️"
to_icon = "🌧️" if "pioggia" in wc["to"] else "❄️" if "neve" in wc["to"] else "⛈️" if "temporale" in wc["to"] else "☁️"
msg_parts.append(f" {from_icon}{to_icon} <b>{wc['day_ref']}:</b> {wc['from']}{wc['to']}")
else:
# Precipitazioni - usa simbolo appropriato
precip_sym = wc.get('precip_symbol', '💧')
msg_parts.append(f" ☁️→{precip_sym} <b>{wc['day_ref']}:</b> precipitazioni ({wc['amount']:.1f}mm)")
msg_parts.append("")
# DETTAGLIO GIORNALIERO
# Usa i dati daily come riferimento principale (sono più affidabili)
# daily_time_list già definito sopra
temp_min_list = daily.get('temperature_2m_min', [])
temp_max_list = daily.get('temperature_2m_max', [])
# Limita ai giorni per cui abbiamo dati daily validi
max_days = min(len(daily_time_list), len(temp_min_list), len(temp_max_list), 10)
# Mappa hourly per eventi dettagliati
daily_map = defaultdict(list)
times = hourly.get('time', [])
for i, t in enumerate(times):
daily_map[t.split("T")[0]].append(i)
events_summary = []
daily_details = []
for count in range(max_days):
day_date = daily_time_list[count].split("T")[0] if count < len(daily_time_list) else None
if not day_date:
break
# Ottieni indici hourly per questo giorno
indices = daily_map.get(day_date, [])
# Estrai dati hourly per questo giorno (se disponibili)
d_times = [hourly['time'][i] for i in indices if i < len(hourly.get('time', []))]
d_codes = [hourly.get('weathercode', [])[i] for i in indices if i < len(hourly.get('weathercode', []))]
d_probs = [hourly.get('precipitation_probability', [])[i] for i in indices if i < len(hourly.get('precipitation_probability', []))]
d_precip = [hourly.get('precipitation', [])[i] for i in indices if i < len(hourly.get('precipitation', []))]
d_snow = [hourly.get('snowfall', [])[i] for i in indices if i < len(hourly.get('snowfall', []))]
d_winds = [hourly.get('windspeed_10m', [])[i] for i in indices if i < len(hourly.get('windspeed_10m', []))]
d_winddir = [hourly.get('winddirection_10m', [])[i] for i in indices if i < len(hourly.get('winddirection_10m', []))]
d_temps = [hourly.get('temperature_2m', [])[i] for i in indices if i < len(hourly.get('temperature_2m', []))]
d_dews = [hourly.get('dewpoint_2m', [])[i] for i in indices if i < len(hourly.get('dewpoint_2m', []))]
d_clouds = [hourly.get('cloud_cover', [])[i] for i in indices if i < len(hourly.get('cloud_cover', []))]
d_rains = [hourly.get('rain', [])[i] for i in indices if i < len(hourly.get('rain', []))]
d_soil_temps = [hourly.get('soil_temperature_0cm', [])[i] for i in indices if i < len(hourly.get('soil_temperature_0cm', []))]
d_snow_depth = [hourly.get('snow_depth', [])[i] for i in indices if i < len(hourly.get('snow_depth', []))]
# Usa dati daily come primario (più affidabili)
try:
t_min_val = temp_min_list[count] if count < len(temp_min_list) else None
t_max_val = temp_max_list[count] if count < len(temp_max_list) else None
# Se dati daily validi, usali; altrimenti calcola da hourly
if t_min_val is not None and t_max_val is not None:
t_min = float(t_min_val)
t_max = float(t_max_val)
elif d_temps and any(t is not None for t in d_temps):
temp_clean = [float(t) for t in d_temps if t is not None]
t_min = min(temp_clean)
t_max = max(temp_clean)
else:
# Se non ci sono dati, salta questo giorno
continue
# Usa dati daily per caratterizzazione precipitazioni
precip_list = daily.get('precipitation_sum', [])
snowfall_list = daily.get('snowfall_sum', [])
rain_list = daily.get('rain_sum', [])
showers_list = daily.get('showers_sum', [])
if count < len(precip_list) and precip_list[count] is not None:
precip_sum = float(precip_list[count])
elif d_precip and any(p is not None for p in d_precip):
precip_sum = sum([float(p) for p in d_precip if p is not None])
else:
precip_sum = 0.0
# Caratterizza precipitazioni usando dati daily
snowfall_sum = 0.0
rain_sum = 0.0
showers_sum = 0.0
if count < len(snowfall_list) and snowfall_list[count] is not None:
snowfall_sum = float(snowfall_list[count])
elif d_snow and any(s is not None for s in d_snow):
snowfall_sum = sum([float(s) for s in d_snow if s is not None])
if count < len(rain_list) and rain_list[count] is not None:
rain_sum = float(rain_list[count])
if count < len(showers_list) and showers_list[count] is not None:
showers_sum = float(showers_list[count])
wind_list = daily.get('windspeed_10m_max', [])
if count < len(wind_list) and wind_list[count] is not None:
wind_max = float(wind_list[count])
elif d_winds and any(w is not None for w in d_winds):
wind_max = max([float(w) for w in d_winds if w is not None])
else:
wind_max = 0.0
# Calcola direzione vento dominante
wind_dir_deg = None
wind_dir_list = daily.get('winddirection_10m_dominant', [])
if count < len(wind_dir_list) and wind_dir_list[count] is not None:
wind_dir_deg = float(wind_dir_list[count])
elif d_winddir and any(wd is not None for wd in d_winddir):
# Media delle direzioni vento del giorno
wind_dir_clean = [float(wd) for wd in d_winddir if wd is not None]
if wind_dir_clean:
# Calcola media circolare
import math
sin_sum = sum(math.sin(math.radians(wd)) for wd in wind_dir_clean)
cos_sum = sum(math.cos(math.radians(wd)) for wd in wind_dir_clean)
wind_dir_deg = math.degrees(math.atan2(sin_sum / len(wind_dir_clean), cos_sum / len(wind_dir_clean)))
if wind_dir_deg < 0:
wind_dir_deg += 360
wind_dir_cardinal = degrees_to_cardinal(int(wind_dir_deg)) if wind_dir_deg is not None else "N"
except (ValueError, TypeError, IndexError) as e:
# Se ci sono errori nei dati, salta questo giorno
continue
events_list = analyze_daily_events(d_times, d_codes, d_probs, d_precip, d_winds, d_temps, d_dews,
snowfalls=d_snow, rains=d_rains, soil_temps=d_soil_temps,
cloud_covers=d_clouds, wind_speeds=d_winds)
events_summary.append(events_list)
dt = datetime.datetime.strptime(day_date, "%Y-%m-%d")
# Nomi giorni in italiano
giorni_ita = ["Lun", "Mar", "Mer", "Gio", "Ven", "Sab", "Dom"]
day_str = f"{giorni_ita[dt.weekday()]} {dt.strftime('%d/%m')}"
# Icona meteo principale basata sul weathercode del giorno
wcode = daily.get('weathercode', [])[count] if count < len(daily.get('weathercode', [])) else None
if wcode is None and d_codes:
# Se non c'è weathercode daily, usa il più frequente tra gli hourly
codes_clean = [int(c) for c in d_codes if c is not None]
if codes_clean:
wcode = Counter(codes_clean).most_common(1)[0][0]
else:
wcode = 0
else:
wcode = int(wcode) if wcode is not None else 0
# Calcola probabilità e tipo precipitazione per il giorno (PRIMA di determinare icona)
precip_prob = 0
precip_type = None
if d_probs and any(p is not None for p in d_probs):
prob_values = [p for p in d_probs if p is not None]
precip_prob = max(prob_values) if prob_values else 0
# Determina tipo precipitazione usando dati daily (più affidabili)
# Usa snowfall_sum, rain_sum, showers_sum per caratterizzazione precisa
# PRIORITÀ: Se snow_depth > 0 (modello italia_meteo_arpae_icon_2i), usa questi dati per determinare neve
precip_type = None
# Verifica snow_depth (per modello italia_meteo_arpae_icon_2i quando snow_depth > 0)
# NOTA: I valori sono già convertiti in cm durante il recupero dall'API
has_snow_depth_data = False
max_snow_depth = 0.0
if d_snow_depth and len(d_snow_depth) > 0:
snow_depth_valid = []
for sd in d_snow_depth:
if sd is not None:
try:
val_cm = float(sd) # Già in cm
if val_cm >= 0:
snow_depth_valid.append(val_cm)
except (ValueError, TypeError):
continue
if snow_depth_valid:
max_snow_depth = max(snow_depth_valid)
if max_snow_depth > 0: # Se snow_depth > 0 cm, considera presenza di neve persistente
has_snow_depth_data = True
# Verifica snow_depth INDIPENDENTEMENTE dalle precipitazioni
# snow_depth rappresenta il manto nevoso persistente, non la neve che cade
if has_snow_depth_data and max_snow_depth > 0:
# C'è manto nevoso al suolo (indipendente da snowfall)
# Questo influenza l'icona meteo, ma non il tipo di precipitazione se non sta nevicando
pass # Gestito separatamente per l'icona meteo
if precip_sum > 0.1:
# Priorità 1: Se sta nevicando (snowfall > 0) e c'è manto nevoso, considera entrambi
if has_snow_depth_data and max_snow_depth > 0:
# C'è sia neve in caduta che manto nevoso persistente
if rain_sum > 0.1 or showers_sum > 0.1:
precip_type = "mixed" # Neve + pioggia/temporali
elif snowfall_sum > 0.1:
precip_type = "snow" # Neve in caduta + manto nevoso
else:
# Solo manto nevoso persistente, nessuna neve in caduta
# Il tipo di precipitazione resta quello basato su snowfall/rain
pass
# Priorità 2: Usa dati daily se disponibili
elif snowfall_sum > 0.1:
# C'è neve significativa
if snowfall_sum >= precip_sum * 0.5:
precip_type = "snow"
elif rain_sum > 0.1 or showers_sum > 0.1:
# Mista (neve + pioggia/temporali)
precip_type = "mixed"
else:
precip_type = "snow"
elif rain_sum > 0.1:
# Pioggia
if showers_sum > 0.1:
# Temporali (showers)
precip_type = "thunderstorms"
else:
precip_type = "rain"
elif showers_sum > 0.1:
# Solo temporali
precip_type = "thunderstorms"
else:
# Fallback: usa dati hourly se daily non disponibili
snow_sum_day = sum([float(s) for s in d_snow if s is not None]) if d_snow else 0.0
if snow_sum_day > 0.1:
if snow_sum_day >= precip_sum * 0.5:
precip_type = "snow"
else:
precip_type = "rain"
else:
# Fallback: verifica weathercode per neve esplicita
snow_codes = [71, 73, 75, 77, 85, 86] # Codici WMO per neve
rain_codes = [51, 53, 55, 56, 57, 61, 63, 65, 80, 81, 82, 66, 67] # Codici WMO per pioggia
hail_codes = [96, 99] # Codici WMO per grandine/temporale
snow_count = sum(1 for c in d_codes if c is not None and int(c) in snow_codes)
rain_count = sum(1 for c in d_codes if c is not None and int(c) in rain_codes)
hail_count = sum(1 for c in d_codes if c is not None and int(c) in hail_codes)
if hail_count > 0:
precip_type = "hail"
elif snow_count > rain_count:
precip_type = "snow"
else:
precip_type = "rain"
# Determina icona basandosi su precipitazioni (priorità) e poi nuvolosità/weathercode
# Usa precip_type già calcolato
# NOTA: snow_depth è INDIPENDENTE da precipitazioni - influenza l'icona anche senza nevicate
has_precip = precip_sum > 0.1
if has_precip:
# Precipitazioni: usa precip_type per determinare icona
if precip_type == "snow":
weather_icon = "❄️" # Neve
elif precip_type == "thunderstorms" or precip_type == "hail" or wcode in (95, 96, 99):
weather_icon = "⛈️" # Temporale/Grandine
elif precip_type == "mixed":
weather_icon = "🌨️" # Precipitazione mista
else:
weather_icon = "🌧️" # Pioggia
elif has_snow_depth_data and max_snow_depth > 0:
# C'è manto nevoso persistente anche senza precipitazioni
# Mostra icona neve anche se non sta nevicando
weather_icon = "❄️" # Manto nevoso presente
elif t_min < 0:
# Giorno freddo (t_min < 0): usa icona ghiaccio (indipendentemente da snow_depth)
# Se c'è anche snow_depth, viene già gestito sopra
weather_icon = "🧊" # Gelo/Ghiaccio (giorno freddo)
elif wcode in (45, 48):
weather_icon = "🌫️" # Nebbia
else:
# Nessuna precipitazione: usa nuvolosità se disponibile, altrimenti weathercode
avg_cloud = 0
if d_clouds and any(c is not None for c in d_clouds):
cloud_clean = [float(c) for c in d_clouds if c is not None]
avg_cloud = sum(cloud_clean) / len(cloud_clean) if cloud_clean else 0
if avg_cloud > 0:
# Usa nuvolosità media
if avg_cloud <= 25:
weather_icon = "☀️" # Sereno
elif avg_cloud <= 50:
weather_icon = "" # Parzialmente nuvoloso
elif avg_cloud <= 75:
weather_icon = "☁️" # Nuvoloso
else:
weather_icon = "☁️" # Molto nuvoloso
else:
# Fallback a weathercode
if wcode in (0, 1):
weather_icon = "☀️" # Sereno
elif wcode in (2, 3):
weather_icon = "" # Parzialmente nuvoloso
else:
weather_icon = "☁️" # Nuvoloso
# Recupera probabilità max daily se disponibile
prob_max_list = daily.get('precipitation_probability_max', [])
precip_prob_max = None
if count < len(prob_max_list) and prob_max_list[count] is not None:
precip_prob_max = int(prob_max_list[count])
elif precip_prob > 0:
precip_prob_max = int(precip_prob)
# Calcola spessore manto nevoso (snow_depth) per questo giorno
# Nota: snow_depth è disponibile solo per alcuni modelli (es. ICON Italia, ICON-D2)
# Se il modello non supporta snow_depth, tutti i valori saranno None o mancanti
snow_depth_min = None
snow_depth_max = None
snow_depth_avg = None
snow_depth_end = None
# Verifica se abbiamo dati snow_depth validi per questo giorno
# NOTA: I valori sono già convertiti in cm durante il recupero dall'API
# Estrai anche direttamente dall'array hourly per sicurezza (fallback se d_snow_depth è vuoto)
# PRIORITÀ: usa sempre i dati dall'array hourly per quel giorno (più affidabile)
all_snow_depth_values = []
hourly_snow_depth = hourly.get('snow_depth', [])
hourly_times = hourly.get('time', [])
# Cerca direttamente nell'array hourly per questo giorno usando i timestamp (più affidabile)
if hourly_snow_depth and hourly_times and day_date:
for idx, ts in enumerate(hourly_times):
if ts.startswith(day_date) and idx < len(hourly_snow_depth):
if hourly_snow_depth[idx] is not None:
all_snow_depth_values.append(hourly_snow_depth[idx])
# Se non trovato con timestamp, usa d_snow_depth come fallback
if not all_snow_depth_values and d_snow_depth and len(d_snow_depth) > 0:
all_snow_depth_values = d_snow_depth
if all_snow_depth_values and len(all_snow_depth_values) > 0:
# Filtra solo valori validi (>= 0 e non null, già in cm)
snow_depth_clean = []
has_valid_data = False
for sd in all_snow_depth_values:
if sd is not None:
has_valid_data = True # Almeno un dato non-null presente
try:
val_cm = float(sd) # Già in cm
if val_cm >= 0: # Solo valori non negativi
snow_depth_clean.append(val_cm)
except (ValueError, TypeError):
continue
# Calcola statistiche se abbiamo almeno un valore non-null (il modello supporta snow_depth)
# snow_depth è INDIPENDENTE da snowfall: rappresenta il manto nevoso persistente al suolo
# Mostriamo sempre quando disponibile e > 0, anche se non nevica
if has_valid_data and snow_depth_clean:
max_depth = max(snow_depth_clean)
min_depth = min(snow_depth_clean)
# Calcola sempre le statistiche se ci sono dati validi, anche se il valore è piccolo
if max_depth > 0: # Mostra se almeno un valore > 0 cm
snow_depth_min = min_depth
snow_depth_max = max_depth
snow_depth_avg = sum(snow_depth_clean) / len(snow_depth_clean)
# Prendi l'ultimo valore non-null del giorno (spessore alla fine del giorno)
# Ordina per valore per trovare l'ultimo valore del giorno (non necessariamente l'ultimo della lista)
snow_depth_end = snow_depth_clean[-1] if snow_depth_clean else None
# Preferisci l'ultimo valore non-null originale per avere il valore alla fine del giorno
if all_snow_depth_values:
for sd in reversed(all_snow_depth_values):
if sd is not None:
try:
val_cm = float(sd)
if val_cm > 0:
snow_depth_end = val_cm
break
except (ValueError, TypeError):
continue
day_info = {
"day_str": day_str,
"t_min": t_min,
"t_max": t_max,
"precip_sum": precip_sum,
"precip_prob": precip_prob_max if precip_prob_max is not None else precip_prob,
"precip_type": precip_type,
"snowfall_sum": snowfall_sum,
"rain_sum": rain_sum,
"showers_sum": showers_sum,
"wind_max": wind_max,
"wind_dir": wind_dir_cardinal,
"events": events_list,
"weather_icon": weather_icon,
"snow_depth_min": snow_depth_min,
"snow_depth_max": snow_depth_max,
"snow_depth_avg": snow_depth_avg,
"snow_depth_end": snow_depth_end
}
daily_details.append(day_info)
count += 1
# Formatta dettagli giornalieri (tutti i giorni disponibili)
msg_parts.append("📅 <b>PREVISIONI GIORNALIERE</b>")
prev_snow_depth_end = None # Traccia lo spessore del giorno precedente per mostrare evoluzione
for day_info in daily_details:
line = f"{day_info['weather_icon']} <b>{day_info['day_str']}</b> 🌡️ {day_info['t_min']:.0f}°/{day_info['t_max']:.0f}°C"
# Aggiungi informazioni precipitazioni con caratterizzazione dettagliata
# Nota: mm è accumulo totale giornaliero (somma di tutte le ore)
if day_info['precip_sum'] > 0.1:
# Caratterizza usando dati daily se disponibili
precip_parts = []
# Neve
if day_info.get('snowfall_sum', 0) > 0.1:
precip_parts.append(f"❄️ {day_info['snowfall_sum']:.1f}cm")
# Pioggia
if day_info.get('rain_sum', 0) > 0.1:
precip_parts.append(f"🌧️ {day_info['rain_sum']:.1f}mm")
# Temporali (showers)
if day_info.get('showers_sum', 0) > 0.1:
precip_parts.append(f"⛈️ {day_info['showers_sum']:.1f}mm")
# Se non abbiamo dati daily dettagliati, usa il tipo generale
if not precip_parts:
precip_symbol = "❄️" if day_info['precip_type'] == "snow" else "⛈️" if day_info['precip_type'] in ("hail", "thunderstorms") else "🌨️" if day_info['precip_type'] == "mixed" else "🌧️"
precip_parts.append(f"{precip_symbol} {day_info['precip_sum']:.1f}mm")
line += f" | {' + '.join(precip_parts)}"
# Aggiungi probabilità se disponibile
if day_info['precip_prob'] and day_info['precip_prob'] > 0:
line += f" ({int(day_info['precip_prob'])}%)"
elif day_info['precip_prob'] > 50:
# Probabilità alta ma nessuna precipitazione prevista (può essere un errore del modello)
line += f" | 💧 Possibile ({int(day_info['precip_prob'])}%)"
# Aggiungi vento (sempre se disponibile, formattato come direzione intensità)
if day_info['wind_max'] > 0:
wind_str = f"{day_info['wind_dir']} {day_info['wind_max']:.0f}km/h"
line += f" | 💨 {wind_str}"
msg_parts.append(line)
# Mostra spessore manto nevoso se disponibile e > 0
# snow_depth è INDIPENDENTE da snowfall: rappresenta il manto nevoso persistente al suolo
# Deve essere sempre mostrato quando disponibile, anche nei giorni senza nevicate
# Se snow_depth_end è None ma ci sono dati validi, ricalcola
snow_depth_end = day_info.get('snow_depth_end')
if snow_depth_end is None:
# Prova a ricalcolare da snow_depth_min/max/avg se disponibili
snow_depth_max = day_info.get('snow_depth_max')
snow_depth_avg = day_info.get('snow_depth_avg')
if snow_depth_max is not None and snow_depth_max > 0:
snow_depth_end = snow_depth_max # Usa il massimo come fallback
elif snow_depth_avg is not None and snow_depth_avg > 0:
snow_depth_end = snow_depth_avg # Usa la media come fallback
if snow_depth_end is not None and snow_depth_end > 0:
snow_depth_str = f"❄️ Manto nevoso: {snow_depth_end:.1f} cm"
# Mostra evoluzione rispetto al giorno precedente
if prev_snow_depth_end is not None:
diff = snow_depth_end - prev_snow_depth_end
if abs(diff) > 0.5: # Solo se variazione significativa
if diff > 0:
snow_depth_str += f" (↑ +{diff:.1f} cm)"
else:
snow_depth_str += f" (↓ {diff:.1f} cm)"
# Mostra range se c'è variazione significativa durante il giorno
snow_depth_min = day_info.get('snow_depth_min')
snow_depth_max = day_info.get('snow_depth_max')
if snow_depth_min is not None and snow_depth_max is not None:
if snow_depth_max - snow_depth_min > 1.0: # Variazione > 1cm durante il giorno
snow_depth_str += f" [range: {snow_depth_min:.1f}-{snow_depth_max:.1f} cm]"
msg_parts.append(f" {snow_depth_str}")
if day_info['events']:
for ev in day_info['events'][:3]: # Limita a 3 eventi principali
msg_parts.append(f"{ev}")
# Aggiorna per il prossimo giorno
prev_snow_depth_end = snow_depth_end if snow_depth_end is not None else prev_snow_depth_end
msg_parts.append("")
return "\n".join(msg_parts)
def send_telegram(text, chat_id, token, debug_mode=False):
if not token:
return False
url = f"https://api.telegram.org/bot{token}/sendMessage"
payload = {
"chat_id": chat_id,
"text": text,
"parse_mode": "HTML",
"disable_web_page_preview": True
}
try:
resp = requests.post(url, json=payload, timeout=15)
return resp.status_code == 200
except:
return False
def main():
parser = argparse.ArgumentParser()
parser.add_argument("query", nargs="?", default="casa")
parser.add_argument("--chat_id")
parser.add_argument("--debug", action="store_true")
parser.add_argument("--home", action="store_true")
parser.add_argument("--timezone", help="Timezone IANA (es: Europe/Rome, America/New_York)")
args = parser.parse_args()
token = get_bot_token()
debug_mode = args.debug
# Determina destinatari
if debug_mode:
recipients = [ADMIN_CHAT_ID]
elif args.chat_id:
recipients = [args.chat_id]
else:
recipients = TELEGRAM_CHAT_IDS
# Determina località
if args.home or (not args.query or args.query.lower() == "casa"):
lat, lon, name, cc = DEFAULT_LAT, DEFAULT_LON, DEFAULT_NAME, "SM"
else:
coords = get_coordinates(args.query)
if not coords[0]:
error_msg = f"❌ Località '{args.query}' non trovata."
if token:
for chat_id in recipients:
send_telegram(error_msg, chat_id, token, debug_mode)
else:
print(error_msg)
return
lat, lon, name, cc = coords
# Recupera dati multi-modello (breve + lungo termine) - selezione intelligente basata su country code
# Determina se è Casa
is_home = (abs(lat - DEFAULT_LAT) < 0.01 and abs(lon - DEFAULT_LON) < 0.01)
# Recupera dati multi-modello (breve + lungo termine)
# - Per Casa: usa AROME Seamless e ICON-D2
# - Per altre località: usa best match di Open-Meteo
short_models, long_models = choose_models_by_country(cc, is_home=is_home)
# Usa timezone personalizzata se fornita
timezone = args.timezone if hasattr(args, 'timezone') and args.timezone else None
models_data = get_weather_multi_model(lat, lon, short_models, long_models, forecast_days=10, timezone=timezone)
if not any(models_data.values()):
error_msg = "❌ Errore: Impossibile recuperare dati meteo."
if token:
for chat_id in recipients:
send_telegram(error_msg, chat_id, token, debug_mode)
else:
print(error_msg)
return
# Genera report
report = format_weather_context_report(models_data, name, cc)
if debug_mode:
report = f"🛠 <b>[DEBUG MODE]</b> 🛠\n\n{report}"
# Invia
if token:
success = False
for chat_id in recipients:
if send_telegram(report, chat_id, token, debug_mode):
success = True
if not success:
print("❌ Errore invio Telegram")
else:
print(report)
if __name__ == "__main__":
main()