Compare commits

..

6 Commits

11 changed files with 2740 additions and 912 deletions
+63 -23
View File
@@ -41,6 +41,7 @@ SEVERE_SCRIPT = os.path.join(SCRIPT_DIR, "severe_weather.py")
ICE_CHECK_SCRIPT = os.path.join(SCRIPT_DIR, "check_ghiaccio.py")
IRRIGATION_SCRIPT = os.path.join(SCRIPT_DIR, "smart_irrigation_advisor.py")
SNOW_RADAR_SCRIPT = os.path.join(SCRIPT_DIR, "snow_radar.py")
FOTOVOLTAICO_SCRIPT = os.path.join(SCRIPT_DIR, "fotovoltaico.py")
# FILE STATO VIAGGI
VIAGGI_STATE_FILE = os.path.join(SCRIPT_DIR, "viaggi_attivi.json")
@@ -152,24 +153,25 @@ def get_timezone_from_coords(lat: float, lon: float) -> str:
except Exception as e:
logger.warning(f"Errore timezonefinder: {e}")
# Fallback: stima timezone da longitudine (approssimativo)
# Ogni 15 gradi = 1 ora di differenza da UTC
# Fallback: stima da longitudine (approssimativo). Ordine importante: gli offset
# tipici delle Americhe (-4 NY, -7 Denver, …) rientrano in [-10, 2] e non devono
# essere classificati come Europa prima dei rami per le Americhe.
offset_hours = int(lon / 15)
# Mappatura approssimativa a timezone IANA
if -10 <= offset_hours <= 2: # Europa
return "Europe/Rome"
elif 3 <= offset_hours <= 5: # Medio Oriente
return "Asia/Dubai"
elif 6 <= offset_hours <= 8: # Asia centrale
return "Asia/Kolkata"
elif 9 <= offset_hours <= 11: # Asia orientale
return "Asia/Tokyo"
elif -5 <= offset_hours <= -3: # Americhe orientali
return "America/New_York"
elif -8 <= offset_hours <= -6: # Americhe occidentali
if -8 <= offset_hours <= -6:
return "America/Los_Angeles"
else:
return "UTC"
if -5 <= offset_hours <= -3:
return "America/New_York"
if lon < -30 and offset_hours <= -9:
return "America/Los_Angeles"
if 3 <= offset_hours <= 5:
return "Asia/Dubai"
if 6 <= offset_hours <= 8:
return "Asia/Kolkata"
if 9 <= offset_hours <= 11:
return "Asia/Tokyo"
if -10 <= offset_hours <= 2:
return "Europe/Rome"
return "UTC"
def add_viaggio(chat_id: str, location: str, lat: float, lon: float, name: str, timezone: Optional[str] = None) -> None:
"""Aggiunge o aggiorna un viaggio attivo per un chat_id (sovrascrive se esiste)"""
@@ -237,6 +239,7 @@ def restricted(func):
async def wrapped(update: Update, context: ContextTypes.DEFAULT_TYPE, *args, **kwargs):
user_id = update.effective_user.id
if user_id not in ALLOWED_IDS:
logger.warning("Comando da utente non in ALLOWED_IDS: user_id=%s", user_id)
return
return await func(update, context, *args, **kwargs)
return wrapped
@@ -248,7 +251,7 @@ async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
[InlineKeyboardButton("🛡️ Pi-hole", callback_data="menu_pihole"), InlineKeyboardButton("🌐 Rete", callback_data="menu_net")],
[InlineKeyboardButton("🌤️ Meteo Casa", callback_data="req_meteo_home"), InlineKeyboardButton("📜 Logs", callback_data="menu_logs")]
]
text = "🎛 **Loogle Control Center v8.1**\nComandi disponibili:\n🔹 `/meteo <città>`\n🔹 `/meteo7 <città>` (Previsione 7gg)\n🔹 Pulsanti sotto"
text = "🎛 **Loogle Control Center v8.1**\nComandi disponibili:\n🔹 `/meteo <città>`\n🔹 `/meteo7 <città>` (Previsione 7gg)\n🔹 `/fotovoltaico` (Previsione produzione FV)\n🔹 Pulsanti sotto"
if update.message: await update.message.reply_text(text, reply_markup=InlineKeyboardMarkup(keyboard), parse_mode="Markdown")
else: await update.callback_query.edit_message_text(text, reply_markup=InlineKeyboardMarkup(keyboard), parse_mode="Markdown")
@@ -365,6 +368,18 @@ async def snowradar_command(update: Update, context: ContextTypes.DEFAULT_TYPE)
# Avvia in background
subprocess.Popen(cmd, cwd=SCRIPT_DIR)
@restricted
async def fotovoltaico_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Comando /fotovoltaico: previsione produzione 72h future e curva 48h passate (ICON Italia)."""
chat_id = str(update.effective_chat.id)
cmd = ["python3", FOTOVOLTAICO_SCRIPT, "--telegram", "--chat_id", chat_id]
await update.message.reply_text(
"☀️ **Fotovoltaico**\n\n"
"Generazione curve previsione (72h future + 48h passate)... I grafici verranno inviati a breve.",
parse_mode="Markdown"
)
subprocess.Popen(cmd, cwd=SCRIPT_DIR)
@restricted
async def irrigazione_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Comando /irrigazione: consulente agronomico per gestione irrigazione"""
@@ -642,10 +657,11 @@ async def meteo_viaggio_command(update: Update, context: ContextTypes.DEFAULT_TY
await update.message.reply_text(f"❌ Località '{location}' non trovata. Verifica il nome e riprova.", parse_mode="Markdown")
return
lat, lon, name, cc = coords
lat, lon, name, cc, geo_tz = coords
# Ottieni timezone per questa localizzazione
timezone = get_timezone_from_coords(lat, lon)
# Fuso: Open-Meteo geocoding espone già timezone IANA; altrimenti timezonefinder / fallback
geo_tz_clean = geo_tz.strip() if isinstance(geo_tz, str) and geo_tz.strip() else ""
timezone = geo_tz_clean or get_timezone_from_coords(lat, lon)
# Conferma riconoscimento località
await update.message.reply_text(
@@ -741,13 +757,36 @@ async def meteo_viaggio_command(update: Update, context: ContextTypes.DEFAULT_TY
await update.message.reply_text(f"❌ Errore durante l'elaborazione: {str(e)}", parse_mode="Markdown")
async def scheduled_morning_report(context: ContextTypes.DEFAULT_TYPE) -> None:
# Esegui una sola chiamata e invia il report a tutti i chat_id
report = call_meteo_script(["--home"])
# Stesso comportamento di `/meteo` senza argomenti: Casa (+ viaggio se attivo) per utente.
report_casa = call_meteo_script(["--home"])
for uid in ALLOWED_IDS:
chat_id = str(uid)
try:
await context.bot.send_message(chat_id=uid, text=report, parse_mode="Markdown")
await context.bot.send_message(
chat_id=uid,
text=f"🏠 **Report Meteo - Casa**\n\n{report_casa}",
parse_mode="Markdown",
)
except Exception:
pass
viaggio_attivo = get_viaggio(chat_id)
if viaggio_attivo:
report_viaggio = call_meteo_script(
[
"--query",
viaggio_attivo["location"],
"--timezone",
viaggio_attivo.get("timezone", "Europe/Rome"),
]
)
try:
await context.bot.send_message(
chat_id=uid,
text=f"✈️ **Report Meteo - {viaggio_attivo['name']}**\n\n{report_viaggio}",
parse_mode="Markdown",
)
except Exception:
pass
@restricted
async def button_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
@@ -849,6 +888,7 @@ def main():
application.add_handler(CommandHandler("road", road_command))
application.add_handler(CommandHandler("irrigazione", irrigazione_command))
application.add_handler(CommandHandler("snowradar", snowradar_command))
application.add_handler(CommandHandler("fotovoltaico", fotovoltaico_command))
application.add_handler(CallbackQueryHandler(button_handler))
job_queue = application.job_queue
+2 -2
View File
@@ -201,11 +201,11 @@ def get_weather_data(lat, lon, model_slug, include_past_days=1):
# Modificati per ridurre falsi positivi mantenendo alta sensibilità
# =============================================================================
H_COLD_THR = 69.0 # hPa (Profondità minima strato freddo)
T_COLD_THR = 0.09 # °C (Temp max al suolo considerata 'fredda') - mantenuta bassa per evitare falsi negativi
T_COLD_THR = -0.5 # °C (Temp max al suolo considerata 'fredda') - abbassata da 0.09 per meno falsi positivi (solo quando è più gelido)
T_MELT_THR = 0.0 # °C (Temp min per considerare uno strato 'in fusione') - aumentata da -0.64°C a 0.0°C per ridurre falsi positivi mantenendo sensibilità
RH_MELT_THR = 89.0 # % (Umidità relativa minima nello strato di fusione)
PR_THR_6H = 0.39 # mm/6h
PR_THR_1H = 0.1 # mm/h - aumentata da 0.065 per richiedere precipitazione più significativa
PR_THR_1H = 0.25 # mm/h - alzata da 0.1 per richiedere precipitazione più significativa (meno allerte gelicidio)
# Differenza minima temperatura tra strato di fusione e suolo (per ridurre falsi positivi)
T_MELT_SURFACE_DIFF = 1.0 # °C - lo strato di fusione deve essere almeno 1°C più caldo del suolo (bilanciato tra riduzione falsi positivi e mantenimento sensibilità)
@@ -43,6 +43,7 @@ TOKEN_FILE_ETC = "/etc/telegram_dpc_bot_token"
# Zone target (come compaiono tipicamente nel testo del bollettino)
TARGET_ZONES = {
"EMR-B2": "Costa romagnola",
"EMR-B1": "Pianura romagnola",
"EMR-A2": "Alta collina romagnola",
"EMR-D1": "Pianura bolognese",
}
+816
View File
@@ -0,0 +1,816 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Fotovoltaico: analisi e previsione produzione con modello ICON Italia (ItaliaMeteo ARPAE).
- Previsione 72h future (Weather Forecast API)
- Previsione teorica 48h passate (Historical Forecast API) per confronto con produzione reale
- Produzione reale (opzionale) da API SolarEdge Monitoring
- Grafici per colpo d'occhio
Configurazione impianto:
- Inverter: 6 kW (SolarEdge)
- 22 pannelli Longi Solar (415 Wp ciascuno, totale 9,13 kWp)
- Orientamenti: vedi PANEL_GROUPS
SolarEdge: imposta SOLAREDGE_API_KEY e SOLAREDGE_SITE_ID (env o file .env / ~/.solaredge_fotovoltaico)
per mostrare la produzione reale sul grafico 48h passate.
"""
from __future__ import annotations
import argparse
import glob
import logging
import os
import sys
from datetime import datetime, timedelta
from io import BytesIO
from typing import Any, Dict, List, Optional, Tuple
import requests
from zoneinfo import ZoneInfo
# Grafici
import matplotlib
matplotlib.use("Agg")
import matplotlib.dates as mdates
import matplotlib.pyplot as plt
import numpy as np
from open_meteo_client import open_meteo_get
# -----------------------------------------------------------------------------
# Configurazione
# -----------------------------------------------------------------------------
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
TZ = "Europe/Rome"
TZINFO = ZoneInfo(TZ)
# Casa (stessa di meteo.py)
HOME_LAT = 43.9356
HOME_LON = 12.4296
# Impianto
INVERTER_KW = 6.0
PANEL_WP = 415.0 # Longi Solar, 415 Wp per modulo
NUM_PANELS = 22
P_PEAK_TOTAL_W = NUM_PANELS * PANEL_WP # 9130 Wp = 9,13 kWp
# Fisica termica (PVLib-style): i pannelli producono di più quando fa freddo (Wp @ 25°C).
TEMP_COEFF_POWER = -0.0035 # Longi Hi-MO ~ -0.35%/°C
NOCT = 45.0 # Nominal Operating Cell Temperature (°C)
SYSTEM_LOSSES = 0.10 # 10% perdite fisse (cavi, inverter, sporco) → PR eff. ~0.90
# Correzione empirica: impostabile via env PRODUCTION_CORRECTION_FACTOR o file.
# Se previsti < reali in modo sistematico, provare 1.21.35 (es. in ~/.solaredge_fotovoltaico).
PRODUCTION_CORRECTION_FACTOR = 1.0
# Gruppi pannelli: (tilt_deg, azimuth_compass_reale_deg, numero_pannelli).
# Azimut = bussola reale osservata (0=N, 90=E, 180=S, 270=W). Non usare i valori di monitoring.solaredge (spesso sbagliati).
# 1.1.1 a 72°; 1.1.2-10 a 158°; 1.2.1-2 a 253°; 1.2.3-10 + 1.2.12 a 72°; 1.2.11 a 190°. Due gruppi a 72° accorpati in uno.
PANEL_GROUPS = [
(34, 72, 10), # 1.1.1 + 1.2.3-10 + 1.2.12 (Est)
(34, 158, 9), # 1.1.2 - 1.1.10 (Sud-Sud-Est)
(34, 253, 2), # 1.2.1, 1.2.2 (Ovest-Sud-Ovest)
(34, 190, 1), # 1.2.11 (Sud-Sud-Ovest)
]
# API Open-Meteo: usiamo solo global_tilted_irradiance (W/m²) con &tilt= e &azimuth=.
# È l'irradianza reale sul piano del pannello (diretta+diffusa inclinate); non servono
# direct_radiation/diffuse_radiation separati. shortwave_radiation=GHI orizzontale non usato.
FORECAST_URL = "https://api.open-meteo.com/v1/forecast"
HISTORICAL_FORECAST_URL = "https://historical-forecast-api.open-meteo.com/v1/forecast"
MODEL_ICON_ITALIA = "italia_meteo_arpae_icon_2i"
MODEL_AROME_HD = "meteofrance_arome_france_hd" # Francia + limitrofi; fuori area può non restituire dati
HTTP_HEADERS = {"User-Agent": "loogle-bot-fotovoltaico/2.0"}
# Telegram
TOKEN_FILE_HOME = os.path.expanduser("~/.telegram_dpc_bot_token")
TOKEN_FILE_ETC = "/etc/telegram_dpc_bot_token"
# SolarEdge Monitoring API (opzionale: produzione reale sul grafico 48h)
# Chiave da https://monitoring.solaredge.com (Account → API Access); Site ID dal portale
SOLAREDGE_BASE = "https://monitoringapi.solaredge.com"
SOLAREDGE_SITE_ENERGY_PATH = "/site/{site_id}/energy"
SOLAREDGE_SITE_POWER_PATH = "/site/{site_id}/power"
DEFAULT_SOLAREDGE_SITE_ID = "3079750" # Usato se SOLAREDGE_SITE_ID non è impostato
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
)
logger = logging.getLogger(__name__)
def get_production_correction_factor() -> float:
"""Fattore di correzione produzione (default 1.0). Env: PRODUCTION_CORRECTION_FACTOR."""
s = (os.environ.get("PRODUCTION_CORRECTION_FACTOR") or "").strip()
if s:
try:
return max(0.1, min(3.0, float(s)))
except ValueError:
pass
for path in (
os.path.expanduser("~/.solaredge_fotovoltaico"),
os.path.join(SCRIPT_DIR, ".env"),
):
if not os.path.isfile(path):
continue
try:
with open(path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if line.startswith("PRODUCTION_CORRECTION_FACTOR="):
v = line.split("=", 1)[1].strip().strip("'\"")
if v:
return max(0.1, min(3.0, float(v)))
except Exception:
pass
return PRODUCTION_CORRECTION_FACTOR
def compass_to_open_meteo_azimuth(compass_deg: float) -> float:
"""Converte azimut compass (0=N, 90=E, 180=S, 270=W) in Open-Meteo (0=S, -90=E, 90=W, 180=N). Formula corretta: compass - 180 (non 180 - compass, altrimenti Est/Ovest si invertono)."""
return compass_deg - 180.0
def calculate_pv_power(gti: float, temp_air: float, n_panels: int) -> float:
"""Potenza DC (kW) del gruppo con fisica termica: freddo = più efficienza rispetto a STC (25°C)."""
if gti <= 0:
return 0.0
t_cell = temp_air + (gti / 800.0) * (NOCT - 20.0)
delta_t = t_cell - 25.0
temp_factor = 1.0 + (TEMP_COEFF_POWER * delta_t)
p_peak_group_kw = (n_panels * PANEL_WP) / 1000.0
p_dc = p_peak_group_kw * (gti / 1000.0) * temp_factor * (1.0 - SYSTEM_LOSSES)
return max(0.0, p_dc)
def fetch_gti_forecast(
lat: float,
lon: float,
tilt: float,
azimuth_om: float,
forecast_hours: int = 72,
past_hours: int = 0,
model: str = MODEL_ICON_ITALIA,
) -> Optional[Dict[str, Any]]:
"""Recupera GTI e temperature_2m dalla Weather Forecast API (modello specificabile)."""
params = {
"latitude": lat,
"longitude": lon,
"timezone": "auto",
"models": model,
"hourly": "global_tilted_irradiance,temperature_2m",
"tilt": tilt,
"azimuth": azimuth_om,
"forecast_hours": forecast_hours,
"past_hours": past_hours,
}
try:
r = open_meteo_get(
FORECAST_URL,
params=params,
headers=HTTP_HEADERS,
timeout=(8, 30),
)
if r.status_code != 200:
logger.warning("Forecast API status %s: %s", r.status_code, r.text[:300])
return None
return r.json()
except Exception as e:
logger.exception("Forecast API error: %s", e)
return None
def fetch_gti_historical(
lat: float,
lon: float,
tilt: float,
azimuth_om: float,
start_date: str,
end_date: str,
) -> Optional[Dict[str, Any]]:
"""Recupera GTI e temperature_2m dalla Historical Forecast API (previsione teorica passata)."""
params = {
"latitude": lat,
"longitude": lon,
"start_date": start_date,
"end_date": end_date,
"timezone": "auto",
"models": MODEL_ICON_ITALIA,
"hourly": "global_tilted_irradiance,temperature_2m",
"tilt": tilt,
"azimuth": azimuth_om,
}
try:
r = open_meteo_get(
HISTORICAL_FORECAST_URL,
params=params,
headers=HTTP_HEADERS,
timeout=(8, 45),
)
if r.status_code != 200:
logger.warning("Historical Forecast API status %s: %s", r.status_code, r.text[:300])
return None
return r.json()
except Exception as e:
logger.exception("Historical Forecast API error: %s", e)
return None
def production_from_groups(
hourly_times: List[str],
gti_per_group: List[List[Optional[float]]],
temp_per_hour: List[Optional[float]],
panel_counts: List[int],
) -> Tuple[List[float], List[float]]:
"""
Calcola la produzione per ora: ogni gruppo con GTI + temperatura aria; fisica termica
(freddo = più efficienza). Produzione totale = SOMMA gruppi, cap inverter, poi fattore correzione.
"""
total_panels = sum(panel_counts)
if total_panels != NUM_PANELS:
logger.warning(
"Somma pannelli per gruppo = %s (attesi %s); verifica PANEL_GROUPS",
total_panels, NUM_PANELS,
)
n = len(hourly_times)
gti_equivalent = []
power_kw = []
correction = get_production_correction_factor()
for i in range(n):
temp = temp_per_hour[i] if i < len(temp_per_hour) and temp_per_hour[i] is not None else 15.0
p_dc_total = 0.0
gti_sum_weighted = 0.0
for gti_list, n_panels in zip(gti_per_group, panel_counts):
if gti_list is not None and i < len(gti_list) and gti_list[i] is not None:
gti_w = gti_list[i]
p_dc_total += calculate_pv_power(gti_w, temp, n_panels)
gti_sum_weighted += gti_w * (n_panels / total_panels)
p_ac_kw = min(p_dc_total, INVERTER_KW) * correction
power_kw.append(round(p_ac_kw, 3))
gti_equivalent.append(gti_sum_weighted)
return gti_equivalent, power_kw
def fetch_forecast_72h(lat: float, lon: float) -> Optional[Tuple[List[str], List[float], List[float]]]:
"""
Ottiene previsione 72h come curve intere: oggi, domani, dopodomani (sempre 3 giorni pieni).
Indipendentemente dall'ora di chiamata, si richiedono dati da mezzanotte di oggi per 72 ore.
"""
now = datetime.now(TZINFO)
midnight_today = now.replace(hour=0, minute=0, second=0, microsecond=0)
past_hours = int((now - midnight_today).total_seconds() / 3600)
# Richiedi da mezzanotte (past_hours) + 72h future
total_hours = past_hours + 72
gti_per_orientation = []
panel_weights = []
times_ref = None
temp_per_hour = None
for tilt, azimuth_compass, count in PANEL_GROUPS:
az_om = compass_to_open_meteo_azimuth(azimuth_compass)
data = fetch_gti_forecast(
lat, lon, tilt, az_om,
forecast_hours=72,
past_hours=past_hours,
)
if not data or "hourly" not in data:
logger.warning("Forecast mancante per tilt=%s az=%s", tilt, azimuth_compass)
gti_per_orientation.append([None] * total_hours)
else:
if times_ref is None:
times_ref = data["hourly"].get("time", [])
raw = data["hourly"].get("temperature_2m")
temp_per_hour = raw if raw else [15.0] * len(times_ref)
gti = data["hourly"].get("global_tilted_irradiance")
if not gti:
gti = data["hourly"].get("global_tilted_irradiance_sum")
gti_per_orientation.append(gti or [None] * total_hours)
panel_weights.append(count)
times = times_ref or []
if not times:
return None
if temp_per_hour is None:
temp_per_hour = [15.0] * len(times)
gti_w, power_kw = production_from_groups(times, gti_per_orientation, temp_per_hour, panel_weights)
# Filtra: solo le 72 ore da mezzanotte di oggi (oggi, domani, dopodomani interi)
end_window = midnight_today + timedelta(hours=72)
dt_list = parse_times_to_datetime(times)
filtered_times = []
filtered_gti = []
filtered_power = []
for i, dt in enumerate(dt_list):
if midnight_today <= dt < end_window and len(filtered_times) < 72:
filtered_times.append(times[i])
filtered_gti.append(gti_w[i] if i < len(gti_w) else 0)
filtered_power.append(power_kw[i] if i < len(power_kw) else 0)
if not filtered_times:
return None
return filtered_times, filtered_gti, filtered_power
def _fetch_forecast_72h_for_model(
lat: float, lon: float, model: str
) -> Optional[Tuple[List[str], List[float]]]:
"""
Come fetch_forecast_72h ma per un singolo modello; ritorna (filtered_times, filtered_power)
o None se il modello non restituisce dati (es. AROME HD fuori copertura).
"""
now = datetime.now(TZINFO)
midnight_today = now.replace(hour=0, minute=0, second=0, microsecond=0)
past_hours = int((now - midnight_today).total_seconds() / 3600)
total_hours = past_hours + 72
gti_per_orientation = []
panel_weights = []
times_ref = None
temp_per_hour = None
for tilt, azimuth_compass, count in PANEL_GROUPS:
az_om = compass_to_open_meteo_azimuth(azimuth_compass)
data = fetch_gti_forecast(
lat, lon, tilt, az_om,
forecast_hours=72,
past_hours=past_hours,
model=model,
)
if not data or "hourly" not in data:
logger.warning("Forecast (%s) mancante per tilt=%s az=%s", model, tilt, azimuth_compass)
gti_per_orientation.append([None] * total_hours)
else:
if times_ref is None:
times_ref = data["hourly"].get("time", [])
raw = data["hourly"].get("temperature_2m")
temp_per_hour = raw if raw else [15.0] * len(times_ref)
gti = data["hourly"].get("global_tilted_irradiance")
if not gti:
gti = data["hourly"].get("global_tilted_irradiance_sum")
gti_per_orientation.append(gti or [None] * total_hours)
panel_weights.append(count)
times = times_ref or []
if not times:
return None
if temp_per_hour is None:
temp_per_hour = [15.0] * len(times)
gti_w, power_kw = production_from_groups(times, gti_per_orientation, temp_per_hour, panel_weights)
end_window = midnight_today + timedelta(hours=72)
dt_list = parse_times_to_datetime(times)
filtered_times = []
filtered_power = []
for i, dt in enumerate(dt_list):
if midnight_today <= dt < end_window and len(filtered_times) < 72:
filtered_times.append(times[i])
filtered_power.append(power_kw[i] if i < len(power_kw) else 0)
if not filtered_times:
return None
return filtered_times, filtered_power
def fetch_forecast_72h_multi(
lat: float, lon: float
) -> Optional[Tuple[List[str], List[Tuple[str, List[float]]]]]:
"""
Previsione 72h da più modelli (ICON Italia + AROME HD se disponibile).
Ritorna (times_72, [(label, power_list), ...]). AROME HD copre Francia e limitrofi;
fuori area può non restituire dati e viene omesso.
"""
result_icon = _fetch_forecast_72h_for_model(lat, lon, MODEL_ICON_ITALIA)
if not result_icon:
return None
ref_times, power_icon = result_icon
n = len(ref_times)
series: List[Tuple[str, List[float]]] = [("ICON Italia", power_icon)]
result_arome = _fetch_forecast_72h_for_model(lat, lon, MODEL_AROME_HD)
if result_arome:
times_arome, power_arome = result_arome
# Allinea alla griglia oraria di ref_times (AROME può avere meno ore, es. 48)
power_aligned = []
for t in ref_times:
if t in times_arome:
idx = times_arome.index(t)
power_aligned.append(power_arome[idx] if idx < len(power_arome) else 0.0)
else:
power_aligned.append(0.0)
# Includi AROME solo se ha dati utili (fuori copertura restituisce spesso tutti zero)
if sum(power_aligned) >= 0.1:
series.append(("AROME HD", power_aligned))
logger.info("AROME HD disponibile per confronto previsioni")
else:
logger.info("AROME HD senza dati utili per questa località (copertura Francia/limitrofi)")
else:
logger.info("AROME HD non disponibile per questa località (copertura Francia/limitrofi)")
return ref_times, series
def fetch_historical_48h(lat: float, lon: float) -> Optional[Tuple[List[str], List[float], List[float]]]:
"""
Ottiene previsione teorica per ieri l'altro e ieri (2 giorni pieni, senza oggi).
Usa orientamenti reali (bussola osservata) e temperature_2m per la fisica termica.
Finestra: solo ieri l'altro + ieri, così il grafico "Ultime 48h" non include la giornata odierna.
"""
today = datetime.now(TZINFO).date()
start_d = datetime.combine(today - timedelta(days=2), datetime.min.time()).replace(tzinfo=TZINFO)
end_d = datetime.combine(today - timedelta(days=1), datetime.min.time()).replace(tzinfo=TZINFO)
start_date = start_d.strftime("%Y-%m-%d")
end_date = end_d.strftime("%Y-%m-%d")
gti_per_orientation = []
panel_weights = []
times_ref = None
temp_per_hour = None
for tilt, azimuth_compass, count in PANEL_GROUPS:
az_om = compass_to_open_meteo_azimuth(azimuth_compass)
data = fetch_gti_historical(lat, lon, tilt, az_om, start_date, end_date)
if not data or "hourly" not in data:
logger.warning("Historical mancante per tilt=%s az=%s", tilt, azimuth_compass)
gti_per_orientation.append([])
else:
if times_ref is None:
times_ref = data["hourly"].get("time", [])
raw = data["hourly"].get("temperature_2m")
temp_per_hour = raw if raw else [15.0] * len(times_ref)
gti = data["hourly"].get("global_tilted_irradiance")
if not gti:
gti = data["hourly"].get("global_tilted_irradiance_sum")
gti_per_orientation.append(gti or [])
panel_weights.append(count)
times = times_ref or []
if not times or not gti_per_orientation:
return None
if temp_per_hour is None:
temp_per_hour = [15.0] * len(times)
n = len(times)
for i, gti_list in enumerate(gti_per_orientation):
if len(gti_list) < n:
gti_per_orientation[i] = gti_list + [None] * (n - len(gti_list))
elif len(gti_list) > n:
gti_per_orientation[i] = gti_list[:n]
gti_w, power_kw = production_from_groups(times, gti_per_orientation, temp_per_hour, panel_weights)
return times, gti_w, power_kw
def load_solaredge_config() -> Tuple[str, str]:
"""Ritorna (api_key, site_id). site_id usa DEFAULT_SOLAREDGE_SITE_ID se non impostato."""
api_key = (os.environ.get("SOLAREDGE_API_KEY") or "").strip()
site_id = (os.environ.get("SOLAREDGE_SITE_ID") or "").strip() or DEFAULT_SOLAREDGE_SITE_ID
if api_key and site_id:
return api_key, site_id
for path in (
os.path.expanduser("~/.solaredge_fotovoltaico"),
os.path.join(SCRIPT_DIR, ".env"),
):
if not os.path.isfile(path):
continue
try:
with open(path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line or line.startswith("#"):
continue
if "=" in line:
k, v = line.split("=", 1)
k, v = k.strip(), v.strip().strip('"\'')
if k == "SOLAREDGE_API_KEY":
api_key = api_key or v
elif k == "SOLAREDGE_SITE_ID":
site_id = site_id or v
except Exception as e:
logger.debug("Lettura %s: %s", path, e)
if api_key and site_id:
break
return api_key or "", site_id or DEFAULT_SOLAREDGE_SITE_ID
def fetch_solaredge_energy_48h(api_key: str, site_id: str) -> Optional[Tuple[List[datetime], List[float]]]:
"""Recupera produzione reale per ieri l'altro e ieri (stessa finestra del grafico 48h, senza oggi)."""
today = datetime.now(TZINFO).date()
start_date = (today - timedelta(days=2)).strftime("%Y-%m-%d")
end_date = (today - timedelta(days=1)).strftime("%Y-%m-%d")
url = SOLAREDGE_BASE + SOLAREDGE_SITE_ENERGY_PATH.format(site_id=site_id)
params = {
"api_key": api_key,
"startDate": start_date,
"endDate": end_date,
"timeUnit": "HOUR",
}
try:
r = requests.get(url, params=params, headers=HTTP_HEADERS, timeout=(8, 25))
if r.status_code != 200:
logger.warning("SolarEdge API status %s: %s", r.status_code, r.text[:300])
return None
data = r.json()
energy_block = data.get("energy", {}) or data.get("siteEnergy", {})
values = energy_block.get("values") or data.get("values")
if not values:
logger.warning("SolarEdge: nessun 'energy.values' in risposta")
return None
times_out: List[datetime] = []
power_kw_out: List[float] = []
for item in values:
val = item.get("value")
if val is None:
continue
date_str = item.get("date") or item.get("time") or ""
try:
if "T" in date_str or " " in date_str:
dt = datetime.fromisoformat(date_str.replace("Z", "+00:00").strip())
else:
dt = datetime.strptime(date_str[:10], "%Y-%m-%d").replace(tzinfo=TZINFO)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=TZINFO)
except Exception:
continue
power_kw = float(val) / 1000.0
times_out.append(dt)
power_kw_out.append(round(power_kw, 3))
if not times_out:
return None
return times_out, power_kw_out
except Exception as e:
logger.exception("SolarEdge API error: %s", e)
return None
def kwh_per_day_from_series(
times: List[str],
power_kw: List[float],
) -> List[Tuple[str, float]]:
"""Raggruppa power_kw (kWh/ora) per data e ritorna [(data dd/mm, kwh), ...]."""
from collections import defaultdict
dt_list = parse_times_to_datetime(times)
by_date: Dict[str, float] = defaultdict(float)
for i, dt in enumerate(dt_list):
if i < len(power_kw):
key = dt.strftime("%Y-%m-%d")
by_date[key] += power_kw[i]
out: List[Tuple[str, float]] = []
for d_str in sorted(by_date.keys()):
d = datetime.strptime(d_str, "%Y-%m-%d").date()
label = d.strftime("%d/%m")
out.append((label, round(by_date[d_str], 1)))
return out
def parse_times_to_datetime(times: List[str]) -> List[datetime]:
out = []
for t in times:
try:
if "T" in t:
dt = datetime.fromisoformat(t.replace("Z", "+00:00"))
else:
dt = datetime.strptime(t, "%Y-%m-%d").replace(tzinfo=TZINFO)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=TZINFO)
out.append(dt)
except Exception:
out.append(datetime.now(TZINFO))
return out
def plot_past_48h(
times: List[str],
power_kw: List[float],
title_suffix: str = "",
real_times: Optional[List[datetime]] = None,
real_power_kw: Optional[List[float]] = None,
) -> bytes:
"""Genera il grafico in memoria e ritorna i byte PNG (non salva su disco)."""
fig, ax = plt.subplots(figsize=(10, 4.5))
dt_list = parse_times_to_datetime(times)
ax.fill_between(dt_list, 0, power_kw, alpha=0.5, color="steelblue")
ax.plot(dt_list, power_kw, color="navy", linewidth=1.2, label="Previsione (ICON Italia)")
if real_times and real_power_kw and len(real_times) == len(real_power_kw):
ax.plot(real_times, real_power_kw, color="darkorange", linewidth=1.4, label="Produzione reale (SolarEdge)")
ax.axhline(y=INVERTER_KW, color="red", linestyle="--", alpha=0.7, label=f"Limite inverter {INVERTER_KW} kW")
ax.set_ylabel("Potenza AC (kW)")
ax.set_xlabel("Data / Ora")
ax.set_title(f"Produzione Ieri l'altro e Ieri{title_suffix}")
ax.legend(loc="upper right")
all_vals = list(power_kw) + (list(real_power_kw) if real_power_kw else [])
ax.set_ylim(0, max(INVERTER_KW * 1.05, max(all_vals) * 1.1) if all_vals else 7)
ax.xaxis.set_major_formatter(mdates.DateFormatter("%d/%m %H:%M", tz=TZINFO))
ax.xaxis.set_major_locator(mdates.HourLocator(interval=6))
plt.xticks(rotation=25)
plt.tight_layout()
buf = BytesIO()
plt.savefig(buf, format="png", dpi=120)
plt.close()
return buf.getvalue()
def plot_future_72h(
times: List[str],
power_series: List[Tuple[str, List[float]]],
title_suffix: str = "",
) -> bytes:
"""
Genera il grafico in memoria. power_series: [(label, power_kw_list), ...];
la prima serie ha anche fill_between, le altre solo linea (confronto modelli).
"""
if not power_series:
plt.close("all")
return b""
fig, ax = plt.subplots(figsize=(10, 4.5))
dt_list = parse_times_to_datetime(times)
colors = ["green", "blue", "purple"]
all_vals = []
for i, (label, power_kw) in enumerate(power_series):
if len(power_kw) != len(dt_list):
power_kw = (power_kw + [0.0] * len(dt_list))[: len(dt_list)]
all_vals.extend(power_kw)
color = colors[i % len(colors)]
if i == 0:
ax.fill_between(dt_list, 0, power_kw, alpha=0.4, color=color)
ax.plot(dt_list, power_kw, color=color, linewidth=1.2, label=label)
ax.axhline(y=INVERTER_KW, color="red", linestyle="--", alpha=0.7, label=f"Limite inverter {INVERTER_KW} kW")
ax.set_ylabel("Potenza AC (kW)")
ax.set_xlabel("Data / Ora")
ax.set_title(f"Previsione produzione Oggi, Domani, Dopodomani{title_suffix}")
ax.legend(loc="upper right")
ax.set_ylim(0, max(INVERTER_KW * 1.05, max(all_vals) * 1.1) if all_vals else 7)
ax.xaxis.set_major_formatter(mdates.DateFormatter("%d/%m %H:%M", tz=TZINFO))
ax.xaxis.set_major_locator(mdates.HourLocator(interval=6))
plt.xticks(rotation=25)
plt.tight_layout()
buf = BytesIO()
plt.savefig(buf, format="png", dpi=120)
plt.close()
return buf.getvalue()
def load_bot_token() -> str:
tok = os.environ.get("TELEGRAM_BOT_TOKEN", "").strip() or os.environ.get("BOT_TOKEN", "").strip()
if tok:
return tok
for path in (TOKEN_FILE_HOME, TOKEN_FILE_ETC):
try:
with open(path, "r", encoding="utf-8") as f:
t = f.read().strip()
if t:
return t
except FileNotFoundError:
continue
return ""
def telegram_send_photo(photo_bytes: bytes, caption: str, chat_id: str) -> bool:
"""Invia una foto a Telegram da bytes (nessun file su disco)."""
token = load_bot_token()
if not token:
logger.warning("Token Telegram mancante")
return False
url = f"https://api.telegram.org/bot{token}/sendPhoto"
try:
r = requests.post(
url,
data={"chat_id": chat_id, "caption": caption, "parse_mode": "Markdown"},
files={"photo": ("grafico.png", photo_bytes, "image/png")},
timeout=20,
)
if r.status_code != 200:
logger.error("Telegram sendPhoto %s: %s", r.status_code, r.text[:400])
return False
return True
except Exception as e:
logger.exception("Telegram sendPhoto: %s", e)
return False
def telegram_send_message(text: str, chat_id: str) -> bool:
token = load_bot_token()
if not token:
return False
url = f"https://api.telegram.org/bot{token}/sendMessage"
try:
r = requests.post(
url,
json={"chat_id": chat_id, "text": text, "parse_mode": "Markdown"},
timeout=15,
)
return r.status_code == 200
except Exception as e:
logger.exception("Telegram sendMessage: %s", e)
return False
def main() -> int:
parser = argparse.ArgumentParser(description="Previsione e analisi produzione fotovoltaico (ICON Italia)")
parser.add_argument("--lat", type=float, default=HOME_LAT, help="Latitudine")
parser.add_argument("--lon", type=float, default=HOME_LON, help="Longitudine")
parser.add_argument("--telegram", action="store_true", help="Invia grafici e messaggio a Telegram")
parser.add_argument("--chat_id", type=str, default="", help="Chat ID per invio Telegram")
parser.add_argument("--no-past", action="store_true", help="Salta grafico 48h passate")
parser.add_argument("--no-future", action="store_true", help="Salta grafico 72h future")
args = parser.parse_args()
lat, lon = args.lat, args.lon
send_telegram = args.telegram and args.chat_id.strip()
# Rimuovi eventuali grafici fotovoltaico salvati in precedenza (non ne salviamo più)
for old in glob.glob(os.path.join(SCRIPT_DIR, "fotovoltaico_*.png")):
try:
os.remove(old)
logger.info("Rimosso %s", old)
except OSError as e:
logger.warning("Impossibile rimuovere %s: %s", old, e)
hist: Optional[Tuple[List[str], List[float], List[float]]] = None
fore: Optional[Tuple[List[str], List[float], List[float]]] = None
solaredge_api_key, solaredge_site_id = load_solaredge_config()
real_48h: Optional[Tuple[List[datetime], List[float]]] = None
if solaredge_api_key and solaredge_site_id and not args.no_past:
logger.info("Recupero produzione reale SolarEdge (48h)...")
real_48h = fetch_solaredge_energy_48h(solaredge_api_key, solaredge_site_id)
if not real_48h:
logger.warning("SolarEdge: dati 48h non disponibili (controlla API key e Site ID)")
past_ok = False
if not args.no_past:
logger.info(
"Pannelli: %s gruppi (tilt, azimut°, n) = %s",
len(PANEL_GROUPS),
[(t, a, n) for t, a, n in PANEL_GROUPS],
)
logger.info("Recupero dati Historical Forecast (48h passate)...")
hist = fetch_historical_48h(lat, lon)
if hist:
times_past, _, power_past = hist
past_days_debug = kwh_per_day_from_series(hist[0], hist[2])
logger.info("Historical 48h kWh per giorno (previsione): %s", past_days_debug)
real_t = (real_48h[0], real_48h[1]) if real_48h else (None, None)
img_past = plot_past_48h(times_past, power_past, real_times=real_t[0], real_power_kw=real_t[1])
past_ok = True
if send_telegram:
caption = "📊 *Produzione Ieri l'altro e Ieri* (ICON Italia" + (" + SolarEdge reale" if real_48h else "") + ")"
telegram_send_photo(img_past, caption, args.chat_id)
else:
logger.warning("Nessun dato Historical Forecast per le 48h passate")
future_ok = False
fore_multi: Optional[Tuple[List[str], List[Tuple[str, List[float]]]]] = None
if not args.no_future:
logger.info("Recupero dati Forecast (72h future, ICON + AROME HD)...")
fore_multi = fetch_forecast_72h_multi(lat, lon)
if fore_multi:
times_fut, power_series = fore_multi
img_fut = plot_future_72h(times_fut, power_series)
future_ok = True
if send_telegram:
models_label = " / ".join(s[0] for s in power_series)
telegram_send_photo(
img_fut,
f"☀️ *Previsione Oggi, Domani, Dopodomani* ({models_label})",
args.chat_id,
)
else:
logger.warning("Nessun dato Forecast per le 72h future")
if send_telegram:
lines = ["🖥 *Fotovoltaico*", ""]
if past_ok and hist:
past_days = kwh_per_day_from_series(hist[0], hist[2])
real_days = None
if real_48h:
real_days = kwh_per_day_from_series(
[t.isoformat() for t in real_48h[0]],
real_48h[1],
)
real_by_date = {d: k for d, k in (real_days or [])}
lines.append("📊 *Ultimi 2 giorni*")
for label, kwh in past_days:
part = f" {label} · prev. {kwh} kWh"
if label in real_by_date:
part += f" · _reale_ {real_by_date[label]} kWh"
lines.append(part)
lines.append("")
if future_ok and fore_multi:
times_fut, power_series = fore_multi
fut_days_per_model = [
(name, kwh_per_day_from_series(times_fut, power_list))
for name, power_list in power_series
]
lines.append("☀️ *Prossimi 3 giorni*")
if len(fut_days_per_model) == 1:
for label, kwh in fut_days_per_model[0][1]:
lines.append(f" {label} · {kwh} kWh")
else:
dates = sorted({d for _, days_list in fut_days_per_model for d, _ in days_list})
for d in dates:
parts = [f" {d}"]
for name, days_list in fut_days_per_model:
val = next((k for lbl, k in days_list if lbl == d), None)
if val is not None:
short = "ICON" if "ICON" in name else "AROME"
parts.append(f"{short} {val}")
lines.append(" · ".join(parts) + " kWh")
lines.append("")
models_footer = " · ".join(s[0] for s in power_series)
lines.append(f"_Modello: {models_footer}_")
telegram_send_message("\n".join(lines), args.chat_id)
if not past_ok and not future_ok:
logger.error("Nessun dato disponibile")
return 1
return 0
if __name__ == "__main__":
sys.exit(main())
+17 -1
View File
@@ -14,6 +14,15 @@ import requests
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
DEFAULT_PATTERNS = ["*.log", "*_log.txt"]
EXCLUDED_FILES = {
"circondario_log.txt",
"irrigation_cron.log",
"road_weather.log",
"snow_radar.log",
}
# Log irrigazione: aggiornato solo quando lo script viene eseguito (cron --auto o /irrigazione).
# Se non c’è un cron giornaliero, il file può restare “non aggiornato” per giorni (normale in inverno).
STALE_EXCLUDE_BASENAMES = {"irrigation_advisor.log"}
TOKEN_FILE_HOME = os.path.expanduser("~/.telegram_dpc_bot_token")
TOKEN_FILE_ETC = "/etc/telegram_dpc_bot_token"
@@ -21,7 +30,8 @@ TS_RE = re.compile(r"(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})")
CATEGORIES = {
"open_meteo_timeout": re.compile(
r"timeout|timed out|Read timed out|Gateway Time-out|504", re.IGNORECASE
r"timeout|timed out|Read timed out|Gateway Time-out|HTTP\s*504|status\s*504|\b504\b",
re.IGNORECASE,
),
"ssl_handshake": re.compile(r"handshake", re.IGNORECASE),
"permission_error": re.compile(r"PermissionError|permesso negato|Errno 13", re.IGNORECASE),
@@ -119,6 +129,8 @@ def analyze_logs(files: List[str], since: datetime.datetime, max_lines: int) ->
break
# Verifica se il log è "stale" (non aggiornato da più di 24 ore)
if os.path.basename(path) in STALE_EXCLUDE_BASENAMES:
continue # Non segnalare come stale (es. irrigazione: aggiornato solo a ogni run)
if last_ts:
hours_since = (now - last_ts).total_seconds() / 3600.0
if hours_since > 24:
@@ -207,6 +219,9 @@ def main():
parser.add_argument("--log", action="append", help="Aggiungi un file log specifico")
args = parser.parse_args()
run_ts = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"--- Log Monitor run {run_ts} ---")
if args.log:
files = [p for p in args.log if os.path.exists(p)]
else:
@@ -215,6 +230,7 @@ def main():
for pat in DEFAULT_PATTERNS:
files.extend(sorted([str(p) for p in Path(BASE_DIR).glob(pat)]))
files = sorted(set(files))
files = [p for p in files if os.path.basename(p) not in EXCLUDED_FILES]
since = datetime.datetime.now() - datetime.timedelta(days=args.days)
category_hits, per_file_counts, timeout_minutes, stale_logs = analyze_logs(files, since, args.max_lines)
+71 -84
View File
@@ -18,14 +18,10 @@ logger = logging.getLogger(__name__)
# --- CONFIGURAZIONE METEO ---
HOME_LAT = 43.9356
HOME_LON = 12.4296
HOME_NAME = "🏠 Casa (Wide View ±12km)"
HOME_NAME = "🏠 Casa"
TZ = "Europe/Berlin"
TZINFO = ZoneInfo(TZ)
# Offset ~12-15km per i 5 punti
OFFSET_LAT = 0.12
OFFSET_LON = 0.16
OPEN_METEO_URL = "https://api.open-meteo.com/v1/forecast"
GEOCODING_URL = "https://geocoding-api.open-meteo.com/v1/search"
HTTP_HEADERS = {"User-Agent": "loogle-bot-v10.5"}
@@ -94,14 +90,17 @@ def telegram_send_markdown(message_md: str, chat_ids: Optional[List[str]] = None
def now_local() -> datetime.datetime:
return datetime.datetime.now(TZINFO)
def parse_time(t: str) -> datetime.datetime:
def parse_time(t: str, tz: Optional[ZoneInfo] = None) -> datetime.datetime:
"""Interpreta un timestamp ISO dell'API nel fuso richiesto (default: Casa / Europe/Berlin)."""
target = tz if tz is not None else TZINFO
try:
dt = date_parser.isoparse(t)
if dt.tzinfo is None: return dt.replace(tzinfo=TZINFO)
return dt.astimezone(TZINFO)
if dt.tzinfo is None:
return dt.replace(tzinfo=target)
return dt.astimezone(target)
except Exception as e:
logger.error(f"Time parse error: {e}")
return now_local()
return datetime.datetime.now(target)
def degrees_to_cardinal(d: int) -> str:
dirs = ["N", "NE", "E", "SE", "S", "SW", "W", "NW"]
@@ -168,7 +167,8 @@ def get_coordinates(city_name: str):
res = data["results"][0]
cc = res.get("country_code", "IT").upper()
name = f"{res.get('name')} ({cc})"
return res["latitude"], res["longitude"], name, cc
geo_tz = res.get("timezone")
return res["latitude"], res["longitude"], name, cc, geo_tz
except Exception as e:
logger.error(f"Geocoding error: {e}")
return None
@@ -176,12 +176,12 @@ def get_coordinates(city_name: str):
def choose_best_model(lat, lon, cc, is_home=False):
"""
Sceglie il modello meteo.
- Per Casa: usa AROME Seamless (ha snowfall)
- Per Casa: usa ICON Italia (ARPAE 2i) - migliore risoluzione spaziale per Italia/San Marino.
- Per altre località: usa best match di Open-Meteo (senza specificare models)
"""
if is_home:
# Per Casa, usa AROME Seamless (ha snowfall e dati dettagliati)
return "meteofrance_seamless", "AROME HD"
# Per Casa, usa ICON Italia (risoluzione spaziale migliore per Italia/San Marino)
return "italia_meteo_arpae_icon_2i", "ICON Italia"
else:
# Per query worldwide, usa best match (Open-Meteo sceglie automaticamente)
return None, "Best Match"
@@ -189,7 +189,7 @@ def choose_best_model(lat, lon, cc, is_home=False):
def get_forecast(lat, lon, model=None, is_home=False, timezone=None, retry_after_60s=False):
"""
Recupera forecast. Se model è None, usa best match di Open-Meteo.
Per Casa (is_home=True), usa AROME Seamless.
Per Casa (is_home=True), usa ICON Italia.
Args:
retry_after_60s: Se True, attende 10 secondi prima di riprovare (per retry)
@@ -202,21 +202,15 @@ def get_forecast(lat, lon, model=None, is_home=False, timezone=None, retry_after
logger.info("Attendo 10 secondi prima del retry...")
time.sleep(10)
# Generiamo 5 punti: Centro, N, S, E, W
lats = [lat, lat + OFFSET_LAT, lat - OFFSET_LAT, lat, lat]
lons = [lon, lon, lon, lon + OFFSET_LON, lon - OFFSET_LON]
lat_str = ",".join(map(str, lats))
lon_str = ",".join(map(str, lons))
# Singola coordinata: cielo sopra il punto richiesto (casa o località).
params = {
"latitude": lat_str, "longitude": lon_str, "timezone": tz_to_use,
"latitude": lat, "longitude": lon, "timezone": tz_to_use,
"forecast_days": 3,
"wind_speed_unit": "kmh", "precipitation_unit": "mm",
"hourly": "temperature_2m,apparent_temperature,relative_humidity_2m,cloud_cover,cloud_cover_low,cloud_cover_mid,cloud_cover_high,windspeed_10m,winddirection_10m,windgusts_10m,precipitation,rain,snowfall,weathercode,is_day,cape,visibility,uv_index"
"hourly": "temperature_2m,apparent_temperature,relative_humidity_2m,cloud_cover,cloud_cover_low,cloud_cover_mid,cloud_cover_high,windspeed_10m,winddirection_10m,windgusts_10m,precipitation,rain,showers,snowfall,weathercode,is_day,cape,visibility,uv_index"
}
# Aggiungi models solo se specificato (per Casa usa AROME, per altre località best match)
# Aggiungi models solo se specificato (per Casa usa ICON Italia, per altre località best match)
if model:
params["models"] = model
@@ -241,11 +235,7 @@ def get_forecast(lat, lon, model=None, is_home=False, timezone=None, retry_after
logger.error(f"API Error {error_details}")
return None, error_details # Restituisce anche i dettagli dell'errore
response_data = r.json()
logger.info("get_forecast ok model=%s points=5 elapsed=%.2fs", model or "best_match", time.time() - t0)
# Open-Meteo per multiple locations (lat/lon separati da virgola) restituisce
# direttamente un dict con "hourly", "daily", etc. che contiene liste di valori
# per ogni location. Per semplicità, restituiamo il dict così com'è
# e lo gestiamo nel codice chiamante
logger.info("get_forecast ok model=%s elapsed=%.2fs", model or "best_match", time.time() - t0)
return response_data, None
except requests.exceptions.Timeout as e:
error_details = f"Timeout dopo 20s: {str(e)}"
@@ -315,23 +305,23 @@ def generate_weather_report(lat, lon, location_name, debug_mode=False, cc="IT",
# Determina se è Casa
is_home = (abs(lat - HOME_LAT) < 0.01 and abs(lon - HOME_LON) < 0.01)
# Usa timezone personalizzata se fornita, altrimenti default
tz_to_use = timezone if timezone else TZ
# Fuso per l'API: Casa = TZ; località = timezone esplicito/geocoding, altrimenti "auto" (Open-Meteo risolve da lat/lon)
tz_for_api = timezone if timezone else (TZ if is_home else "auto")
model_id, model_name = choose_best_model(lat, lon, cc, is_home=is_home)
# Tentativo 1: Richiesta iniziale
data_list, error_details = get_forecast(lat, lon, model_id, is_home=is_home, timezone=tz_to_use, retry_after_60s=False)
data_list, error_details = get_forecast(lat, lon, model_id, is_home=is_home, timezone=tz_for_api, retry_after_60s=False)
# Se fallisce e siamo a Casa con AROME, prova retry dopo 10 secondi
if not data_list and is_home and model_id == "meteofrance_seamless":
logger.warning(f"Primo tentativo AROME fallito: {error_details}. Retry dopo 10 secondi...")
data_list, error_details = get_forecast(lat, lon, model_id, is_home=is_home, timezone=tz_to_use, retry_after_60s=True)
# Se fallisce e siamo a Casa con ICON Italia, prova retry dopo 10 secondi
if not data_list and is_home and model_id == "italia_meteo_arpae_icon_2i":
logger.warning(f"Primo tentativo ICON Italia fallito: {error_details}. Retry dopo 10 secondi...")
data_list, error_details = get_forecast(lat, lon, model_id, is_home=is_home, timezone=tz_for_api, retry_after_60s=True)
# Se ancora fallisce e siamo a Casa, fallback a best match
if not data_list and is_home:
logger.warning(f"AROME fallito anche dopo retry: {error_details}. Fallback a best match...")
data_list, error_details = get_forecast(lat, lon, None, is_home=False, timezone=tz_to_use, retry_after_60s=False)
logger.warning(f"ICON Italia fallito anche dopo retry: {error_details}. Fallback a best match...")
data_list, error_details = get_forecast(lat, lon, None, is_home=False, timezone=tz_for_api, retry_after_60s=False)
if data_list:
model_name = "Best Match (fallback)"
logger.info("Fallback a best match riuscito")
@@ -345,20 +335,33 @@ def generate_weather_report(lat, lon, location_name, debug_mode=False, cc="IT",
if not isinstance(data_list, list): data_list = [data_list]
# Punto centrale (Casa) per dati specifici
data_center = data_list[0]
# Ora e giorno LT: fuso della località (non San Marino se non è Casa)
if timezone is not None:
tz_to_use = timezone
elif is_home:
tz_to_use = TZ
else:
tz_to_use = data_center.get("timezone") or TZ
try:
tz_to_use_info = ZoneInfo(tz_to_use)
except Exception:
tz_to_use_info = TZINFO
tz_to_use = TZ
hourly_c = data_center.get("hourly", {})
times = hourly_c.get("time", [])
if not times: return "❌ Dati orari mancanti."
L = len(times)
# --- DATI LOCALI (CASA) ---
# --- DATI LOCALI ---
l_temp = safe_get_list(hourly_c, "temperature_2m", L, 0)
l_app = safe_get_list(hourly_c, "apparent_temperature", L, 0)
l_rh = safe_get_list(hourly_c, "relative_humidity_2m", L, 50)
l_prec = safe_get_list(hourly_c, "precipitation", L, 0)
l_rain = safe_get_list(hourly_c, "rain", L, 0)
l_showers = safe_get_list(hourly_c, "showers", L, 0)
l_snow = safe_get_list(hourly_c, "snowfall", L, 0)
l_wspd = safe_get_list(hourly_c, "windspeed_10m", L, 0)
l_gust = safe_get_list(hourly_c, "windgusts_10m", L, 0)
@@ -369,8 +372,8 @@ def generate_weather_report(lat, lon, location_name, debug_mode=False, cc="IT",
l_vis = safe_get_list(hourly_c, "visibility", L, 10000)
l_uv = safe_get_list(hourly_c, "uv_index", L, 0)
# Se è Casa e AROME non fornisce visibilità (tutti None), recuperala da best match
if is_home and model_id == "meteofrance_seamless":
# Se è Casa e ICON Italia non fornisce visibilità (tutti None), recuperala da best match
if is_home and model_id == "italia_meteo_arpae_icon_2i":
vis_check = [v for v in l_vis if v is not None]
if not vis_check: # Tutti None, recupera da best match
vis_data = get_visibility_forecast(lat, lon)
@@ -383,34 +386,22 @@ def generate_weather_report(lat, lon, location_name, debug_mode=False, cc="IT",
l_cl_mid_loc = safe_get_list(hourly_c, "cloud_cover_mid", L, 0)
l_cl_hig_loc = safe_get_list(hourly_c, "cloud_cover_high", L, 0)
# --- DATI GLOBALI (MEDIA 5 PUNTI) ---
acc_cl_tot = [0.0] * L
points_cl_tot = [ [] for _ in range(L) ]
for d in data_list:
h = d.get("hourly", {})
for i in range(L):
cc = get_val(safe_get_list(h, "cloud_cover", L)[i])
cl = get_val(safe_get_list(h, "cloud_cover_low", L)[i])
cm = get_val(safe_get_list(h, "cloud_cover_mid", L)[i])
ch = get_val(safe_get_list(h, "cloud_cover_high", L)[i])
# Calcolo robusto del totale per singolo punto
real_point_total = max(cc, cl, cm, ch)
acc_cl_tot[i] += real_point_total
points_cl_tot[i].append(real_point_total)
num_points = len(data_list)
avg_cl_tot = [x / num_points for x in acc_cl_tot]
# Nuvolosità (stesso punto della località)
avg_cl_tot = []
for i in range(L):
cc = get_val(l_cl_tot_loc[i], 0)
cl = get_val(l_cl_low_loc[i], 0)
cm = get_val(l_cl_mid_loc[i], 0)
ch = get_val(l_cl_hig_loc[i], 0)
avg_cl_tot.append(max(cc, cl, cm, ch))
# --- DEBUG MODE ---
if debug_mode:
output = f"🔍 **DEBUG METEO (v10.5)**\n"
now_h = now_local().replace(minute=0, second=0, microsecond=0)
now_h = datetime.datetime.now(tz_to_use_info).replace(minute=0, second=0, microsecond=0)
idx = 0
for i, t_str in enumerate(times):
if parse_time(t_str) >= now_h:
if parse_time(t_str, tz_to_use_info) >= now_h:
idx = i
break
@@ -419,9 +410,9 @@ def generate_weather_report(lat, lon, location_name, debug_mode=False, cc="IT",
loc_H = get_val(l_cl_hig_loc[idx])
code_now = int(get_val(l_code[idx]))
output += f"Ora: {parse_time(times[idx]).strftime('%H:%M')}\n"
output += f"📍 **LOCALE (Casa)**: L:{int(loc_L)}% | H:{int(loc_H)}%\n"
output += f"🌍 **MEDIA GLOBALE**: {int(avg_cl_tot[idx])}%\n"
output += f"Ora: {parse_time(times[idx], tz_to_use_info).strftime('%H:%M')} (LT)\n"
output += f"📍 **LOCALE**: L:{int(loc_L)}% | H:{int(loc_H)}%\n"
output += f"☁️ **Nv%**: {int(avg_cl_tot[idx])}%\n"
output += f"❄️ **NEVE**: Codice={code_now}, Accumulo={get_val(l_snow[idx])}cm\n"
decision = "H"
@@ -430,8 +421,6 @@ def generate_weather_report(lat, lon, location_name, debug_mode=False, cc="IT",
return output
# --- GENERAZIONE TABELLA ---
# Usa timezone personalizzata se fornita
tz_to_use_info = ZoneInfo(tz_to_use) if tz_to_use else TZINFO
now_local_tz = datetime.datetime.now(tz_to_use_info)
# Inizia dall'ora corrente (arrotondata all'ora)
@@ -444,12 +433,7 @@ def generate_weather_report(lat, lon, location_name, debug_mode=False, cc="IT",
valid_indices = []
for i, t_str in enumerate(times):
try:
dt = parse_time(t_str)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=tz_to_use_info)
else:
dt = dt.astimezone(tz_to_use_info)
dt = parse_time(t_str, tz_to_use_info)
# Include solo timestamp >= current_hour e < end_hour
if current_hour <= dt < end_hour:
valid_indices.append((i, dt))
@@ -521,17 +505,19 @@ def generate_weather_report(lat, lon, location_name, debug_mode=False, cc="IT",
Sn = get_val(l_snow[idx], 0)
Code = int(get_val(l_code[idx], 0))
Rain = get_val(l_rain[idx], 0)
Showers = get_val(l_showers[idx], 0) if idx < len(l_showers) else 0
# Per modelli che espongono rain+showers (es. ICON Italia), usa il totale se precipitation è assente/zero
Pr_display = max(Pr, Rain + Showers)
# Determina se è neve
is_snowing = Sn > 0 or (Code in [71, 73, 75, 77, 85, 86])
# Formattazione MM
# Formattazione MM: 0 se nulla, altrimenti il valore (i modelli danno 0 o il valore orario)
p_suffix = ""
if Code in [96, 99]: p_suffix = "G"
elif Code in [66, 67]: p_suffix = "Z"
elif is_snowing and Pr >= 0.2: p_suffix = "N"
p_s = "--" if Pr < 0.2 else f"{int(round(Pr))}{p_suffix}"
elif is_snowing and Pr_display > 0: p_suffix = "N"
p_s = "0" if Pr_display <= 0 else f"{int(round(Pr_display))}{p_suffix}"
# --- CLOUD LOGIC ---
Cl = int(get_val(l_cl_tot_loc[idx], 0))
@@ -587,10 +573,10 @@ def generate_weather_report(lat, lon, location_name, debug_mode=False, cc="IT",
w_fmt = f"{w_txt:<5}"
# --- ICONE ---
sky, sgx = get_icon_set(Pr, Sn, Code, IsDay, Cl, Vis, T, Rain, Gust, Cape, dominant_type)
sky, sgx = get_icon_set(Pr_display, Sn, Code, IsDay, Cl, Vis, T, Rain, Gust, Cape, dominant_type)
# Se c'è precipitazione nevosa, mostra ❄️ nella colonna Sk (invece di 🌨️)
if is_snowing and Pr >= 0.2:
if is_snowing and Pr_display > 0:
sky = "❄️"
sky_fmt = f"{sky}{uv_suffix}"
@@ -615,7 +601,7 @@ if __name__ == "__main__":
args_parser = argparse.ArgumentParser()
args_parser.add_argument("--query", help="Nome città")
args_parser.add_argument("--home", action="store_true", help="Usa coordinate casa")
args_parser.add_argument("--debug", action="store_true", help="Mostra i valori dei 5 punti")
args_parser.add_argument("--debug", action="store_true", help="Mostra dettaglio debug (nuvole, neve)")
args_parser.add_argument("--chat_id", help="Chat ID Telegram per invio diretto (opzionale, può essere multiplo separato da virgola)")
args_parser.add_argument("--timezone", help="Timezone IANA (es: Europe/Rome, America/New_York)")
args = args_parser.parse_args()
@@ -632,8 +618,9 @@ if __name__ == "__main__":
elif args.query:
coords = get_coordinates(args.query)
if coords:
lat, lon, name, cc = coords
report = generate_weather_report(lat, lon, name, args.debug, cc)
lat, lon, name, cc, geo_tz = coords
tz = args.timezone or geo_tz
report = generate_weather_report(lat, lon, name, args.debug, cc, timezone=tz)
else:
error_msg = f"❌ Città '{args.query}' non trovata."
if chat_ids:
+18 -6
View File
@@ -11,7 +11,7 @@ from typing import List, Optional
# --- CONFIGURAZIONE ---
BOT_TOKEN="8155587974:AAF9OekvBpixtk8ZH6KoIc0L8edbhdXt7A4"
TELEGRAM_CHAT_IDS = ["64463169", "24827341", "132455422", "5405962012"]
TELEGRAM_CHAT_IDS = ["64463169"]
# BERSAGLIO (Cloudflare è solitamente il più stabile per i ping)
TARGET_HOST = "1.1.1.1"
@@ -86,8 +86,20 @@ def measure_quality(chat_ids: Optional[List[str]] = None):
# Parsing Packet Loss
# Cerca pattern: "X% packet loss"
loss_match = re.search(r'(\d+)% packet loss', output)
loss = int(loss_match.group(1)) if loss_match else 100
loss_match = re.search(r'([0-9]+(?:[\\.,][0-9]+)?)% packet loss', output)
if loss_match:
loss_raw = loss_match.group(1).replace(",", ".")
try:
loss = float(loss_raw)
except Exception:
loss = 100.0
else:
loss = 100.0
# Clamp to avoid parsing artifacts (e.g., "0.96078%" -> 0.96078, not 96078).
if loss < 0:
loss = 0.0
if loss > 100:
loss = 100.0
# Parsing Jitter (mdev)
# Output tipico: rtt min/avg/max/mdev = 10.1/12.5/40.2/5.1 ms
@@ -101,7 +113,7 @@ def measure_quality(chat_ids: Optional[List[str]] = None):
else:
avg_ping = 0.0
result_line = f"Risultati: Loss={loss}% | Jitter={jitter}ms | AvgPing={avg_ping}ms"
result_line = f"Risultati: Loss={loss:.2f}% | Jitter={jitter}ms | AvgPing={avg_ping}ms"
print(result_line)
log_line(f"INFO {result_line}")
@@ -116,7 +128,7 @@ def measure_quality(chat_ids: Optional[List[str]] = None):
# NUOVO ALLARME
msg = f"📉 **DEGRADO QUALITÀ LINEA**\n\n"
if loss >= LIMIT_LOSS:
msg += f"🔴 **Packet Loss:** `{loss}%` (Soglia {LIMIT_LOSS}%)\n"
msg += f"🔴 **Packet Loss:** `{loss:.2f}%` (Soglia {LIMIT_LOSS}%)\n"
if jitter >= LIMIT_JITTER:
msg += f"⚠️ **Jitter (Instabilità):** `{jitter}ms` (Soglia {LIMIT_JITTER}ms)\n"
@@ -133,7 +145,7 @@ def measure_quality(chat_ids: Optional[List[str]] = None):
# RECOVERY
msg = f"✅ **QUALITÀ LINEA RIPRISTINATA**\n\n"
msg += f"I parametri sono rientrati nella norma.\n"
msg += f"Ping: `{avg_ping}ms` | Jitter: `{jitter}ms` | Loss: `{loss}%`"
msg += f"Ping: `{avg_ping}ms` | Jitter: `{jitter}ms` | Loss: `{loss:.2f}%`"
send_telegram(msg, chat_ids=chat_ids)
save_state(False)
print("Recovery inviata.")
+339 -226
View File
@@ -47,6 +47,7 @@ MODEL_NAMES = {
"icon_d2": "ICON-D2",
"gfs_global": "GFS",
"ecmwf_ifs04": "ECMWF",
"ecmwf_ifs": "ECMWF IFS",
"jma_msm": "JMA MSM",
"metno_nordic": "Yr.no",
"ukmo_global": "UK MetOffice",
@@ -54,29 +55,31 @@ MODEL_NAMES = {
"italia_meteo_arpae_icon_2i": "ICON Italia (ARPAE 2i)"
}
# Per Casa/Italia: forecast_days per modello lungo termine (come Agent Irrigazione / OPEN_METEO_MODELS.md)
LONG_TERM_FORECAST_DAYS = {
"italia_meteo_arpae_icon_2i": 10,
"ecmwf_ifs": 10,
"meteofrance_seamless": 4,
}
def choose_models_by_country(cc, is_home=False):
"""
Seleziona modelli meteo ottimali.
- Per Casa: usa AROME Seamless e ICON-D2 (alta risoluzione)
- Per Italia: usa italia_meteo_arpae_icon_2i (include snow_depth quando > 0)
- Per altre località: usa best match di Open-Meteo (senza specificare models)
- Per Casa e Italia: 0-2d mediana ICON Italia + AROME HD; 3-10d mediana ICON Italia + ECMWF IFS + ARPEGE.
- Per altre località: best match Open-Meteo.
Ritorna (short_term_models, long_term_models)
"""
cc = cc.upper() if cc else "UNKNOWN"
# Modelli a lungo termine (sempre globali, funzionano ovunque)
long_term_default = ["gfs_global", "ecmwf_ifs04"]
if is_home:
# Per Casa, usa AROME Seamless, ICON-D2 e ICON Italia (alta risoluzione europea)
# ICON Italia include snow_depth quando disponibile (> 0)
return ["meteofrance_seamless", "icon_d2", "italia_meteo_arpae_icon_2i"], long_term_default
elif cc == "IT":
# Per Italia, usa ICON Italia (ARPAE 2i) che include snow_depth quando disponibile
return ["italia_meteo_arpae_icon_2i"], long_term_default
if is_home or cc == "IT":
# 0-2d: due modelli ad alta risoluzione (mediana). 3-10d: tre modelli (mediana, come Irrigazione).
return (
["italia_meteo_arpae_icon_2i", "meteofrance_arome_france_hd"],
["italia_meteo_arpae_icon_2i", "ecmwf_ifs", "meteofrance_seamless"],
)
else:
# Per query worldwide, usa best match (Open-Meteo sceglie automaticamente)
# Ritorna None per indicare best match
return None, long_term_default
def get_bot_token():
@@ -127,9 +130,9 @@ def get_weather_multi_model(lat, lon, short_term_models, long_term_models, forec
url = "https://api.open-meteo.com/v1/forecast"
params = {
"latitude": lat, "longitude": lon,
"hourly": "temperature_2m,precipitation_probability,precipitation,snowfall,rain,snow_depth,weathercode,windspeed_10m,windgusts_10m,winddirection_10m,dewpoint_2m,relative_humidity_2m,cloud_cover,soil_temperature_0cm",
"daily": "temperature_2m_max,temperature_2m_min,precipitation_sum,precipitation_hours,precipitation_probability_max,snowfall_sum,showers_sum,rain_sum,weathercode,winddirection_10m_dominant,windspeed_10m_max,windgusts_10m_max",
"timezone": timezone if timezone else TZ_STR, "forecast_days": min(forecast_days, 3) # Limita a 3 giorni per modelli ad alta risoluzione
"hourly": "temperature_2m,precipitation,snowfall,rain,snow_depth,weathercode,windspeed_10m,windgusts_10m,winddirection_10m,dewpoint_2m,relative_humidity_2m,cloud_cover,soil_temperature_0cm",
"daily": "temperature_2m_max,temperature_2m_min,precipitation_sum,precipitation_hours,snowfall_sum,showers_sum,rain_sum,weathercode,winddirection_10m_dominant,windspeed_10m_max,windgusts_10m_max",
"timezone": timezone if timezone else TZ_STR, "forecast_days": min(forecast_days, 3)
}
try:
resp = open_meteo_get(url, params=params, timeout=(5, 20))
@@ -160,17 +163,28 @@ def get_weather_multi_model(lat, lon, short_term_models, long_term_models, forec
except:
results["best_match"] = None
else:
# Modelli specifici (per Casa: AROME + ICON, per Italia: ICON ARPAE)
# Modelli specifici: 0-2d ICON Italia + AROME HD (mediana); AROME HD solo 2 giorni
for model in short_term_models:
url = "https://api.open-meteo.com/v1/forecast"
# Per italia_meteo_arpae_icon_2i, includi sempre snow_depth (supportato quando > 0)
hourly_params = "temperature_2m,precipitation_probability,precipitation,snowfall,rain,snow_depth,weathercode,windspeed_10m,windgusts_10m,winddirection_10m,dewpoint_2m,relative_humidity_2m,cloud_cover,soil_temperature_0cm"
if model == "italia_meteo_arpae_icon_2i":
hourly_params = "rain,showers,snowfall,snow_depth,precipitation,temperature_2m,weathercode,windspeed_10m,winddirection_10m"
daily_params = "temperature_2m_max,temperature_2m_min,showers_sum,rain_sum,snowfall_sum,precipitation_sum,precipitation_hours,weathercode,winddirection_10m_dominant,windspeed_10m_max,windgusts_10m_max"
fd_short = min(forecast_days, 7)
elif model == "meteofrance_arome_france_hd":
# AROME HD: 2 giorni, set variabili ridotto (no snow_depth/showers in output)
hourly_params = "temperature_2m,precipitation,snowfall,rain,weathercode,windspeed_10m,winddirection_10m"
daily_params = "temperature_2m_max,temperature_2m_min,precipitation_sum,precipitation_hours,snowfall_sum,rain_sum,weathercode,winddirection_10m_dominant,windspeed_10m_max,windgusts_10m_max"
fd_short = 2
else:
hourly_params = "temperature_2m,precipitation,snowfall,rain,snow_depth,weathercode,windspeed_10m,windgusts_10m,winddirection_10m,dewpoint_2m,relative_humidity_2m,cloud_cover,soil_temperature_0cm"
daily_params = "temperature_2m_max,temperature_2m_min,precipitation_sum,precipitation_hours,snowfall_sum,showers_sum,rain_sum,weathercode,winddirection_10m_dominant,windspeed_10m_max,windgusts_10m_max"
fd_short = min(forecast_days, 3)
params = {
"latitude": lat, "longitude": lon,
"hourly": hourly_params,
"daily": "temperature_2m_max,temperature_2m_min,precipitation_sum,precipitation_hours,precipitation_probability_max,snowfall_sum,showers_sum,rain_sum,weathercode,winddirection_10m_dominant,windspeed_10m_max,windgusts_10m_max",
"timezone": timezone if timezone else TZ_STR, "models": model, "forecast_days": min(forecast_days, 3) # Limita a 3 giorni per modelli ad alta risoluzione
"daily": daily_params,
"timezone": timezone if timezone else TZ_STR, "models": model,
"forecast_days": fd_short
}
try:
resp = open_meteo_get(url, params=params, timeout=(5, 20))
@@ -220,14 +234,25 @@ def get_weather_multi_model(lat, lon, short_term_models, long_term_models, forec
except:
results[model] = None
# Recupera modelli a lungo termine (globale, fino a 10 giorni)
for model in long_term_models:
# Recupera modelli a lungo termine (3-10d): tre modelli per mediana (come Agent Irrigazione)
for model in (long_term_models or []):
url = "https://api.open-meteo.com/v1/forecast"
fd_long = LONG_TERM_FORECAST_DAYS.get(model, forecast_days)
if model == "ecmwf_ifs":
hourly_params = "rain,showers,snowfall,precipitation,temperature_2m,weathercode,windspeed_10m,winddirection_10m"
daily_params = "temperature_2m_max,temperature_2m_min,showers_sum,rain_sum,snowfall_sum,precipitation_sum,precipitation_hours,weathercode,winddirection_10m_dominant,windspeed_10m_max,windgusts_10m_max"
elif model == "meteofrance_seamless":
hourly_params = "temperature_2m,precipitation,snowfall,rain,weathercode,windspeed_10m,windgusts_10m,winddirection_10m"
daily_params = "temperature_2m_max,temperature_2m_min,precipitation_sum,precipitation_hours,snowfall_sum,rain_sum,weathercode,winddirection_10m_dominant,windspeed_10m_max,windgusts_10m_max"
else:
# ICON Italia e altri
hourly_params = "rain,showers,snowfall,snow_depth,precipitation,temperature_2m,weathercode,windspeed_10m,winddirection_10m"
daily_params = "temperature_2m_max,temperature_2m_min,showers_sum,rain_sum,snowfall_sum,precipitation_sum,precipitation_hours,weathercode,winddirection_10m_dominant,windspeed_10m_max,windgusts_10m_max"
params = {
"latitude": lat, "longitude": lon,
"hourly": "temperature_2m,precipitation_probability,precipitation,snowfall,rain,snow_depth,weathercode,windspeed_10m,windgusts_10m,winddirection_10m,dewpoint_2m,relative_humidity_2m,cloud_cover,soil_temperature_0cm",
"daily": "temperature_2m_max,temperature_2m_min,precipitation_sum,precipitation_hours,precipitation_probability_max,snowfall_sum,showers_sum,rain_sum,weathercode,winddirection_10m_dominant,windspeed_10m_max,windgusts_10m_max",
"timezone": timezone if timezone else TZ_STR, "models": model, "forecast_days": forecast_days
"hourly": hourly_params,
"daily": daily_params,
"timezone": timezone if timezone else TZ_STR, "models": model, "forecast_days": fd_long
}
try:
resp = open_meteo_get(url, params=params, timeout=(5, 25))
@@ -260,6 +285,163 @@ def get_weather_multi_model(lat, lon, short_term_models, long_term_models, forec
return results
def _normalize_time_key(t):
"""Normalizza timestamp per confronto (YYYY-MM-DDTHH:MM)."""
if not t or not isinstance(t, str):
return str(t) if t else ""
return t.strip()[:16]
def _median_or_single(values):
"""Mediana dei valori numerici; ignora None. Con 2 valori restituisce la media dei due."""
nums = [float(v) for v in values if v is not None]
if not nums:
return None
if len(nums) == 1:
return nums[0]
return median(nums)
# Chiavi che esistono solo su ICON Italia (no merge, si tiene il valore da quel modello)
HOURLY_KEYS_ICON_ONLY = ["snow_depth", "showers"]
DAILY_KEYS_ICON_ONLY = ["showers_sum"]
def _merge_hourly_median(hourly_by_model, single_source_keys=None, single_source_model=None):
"""
Unisce hourly da più modelli: mediana per ogni timestamp.
single_source_keys: per queste chiavi si prende il valore solo da single_source_model (es. ICON Italia per snow_depth, showers).
"""
single_source_keys = single_source_keys or []
time_idx = {}
all_times = []
for _model, h in hourly_by_model:
times = h.get("time", []) or []
for t in times:
k = _normalize_time_key(str(t)) if t else ""
if k and k not in time_idx:
time_idx[k] = len(all_times)
all_times.append(t if isinstance(t, str) else k)
if not all_times:
return {"time": [], "temperature_2m": [], "precipitation": [], "snowfall": [], "rain": [], "weathercode": [], "windspeed_10m": [], "winddirection_10m": [], "snow_depth": [], "dewpoint_2m": [], "cloud_cover": [], "soil_temperature_0cm": []}
# Raccogli tutte le chiavi numeriche dal primo modello che le ha
all_keys = []
for _model, h in hourly_by_model:
for key in (h.keys() - {"time"}):
if key not in all_keys:
all_keys.append(key)
out = {"time": all_times}
for key in all_keys:
out[key] = []
for ref_t in all_times:
ref_k = _normalize_time_key(str(ref_t))
if key in single_source_keys and single_source_model:
val = None
for m, h in hourly_by_model:
if m != single_source_model:
continue
times = h.get("time", []) or []
arr = h.get(key, []) or []
for i, t in enumerate(times):
if _normalize_time_key(str(t)) == ref_k and i < len(arr) and arr[i] is not None:
try:
val = float(arr[i]) if key != "weathercode" else (int(arr[i]) if arr[i] is not None else None)
except (TypeError, ValueError):
pass
break
break
out[key].append(val)
else:
vals = []
for _m, h in hourly_by_model:
times = h.get("time", []) or []
arr = h.get(key, []) or []
for i, t in enumerate(times):
if _normalize_time_key(str(t)) == ref_k and i < len(arr) and arr[i] is not None:
try:
vals.append(float(arr[i]))
except (TypeError, ValueError):
pass
break
out[key].append(_median_or_single(vals) if vals else None)
n = len(out["time"])
if n > 1:
order = sorted(range(n), key=lambda i: str(out["time"][i]))
out["time"] = [out["time"][i] for i in order]
for key in all_keys:
if key == "time":
continue
if len(out.get(key, [])) == n:
out[key] = [out[key][i] for i in order]
return out
def _merge_daily_median(daily_by_model, single_source_keys=None, single_source_model=None):
"""Unisce daily da più modelli: mediana per data. single_source_keys: valore solo da single_source_model."""
single_source_keys = single_source_keys or []
time_idx = {}
all_times = []
for _model, d in daily_by_model:
times = d.get("time", []) or []
for t in times:
key = str(t)[:10] if t else ""
if key and key not in time_idx:
time_idx[key] = len(all_times)
all_times.append(key)
if not all_times:
return {"time": [], "temperature_2m_max": [], "temperature_2m_min": [], "precipitation_sum": [], "precipitation_hours": [], "snowfall_sum": [], "showers_sum": [], "rain_sum": [], "weathercode": [], "winddirection_10m_dominant": [], "windspeed_10m_max": [], "windgusts_10m_max": []}
all_keys = []
for _model, d in daily_by_model:
for key in (d.keys() - {"time"}):
if key not in all_keys:
all_keys.append(key)
out = {"time": all_times}
for key in all_keys:
out[key] = []
for date_str in all_times:
if key in single_source_keys and single_source_model:
val = None
for m, d in daily_by_model:
if m != single_source_model:
continue
times = d.get("time", []) or []
arr = d.get(key, []) or []
for i, t in enumerate(times):
if str(t)[:10] == date_str and i < len(arr) and arr[i] is not None:
try:
val = float(arr[i]) if key != "weathercode" else (int(arr[i]) if arr[i] is not None else None)
except (TypeError, ValueError):
pass
break
break
out[key].append(val)
else:
vals = []
for _m, d in daily_by_model:
times = d.get("time", []) or []
arr = d.get(key, []) or []
for i, t in enumerate(times):
if str(t)[:10] == date_str and i < len(arr) and arr[i] is not None:
try:
vals.append(float(arr[i]))
except (TypeError, ValueError):
pass
break
out[key].append(_median_or_single(vals) if vals else None)
# Ordina cronologicamente (evita buchi nel report se l'unione non era ordinata)
n = len(out["time"])
if n > 1:
order = sorted(range(n), key=lambda i: out["time"][i])
out["time"] = [out["time"][i] for i in order]
for key in all_keys:
if key == "time":
continue
if len(out.get(key, [])) == n:
out[key] = [out[key][i] for i in order]
return out
def merge_multi_model_forecast(models_data, forecast_days=10):
"""Combina dati da modelli a breve e lungo termine in un forecast unificato"""
merged = {
@@ -269,11 +451,11 @@ def merge_multi_model_forecast(models_data, forecast_days=10):
"temperature_2m_min": [],
"precipitation_sum": [],
"precipitation_hours": [],
"precipitation_probability_max": [],
"snowfall_sum": [],
"showers_sum": [],
"rain_sum": [],
"weathercode": [],
"winddirection_10m_dominant": [],
"windspeed_10m_max": [],
"windgusts_10m_max": []
},
@@ -288,189 +470,147 @@ def merge_multi_model_forecast(models_data, forecast_days=10):
"windspeed_10m": [],
"winddirection_10m": [],
"dewpoint_2m": [],
"precipitation_probability": [],
"cloud_cover": [],
"soil_temperature_0cm": []
},
"models_used": []
}
# Trova modello a breve termine disponibile (cerca tutti i modelli con type "short_term")
# Priorità: ICON Italia per snow_depth, altrimenti primo disponibile
short_term_data = None
short_term_model = None
icon_italia_data = None
icon_italia_model = None
cutoff_day = 2 # 0-2d alta risoluzione, 3-10d mediana tre modelli
short_term_list = [(m, models_data[m]) for m in models_data if models_data.get(m) and models_data[m].get("model_type") == "short_term"]
long_term_list = [(m, models_data[m]) for m in models_data if models_data.get(m) and models_data[m].get("model_type") == "long_term"]
# Prima cerca ICON Italia (ha snow_depth quando disponibile)
# Cerca anche altri modelli che potrebbero avere snow_depth (icon_d2, etc.)
for model in models_data.keys():
if models_data[model] and models_data[model].get("model_type") == "short_term":
# Priorità a ICON Italia, ma cerca anche altri modelli con snow_depth
if model == "italia_meteo_arpae_icon_2i":
icon_italia_data = models_data[model]
icon_italia_model = model
# ICON-D2 può avere anche snow_depth
elif model == "icon_d2" and icon_italia_data is None:
# Usa ICON-D2 come fallback se ICON Italia non disponibile
hourly_data = models_data[model].get("hourly", {})
snow_depth_values = hourly_data.get("snow_depth", []) if hourly_data else []
# Verifica se ha dati di snow_depth validi
has_valid_snow_depth = False
if snow_depth_values:
for sd in snow_depth_values[:24]:
if sd is not None:
try:
if float(sd) > 0:
has_valid_snow_depth = True
break
except (ValueError, TypeError):
continue
if has_valid_snow_depth:
icon_italia_data = models_data[model]
icon_italia_model = model
# Poi cerca primo modello disponibile (per altri parametri)
for model in models_data.keys():
if models_data[model] and models_data[model].get("model_type") == "short_term":
short_term_data = models_data[model]
short_term_model = model
break
# Trova modello a lungo termine disponibile (cerca tutti i modelli con type "long_term")
long_term_data = None
long_term_model = None
for model in models_data.keys():
if models_data[model] and models_data[model].get("model_type") == "long_term":
long_term_data = models_data[model]
long_term_model = model
break
if not short_term_data and not long_term_data:
if not short_term_list and not long_term_list:
return None
# Usa dati a breve termine per primi 2-3 giorni, poi passa a lungo termine
cutoff_day = 2 # Usa modelli ad alta risoluzione per primi 2 giorni
daily_keys = list(merged["daily"].keys())
hourly_keys = list(merged["hourly"].keys())
if short_term_data:
# Gestisci best_match o modelli specifici
if short_term_model == "best_match":
model_display = "Best Match"
else:
model_display = MODEL_NAMES.get(short_term_model, short_term_model)
# Verifica se ICON Italia ha dati di snow_depth (controllo diretto, non solo il flag)
has_icon_snow_depth = False
if icon_italia_data:
icon_hourly = icon_italia_data.get("hourly", {})
icon_snow_depth = icon_hourly.get("snow_depth", []) if icon_hourly else []
# Verifica se ci sono dati non-null di snow_depth
if icon_snow_depth:
for sd in icon_snow_depth[:72]: # Controlla prime 72h
if sd is not None:
try:
if float(sd) > 0: # Anche valori piccoli
has_icon_snow_depth = True
break
except (ValueError, TypeError):
continue
# Se ICON Italia ha dati di snow_depth, aggiungilo ai modelli usati
if has_icon_snow_depth or (icon_italia_data and icon_italia_data.get("has_snow_depth_data")):
icon_display = MODEL_NAMES.get("italia_meteo_arpae_icon_2i", "ICON Italia")
merged["models_used"].append(f"{model_display} + {icon_display} (snow_depth) (0-{cutoff_day+1}d)")
else:
merged["models_used"].append(f"{model_display} (0-{cutoff_day+1}d)")
short_daily = short_term_data.get("daily", {})
short_hourly = short_term_data.get("hourly", {})
# Prendi dati daily dai primi giorni del modello a breve termine
short_daily_times = short_daily.get("time", [])[:cutoff_day+1]
for i, day_time in enumerate(short_daily_times):
merged["daily"]["time"].append(day_time)
for key in ["temperature_2m_max", "temperature_2m_min", "precipitation_sum", "precipitation_hours", "precipitation_probability_max", "snowfall_sum", "showers_sum", "rain_sum", "weathercode", "windspeed_10m_max", "windgusts_10m_max"]:
val = short_daily.get(key, [])[i] if i < len(short_daily.get(key, [])) else None
merged["daily"][key].append(val)
# Prendi dati hourly dal modello a breve termine
# Priorità: usa snow_depth da ICON Italia se disponibile, altrimenti dal modello principale
short_hourly_times = short_hourly.get("time", [])
icon_italia_hourly = icon_italia_data.get("hourly", {}) if icon_italia_data else {}
icon_italia_hourly_times = icon_italia_hourly.get("time", []) if icon_italia_hourly else []
icon_italia_snow_depth = icon_italia_hourly.get("snow_depth", []) if icon_italia_hourly else []
# Crea mappa timestamp -> snow_depth per ICON Italia (per corrispondenza esatta o approssimata)
icon_snow_depth_map = {}
if icon_italia_hourly_times and icon_italia_snow_depth:
for idx, ts in enumerate(icon_italia_hourly_times):
if idx < len(icon_italia_snow_depth) and icon_italia_snow_depth[idx] is not None:
try:
val_cm = float(icon_italia_snow_depth[idx])
if val_cm >= 0: # Solo valori validi (già in cm)
icon_snow_depth_map[ts] = val_cm
except (ValueError, TypeError):
def ensure_merged_keys(merged, daily_times, hourly_times):
for k in daily_keys:
if k == "time":
continue
while len(merged["daily"][k]) < len(merged["daily"]["time"]):
merged["daily"][k].append(None)
for k in hourly_keys:
if k == "time":
continue
while len(merged["hourly"][k]) < len(merged["hourly"]["time"]):
merged["hourly"][k].append(None)
# ---- 0-2 giorni: uno o due modelli short-term ----
if short_term_list:
if len(short_term_list) >= 2:
# Mediana ICON Italia + AROME HD; snow_depth e showers solo da ICON Italia
short_daily_by_model = [(m, d.get("daily", {}) or {}) for m, d in short_term_list]
short_hourly_by_model = [(m, d.get("hourly", {}) or {}) for m, d in short_term_list]
merged_short_daily = _merge_daily_median(short_daily_by_model, single_source_keys=DAILY_KEYS_ICON_ONLY, single_source_model="italia_meteo_arpae_icon_2i")
merged_short_hourly = _merge_hourly_median(short_hourly_by_model, single_source_keys=HOURLY_KEYS_ICON_ONLY, single_source_model="italia_meteo_arpae_icon_2i")
short_daily_times = (merged_short_daily.get("time") or [])[:cutoff_day + 1] if long_term_list else (merged_short_daily.get("time") or [])
short_hourly_times = merged_short_hourly.get("time") or []
cutoff_h = (cutoff_day + 1) * 24 if long_term_list else len(short_hourly_times)
short_hourly_times = short_hourly_times[:cutoff_h]
names_short = " + ".join(MODEL_NAMES.get(m, m) for m, _ in short_term_list[:2])
merged["models_used"].append(f"{names_short} (mediana) (0-{len(short_daily_times)}d)")
for i, day_time in enumerate(short_daily_times):
merged["daily"]["time"].append(day_time)
for key in daily_keys:
if key == "time":
continue
cutoff_hour = (cutoff_day + 1) * 24
for i, hour_time in enumerate(short_hourly_times[:cutoff_hour]):
merged["hourly"]["time"].append(hour_time)
for key in ["temperature_2m", "precipitation", "snowfall", "rain", "weathercode", "windspeed_10m", "winddirection_10m", "dewpoint_2m", "precipitation_probability", "cloud_cover", "soil_temperature_0cm"]:
val = short_hourly.get(key, [])[i] if i < len(short_hourly.get(key, [])) else None
merged["hourly"][key].append(val)
# Per snow_depth: usa ICON Italia se disponibile (corrispondenza per timestamp), altrimenti modello principale
# NOTA: I valori sono già convertiti in cm durante il recupero dall'API
val_snow_depth = None
# Cerca corrispondenza esatta per timestamp
if hour_time in icon_snow_depth_map:
# Usa snow_depth da ICON Italia per questo timestamp (già in cm)
val_snow_depth = icon_snow_depth_map[hour_time]
else:
# Fallback 1: cerca corrispondenza per ora approssimata (se i timestamp non corrispondono esattamente)
# Estrai solo la parte ora (YYYY-MM-DDTHH) per corrispondenza approssimata
hour_time_base = hour_time[:13] if len(hour_time) >= 13 else hour_time # "2025-01-09T12"
for icon_ts, icon_val in icon_snow_depth_map.items():
if icon_ts.startswith(hour_time_base):
val_snow_depth = icon_val
break
# Fallback 2: se non trovato, cerca il valore più vicino nello stesso giorno
if val_snow_depth is None and hour_time_base:
day_date_str = hour_time[:10] if len(hour_time) >= 10 else None # "2025-01-09"
if day_date_str:
# Cerca tutti i valori di ICON Italia per lo stesso giorno
same_day_values = [v for ts, v in icon_snow_depth_map.items() if ts.startswith(day_date_str)]
if same_day_values:
# Usa il primo valore disponibile per quel giorno (approssimazione)
val_snow_depth = same_day_values[0]
# Fallback 3: usa snow_depth dal modello principale se ICON Italia non disponibile
if val_snow_depth is None and i < len(short_hourly.get("snow_depth", [])):
val_snow_depth = short_hourly.get("snow_depth", [])[i]
merged["hourly"]["snow_depth"].append(val_snow_depth)
arr = merged_short_daily.get(key, [])
merged["daily"][key].append(arr[i] if i < len(arr) else None)
for i, hour_time in enumerate(short_hourly_times):
merged["hourly"]["time"].append(hour_time)
for key in hourly_keys:
if key == "time":
continue
arr = merged_short_hourly.get(key, [])
merged["hourly"][key].append(arr[i] if i < len(arr) else None)
else:
# Un solo modello short-term (es. best_match o fallback)
short_term_model, short_term_data = short_term_list[0]
short_daily = short_term_data.get("daily", {}) or {}
short_hourly = short_term_data.get("hourly", {}) or {}
short_daily_times_all = short_daily.get("time", []) or []
short_daily_times = short_daily_times_all[:cutoff_day + 1] if long_term_list else short_daily_times_all
model_display = "Best Match" if short_term_model == "best_match" else MODEL_NAMES.get(short_term_model, short_term_model)
merged["models_used"].append(f"{model_display} (0-{len(short_daily_times)}d)")
for i, day_time in enumerate(short_daily_times):
merged["daily"]["time"].append(day_time)
for key in daily_keys:
if key == "time":
continue
arr = short_daily.get(key, [])
merged["daily"][key].append(arr[i] if i < len(arr) else None)
short_hourly_times = short_hourly.get("time", []) or []
cutoff_h = (cutoff_day + 1) * 24 if long_term_list else len(short_hourly_times)
for i, hour_time in enumerate(short_hourly_times[:cutoff_h]):
merged["hourly"]["time"].append(hour_time)
for key in hourly_keys:
if key == "time":
continue
arr = short_hourly.get(key, [])
merged["hourly"][key].append(arr[i] if i < len(arr) else None)
ensure_merged_keys(merged, merged["daily"]["time"], merged["hourly"]["time"])
if long_term_data:
merged["models_used"].append(f"{MODEL_NAMES.get(long_term_model, long_term_model)} ({cutoff_day+1}-{forecast_days}d)")
long_daily = long_term_data.get("daily", {})
long_hourly = long_term_data.get("hourly", {})
# Prendi dati daily dal modello a lungo termine per i giorni successivi
long_daily_times = long_daily.get("time", [])
start_idx = cutoff_day + 1
for i in range(start_idx, min(len(long_daily_times), forecast_days)):
merged["daily"]["time"].append(long_daily_times[i])
for key in ["temperature_2m_max", "temperature_2m_min", "precipitation_sum", "precipitation_hours", "precipitation_probability_max", "snowfall_sum", "showers_sum", "rain_sum", "weathercode", "windspeed_10m_max", "windgusts_10m_max"]:
val = long_daily.get(key, [])[i] if i < len(long_daily.get(key, [])) else None
merged["daily"][key].append(val)
# Per i dati hourly, completa con dati a lungo termine se necessario
long_hourly_times = long_hourly.get("time", [])
current_hourly_count = len(merged["hourly"]["time"])
needed_hours = forecast_days * 24
if current_hourly_count < needed_hours:
start_hour_idx = current_hourly_count
# ---- 3-10 giorni: uno o più modelli long-term ----
if long_term_list:
if len(long_term_list) >= 2:
long_daily_by_model = [(m, d.get("daily", {}) or {}) for m, d in long_term_list]
long_hourly_by_model = [(m, d.get("hourly", {}) or {}) for m, d in long_term_list]
merged_long_daily = _merge_daily_median(long_daily_by_model, single_source_keys=DAILY_KEYS_ICON_ONLY, single_source_model="italia_meteo_arpae_icon_2i")
merged_long_hourly = _merge_hourly_median(long_hourly_by_model, single_source_keys=HOURLY_KEYS_ICON_ONLY, single_source_model="italia_meteo_arpae_icon_2i")
long_daily_times = merged_long_daily.get("time") or []
long_hourly_times = merged_long_hourly.get("time") or []
names_long = " + ".join(MODEL_NAMES.get(m, m) for m, _ in long_term_list[:3])
# Allinea al numero effettivo di giorni/orari short (non indice fisso): evita buco del 3° giorno
start_idx = len(merged["daily"]["time"])
start_hour_idx = len(merged["hourly"]["time"])
merged["models_used"].append(f"{names_long} (mediana) (giorno {start_idx + 1}-{forecast_days}d)")
for i, day_time in enumerate(long_daily_times):
if i < start_idx:
continue
if i >= forecast_days:
break
merged["daily"]["time"].append(day_time)
for key in daily_keys:
if key == "time":
continue
arr = merged_long_daily.get(key, [])
merged["daily"][key].append(arr[i] if i < len(arr) else None)
needed_hours = forecast_days * 24
for i in range(start_hour_idx, min(len(long_hourly_times), needed_hours)):
merged["hourly"]["time"].append(long_hourly_times[i])
for key in ["temperature_2m", "precipitation", "snowfall", "snow_depth", "rain", "weathercode", "windspeed_10m", "winddirection_10m", "dewpoint_2m", "precipitation_probability", "cloud_cover", "soil_temperature_0cm"]:
val = long_hourly.get(key, [])[i] if i < len(long_hourly.get(key, [])) else None
merged["hourly"][key].append(val)
for key in hourly_keys:
if key == "time":
continue
arr = merged_long_hourly.get(key, [])
merged["hourly"][key].append(arr[i] if i < len(arr) else None)
else:
long_term_model, long_term_data = long_term_list[0]
long_daily = long_term_data.get("daily", {}) or {}
long_hourly = long_term_data.get("hourly", {}) or {}
start_idx = len(merged["daily"]["time"])
merged["models_used"].append(f"{MODEL_NAMES.get(long_term_model, long_term_model)} (giorno {start_idx + 1}-{forecast_days}d)")
long_daily_times = long_daily.get("time", []) or []
for i in range(start_idx, min(len(long_daily_times), forecast_days)):
merged["daily"]["time"].append(long_daily_times[i])
for key in daily_keys:
if key == "time":
continue
arr = long_daily.get(key, [])
merged["daily"][key].append(arr[i] if i < len(arr) else None)
long_hourly_times = long_hourly.get("time", []) or []
current_hourly_count = len(merged["hourly"]["time"])
needed_hours = forecast_days * 24
for i in range(current_hourly_count, min(len(long_hourly_times), needed_hours)):
merged["hourly"]["time"].append(long_hourly_times[i])
for key in hourly_keys:
if key == "time":
continue
arr = long_hourly.get(key, [])
merged["hourly"][key].append(arr[i] if i < len(arr) else None)
ensure_merged_keys(merged, merged["daily"]["time"], merged["hourly"]["time"])
return merged
@@ -792,16 +932,13 @@ def analyze_daily_events(times, codes, probs, precip, winds, temps, dewpoints, s
block_precip = precip[start_idx:end_idx] if end_idx <= len(precip) else precip[start_idx:]
block_precip_clean = [p for p in block_precip if p is not None]
tot_mm = sum(block_precip_clean)
prob_block = probs[start_idx:end_idx] if end_idx <= len(probs) else probs[start_idx:]
prob_block_clean = [p for p in prob_block if p is not None]
max_prob = max(prob_block_clean) if prob_block_clean else 0
start_time = times[start_idx].split("T")[1][:5]
end_time = times[end_idx].split("T")[1][:5] if end_idx < len(times) else times[-1].split("T")[1][:5]
avg_intensity = tot_mm / len(block_precip) if block_precip else 0
events.append(
f"{current_rain_type} ({get_intensity_label(avg_intensity)}):\n"
f" 🕒 {start_time}-{end_time} | 💧 {tot_mm:.1f}mm | Prob: {max_prob}%"
f" 🕒 {start_time}-{end_time} | 💧 {tot_mm:.1f}mm"
)
start_idx = i
current_rain_type = new_type
@@ -813,16 +950,13 @@ def analyze_daily_events(times, codes, probs, precip, winds, temps, dewpoints, s
tot_mm = sum(block_precip_clean)
if tot_mm > 0:
prob_block = probs[start_idx:end_idx] if end_idx <= len(probs) else probs[start_idx:]
prob_block_clean = [p for p in prob_block if p is not None]
max_prob = max(prob_block_clean) if prob_block_clean else 0
start_time = times[start_idx].split("T")[1][:5]
end_time = times[min(end_idx-1, len(times)-1)].split("T")[1][:5]
avg_intensity = tot_mm / len(block_precip) if block_precip else 0
events.append(
f"{current_rain_type} ({get_intensity_label(avg_intensity)}):\n"
f" 🕒 {start_time}-{end_time} | 💧 {tot_mm:.1f}mm | Prob: {max_prob}%"
f" 🕒 {start_time}-{end_time} | 💧 {tot_mm:.1f}mm"
)
# 3. VENTO
@@ -1225,12 +1359,7 @@ def format_weather_context_report(models_data, location_name, country_code):
wcode = int(wcode) if wcode is not None else 0
# Calcola probabilità e tipo precipitazione per il giorno (PRIMA di determinare icona)
precip_prob = 0
precip_type = None
if d_probs and any(p is not None for p in d_probs):
prob_values = [p for p in d_probs if p is not None]
precip_prob = max(prob_values) if prob_values else 0
# Determina tipo precipitazione usando dati daily (più affidabili)
# Usa snowfall_sum, rain_sum, showers_sum per caratterizzazione precisa
# PRIORITÀ: Se snow_depth > 0 (modello italia_meteo_arpae_icon_2i), usa questi dati per determinare neve
@@ -1369,14 +1498,6 @@ def format_weather_context_report(models_data, location_name, country_code):
else:
weather_icon = "☁️" # Nuvoloso
# Recupera probabilità max daily se disponibile
prob_max_list = daily.get('precipitation_probability_max', [])
precip_prob_max = None
if count < len(prob_max_list) and prob_max_list[count] is not None:
precip_prob_max = int(prob_max_list[count])
elif precip_prob > 0:
precip_prob_max = int(precip_prob)
# Calcola spessore manto nevoso (snow_depth) per questo giorno
# Nota: snow_depth è disponibile solo per alcuni modelli (es. ICON Italia, ICON-D2)
# Se il modello non supporta snow_depth, tutti i valori saranno None o mancanti
@@ -1449,7 +1570,6 @@ def format_weather_context_report(models_data, location_name, country_code):
"t_min": t_min,
"t_max": t_max,
"precip_sum": precip_sum,
"precip_prob": precip_prob_max if precip_prob_max is not None else precip_prob,
"precip_type": precip_type,
"snowfall_sum": snowfall_sum,
"rain_sum": rain_sum,
@@ -1496,13 +1616,6 @@ def format_weather_context_report(models_data, location_name, country_code):
precip_parts.append(f"{precip_symbol} {day_info['precip_sum']:.1f}mm")
line += f" | {' + '.join(precip_parts)}"
# Aggiungi probabilità se disponibile
if day_info['precip_prob'] and day_info['precip_prob'] > 0:
line += f" ({int(day_info['precip_prob'])}%)"
elif day_info['precip_prob'] > 50:
# Probabilità alta ma nessuna precipitazione prevista (può essere un errore del modello)
line += f" | 💧 Possibile ({int(day_info['precip_prob'])}%)"
# Aggiungi vento (sempre se disponibile, formattato come direzione intensità)
if day_info['wind_max'] > 0:
File diff suppressed because it is too large Load Diff
+83 -40
View File
@@ -19,15 +19,14 @@ from open_meteo_client import configure_open_meteo_session
# snow_radar.py
#
# Scopo:
# Analizza la nevicata in una griglia di località in un raggio di 40km da San Marino.
# Per ciascuna località mostra:
# - Nome della località
# - Somma dello snowfall orario nelle 12 ore precedenti
# - Somma dello snowfall previsto nelle 12 ore successive
# - Somma dello snowfall previsto nelle 24 ore successive
# Analizza la neve in una griglia di località in un raggio di 40km da San Marino.
# Combina due parametri Open-Meteo:
# - snowfall: precipitazione nevosa (cm/h) - neve che cade
# - snow_depth: spessore manto al suolo (m) - neve accumulata, incluso residuo
# senza precipitazione (es. giorni successivi a nevicata)
#
# Modello meteo:
# meteofrance_seamless (AROME) per dati dettagliati
# italia_meteo_arpae_icon_2i (supporta snowfall e snow_depth)
#
# Token Telegram:
# Nessun token in chiaro. Lettura in ordine:
@@ -88,8 +87,14 @@ LOCATIONS = [
TZ = "Europe/Berlin"
TZINFO = ZoneInfo(TZ)
# Modello meteo
MODEL_AROME = "meteofrance_seamless"
# Modello meteo: italia_meteo_arpae_icon_2i supporta snowfall e snow_depth
# - snowfall: precipitazione nevosa (cm/h)
# - snow_depth: spessore manto al suolo (m), include neve residua anche senza precipitazione
MODEL_SNOW = "italia_meteo_arpae_icon_2i"
# Soglia minima (cm) per considerare "neve presente" - evita falsi positivi da rumore/modello
# Valori < 1 cm sono tracce trascurabili (dew, frost, errori numerici) - non neve reale
SNOW_THRESHOLD_CM = 1.0
# File di log
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
@@ -189,14 +194,16 @@ def calculate_distance_km(lat1: float, lon1: float, lat2: float, lon2: float) ->
def get_forecast(session: requests.Session, lat: float, lon: float) -> Optional[Dict]:
"""
Recupera previsioni meteo per una località.
Inclusi: snowfall (precipitazione nevosa cm/h), snow_depth (manto al suolo m).
"""
params = {
"latitude": lat,
"longitude": lon,
"hourly": "snowfall,weathercode",
"hourly": "snowfall,snow_depth,weathercode",
"timezone": TZ,
"forecast_days": 2,
"models": MODEL_AROME,
"past_days": 7,
"forecast_days": 7,
"models": MODEL_SNOW,
}
try:
@@ -217,22 +224,25 @@ def get_forecast(session: requests.Session, lat: float, lon: float) -> Optional[
def analyze_snowfall_for_location(data: Dict, now: datetime.datetime) -> Optional[Dict]:
"""
Analizza snowfall per una località.
Analizza snowfall e snow_depth per una località.
Nota: Open-Meteo fornisce principalmente dati futuri. Per i dati passati,
includiamo anche le ore appena passate se disponibili nei dati hourly.
Combina:
- snowfall: precipitazione nevosa (cm/h) - neve che cade
- snow_depth: spessore manto al suolo (m) - neve accumulata, incluso residuo senza precipitazione
Returns:
Dict con:
- snow_past_12h: somma snowfall ultime 12 ore (cm) - se disponibile nei dati
- snow_next_12h: somma snowfall prossime 12 ore (cm)
- snow_next_24h: somma snowfall prossime 24 ore (cm)
Dict con valori in cm:
- snow_past_12h: max(somma snowfall ultime 12h, snow_depth attuale)
- snow_next_12h: somma snowfall prossime 12h
- snow_next_24h: max(somma snowfall prossime 24h, snow_depth max previsto)
- snow_depth_now_cm: manto attuale al suolo (cm)
"""
hourly = data.get("hourly", {}) or {}
times = hourly.get("time", []) or []
snowfall = hourly.get("snowfall", []) or []
snow_depth_raw = hourly.get("snow_depth", []) or [] # in metri (m)
if not times or not snowfall:
if not times:
return None
# Converti timestamps
@@ -243,29 +253,54 @@ def analyze_snowfall_for_location(data: Dict, now: datetime.datetime) -> Optiona
next_12h_end = now + datetime.timedelta(hours=12)
next_24h_end = now + datetime.timedelta(hours=24)
snow_past_12h = 0.0
snow_next_12h = 0.0
snow_next_24h = 0.0
snowfall_past_12h = 0.0
snowfall_next_12h = 0.0
snowfall_next_24h = 0.0
snow_depth_now_m = 0.0
snow_depth_max_past_12h_m = 0.0
snow_depth_max_next_24h_m = 0.0
# Rumore numerico: valori < 0.01 cm (snowfall) o < 0.0001 m (snow_depth) → 0
NOISE_FLOOR_SNOWFALL_CM = 0.01
NOISE_FLOOR_SNOW_DEPTH_M = 0.0001
for i, dt in enumerate(dt_list):
snow_val = float(snowfall[i]) if i < len(snowfall) and snowfall[i] is not None else 0.0
depth_val = float(snow_depth_raw[i]) if i < len(snow_depth_raw) and snow_depth_raw[i] is not None else 0.0
if snow_val < NOISE_FLOOR_SNOWFALL_CM:
snow_val = 0.0
if depth_val < NOISE_FLOOR_SNOW_DEPTH_M:
depth_val = 0.0
# Ultime 12 ore (passato) - solo se i dati includono il passato
# Snowfall nelle finestre temporali
if dt < now and dt >= past_12h_start:
snow_past_12h += snow_val
# Prossime 12 ore
snowfall_past_12h += snow_val
snow_depth_max_past_12h_m = max(snow_depth_max_past_12h_m, depth_val)
if now <= dt < next_12h_end:
snow_next_12h += snow_val
# Prossime 24 ore
snowfall_next_12h += snow_val
if now <= dt < next_24h_end:
snow_next_24h += snow_val
snowfall_next_24h += snow_val
snow_depth_max_next_24h_m = max(snow_depth_max_next_24h_m, depth_val)
# snow_depth attuale: usa valore più vicino a "now" (ultima ora passata o prima futura)
if dt <= now:
snow_depth_now_m = depth_val
# snow_depth da m a cm
snow_depth_now_cm = snow_depth_now_m * 100.0
snow_depth_max_past_12h_cm = snow_depth_max_past_12h_m * 100.0
snow_depth_max_next_24h_cm = snow_depth_max_next_24h_m * 100.0
# Combina precipitazione + manto: per passato usa max(somma precipitazione, manto attuale)
# per futuro usa max(somma precipitazione, manto max previsto)
snow_past_12h = max(snowfall_past_12h, snow_depth_now_cm)
snow_next_24h = max(snowfall_next_24h, snow_depth_max_next_24h_cm)
return {
"snow_past_12h": snow_past_12h,
"snow_next_12h": snow_next_12h,
"snow_next_12h": snowfall_next_12h,
"snow_next_24h": snow_next_24h,
"snow_depth_now_cm": snow_depth_now_cm,
}
@@ -314,6 +349,9 @@ def generate_snow_map(results: List[Dict], center_lat: float, center_lon: float,
totals = [r.get(data_field, 0.0) for r in results]
max_total = max(totals) if totals else 1.0
min_total = min(totals) if totals else 0.0
# Evita vmin==vmax (divisione per zero nel colormap) - tutti 0 → scala 0..1
if max_total <= min_total:
max_total = max(min_total + 0.1, 1.0)
# Estrai coordinate
lats = [r["lat"] for r in results]
@@ -457,9 +495,10 @@ def generate_snow_map(results: List[Dict], center_lat: float, center_lon: float,
ax.legend(handles=legend_elements, loc='lower left', fontsize=10,
framealpha=0.95, edgecolor='black', fancybox=True, shadow=True)
# Info timestamp spostata in alto a destra
# Info timestamp spostata in alto a destra (Località con neve = solo quelle con neve sopra soglia)
now = now_local()
info_text = f"Aggiornamento: {now.strftime('%d/%m/%Y %H:%M')}\nLocalità con neve: {len(results)}"
num_with_snow = sum(1 for r in results if r.get("has_snow", False))
info_text = f"Aggiornamento: {now.strftime('%d/%m/%Y %H:%M')}\nLocalità con neve: {num_with_snow}"
ax.text(0.98, 0.98, info_text, transform=ax.transAxes,
fontsize=9, verticalalignment='top', horizontalalignment='right',
bbox=dict(boxstyle='round,pad=0.5', facecolor='white', alpha=0.9,
@@ -660,11 +699,13 @@ def main(chat_ids: Optional[List[str]] = None, debug_mode: bool = False, chat_id
continue
# Aggiungi sempre Casa, anche se non c'è neve
# Per le altre località, aggiungi solo se c'è neve (passata o prevista)
# Per le altre località, aggiungi solo se c'è neve sopra soglia (precipitazione o manto residuo)
is_casa = loc["name"] == "Casa (Strada Cà Toro)"
has_snow = (snow_analysis["snow_past_12h"] > 0.0 or
snow_analysis["snow_next_12h"] > 0.0 or
snow_analysis["snow_next_24h"] > 0.0)
has_snow = (
snow_analysis["snow_past_12h"] >= SNOW_THRESHOLD_CM or
snow_analysis["snow_next_12h"] >= SNOW_THRESHOLD_CM or
snow_analysis["snow_next_24h"] >= SNOW_THRESHOLD_CM
)
if is_casa or has_snow:
results.append({
@@ -672,6 +713,7 @@ def main(chat_ids: Optional[List[str]] = None, debug_mode: bool = False, chat_id
"lat": loc["lat"],
"lon": loc["lon"],
"distance_km": distance_km,
"has_snow": has_snow,
**snow_analysis
})
@@ -687,6 +729,7 @@ def main(chat_ids: Optional[List[str]] = None, debug_mode: bool = False, chat_id
# Genera e invia DUE mappe separate
now_str = now.strftime('%d/%m/%Y %H:%M')
num_with_snow = sum(1 for r in results if r.get("has_snow", False))
# 1. Mappa snowfall passato (12h precedenti)
map_path_past = os.path.join(BASE_DIR, "snow_radar_past.png")
@@ -698,7 +741,7 @@ def main(chat_ids: Optional[List[str]] = None, debug_mode: bool = False, chat_id
f"❄️ *SNOW RADAR - Ultime 12h*\n"
f"📍 Centro: San Marino\n"
f"🕒 {now_str}\n"
f"📊 Località con neve: {len(results)}/{len(LOCATIONS)}"
f"📊 Località con neve: {num_with_snow}/{len(LOCATIONS)}"
)
telegram_send_photo(map_path_past, caption_past, chat_ids=chat_ids)
# Pulisci file temporaneo
@@ -718,7 +761,7 @@ def main(chat_ids: Optional[List[str]] = None, debug_mode: bool = False, chat_id
f"❄️ *SNOW RADAR - Prossime 24h*\n"
f"📍 Centro: San Marino\n"
f"🕒 {now_str}\n"
f"📊 Località con neve: {len(results)}/{len(LOCATIONS)}"
f"📊 Località con neve: {num_with_snow}/{len(LOCATIONS)}"
)
telegram_send_photo(map_path_future, caption_future, chat_ids=chat_ids)
# Pulisci file temporaneo