Backup automatico script del 2026-02-08 07:00

This commit is contained in:
2026-02-08 07:00:03 +01:00
parent d79431ed28
commit 812bcd002c
8 changed files with 1911 additions and 446 deletions
+16 -1
View File
@@ -41,6 +41,7 @@ SEVERE_SCRIPT = os.path.join(SCRIPT_DIR, "severe_weather.py")
ICE_CHECK_SCRIPT = os.path.join(SCRIPT_DIR, "check_ghiaccio.py") ICE_CHECK_SCRIPT = os.path.join(SCRIPT_DIR, "check_ghiaccio.py")
IRRIGATION_SCRIPT = os.path.join(SCRIPT_DIR, "smart_irrigation_advisor.py") IRRIGATION_SCRIPT = os.path.join(SCRIPT_DIR, "smart_irrigation_advisor.py")
SNOW_RADAR_SCRIPT = os.path.join(SCRIPT_DIR, "snow_radar.py") SNOW_RADAR_SCRIPT = os.path.join(SCRIPT_DIR, "snow_radar.py")
FOTOVOLTAICO_SCRIPT = os.path.join(SCRIPT_DIR, "fotovoltaico.py")
# FILE STATO VIAGGI # FILE STATO VIAGGI
VIAGGI_STATE_FILE = os.path.join(SCRIPT_DIR, "viaggi_attivi.json") VIAGGI_STATE_FILE = os.path.join(SCRIPT_DIR, "viaggi_attivi.json")
@@ -237,6 +238,7 @@ def restricted(func):
async def wrapped(update: Update, context: ContextTypes.DEFAULT_TYPE, *args, **kwargs): async def wrapped(update: Update, context: ContextTypes.DEFAULT_TYPE, *args, **kwargs):
user_id = update.effective_user.id user_id = update.effective_user.id
if user_id not in ALLOWED_IDS: if user_id not in ALLOWED_IDS:
logger.warning("Comando da utente non in ALLOWED_IDS: user_id=%s", user_id)
return return
return await func(update, context, *args, **kwargs) return await func(update, context, *args, **kwargs)
return wrapped return wrapped
@@ -248,7 +250,7 @@ async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
[InlineKeyboardButton("🛡️ Pi-hole", callback_data="menu_pihole"), InlineKeyboardButton("🌐 Rete", callback_data="menu_net")], [InlineKeyboardButton("🛡️ Pi-hole", callback_data="menu_pihole"), InlineKeyboardButton("🌐 Rete", callback_data="menu_net")],
[InlineKeyboardButton("🌤️ Meteo Casa", callback_data="req_meteo_home"), InlineKeyboardButton("📜 Logs", callback_data="menu_logs")] [InlineKeyboardButton("🌤️ Meteo Casa", callback_data="req_meteo_home"), InlineKeyboardButton("📜 Logs", callback_data="menu_logs")]
] ]
text = "🎛 **Loogle Control Center v8.1**\nComandi disponibili:\n🔹 `/meteo <città>`\n🔹 `/meteo7 <città>` (Previsione 7gg)\n🔹 Pulsanti sotto" text = "🎛 **Loogle Control Center v8.1**\nComandi disponibili:\n🔹 `/meteo <città>`\n🔹 `/meteo7 <città>` (Previsione 7gg)\n🔹 `/fotovoltaico` (Previsione produzione FV)\n🔹 Pulsanti sotto"
if update.message: await update.message.reply_text(text, reply_markup=InlineKeyboardMarkup(keyboard), parse_mode="Markdown") if update.message: await update.message.reply_text(text, reply_markup=InlineKeyboardMarkup(keyboard), parse_mode="Markdown")
else: await update.callback_query.edit_message_text(text, reply_markup=InlineKeyboardMarkup(keyboard), parse_mode="Markdown") else: await update.callback_query.edit_message_text(text, reply_markup=InlineKeyboardMarkup(keyboard), parse_mode="Markdown")
@@ -365,6 +367,18 @@ async def snowradar_command(update: Update, context: ContextTypes.DEFAULT_TYPE)
# Avvia in background # Avvia in background
subprocess.Popen(cmd, cwd=SCRIPT_DIR) subprocess.Popen(cmd, cwd=SCRIPT_DIR)
@restricted
async def fotovoltaico_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Comando /fotovoltaico: previsione produzione 72h future e curva 48h passate (ICON Italia)."""
chat_id = str(update.effective_chat.id)
cmd = ["python3", FOTOVOLTAICO_SCRIPT, "--telegram", "--chat_id", chat_id]
await update.message.reply_text(
"☀️ **Fotovoltaico**\n\n"
"Generazione curve previsione (72h future + 48h passate)... I grafici verranno inviati a breve.",
parse_mode="Markdown"
)
subprocess.Popen(cmd, cwd=SCRIPT_DIR)
@restricted @restricted
async def irrigazione_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: async def irrigazione_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Comando /irrigazione: consulente agronomico per gestione irrigazione""" """Comando /irrigazione: consulente agronomico per gestione irrigazione"""
@@ -849,6 +863,7 @@ def main():
application.add_handler(CommandHandler("road", road_command)) application.add_handler(CommandHandler("road", road_command))
application.add_handler(CommandHandler("irrigazione", irrigazione_command)) application.add_handler(CommandHandler("irrigazione", irrigazione_command))
application.add_handler(CommandHandler("snowradar", snowradar_command)) application.add_handler(CommandHandler("snowradar", snowradar_command))
application.add_handler(CommandHandler("fotovoltaico", fotovoltaico_command))
application.add_handler(CallbackQueryHandler(button_handler)) application.add_handler(CallbackQueryHandler(button_handler))
job_queue = application.job_queue job_queue = application.job_queue
+816
View File
@@ -0,0 +1,816 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Fotovoltaico: analisi e previsione produzione con modello ICON Italia (ItaliaMeteo ARPAE).
- Previsione 72h future (Weather Forecast API)
- Previsione teorica 48h passate (Historical Forecast API) per confronto con produzione reale
- Produzione reale (opzionale) da API SolarEdge Monitoring
- Grafici per colpo d'occhio
Configurazione impianto:
- Inverter: 6 kW (SolarEdge)
- 22 pannelli Longi Solar (415 Wp ciascuno, totale 9,13 kWp)
- Orientamenti: vedi PANEL_GROUPS
SolarEdge: imposta SOLAREDGE_API_KEY e SOLAREDGE_SITE_ID (env o file .env / ~/.solaredge_fotovoltaico)
per mostrare la produzione reale sul grafico 48h passate.
"""
from __future__ import annotations
import argparse
import glob
import logging
import os
import sys
from datetime import datetime, timedelta
from io import BytesIO
from typing import Any, Dict, List, Optional, Tuple
import requests
from zoneinfo import ZoneInfo
# Grafici
import matplotlib
matplotlib.use("Agg")
import matplotlib.dates as mdates
import matplotlib.pyplot as plt
import numpy as np
from open_meteo_client import open_meteo_get
# -----------------------------------------------------------------------------
# Configurazione
# -----------------------------------------------------------------------------
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
TZ = "Europe/Rome"
TZINFO = ZoneInfo(TZ)
# Casa (stessa di meteo.py)
HOME_LAT = 43.9356
HOME_LON = 12.4296
# Impianto
INVERTER_KW = 6.0
PANEL_WP = 415.0 # Longi Solar, 415 Wp per modulo
NUM_PANELS = 22
P_PEAK_TOTAL_W = NUM_PANELS * PANEL_WP # 9130 Wp = 9,13 kWp
# Fisica termica (PVLib-style): i pannelli producono di più quando fa freddo (Wp @ 25°C).
TEMP_COEFF_POWER = -0.0035 # Longi Hi-MO ~ -0.35%/°C
NOCT = 45.0 # Nominal Operating Cell Temperature (°C)
SYSTEM_LOSSES = 0.10 # 10% perdite fisse (cavi, inverter, sporco) → PR eff. ~0.90
# Correzione empirica: impostabile via env PRODUCTION_CORRECTION_FACTOR o file.
# Se previsti < reali in modo sistematico, provare 1.21.35 (es. in ~/.solaredge_fotovoltaico).
PRODUCTION_CORRECTION_FACTOR = 1.0
# Gruppi pannelli: (tilt_deg, azimuth_compass_reale_deg, numero_pannelli).
# Azimut = bussola reale osservata (0=N, 90=E, 180=S, 270=W). Non usare i valori di monitoring.solaredge (spesso sbagliati).
# 1.1.1 a 72°; 1.1.2-10 a 158°; 1.2.1-2 a 253°; 1.2.3-10 + 1.2.12 a 72°; 1.2.11 a 190°. Due gruppi a 72° accorpati in uno.
PANEL_GROUPS = [
(34, 72, 10), # 1.1.1 + 1.2.3-10 + 1.2.12 (Est)
(34, 158, 9), # 1.1.2 - 1.1.10 (Sud-Sud-Est)
(34, 253, 2), # 1.2.1, 1.2.2 (Ovest-Sud-Ovest)
(34, 190, 1), # 1.2.11 (Sud-Sud-Ovest)
]
# API Open-Meteo: usiamo solo global_tilted_irradiance (W/m²) con &tilt= e &azimuth=.
# È l'irradianza reale sul piano del pannello (diretta+diffusa inclinate); non servono
# direct_radiation/diffuse_radiation separati. shortwave_radiation=GHI orizzontale non usato.
FORECAST_URL = "https://api.open-meteo.com/v1/forecast"
HISTORICAL_FORECAST_URL = "https://historical-forecast-api.open-meteo.com/v1/forecast"
MODEL_ICON_ITALIA = "italia_meteo_arpae_icon_2i"
MODEL_AROME_HD = "meteofrance_arome_france_hd" # Francia + limitrofi; fuori area può non restituire dati
HTTP_HEADERS = {"User-Agent": "loogle-bot-fotovoltaico/2.0"}
# Telegram
TOKEN_FILE_HOME = os.path.expanduser("~/.telegram_dpc_bot_token")
TOKEN_FILE_ETC = "/etc/telegram_dpc_bot_token"
# SolarEdge Monitoring API (opzionale: produzione reale sul grafico 48h)
# Chiave da https://monitoring.solaredge.com (Account → API Access); Site ID dal portale
SOLAREDGE_BASE = "https://monitoringapi.solaredge.com"
SOLAREDGE_SITE_ENERGY_PATH = "/site/{site_id}/energy"
SOLAREDGE_SITE_POWER_PATH = "/site/{site_id}/power"
DEFAULT_SOLAREDGE_SITE_ID = "3079750" # Usato se SOLAREDGE_SITE_ID non è impostato
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
)
logger = logging.getLogger(__name__)
def get_production_correction_factor() -> float:
"""Fattore di correzione produzione (default 1.0). Env: PRODUCTION_CORRECTION_FACTOR."""
s = (os.environ.get("PRODUCTION_CORRECTION_FACTOR") or "").strip()
if s:
try:
return max(0.1, min(3.0, float(s)))
except ValueError:
pass
for path in (
os.path.expanduser("~/.solaredge_fotovoltaico"),
os.path.join(SCRIPT_DIR, ".env"),
):
if not os.path.isfile(path):
continue
try:
with open(path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if line.startswith("PRODUCTION_CORRECTION_FACTOR="):
v = line.split("=", 1)[1].strip().strip("'\"")
if v:
return max(0.1, min(3.0, float(v)))
except Exception:
pass
return PRODUCTION_CORRECTION_FACTOR
def compass_to_open_meteo_azimuth(compass_deg: float) -> float:
"""Converte azimut compass (0=N, 90=E, 180=S, 270=W) in Open-Meteo (0=S, -90=E, 90=W, 180=N). Formula corretta: compass - 180 (non 180 - compass, altrimenti Est/Ovest si invertono)."""
return compass_deg - 180.0
def calculate_pv_power(gti: float, temp_air: float, n_panels: int) -> float:
"""Potenza DC (kW) del gruppo con fisica termica: freddo = più efficienza rispetto a STC (25°C)."""
if gti <= 0:
return 0.0
t_cell = temp_air + (gti / 800.0) * (NOCT - 20.0)
delta_t = t_cell - 25.0
temp_factor = 1.0 + (TEMP_COEFF_POWER * delta_t)
p_peak_group_kw = (n_panels * PANEL_WP) / 1000.0
p_dc = p_peak_group_kw * (gti / 1000.0) * temp_factor * (1.0 - SYSTEM_LOSSES)
return max(0.0, p_dc)
def fetch_gti_forecast(
lat: float,
lon: float,
tilt: float,
azimuth_om: float,
forecast_hours: int = 72,
past_hours: int = 0,
model: str = MODEL_ICON_ITALIA,
) -> Optional[Dict[str, Any]]:
"""Recupera GTI e temperature_2m dalla Weather Forecast API (modello specificabile)."""
params = {
"latitude": lat,
"longitude": lon,
"timezone": "auto",
"models": model,
"hourly": "global_tilted_irradiance,temperature_2m",
"tilt": tilt,
"azimuth": azimuth_om,
"forecast_hours": forecast_hours,
"past_hours": past_hours,
}
try:
r = open_meteo_get(
FORECAST_URL,
params=params,
headers=HTTP_HEADERS,
timeout=(8, 30),
)
if r.status_code != 200:
logger.warning("Forecast API status %s: %s", r.status_code, r.text[:300])
return None
return r.json()
except Exception as e:
logger.exception("Forecast API error: %s", e)
return None
def fetch_gti_historical(
lat: float,
lon: float,
tilt: float,
azimuth_om: float,
start_date: str,
end_date: str,
) -> Optional[Dict[str, Any]]:
"""Recupera GTI e temperature_2m dalla Historical Forecast API (previsione teorica passata)."""
params = {
"latitude": lat,
"longitude": lon,
"start_date": start_date,
"end_date": end_date,
"timezone": "auto",
"models": MODEL_ICON_ITALIA,
"hourly": "global_tilted_irradiance,temperature_2m",
"tilt": tilt,
"azimuth": azimuth_om,
}
try:
r = open_meteo_get(
HISTORICAL_FORECAST_URL,
params=params,
headers=HTTP_HEADERS,
timeout=(8, 45),
)
if r.status_code != 200:
logger.warning("Historical Forecast API status %s: %s", r.status_code, r.text[:300])
return None
return r.json()
except Exception as e:
logger.exception("Historical Forecast API error: %s", e)
return None
def production_from_groups(
hourly_times: List[str],
gti_per_group: List[List[Optional[float]]],
temp_per_hour: List[Optional[float]],
panel_counts: List[int],
) -> Tuple[List[float], List[float]]:
"""
Calcola la produzione per ora: ogni gruppo con GTI + temperatura aria; fisica termica
(freddo = più efficienza). Produzione totale = SOMMA gruppi, cap inverter, poi fattore correzione.
"""
total_panels = sum(panel_counts)
if total_panels != NUM_PANELS:
logger.warning(
"Somma pannelli per gruppo = %s (attesi %s); verifica PANEL_GROUPS",
total_panels, NUM_PANELS,
)
n = len(hourly_times)
gti_equivalent = []
power_kw = []
correction = get_production_correction_factor()
for i in range(n):
temp = temp_per_hour[i] if i < len(temp_per_hour) and temp_per_hour[i] is not None else 15.0
p_dc_total = 0.0
gti_sum_weighted = 0.0
for gti_list, n_panels in zip(gti_per_group, panel_counts):
if gti_list is not None and i < len(gti_list) and gti_list[i] is not None:
gti_w = gti_list[i]
p_dc_total += calculate_pv_power(gti_w, temp, n_panels)
gti_sum_weighted += gti_w * (n_panels / total_panels)
p_ac_kw = min(p_dc_total, INVERTER_KW) * correction
power_kw.append(round(p_ac_kw, 3))
gti_equivalent.append(gti_sum_weighted)
return gti_equivalent, power_kw
def fetch_forecast_72h(lat: float, lon: float) -> Optional[Tuple[List[str], List[float], List[float]]]:
"""
Ottiene previsione 72h come curve intere: oggi, domani, dopodomani (sempre 3 giorni pieni).
Indipendentemente dall'ora di chiamata, si richiedono dati da mezzanotte di oggi per 72 ore.
"""
now = datetime.now(TZINFO)
midnight_today = now.replace(hour=0, minute=0, second=0, microsecond=0)
past_hours = int((now - midnight_today).total_seconds() / 3600)
# Richiedi da mezzanotte (past_hours) + 72h future
total_hours = past_hours + 72
gti_per_orientation = []
panel_weights = []
times_ref = None
temp_per_hour = None
for tilt, azimuth_compass, count in PANEL_GROUPS:
az_om = compass_to_open_meteo_azimuth(azimuth_compass)
data = fetch_gti_forecast(
lat, lon, tilt, az_om,
forecast_hours=72,
past_hours=past_hours,
)
if not data or "hourly" not in data:
logger.warning("Forecast mancante per tilt=%s az=%s", tilt, azimuth_compass)
gti_per_orientation.append([None] * total_hours)
else:
if times_ref is None:
times_ref = data["hourly"].get("time", [])
raw = data["hourly"].get("temperature_2m")
temp_per_hour = raw if raw else [15.0] * len(times_ref)
gti = data["hourly"].get("global_tilted_irradiance")
if not gti:
gti = data["hourly"].get("global_tilted_irradiance_sum")
gti_per_orientation.append(gti or [None] * total_hours)
panel_weights.append(count)
times = times_ref or []
if not times:
return None
if temp_per_hour is None:
temp_per_hour = [15.0] * len(times)
gti_w, power_kw = production_from_groups(times, gti_per_orientation, temp_per_hour, panel_weights)
# Filtra: solo le 72 ore da mezzanotte di oggi (oggi, domani, dopodomani interi)
end_window = midnight_today + timedelta(hours=72)
dt_list = parse_times_to_datetime(times)
filtered_times = []
filtered_gti = []
filtered_power = []
for i, dt in enumerate(dt_list):
if midnight_today <= dt < end_window and len(filtered_times) < 72:
filtered_times.append(times[i])
filtered_gti.append(gti_w[i] if i < len(gti_w) else 0)
filtered_power.append(power_kw[i] if i < len(power_kw) else 0)
if not filtered_times:
return None
return filtered_times, filtered_gti, filtered_power
def _fetch_forecast_72h_for_model(
lat: float, lon: float, model: str
) -> Optional[Tuple[List[str], List[float]]]:
"""
Come fetch_forecast_72h ma per un singolo modello; ritorna (filtered_times, filtered_power)
o None se il modello non restituisce dati (es. AROME HD fuori copertura).
"""
now = datetime.now(TZINFO)
midnight_today = now.replace(hour=0, minute=0, second=0, microsecond=0)
past_hours = int((now - midnight_today).total_seconds() / 3600)
total_hours = past_hours + 72
gti_per_orientation = []
panel_weights = []
times_ref = None
temp_per_hour = None
for tilt, azimuth_compass, count in PANEL_GROUPS:
az_om = compass_to_open_meteo_azimuth(azimuth_compass)
data = fetch_gti_forecast(
lat, lon, tilt, az_om,
forecast_hours=72,
past_hours=past_hours,
model=model,
)
if not data or "hourly" not in data:
logger.warning("Forecast (%s) mancante per tilt=%s az=%s", model, tilt, azimuth_compass)
gti_per_orientation.append([None] * total_hours)
else:
if times_ref is None:
times_ref = data["hourly"].get("time", [])
raw = data["hourly"].get("temperature_2m")
temp_per_hour = raw if raw else [15.0] * len(times_ref)
gti = data["hourly"].get("global_tilted_irradiance")
if not gti:
gti = data["hourly"].get("global_tilted_irradiance_sum")
gti_per_orientation.append(gti or [None] * total_hours)
panel_weights.append(count)
times = times_ref or []
if not times:
return None
if temp_per_hour is None:
temp_per_hour = [15.0] * len(times)
gti_w, power_kw = production_from_groups(times, gti_per_orientation, temp_per_hour, panel_weights)
end_window = midnight_today + timedelta(hours=72)
dt_list = parse_times_to_datetime(times)
filtered_times = []
filtered_power = []
for i, dt in enumerate(dt_list):
if midnight_today <= dt < end_window and len(filtered_times) < 72:
filtered_times.append(times[i])
filtered_power.append(power_kw[i] if i < len(power_kw) else 0)
if not filtered_times:
return None
return filtered_times, filtered_power
def fetch_forecast_72h_multi(
lat: float, lon: float
) -> Optional[Tuple[List[str], List[Tuple[str, List[float]]]]]:
"""
Previsione 72h da più modelli (ICON Italia + AROME HD se disponibile).
Ritorna (times_72, [(label, power_list), ...]). AROME HD copre Francia e limitrofi;
fuori area può non restituire dati e viene omesso.
"""
result_icon = _fetch_forecast_72h_for_model(lat, lon, MODEL_ICON_ITALIA)
if not result_icon:
return None
ref_times, power_icon = result_icon
n = len(ref_times)
series: List[Tuple[str, List[float]]] = [("ICON Italia", power_icon)]
result_arome = _fetch_forecast_72h_for_model(lat, lon, MODEL_AROME_HD)
if result_arome:
times_arome, power_arome = result_arome
# Allinea alla griglia oraria di ref_times (AROME può avere meno ore, es. 48)
power_aligned = []
for t in ref_times:
if t in times_arome:
idx = times_arome.index(t)
power_aligned.append(power_arome[idx] if idx < len(power_arome) else 0.0)
else:
power_aligned.append(0.0)
# Includi AROME solo se ha dati utili (fuori copertura restituisce spesso tutti zero)
if sum(power_aligned) >= 0.1:
series.append(("AROME HD", power_aligned))
logger.info("AROME HD disponibile per confronto previsioni")
else:
logger.info("AROME HD senza dati utili per questa località (copertura Francia/limitrofi)")
else:
logger.info("AROME HD non disponibile per questa località (copertura Francia/limitrofi)")
return ref_times, series
def fetch_historical_48h(lat: float, lon: float) -> Optional[Tuple[List[str], List[float], List[float]]]:
"""
Ottiene previsione teorica per ieri l'altro e ieri (2 giorni pieni, senza oggi).
Usa orientamenti reali (bussola osservata) e temperature_2m per la fisica termica.
Finestra: solo ieri l'altro + ieri, così il grafico "Ultime 48h" non include la giornata odierna.
"""
today = datetime.now(TZINFO).date()
start_d = datetime.combine(today - timedelta(days=2), datetime.min.time()).replace(tzinfo=TZINFO)
end_d = datetime.combine(today - timedelta(days=1), datetime.min.time()).replace(tzinfo=TZINFO)
start_date = start_d.strftime("%Y-%m-%d")
end_date = end_d.strftime("%Y-%m-%d")
gti_per_orientation = []
panel_weights = []
times_ref = None
temp_per_hour = None
for tilt, azimuth_compass, count in PANEL_GROUPS:
az_om = compass_to_open_meteo_azimuth(azimuth_compass)
data = fetch_gti_historical(lat, lon, tilt, az_om, start_date, end_date)
if not data or "hourly" not in data:
logger.warning("Historical mancante per tilt=%s az=%s", tilt, azimuth_compass)
gti_per_orientation.append([])
else:
if times_ref is None:
times_ref = data["hourly"].get("time", [])
raw = data["hourly"].get("temperature_2m")
temp_per_hour = raw if raw else [15.0] * len(times_ref)
gti = data["hourly"].get("global_tilted_irradiance")
if not gti:
gti = data["hourly"].get("global_tilted_irradiance_sum")
gti_per_orientation.append(gti or [])
panel_weights.append(count)
times = times_ref or []
if not times or not gti_per_orientation:
return None
if temp_per_hour is None:
temp_per_hour = [15.0] * len(times)
n = len(times)
for i, gti_list in enumerate(gti_per_orientation):
if len(gti_list) < n:
gti_per_orientation[i] = gti_list + [None] * (n - len(gti_list))
elif len(gti_list) > n:
gti_per_orientation[i] = gti_list[:n]
gti_w, power_kw = production_from_groups(times, gti_per_orientation, temp_per_hour, panel_weights)
return times, gti_w, power_kw
def load_solaredge_config() -> Tuple[str, str]:
"""Ritorna (api_key, site_id). site_id usa DEFAULT_SOLAREDGE_SITE_ID se non impostato."""
api_key = (os.environ.get("SOLAREDGE_API_KEY") or "").strip()
site_id = (os.environ.get("SOLAREDGE_SITE_ID") or "").strip() or DEFAULT_SOLAREDGE_SITE_ID
if api_key and site_id:
return api_key, site_id
for path in (
os.path.expanduser("~/.solaredge_fotovoltaico"),
os.path.join(SCRIPT_DIR, ".env"),
):
if not os.path.isfile(path):
continue
try:
with open(path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line or line.startswith("#"):
continue
if "=" in line:
k, v = line.split("=", 1)
k, v = k.strip(), v.strip().strip('"\'')
if k == "SOLAREDGE_API_KEY":
api_key = api_key or v
elif k == "SOLAREDGE_SITE_ID":
site_id = site_id or v
except Exception as e:
logger.debug("Lettura %s: %s", path, e)
if api_key and site_id:
break
return api_key or "", site_id or DEFAULT_SOLAREDGE_SITE_ID
def fetch_solaredge_energy_48h(api_key: str, site_id: str) -> Optional[Tuple[List[datetime], List[float]]]:
"""Recupera produzione reale per ieri l'altro e ieri (stessa finestra del grafico 48h, senza oggi)."""
today = datetime.now(TZINFO).date()
start_date = (today - timedelta(days=2)).strftime("%Y-%m-%d")
end_date = (today - timedelta(days=1)).strftime("%Y-%m-%d")
url = SOLAREDGE_BASE + SOLAREDGE_SITE_ENERGY_PATH.format(site_id=site_id)
params = {
"api_key": api_key,
"startDate": start_date,
"endDate": end_date,
"timeUnit": "HOUR",
}
try:
r = requests.get(url, params=params, headers=HTTP_HEADERS, timeout=(8, 25))
if r.status_code != 200:
logger.warning("SolarEdge API status %s: %s", r.status_code, r.text[:300])
return None
data = r.json()
energy_block = data.get("energy", {}) or data.get("siteEnergy", {})
values = energy_block.get("values") or data.get("values")
if not values:
logger.warning("SolarEdge: nessun 'energy.values' in risposta")
return None
times_out: List[datetime] = []
power_kw_out: List[float] = []
for item in values:
val = item.get("value")
if val is None:
continue
date_str = item.get("date") or item.get("time") or ""
try:
if "T" in date_str or " " in date_str:
dt = datetime.fromisoformat(date_str.replace("Z", "+00:00").strip())
else:
dt = datetime.strptime(date_str[:10], "%Y-%m-%d").replace(tzinfo=TZINFO)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=TZINFO)
except Exception:
continue
power_kw = float(val) / 1000.0
times_out.append(dt)
power_kw_out.append(round(power_kw, 3))
if not times_out:
return None
return times_out, power_kw_out
except Exception as e:
logger.exception("SolarEdge API error: %s", e)
return None
def kwh_per_day_from_series(
times: List[str],
power_kw: List[float],
) -> List[Tuple[str, float]]:
"""Raggruppa power_kw (kWh/ora) per data e ritorna [(data dd/mm, kwh), ...]."""
from collections import defaultdict
dt_list = parse_times_to_datetime(times)
by_date: Dict[str, float] = defaultdict(float)
for i, dt in enumerate(dt_list):
if i < len(power_kw):
key = dt.strftime("%Y-%m-%d")
by_date[key] += power_kw[i]
out: List[Tuple[str, float]] = []
for d_str in sorted(by_date.keys()):
d = datetime.strptime(d_str, "%Y-%m-%d").date()
label = d.strftime("%d/%m")
out.append((label, round(by_date[d_str], 1)))
return out
def parse_times_to_datetime(times: List[str]) -> List[datetime]:
out = []
for t in times:
try:
if "T" in t:
dt = datetime.fromisoformat(t.replace("Z", "+00:00"))
else:
dt = datetime.strptime(t, "%Y-%m-%d").replace(tzinfo=TZINFO)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=TZINFO)
out.append(dt)
except Exception:
out.append(datetime.now(TZINFO))
return out
def plot_past_48h(
times: List[str],
power_kw: List[float],
title_suffix: str = "",
real_times: Optional[List[datetime]] = None,
real_power_kw: Optional[List[float]] = None,
) -> bytes:
"""Genera il grafico in memoria e ritorna i byte PNG (non salva su disco)."""
fig, ax = plt.subplots(figsize=(10, 4.5))
dt_list = parse_times_to_datetime(times)
ax.fill_between(dt_list, 0, power_kw, alpha=0.5, color="steelblue")
ax.plot(dt_list, power_kw, color="navy", linewidth=1.2, label="Previsione (ICON Italia)")
if real_times and real_power_kw and len(real_times) == len(real_power_kw):
ax.plot(real_times, real_power_kw, color="darkorange", linewidth=1.4, label="Produzione reale (SolarEdge)")
ax.axhline(y=INVERTER_KW, color="red", linestyle="--", alpha=0.7, label=f"Limite inverter {INVERTER_KW} kW")
ax.set_ylabel("Potenza AC (kW)")
ax.set_xlabel("Data / Ora")
ax.set_title(f"Produzione Ieri l'altro e Ieri{title_suffix}")
ax.legend(loc="upper right")
all_vals = list(power_kw) + (list(real_power_kw) if real_power_kw else [])
ax.set_ylim(0, max(INVERTER_KW * 1.05, max(all_vals) * 1.1) if all_vals else 7)
ax.xaxis.set_major_formatter(mdates.DateFormatter("%d/%m %H:%M", tz=TZINFO))
ax.xaxis.set_major_locator(mdates.HourLocator(interval=6))
plt.xticks(rotation=25)
plt.tight_layout()
buf = BytesIO()
plt.savefig(buf, format="png", dpi=120)
plt.close()
return buf.getvalue()
def plot_future_72h(
times: List[str],
power_series: List[Tuple[str, List[float]]],
title_suffix: str = "",
) -> bytes:
"""
Genera il grafico in memoria. power_series: [(label, power_kw_list), ...];
la prima serie ha anche fill_between, le altre solo linea (confronto modelli).
"""
if not power_series:
plt.close("all")
return b""
fig, ax = plt.subplots(figsize=(10, 4.5))
dt_list = parse_times_to_datetime(times)
colors = ["green", "blue", "purple"]
all_vals = []
for i, (label, power_kw) in enumerate(power_series):
if len(power_kw) != len(dt_list):
power_kw = (power_kw + [0.0] * len(dt_list))[: len(dt_list)]
all_vals.extend(power_kw)
color = colors[i % len(colors)]
if i == 0:
ax.fill_between(dt_list, 0, power_kw, alpha=0.4, color=color)
ax.plot(dt_list, power_kw, color=color, linewidth=1.2, label=label)
ax.axhline(y=INVERTER_KW, color="red", linestyle="--", alpha=0.7, label=f"Limite inverter {INVERTER_KW} kW")
ax.set_ylabel("Potenza AC (kW)")
ax.set_xlabel("Data / Ora")
ax.set_title(f"Previsione produzione Oggi, Domani, Dopodomani{title_suffix}")
ax.legend(loc="upper right")
ax.set_ylim(0, max(INVERTER_KW * 1.05, max(all_vals) * 1.1) if all_vals else 7)
ax.xaxis.set_major_formatter(mdates.DateFormatter("%d/%m %H:%M", tz=TZINFO))
ax.xaxis.set_major_locator(mdates.HourLocator(interval=6))
plt.xticks(rotation=25)
plt.tight_layout()
buf = BytesIO()
plt.savefig(buf, format="png", dpi=120)
plt.close()
return buf.getvalue()
def load_bot_token() -> str:
tok = os.environ.get("TELEGRAM_BOT_TOKEN", "").strip() or os.environ.get("BOT_TOKEN", "").strip()
if tok:
return tok
for path in (TOKEN_FILE_HOME, TOKEN_FILE_ETC):
try:
with open(path, "r", encoding="utf-8") as f:
t = f.read().strip()
if t:
return t
except FileNotFoundError:
continue
return ""
def telegram_send_photo(photo_bytes: bytes, caption: str, chat_id: str) -> bool:
"""Invia una foto a Telegram da bytes (nessun file su disco)."""
token = load_bot_token()
if not token:
logger.warning("Token Telegram mancante")
return False
url = f"https://api.telegram.org/bot{token}/sendPhoto"
try:
r = requests.post(
url,
data={"chat_id": chat_id, "caption": caption, "parse_mode": "Markdown"},
files={"photo": ("grafico.png", photo_bytes, "image/png")},
timeout=20,
)
if r.status_code != 200:
logger.error("Telegram sendPhoto %s: %s", r.status_code, r.text[:400])
return False
return True
except Exception as e:
logger.exception("Telegram sendPhoto: %s", e)
return False
def telegram_send_message(text: str, chat_id: str) -> bool:
token = load_bot_token()
if not token:
return False
url = f"https://api.telegram.org/bot{token}/sendMessage"
try:
r = requests.post(
url,
json={"chat_id": chat_id, "text": text, "parse_mode": "Markdown"},
timeout=15,
)
return r.status_code == 200
except Exception as e:
logger.exception("Telegram sendMessage: %s", e)
return False
def main() -> int:
parser = argparse.ArgumentParser(description="Previsione e analisi produzione fotovoltaico (ICON Italia)")
parser.add_argument("--lat", type=float, default=HOME_LAT, help="Latitudine")
parser.add_argument("--lon", type=float, default=HOME_LON, help="Longitudine")
parser.add_argument("--telegram", action="store_true", help="Invia grafici e messaggio a Telegram")
parser.add_argument("--chat_id", type=str, default="", help="Chat ID per invio Telegram")
parser.add_argument("--no-past", action="store_true", help="Salta grafico 48h passate")
parser.add_argument("--no-future", action="store_true", help="Salta grafico 72h future")
args = parser.parse_args()
lat, lon = args.lat, args.lon
send_telegram = args.telegram and args.chat_id.strip()
# Rimuovi eventuali grafici fotovoltaico salvati in precedenza (non ne salviamo più)
for old in glob.glob(os.path.join(SCRIPT_DIR, "fotovoltaico_*.png")):
try:
os.remove(old)
logger.info("Rimosso %s", old)
except OSError as e:
logger.warning("Impossibile rimuovere %s: %s", old, e)
hist: Optional[Tuple[List[str], List[float], List[float]]] = None
fore: Optional[Tuple[List[str], List[float], List[float]]] = None
solaredge_api_key, solaredge_site_id = load_solaredge_config()
real_48h: Optional[Tuple[List[datetime], List[float]]] = None
if solaredge_api_key and solaredge_site_id and not args.no_past:
logger.info("Recupero produzione reale SolarEdge (48h)...")
real_48h = fetch_solaredge_energy_48h(solaredge_api_key, solaredge_site_id)
if not real_48h:
logger.warning("SolarEdge: dati 48h non disponibili (controlla API key e Site ID)")
past_ok = False
if not args.no_past:
logger.info(
"Pannelli: %s gruppi (tilt, azimut°, n) = %s",
len(PANEL_GROUPS),
[(t, a, n) for t, a, n in PANEL_GROUPS],
)
logger.info("Recupero dati Historical Forecast (48h passate)...")
hist = fetch_historical_48h(lat, lon)
if hist:
times_past, _, power_past = hist
past_days_debug = kwh_per_day_from_series(hist[0], hist[2])
logger.info("Historical 48h kWh per giorno (previsione): %s", past_days_debug)
real_t = (real_48h[0], real_48h[1]) if real_48h else (None, None)
img_past = plot_past_48h(times_past, power_past, real_times=real_t[0], real_power_kw=real_t[1])
past_ok = True
if send_telegram:
caption = "📊 *Produzione Ieri l'altro e Ieri* (ICON Italia" + (" + SolarEdge reale" if real_48h else "") + ")"
telegram_send_photo(img_past, caption, args.chat_id)
else:
logger.warning("Nessun dato Historical Forecast per le 48h passate")
future_ok = False
fore_multi: Optional[Tuple[List[str], List[Tuple[str, List[float]]]]] = None
if not args.no_future:
logger.info("Recupero dati Forecast (72h future, ICON + AROME HD)...")
fore_multi = fetch_forecast_72h_multi(lat, lon)
if fore_multi:
times_fut, power_series = fore_multi
img_fut = plot_future_72h(times_fut, power_series)
future_ok = True
if send_telegram:
models_label = " / ".join(s[0] for s in power_series)
telegram_send_photo(
img_fut,
f"☀️ *Previsione Oggi, Domani, Dopodomani* ({models_label})",
args.chat_id,
)
else:
logger.warning("Nessun dato Forecast per le 72h future")
if send_telegram:
lines = ["🖥 *Fotovoltaico*", ""]
if past_ok and hist:
past_days = kwh_per_day_from_series(hist[0], hist[2])
real_days = None
if real_48h:
real_days = kwh_per_day_from_series(
[t.isoformat() for t in real_48h[0]],
real_48h[1],
)
real_by_date = {d: k for d, k in (real_days or [])}
lines.append("📊 *Ultimi 2 giorni*")
for label, kwh in past_days:
part = f" {label} · prev. {kwh} kWh"
if label in real_by_date:
part += f" · _reale_ {real_by_date[label]} kWh"
lines.append(part)
lines.append("")
if future_ok and fore_multi:
times_fut, power_series = fore_multi
fut_days_per_model = [
(name, kwh_per_day_from_series(times_fut, power_list))
for name, power_list in power_series
]
lines.append("☀️ *Prossimi 3 giorni*")
if len(fut_days_per_model) == 1:
for label, kwh in fut_days_per_model[0][1]:
lines.append(f" {label} · {kwh} kWh")
else:
dates = sorted({d for _, days_list in fut_days_per_model for d, _ in days_list})
for d in dates:
parts = [f" {d}"]
for name, days_list in fut_days_per_model:
val = next((k for lbl, k in days_list if lbl == d), None)
if val is not None:
short = "ICON" if "ICON" in name else "AROME"
parts.append(f"{short} {val}")
lines.append(" · ".join(parts) + " kWh")
lines.append("")
models_footer = " · ".join(s[0] for s in power_series)
lines.append(f"_Modello: {models_footer}_")
telegram_send_message("\n".join(lines), args.chat_id)
if not past_ok and not future_ok:
logger.error("Nessun dato disponibile")
return 1
return 0
if __name__ == "__main__":
sys.exit(main())
+5
View File
@@ -19,6 +19,9 @@ EXCLUDED_FILES = {
"road_weather.log", "road_weather.log",
"snow_radar.log", "snow_radar.log",
} }
# Log irrigazione: aggiornato solo quando lo script viene eseguito (cron --auto o /irrigazione).
# Se non c’è un cron giornaliero, il file può restare “non aggiornato” per giorni (normale in inverno).
STALE_EXCLUDE_BASENAMES = {"irrigation_advisor.log"}
TOKEN_FILE_HOME = os.path.expanduser("~/.telegram_dpc_bot_token") TOKEN_FILE_HOME = os.path.expanduser("~/.telegram_dpc_bot_token")
TOKEN_FILE_ETC = "/etc/telegram_dpc_bot_token" TOKEN_FILE_ETC = "/etc/telegram_dpc_bot_token"
@@ -125,6 +128,8 @@ def analyze_logs(files: List[str], since: datetime.datetime, max_lines: int) ->
break break
# Verifica se il log è "stale" (non aggiornato da più di 24 ore) # Verifica se il log è "stale" (non aggiornato da più di 24 ore)
if os.path.basename(path) in STALE_EXCLUDE_BASENAMES:
continue # Non segnalare come stale (es. irrigazione: aggiornato solo a ogni run)
if last_ts: if last_ts:
hours_since = (now - last_ts).total_seconds() / 3600.0 hours_since = (now - last_ts).total_seconds() / 3600.0
if hours_since > 24: if hours_since > 24:
+37 -61
View File
@@ -18,14 +18,10 @@ logger = logging.getLogger(__name__)
# --- CONFIGURAZIONE METEO --- # --- CONFIGURAZIONE METEO ---
HOME_LAT = 43.9356 HOME_LAT = 43.9356
HOME_LON = 12.4296 HOME_LON = 12.4296
HOME_NAME = "🏠 Casa (Wide View ±12km)" HOME_NAME = "🏠 Casa"
TZ = "Europe/Berlin" TZ = "Europe/Berlin"
TZINFO = ZoneInfo(TZ) TZINFO = ZoneInfo(TZ)
# Offset ~12-15km per i 5 punti
OFFSET_LAT = 0.12
OFFSET_LON = 0.16
OPEN_METEO_URL = "https://api.open-meteo.com/v1/forecast" OPEN_METEO_URL = "https://api.open-meteo.com/v1/forecast"
GEOCODING_URL = "https://geocoding-api.open-meteo.com/v1/search" GEOCODING_URL = "https://geocoding-api.open-meteo.com/v1/search"
HTTP_HEADERS = {"User-Agent": "loogle-bot-v10.5"} HTTP_HEADERS = {"User-Agent": "loogle-bot-v10.5"}
@@ -176,12 +172,12 @@ def get_coordinates(city_name: str):
def choose_best_model(lat, lon, cc, is_home=False): def choose_best_model(lat, lon, cc, is_home=False):
""" """
Sceglie il modello meteo. Sceglie il modello meteo.
- Per Casa: usa AROME Seamless (ha snowfall) - Per Casa: usa ICON Italia (ARPAE 2i) - migliore risoluzione spaziale per Italia/San Marino.
- Per altre località: usa best match di Open-Meteo (senza specificare models) - Per altre località: usa best match di Open-Meteo (senza specificare models)
""" """
if is_home: if is_home:
# Per Casa, usa AROME Seamless (ha snowfall e dati dettagliati) # Per Casa, usa ICON Italia (risoluzione spaziale migliore per Italia/San Marino)
return "meteofrance_seamless", "AROME HD" return "italia_meteo_arpae_icon_2i", "ICON Italia"
else: else:
# Per query worldwide, usa best match (Open-Meteo sceglie automaticamente) # Per query worldwide, usa best match (Open-Meteo sceglie automaticamente)
return None, "Best Match" return None, "Best Match"
@@ -189,7 +185,7 @@ def choose_best_model(lat, lon, cc, is_home=False):
def get_forecast(lat, lon, model=None, is_home=False, timezone=None, retry_after_60s=False): def get_forecast(lat, lon, model=None, is_home=False, timezone=None, retry_after_60s=False):
""" """
Recupera forecast. Se model è None, usa best match di Open-Meteo. Recupera forecast. Se model è None, usa best match di Open-Meteo.
Per Casa (is_home=True), usa AROME Seamless. Per Casa (is_home=True), usa ICON Italia.
Args: Args:
retry_after_60s: Se True, attende 10 secondi prima di riprovare (per retry) retry_after_60s: Se True, attende 10 secondi prima di riprovare (per retry)
@@ -202,21 +198,15 @@ def get_forecast(lat, lon, model=None, is_home=False, timezone=None, retry_after
logger.info("Attendo 10 secondi prima del retry...") logger.info("Attendo 10 secondi prima del retry...")
time.sleep(10) time.sleep(10)
# Generiamo 5 punti: Centro, N, S, E, W # Singola coordinata: cielo sopra il punto richiesto (casa o località).
lats = [lat, lat + OFFSET_LAT, lat - OFFSET_LAT, lat, lat]
lons = [lon, lon, lon, lon + OFFSET_LON, lon - OFFSET_LON]
lat_str = ",".join(map(str, lats))
lon_str = ",".join(map(str, lons))
params = { params = {
"latitude": lat_str, "longitude": lon_str, "timezone": tz_to_use, "latitude": lat, "longitude": lon, "timezone": tz_to_use,
"forecast_days": 3, "forecast_days": 3,
"wind_speed_unit": "kmh", "precipitation_unit": "mm", "wind_speed_unit": "kmh", "precipitation_unit": "mm",
"hourly": "temperature_2m,apparent_temperature,relative_humidity_2m,cloud_cover,cloud_cover_low,cloud_cover_mid,cloud_cover_high,windspeed_10m,winddirection_10m,windgusts_10m,precipitation,rain,snowfall,weathercode,is_day,cape,visibility,uv_index" "hourly": "temperature_2m,apparent_temperature,relative_humidity_2m,cloud_cover,cloud_cover_low,cloud_cover_mid,cloud_cover_high,windspeed_10m,winddirection_10m,windgusts_10m,precipitation,rain,showers,snowfall,weathercode,is_day,cape,visibility,uv_index"
} }
# Aggiungi models solo se specificato (per Casa usa AROME, per altre località best match) # Aggiungi models solo se specificato (per Casa usa ICON Italia, per altre località best match)
if model: if model:
params["models"] = model params["models"] = model
@@ -241,11 +231,7 @@ def get_forecast(lat, lon, model=None, is_home=False, timezone=None, retry_after
logger.error(f"API Error {error_details}") logger.error(f"API Error {error_details}")
return None, error_details # Restituisce anche i dettagli dell'errore return None, error_details # Restituisce anche i dettagli dell'errore
response_data = r.json() response_data = r.json()
logger.info("get_forecast ok model=%s points=5 elapsed=%.2fs", model or "best_match", time.time() - t0) logger.info("get_forecast ok model=%s elapsed=%.2fs", model or "best_match", time.time() - t0)
# Open-Meteo per multiple locations (lat/lon separati da virgola) restituisce
# direttamente un dict con "hourly", "daily", etc. che contiene liste di valori
# per ogni location. Per semplicità, restituiamo il dict così com'è
# e lo gestiamo nel codice chiamante
return response_data, None return response_data, None
except requests.exceptions.Timeout as e: except requests.exceptions.Timeout as e:
error_details = f"Timeout dopo 20s: {str(e)}" error_details = f"Timeout dopo 20s: {str(e)}"
@@ -323,14 +309,14 @@ def generate_weather_report(lat, lon, location_name, debug_mode=False, cc="IT",
# Tentativo 1: Richiesta iniziale # Tentativo 1: Richiesta iniziale
data_list, error_details = get_forecast(lat, lon, model_id, is_home=is_home, timezone=tz_to_use, retry_after_60s=False) data_list, error_details = get_forecast(lat, lon, model_id, is_home=is_home, timezone=tz_to_use, retry_after_60s=False)
# Se fallisce e siamo a Casa con AROME, prova retry dopo 10 secondi # Se fallisce e siamo a Casa con ICON Italia, prova retry dopo 10 secondi
if not data_list and is_home and model_id == "meteofrance_seamless": if not data_list and is_home and model_id == "italia_meteo_arpae_icon_2i":
logger.warning(f"Primo tentativo AROME fallito: {error_details}. Retry dopo 10 secondi...") logger.warning(f"Primo tentativo ICON Italia fallito: {error_details}. Retry dopo 10 secondi...")
data_list, error_details = get_forecast(lat, lon, model_id, is_home=is_home, timezone=tz_to_use, retry_after_60s=True) data_list, error_details = get_forecast(lat, lon, model_id, is_home=is_home, timezone=tz_to_use, retry_after_60s=True)
# Se ancora fallisce e siamo a Casa, fallback a best match # Se ancora fallisce e siamo a Casa, fallback a best match
if not data_list and is_home: if not data_list and is_home:
logger.warning(f"AROME fallito anche dopo retry: {error_details}. Fallback a best match...") logger.warning(f"ICON Italia fallito anche dopo retry: {error_details}. Fallback a best match...")
data_list, error_details = get_forecast(lat, lon, None, is_home=False, timezone=tz_to_use, retry_after_60s=False) data_list, error_details = get_forecast(lat, lon, None, is_home=False, timezone=tz_to_use, retry_after_60s=False)
if data_list: if data_list:
model_name = "Best Match (fallback)" model_name = "Best Match (fallback)"
@@ -345,7 +331,6 @@ def generate_weather_report(lat, lon, location_name, debug_mode=False, cc="IT",
if not isinstance(data_list, list): data_list = [data_list] if not isinstance(data_list, list): data_list = [data_list]
# Punto centrale (Casa) per dati specifici
data_center = data_list[0] data_center = data_list[0]
hourly_c = data_center.get("hourly", {}) hourly_c = data_center.get("hourly", {})
times = hourly_c.get("time", []) times = hourly_c.get("time", [])
@@ -353,12 +338,13 @@ def generate_weather_report(lat, lon, location_name, debug_mode=False, cc="IT",
L = len(times) L = len(times)
# --- DATI LOCALI (CASA) --- # --- DATI LOCALI ---
l_temp = safe_get_list(hourly_c, "temperature_2m", L, 0) l_temp = safe_get_list(hourly_c, "temperature_2m", L, 0)
l_app = safe_get_list(hourly_c, "apparent_temperature", L, 0) l_app = safe_get_list(hourly_c, "apparent_temperature", L, 0)
l_rh = safe_get_list(hourly_c, "relative_humidity_2m", L, 50) l_rh = safe_get_list(hourly_c, "relative_humidity_2m", L, 50)
l_prec = safe_get_list(hourly_c, "precipitation", L, 0) l_prec = safe_get_list(hourly_c, "precipitation", L, 0)
l_rain = safe_get_list(hourly_c, "rain", L, 0) l_rain = safe_get_list(hourly_c, "rain", L, 0)
l_showers = safe_get_list(hourly_c, "showers", L, 0)
l_snow = safe_get_list(hourly_c, "snowfall", L, 0) l_snow = safe_get_list(hourly_c, "snowfall", L, 0)
l_wspd = safe_get_list(hourly_c, "windspeed_10m", L, 0) l_wspd = safe_get_list(hourly_c, "windspeed_10m", L, 0)
l_gust = safe_get_list(hourly_c, "windgusts_10m", L, 0) l_gust = safe_get_list(hourly_c, "windgusts_10m", L, 0)
@@ -369,8 +355,8 @@ def generate_weather_report(lat, lon, location_name, debug_mode=False, cc="IT",
l_vis = safe_get_list(hourly_c, "visibility", L, 10000) l_vis = safe_get_list(hourly_c, "visibility", L, 10000)
l_uv = safe_get_list(hourly_c, "uv_index", L, 0) l_uv = safe_get_list(hourly_c, "uv_index", L, 0)
# Se è Casa e AROME non fornisce visibilità (tutti None), recuperala da best match # Se è Casa e ICON Italia non fornisce visibilità (tutti None), recuperala da best match
if is_home and model_id == "meteofrance_seamless": if is_home and model_id == "italia_meteo_arpae_icon_2i":
vis_check = [v for v in l_vis if v is not None] vis_check = [v for v in l_vis if v is not None]
if not vis_check: # Tutti None, recupera da best match if not vis_check: # Tutti None, recupera da best match
vis_data = get_visibility_forecast(lat, lon) vis_data = get_visibility_forecast(lat, lon)
@@ -383,26 +369,14 @@ def generate_weather_report(lat, lon, location_name, debug_mode=False, cc="IT",
l_cl_mid_loc = safe_get_list(hourly_c, "cloud_cover_mid", L, 0) l_cl_mid_loc = safe_get_list(hourly_c, "cloud_cover_mid", L, 0)
l_cl_hig_loc = safe_get_list(hourly_c, "cloud_cover_high", L, 0) l_cl_hig_loc = safe_get_list(hourly_c, "cloud_cover_high", L, 0)
# --- DATI GLOBALI (MEDIA 5 PUNTI) --- # Nuvolosità (stesso punto della località)
acc_cl_tot = [0.0] * L avg_cl_tot = []
points_cl_tot = [ [] for _ in range(L) ] for i in range(L):
cc = get_val(l_cl_tot_loc[i], 0)
for d in data_list: cl = get_val(l_cl_low_loc[i], 0)
h = d.get("hourly", {}) cm = get_val(l_cl_mid_loc[i], 0)
for i in range(L): ch = get_val(l_cl_hig_loc[i], 0)
cc = get_val(safe_get_list(h, "cloud_cover", L)[i]) avg_cl_tot.append(max(cc, cl, cm, ch))
cl = get_val(safe_get_list(h, "cloud_cover_low", L)[i])
cm = get_val(safe_get_list(h, "cloud_cover_mid", L)[i])
ch = get_val(safe_get_list(h, "cloud_cover_high", L)[i])
# Calcolo robusto del totale per singolo punto
real_point_total = max(cc, cl, cm, ch)
acc_cl_tot[i] += real_point_total
points_cl_tot[i].append(real_point_total)
num_points = len(data_list)
avg_cl_tot = [x / num_points for x in acc_cl_tot]
# --- DEBUG MODE --- # --- DEBUG MODE ---
if debug_mode: if debug_mode:
@@ -420,8 +394,8 @@ def generate_weather_report(lat, lon, location_name, debug_mode=False, cc="IT",
code_now = int(get_val(l_code[idx])) code_now = int(get_val(l_code[idx]))
output += f"Ora: {parse_time(times[idx]).strftime('%H:%M')}\n" output += f"Ora: {parse_time(times[idx]).strftime('%H:%M')}\n"
output += f"📍 **LOCALE (Casa)**: L:{int(loc_L)}% | H:{int(loc_H)}%\n" output += f"📍 **LOCALE**: L:{int(loc_L)}% | H:{int(loc_H)}%\n"
output += f"🌍 **MEDIA GLOBALE**: {int(avg_cl_tot[idx])}%\n" output += f"☁️ **Nv%**: {int(avg_cl_tot[idx])}%\n"
output += f"❄️ **NEVE**: Codice={code_now}, Accumulo={get_val(l_snow[idx])}cm\n" output += f"❄️ **NEVE**: Codice={code_now}, Accumulo={get_val(l_snow[idx])}cm\n"
decision = "H" decision = "H"
@@ -521,17 +495,19 @@ def generate_weather_report(lat, lon, location_name, debug_mode=False, cc="IT",
Sn = get_val(l_snow[idx], 0) Sn = get_val(l_snow[idx], 0)
Code = int(get_val(l_code[idx], 0)) Code = int(get_val(l_code[idx], 0))
Rain = get_val(l_rain[idx], 0) Rain = get_val(l_rain[idx], 0)
Showers = get_val(l_showers[idx], 0) if idx < len(l_showers) else 0
# Per modelli che espongono rain+showers (es. ICON Italia), usa il totale se precipitation è assente/zero
Pr_display = max(Pr, Rain + Showers)
# Determina se è neve # Determina se è neve
is_snowing = Sn > 0 or (Code in [71, 73, 75, 77, 85, 86]) is_snowing = Sn > 0 or (Code in [71, 73, 75, 77, 85, 86])
# Formattazione MM # Formattazione MM: 0 se nulla, altrimenti il valore (i modelli danno 0 o il valore orario)
p_suffix = "" p_suffix = ""
if Code in [96, 99]: p_suffix = "G" if Code in [96, 99]: p_suffix = "G"
elif Code in [66, 67]: p_suffix = "Z" elif Code in [66, 67]: p_suffix = "Z"
elif is_snowing and Pr >= 0.2: p_suffix = "N" elif is_snowing and Pr_display > 0: p_suffix = "N"
p_s = "0" if Pr_display <= 0 else f"{int(round(Pr_display))}{p_suffix}"
p_s = "--" if Pr < 0.2 else f"{int(round(Pr))}{p_suffix}"
# --- CLOUD LOGIC --- # --- CLOUD LOGIC ---
Cl = int(get_val(l_cl_tot_loc[idx], 0)) Cl = int(get_val(l_cl_tot_loc[idx], 0))
@@ -587,10 +563,10 @@ def generate_weather_report(lat, lon, location_name, debug_mode=False, cc="IT",
w_fmt = f"{w_txt:<5}" w_fmt = f"{w_txt:<5}"
# --- ICONE --- # --- ICONE ---
sky, sgx = get_icon_set(Pr, Sn, Code, IsDay, Cl, Vis, T, Rain, Gust, Cape, dominant_type) sky, sgx = get_icon_set(Pr_display, Sn, Code, IsDay, Cl, Vis, T, Rain, Gust, Cape, dominant_type)
# Se c'è precipitazione nevosa, mostra ❄️ nella colonna Sk (invece di 🌨️) # Se c'è precipitazione nevosa, mostra ❄️ nella colonna Sk (invece di 🌨️)
if is_snowing and Pr >= 0.2: if is_snowing and Pr_display > 0:
sky = "❄️" sky = "❄️"
sky_fmt = f"{sky}{uv_suffix}" sky_fmt = f"{sky}{uv_suffix}"
@@ -615,7 +591,7 @@ if __name__ == "__main__":
args_parser = argparse.ArgumentParser() args_parser = argparse.ArgumentParser()
args_parser.add_argument("--query", help="Nome città") args_parser.add_argument("--query", help="Nome città")
args_parser.add_argument("--home", action="store_true", help="Usa coordinate casa") args_parser.add_argument("--home", action="store_true", help="Usa coordinate casa")
args_parser.add_argument("--debug", action="store_true", help="Mostra i valori dei 5 punti") args_parser.add_argument("--debug", action="store_true", help="Mostra dettaglio debug (nuvole, neve)")
args_parser.add_argument("--chat_id", help="Chat ID Telegram per invio diretto (opzionale, può essere multiplo separato da virgola)") args_parser.add_argument("--chat_id", help="Chat ID Telegram per invio diretto (opzionale, può essere multiplo separato da virgola)")
args_parser.add_argument("--timezone", help="Timezone IANA (es: Europe/Rome, America/New_York)") args_parser.add_argument("--timezone", help="Timezone IANA (es: Europe/Rome, America/New_York)")
args = args_parser.parse_args() args = args_parser.parse_args()
+48 -70
View File
@@ -47,6 +47,7 @@ MODEL_NAMES = {
"icon_d2": "ICON-D2", "icon_d2": "ICON-D2",
"gfs_global": "GFS", "gfs_global": "GFS",
"ecmwf_ifs04": "ECMWF", "ecmwf_ifs04": "ECMWF",
"ecmwf_ifs": "ECMWF IFS",
"jma_msm": "JMA MSM", "jma_msm": "JMA MSM",
"metno_nordic": "Yr.no", "metno_nordic": "Yr.no",
"ukmo_global": "UK MetOffice", "ukmo_global": "UK MetOffice",
@@ -57,26 +58,18 @@ MODEL_NAMES = {
def choose_models_by_country(cc, is_home=False): def choose_models_by_country(cc, is_home=False):
""" """
Seleziona modelli meteo ottimali. Seleziona modelli meteo ottimali.
- Per Casa: usa AROME Seamless e ICON-D2 (alta risoluzione) - Per Casa e Italia: solo ICON Italia (ARPAE 2i); AROME HD non copre San Marino.
- Per Italia: usa italia_meteo_arpae_icon_2i (include snow_depth quando > 0) - Per altre locali: usa best match di Open-Meteo (senza specificare models).
- Per altre località: usa best match di Open-Meteo (senza specificare models)
Ritorna (short_term_models, long_term_models) Ritorna (short_term_models, long_term_models)
""" """
cc = cc.upper() if cc else "UNKNOWN" cc = cc.upper() if cc else "UNKNOWN"
# Modelli a lungo termine (sempre globali, funzionano ovunque)
long_term_default = ["gfs_global", "ecmwf_ifs04"] long_term_default = ["gfs_global", "ecmwf_ifs04"]
if is_home: if is_home or cc == "IT":
# Per Casa, usa AROME Seamless, ICON-D2 e ICON Italia (alta risoluzione europea) # ICON Italia (072h) + ECMWF IFS per i giorni successivi (dove Icon Italia non arriva)
# ICON Italia include snow_depth quando disponibile (> 0) return ["italia_meteo_arpae_icon_2i"], ["ecmwf_ifs"]
return ["meteofrance_seamless", "icon_d2", "italia_meteo_arpae_icon_2i"], long_term_default
elif cc == "IT":
# Per Italia, usa ICON Italia (ARPAE 2i) che include snow_depth quando disponibile
return ["italia_meteo_arpae_icon_2i"], long_term_default
else: else:
# Per query worldwide, usa best match (Open-Meteo sceglie automaticamente)
# Ritorna None per indicare best match
return None, long_term_default return None, long_term_default
def get_bot_token(): def get_bot_token():
@@ -127,9 +120,9 @@ def get_weather_multi_model(lat, lon, short_term_models, long_term_models, forec
url = "https://api.open-meteo.com/v1/forecast" url = "https://api.open-meteo.com/v1/forecast"
params = { params = {
"latitude": lat, "longitude": lon, "latitude": lat, "longitude": lon,
"hourly": "temperature_2m,precipitation_probability,precipitation,snowfall,rain,snow_depth,weathercode,windspeed_10m,windgusts_10m,winddirection_10m,dewpoint_2m,relative_humidity_2m,cloud_cover,soil_temperature_0cm", "hourly": "temperature_2m,precipitation,snowfall,rain,snow_depth,weathercode,windspeed_10m,windgusts_10m,winddirection_10m,dewpoint_2m,relative_humidity_2m,cloud_cover,soil_temperature_0cm",
"daily": "temperature_2m_max,temperature_2m_min,precipitation_sum,precipitation_hours,precipitation_probability_max,snowfall_sum,showers_sum,rain_sum,weathercode,winddirection_10m_dominant,windspeed_10m_max,windgusts_10m_max", "daily": "temperature_2m_max,temperature_2m_min,precipitation_sum,precipitation_hours,snowfall_sum,showers_sum,rain_sum,weathercode,winddirection_10m_dominant,windspeed_10m_max,windgusts_10m_max",
"timezone": timezone if timezone else TZ_STR, "forecast_days": min(forecast_days, 3) # Limita a 3 giorni per modelli ad alta risoluzione "timezone": timezone if timezone else TZ_STR, "forecast_days": min(forecast_days, 3)
} }
try: try:
resp = open_meteo_get(url, params=params, timeout=(5, 20)) resp = open_meteo_get(url, params=params, timeout=(5, 20))
@@ -163,14 +156,19 @@ def get_weather_multi_model(lat, lon, short_term_models, long_term_models, forec
# Modelli specifici (per Casa: AROME + ICON, per Italia: ICON ARPAE) # Modelli specifici (per Casa: AROME + ICON, per Italia: ICON ARPAE)
for model in short_term_models: for model in short_term_models:
url = "https://api.open-meteo.com/v1/forecast" url = "https://api.open-meteo.com/v1/forecast"
# Per italia_meteo_arpae_icon_2i, includi sempre snow_depth (supportato quando > 0) # ICON Italia (ARPAE 2i): parametri come da API, senza precipitation_probability
hourly_params = "temperature_2m,precipitation_probability,precipitation,snowfall,rain,snow_depth,weathercode,windspeed_10m,windgusts_10m,winddirection_10m,dewpoint_2m,relative_humidity_2m,cloud_cover,soil_temperature_0cm" if model == "italia_meteo_arpae_icon_2i":
hourly_params = "rain,showers,snowfall,snow_depth,precipitation,temperature_2m,weathercode,windspeed_10m,winddirection_10m"
daily_params = "temperature_2m_max,temperature_2m_min,showers_sum,rain_sum,snowfall_sum,precipitation_sum,precipitation_hours,weathercode,winddirection_10m_dominant,windspeed_10m_max,windgusts_10m_max"
else:
hourly_params = "temperature_2m,precipitation,snowfall,rain,snow_depth,weathercode,windspeed_10m,windgusts_10m,winddirection_10m,dewpoint_2m,relative_humidity_2m,cloud_cover,soil_temperature_0cm"
daily_params = "temperature_2m_max,temperature_2m_min,precipitation_sum,precipitation_hours,snowfall_sum,showers_sum,rain_sum,weathercode,winddirection_10m_dominant,windspeed_10m_max,windgusts_10m_max"
params = { params = {
"latitude": lat, "longitude": lon, "latitude": lat, "longitude": lon,
"hourly": hourly_params, "hourly": hourly_params,
"daily": "temperature_2m_max,temperature_2m_min,precipitation_sum,precipitation_hours,precipitation_probability_max,snowfall_sum,showers_sum,rain_sum,weathercode,winddirection_10m_dominant,windspeed_10m_max,windgusts_10m_max", "daily": daily_params,
"timezone": timezone if timezone else TZ_STR, "models": model, "forecast_days": min(forecast_days, 3) # Limita a 3 giorni per modelli ad alta risoluzione "timezone": timezone if timezone else TZ_STR, "models": model,
"forecast_days": min(forecast_days, 7) if model == "italia_meteo_arpae_icon_2i" else min(forecast_days, 3)
} }
try: try:
resp = open_meteo_get(url, params=params, timeout=(5, 20)) resp = open_meteo_get(url, params=params, timeout=(5, 20))
@@ -220,13 +218,20 @@ def get_weather_multi_model(lat, lon, short_term_models, long_term_models, forec
except: except:
results[model] = None results[model] = None
# Recupera modelli a lungo termine (globale, fino a 10 giorni) # Recupera modelli a lungo termine (dopo 72h, dove Icon Italia non arriva)
for model in long_term_models: for model in (long_term_models or []):
url = "https://api.open-meteo.com/v1/forecast" url = "https://api.open-meteo.com/v1/forecast"
# ECMWF IFS: parametri come da API (rain, showers, snowfall) + campi necessari per il report
if model == "ecmwf_ifs":
hourly_params = "rain,showers,snowfall,precipitation,temperature_2m,weathercode,windspeed_10m,winddirection_10m"
daily_params = "temperature_2m_max,temperature_2m_min,showers_sum,rain_sum,snowfall_sum,precipitation_sum,precipitation_hours,weathercode,winddirection_10m_dominant,windspeed_10m_max,windgusts_10m_max"
else:
hourly_params = "temperature_2m,precipitation,snowfall,rain,snow_depth,weathercode,windspeed_10m,windgusts_10m,winddirection_10m,dewpoint_2m,relative_humidity_2m,cloud_cover,soil_temperature_0cm"
daily_params = "temperature_2m_max,temperature_2m_min,precipitation_sum,precipitation_hours,snowfall_sum,showers_sum,rain_sum,weathercode,winddirection_10m_dominant,windspeed_10m_max,windgusts_10m_max"
params = { params = {
"latitude": lat, "longitude": lon, "latitude": lat, "longitude": lon,
"hourly": "temperature_2m,precipitation_probability,precipitation,snowfall,rain,snow_depth,weathercode,windspeed_10m,windgusts_10m,winddirection_10m,dewpoint_2m,relative_humidity_2m,cloud_cover,soil_temperature_0cm", "hourly": hourly_params,
"daily": "temperature_2m_max,temperature_2m_min,precipitation_sum,precipitation_hours,precipitation_probability_max,snowfall_sum,showers_sum,rain_sum,weathercode,winddirection_10m_dominant,windspeed_10m_max,windgusts_10m_max", "daily": daily_params,
"timezone": timezone if timezone else TZ_STR, "models": model, "forecast_days": forecast_days "timezone": timezone if timezone else TZ_STR, "models": model, "forecast_days": forecast_days
} }
try: try:
@@ -269,11 +274,11 @@ def merge_multi_model_forecast(models_data, forecast_days=10):
"temperature_2m_min": [], "temperature_2m_min": [],
"precipitation_sum": [], "precipitation_sum": [],
"precipitation_hours": [], "precipitation_hours": [],
"precipitation_probability_max": [],
"snowfall_sum": [], "snowfall_sum": [],
"showers_sum": [], "showers_sum": [],
"rain_sum": [], "rain_sum": [],
"weathercode": [], "weathercode": [],
"winddirection_10m_dominant": [],
"windspeed_10m_max": [], "windspeed_10m_max": [],
"windgusts_10m_max": [] "windgusts_10m_max": []
}, },
@@ -288,7 +293,6 @@ def merge_multi_model_forecast(models_data, forecast_days=10):
"windspeed_10m": [], "windspeed_10m": [],
"winddirection_10m": [], "winddirection_10m": [],
"dewpoint_2m": [], "dewpoint_2m": [],
"precipitation_probability": [],
"cloud_cover": [], "cloud_cover": [],
"soil_temperature_0cm": [] "soil_temperature_0cm": []
}, },
@@ -358,36 +362,37 @@ def merge_multi_model_forecast(models_data, forecast_days=10):
model_display = "Best Match" model_display = "Best Match"
else: else:
model_display = MODEL_NAMES.get(short_term_model, short_term_model) model_display = MODEL_NAMES.get(short_term_model, short_term_model)
short_daily = short_term_data.get("daily", {})
short_hourly = short_term_data.get("hourly", {})
# Prendi dati daily: tutti i giorni se è l'unico modello, altrimenti primi cutoff_day+1
short_daily_times_all = short_daily.get("time", [])
short_daily_times = short_daily_times_all[:cutoff_day+1] if long_term_data else short_daily_times_all
# Verifica se ICON Italia ha dati di snow_depth (controllo diretto, non solo il flag) # Verifica se ICON Italia ha dati di snow_depth (controllo diretto, non solo il flag)
has_icon_snow_depth = False has_icon_snow_depth = False
if icon_italia_data: if icon_italia_data:
icon_hourly = icon_italia_data.get("hourly", {}) icon_hourly = icon_italia_data.get("hourly", {})
icon_snow_depth = icon_hourly.get("snow_depth", []) if icon_hourly else [] icon_snow_depth = icon_hourly.get("snow_depth", []) if icon_hourly else []
# Verifica se ci sono dati non-null di snow_depth
if icon_snow_depth: if icon_snow_depth:
for sd in icon_snow_depth[:72]: # Controlla prime 72h for sd in icon_snow_depth[:72]: # Controlla prime 72h
if sd is not None: if sd is not None:
try: try:
if float(sd) > 0: # Anche valori piccoli if float(sd) > 0:
has_icon_snow_depth = True has_icon_snow_depth = True
break break
except (ValueError, TypeError): except (ValueError, TypeError):
continue continue
# Se ICON Italia ha dati di snow_depth, aggiungilo ai modelli usati num_days = len(short_daily_times)
if has_icon_snow_depth or (icon_italia_data and icon_italia_data.get("has_snow_depth_data")): if has_icon_snow_depth or (icon_italia_data and icon_italia_data.get("has_snow_depth_data")):
icon_display = MODEL_NAMES.get("italia_meteo_arpae_icon_2i", "ICON Italia") icon_display = MODEL_NAMES.get("italia_meteo_arpae_icon_2i", "ICON Italia")
merged["models_used"].append(f"{model_display} + {icon_display} (snow_depth) (0-{cutoff_day+1}d)") merged["models_used"].append(f"{model_display} + {icon_display} (snow_depth) (0-{num_days}d)")
else: else:
merged["models_used"].append(f"{model_display} (0-{cutoff_day+1}d)") merged["models_used"].append(f"{model_display} (0-{num_days}d)")
short_daily = short_term_data.get("daily", {})
short_hourly = short_term_data.get("hourly", {})
# Prendi dati daily dai primi giorni del modello a breve termine
short_daily_times = short_daily.get("time", [])[:cutoff_day+1]
for i, day_time in enumerate(short_daily_times): for i, day_time in enumerate(short_daily_times):
merged["daily"]["time"].append(day_time) merged["daily"]["time"].append(day_time)
for key in ["temperature_2m_max", "temperature_2m_min", "precipitation_sum", "precipitation_hours", "precipitation_probability_max", "snowfall_sum", "showers_sum", "rain_sum", "weathercode", "windspeed_10m_max", "windgusts_10m_max"]: for key in ["temperature_2m_max", "temperature_2m_min", "precipitation_sum", "precipitation_hours", "snowfall_sum", "showers_sum", "rain_sum", "weathercode", "winddirection_10m_dominant", "windspeed_10m_max", "windgusts_10m_max"]:
val = short_daily.get(key, [])[i] if i < len(short_daily.get(key, [])) else None val = short_daily.get(key, [])[i] if i < len(short_daily.get(key, [])) else None
merged["daily"][key].append(val) merged["daily"][key].append(val)
@@ -409,10 +414,10 @@ def merge_multi_model_forecast(models_data, forecast_days=10):
except (ValueError, TypeError): except (ValueError, TypeError):
continue continue
cutoff_hour = (cutoff_day + 1) * 24 cutoff_hour = (cutoff_day + 1) * 24 if long_term_data else len(short_hourly_times)
for i, hour_time in enumerate(short_hourly_times[:cutoff_hour]): for i, hour_time in enumerate(short_hourly_times[:cutoff_hour]):
merged["hourly"]["time"].append(hour_time) merged["hourly"]["time"].append(hour_time)
for key in ["temperature_2m", "precipitation", "snowfall", "rain", "weathercode", "windspeed_10m", "winddirection_10m", "dewpoint_2m", "precipitation_probability", "cloud_cover", "soil_temperature_0cm"]: for key in ["temperature_2m", "precipitation", "snowfall", "rain", "weathercode", "windspeed_10m", "winddirection_10m", "dewpoint_2m", "cloud_cover", "soil_temperature_0cm"]:
val = short_hourly.get(key, [])[i] if i < len(short_hourly.get(key, [])) else None val = short_hourly.get(key, [])[i] if i < len(short_hourly.get(key, [])) else None
merged["hourly"][key].append(val) merged["hourly"][key].append(val)
# Per snow_depth: usa ICON Italia se disponibile (corrispondenza per timestamp), altrimenti modello principale # Per snow_depth: usa ICON Italia se disponibile (corrispondenza per timestamp), altrimenti modello principale
@@ -455,7 +460,7 @@ def merge_multi_model_forecast(models_data, forecast_days=10):
for i in range(start_idx, min(len(long_daily_times), forecast_days)): for i in range(start_idx, min(len(long_daily_times), forecast_days)):
merged["daily"]["time"].append(long_daily_times[i]) merged["daily"]["time"].append(long_daily_times[i])
for key in ["temperature_2m_max", "temperature_2m_min", "precipitation_sum", "precipitation_hours", "precipitation_probability_max", "snowfall_sum", "showers_sum", "rain_sum", "weathercode", "windspeed_10m_max", "windgusts_10m_max"]: for key in ["temperature_2m_max", "temperature_2m_min", "precipitation_sum", "precipitation_hours", "snowfall_sum", "showers_sum", "rain_sum", "weathercode", "winddirection_10m_dominant", "windspeed_10m_max", "windgusts_10m_max"]:
val = long_daily.get(key, [])[i] if i < len(long_daily.get(key, [])) else None val = long_daily.get(key, [])[i] if i < len(long_daily.get(key, [])) else None
merged["daily"][key].append(val) merged["daily"][key].append(val)
@@ -468,7 +473,7 @@ def merge_multi_model_forecast(models_data, forecast_days=10):
start_hour_idx = current_hourly_count start_hour_idx = current_hourly_count
for i in range(start_hour_idx, min(len(long_hourly_times), needed_hours)): for i in range(start_hour_idx, min(len(long_hourly_times), needed_hours)):
merged["hourly"]["time"].append(long_hourly_times[i]) merged["hourly"]["time"].append(long_hourly_times[i])
for key in ["temperature_2m", "precipitation", "snowfall", "snow_depth", "rain", "weathercode", "windspeed_10m", "winddirection_10m", "dewpoint_2m", "precipitation_probability", "cloud_cover", "soil_temperature_0cm"]: for key in ["temperature_2m", "precipitation", "snowfall", "snow_depth", "rain", "weathercode", "windspeed_10m", "winddirection_10m", "dewpoint_2m", "cloud_cover", "soil_temperature_0cm"]:
val = long_hourly.get(key, [])[i] if i < len(long_hourly.get(key, [])) else None val = long_hourly.get(key, [])[i] if i < len(long_hourly.get(key, [])) else None
merged["hourly"][key].append(val) merged["hourly"][key].append(val)
@@ -792,16 +797,13 @@ def analyze_daily_events(times, codes, probs, precip, winds, temps, dewpoints, s
block_precip = precip[start_idx:end_idx] if end_idx <= len(precip) else precip[start_idx:] block_precip = precip[start_idx:end_idx] if end_idx <= len(precip) else precip[start_idx:]
block_precip_clean = [p for p in block_precip if p is not None] block_precip_clean = [p for p in block_precip if p is not None]
tot_mm = sum(block_precip_clean) tot_mm = sum(block_precip_clean)
prob_block = probs[start_idx:end_idx] if end_idx <= len(probs) else probs[start_idx:]
prob_block_clean = [p for p in prob_block if p is not None]
max_prob = max(prob_block_clean) if prob_block_clean else 0
start_time = times[start_idx].split("T")[1][:5] start_time = times[start_idx].split("T")[1][:5]
end_time = times[end_idx].split("T")[1][:5] if end_idx < len(times) else times[-1].split("T")[1][:5] end_time = times[end_idx].split("T")[1][:5] if end_idx < len(times) else times[-1].split("T")[1][:5]
avg_intensity = tot_mm / len(block_precip) if block_precip else 0 avg_intensity = tot_mm / len(block_precip) if block_precip else 0
events.append( events.append(
f"{current_rain_type} ({get_intensity_label(avg_intensity)}):\n" f"{current_rain_type} ({get_intensity_label(avg_intensity)}):\n"
f" 🕒 {start_time}-{end_time} | 💧 {tot_mm:.1f}mm | Prob: {max_prob}%" f" 🕒 {start_time}-{end_time} | 💧 {tot_mm:.1f}mm"
) )
start_idx = i start_idx = i
current_rain_type = new_type current_rain_type = new_type
@@ -813,16 +815,13 @@ def analyze_daily_events(times, codes, probs, precip, winds, temps, dewpoints, s
tot_mm = sum(block_precip_clean) tot_mm = sum(block_precip_clean)
if tot_mm > 0: if tot_mm > 0:
prob_block = probs[start_idx:end_idx] if end_idx <= len(probs) else probs[start_idx:]
prob_block_clean = [p for p in prob_block if p is not None]
max_prob = max(prob_block_clean) if prob_block_clean else 0
start_time = times[start_idx].split("T")[1][:5] start_time = times[start_idx].split("T")[1][:5]
end_time = times[min(end_idx-1, len(times)-1)].split("T")[1][:5] end_time = times[min(end_idx-1, len(times)-1)].split("T")[1][:5]
avg_intensity = tot_mm / len(block_precip) if block_precip else 0 avg_intensity = tot_mm / len(block_precip) if block_precip else 0
events.append( events.append(
f"{current_rain_type} ({get_intensity_label(avg_intensity)}):\n" f"{current_rain_type} ({get_intensity_label(avg_intensity)}):\n"
f" 🕒 {start_time}-{end_time} | 💧 {tot_mm:.1f}mm | Prob: {max_prob}%" f" 🕒 {start_time}-{end_time} | 💧 {tot_mm:.1f}mm"
) )
# 3. VENTO # 3. VENTO
@@ -1225,12 +1224,7 @@ def format_weather_context_report(models_data, location_name, country_code):
wcode = int(wcode) if wcode is not None else 0 wcode = int(wcode) if wcode is not None else 0
# Calcola probabilità e tipo precipitazione per il giorno (PRIMA di determinare icona) # Calcola probabilità e tipo precipitazione per il giorno (PRIMA di determinare icona)
precip_prob = 0
precip_type = None precip_type = None
if d_probs and any(p is not None for p in d_probs):
prob_values = [p for p in d_probs if p is not None]
precip_prob = max(prob_values) if prob_values else 0
# Determina tipo precipitazione usando dati daily (più affidabili) # Determina tipo precipitazione usando dati daily (più affidabili)
# Usa snowfall_sum, rain_sum, showers_sum per caratterizzazione precisa # Usa snowfall_sum, rain_sum, showers_sum per caratterizzazione precisa
# PRIORITÀ: Se snow_depth > 0 (modello italia_meteo_arpae_icon_2i), usa questi dati per determinare neve # PRIORITÀ: Se snow_depth > 0 (modello italia_meteo_arpae_icon_2i), usa questi dati per determinare neve
@@ -1369,14 +1363,6 @@ def format_weather_context_report(models_data, location_name, country_code):
else: else:
weather_icon = "☁️" # Nuvoloso weather_icon = "☁️" # Nuvoloso
# Recupera probabilità max daily se disponibile
prob_max_list = daily.get('precipitation_probability_max', [])
precip_prob_max = None
if count < len(prob_max_list) and prob_max_list[count] is not None:
precip_prob_max = int(prob_max_list[count])
elif precip_prob > 0:
precip_prob_max = int(precip_prob)
# Calcola spessore manto nevoso (snow_depth) per questo giorno # Calcola spessore manto nevoso (snow_depth) per questo giorno
# Nota: snow_depth è disponibile solo per alcuni modelli (es. ICON Italia, ICON-D2) # Nota: snow_depth è disponibile solo per alcuni modelli (es. ICON Italia, ICON-D2)
# Se il modello non supporta snow_depth, tutti i valori saranno None o mancanti # Se il modello non supporta snow_depth, tutti i valori saranno None o mancanti
@@ -1449,7 +1435,6 @@ def format_weather_context_report(models_data, location_name, country_code):
"t_min": t_min, "t_min": t_min,
"t_max": t_max, "t_max": t_max,
"precip_sum": precip_sum, "precip_sum": precip_sum,
"precip_prob": precip_prob_max if precip_prob_max is not None else precip_prob,
"precip_type": precip_type, "precip_type": precip_type,
"snowfall_sum": snowfall_sum, "snowfall_sum": snowfall_sum,
"rain_sum": rain_sum, "rain_sum": rain_sum,
@@ -1497,13 +1482,6 @@ def format_weather_context_report(models_data, location_name, country_code):
line += f" | {' + '.join(precip_parts)}" line += f" | {' + '.join(precip_parts)}"
# Aggiungi probabilità se disponibile
if day_info['precip_prob'] and day_info['precip_prob'] > 0:
line += f" ({int(day_info['precip_prob'])}%)"
elif day_info['precip_prob'] > 50:
# Probabilità alta ma nessuna precipitazione prevista (può essere un errore del modello)
line += f" | 💧 Possibile ({int(day_info['precip_prob'])}%)"
# Aggiungi vento (sempre se disponibile, formattato come direzione intensità) # Aggiungi vento (sempre se disponibile, formattato come direzione intensità)
if day_info['wind_max'] > 0: if day_info['wind_max'] > 0:
wind_str = f"{day_info['wind_dir']} {day_info['wind_max']:.0f}km/h" wind_str = f"{day_info['wind_dir']} {day_info['wind_max']:.0f}km/h"
File diff suppressed because it is too large Load Diff
+83 -40
View File
@@ -19,15 +19,14 @@ from open_meteo_client import configure_open_meteo_session
# snow_radar.py # snow_radar.py
# #
# Scopo: # Scopo:
# Analizza la nevicata in una griglia di località in un raggio di 40km da San Marino. # Analizza la neve in una griglia di località in un raggio di 40km da San Marino.
# Per ciascuna località mostra: # Combina due parametri Open-Meteo:
# - Nome della località # - snowfall: precipitazione nevosa (cm/h) - neve che cade
# - Somma dello snowfall orario nelle 12 ore precedenti # - snow_depth: spessore manto al suolo (m) - neve accumulata, incluso residuo
# - Somma dello snowfall previsto nelle 12 ore successive # senza precipitazione (es. giorni successivi a nevicata)
# - Somma dello snowfall previsto nelle 24 ore successive
# #
# Modello meteo: # Modello meteo:
# meteofrance_seamless (AROME) per dati dettagliati # italia_meteo_arpae_icon_2i (supporta snowfall e snow_depth)
# #
# Token Telegram: # Token Telegram:
# Nessun token in chiaro. Lettura in ordine: # Nessun token in chiaro. Lettura in ordine:
@@ -88,8 +87,14 @@ LOCATIONS = [
TZ = "Europe/Berlin" TZ = "Europe/Berlin"
TZINFO = ZoneInfo(TZ) TZINFO = ZoneInfo(TZ)
# Modello meteo # Modello meteo: italia_meteo_arpae_icon_2i supporta snowfall e snow_depth
MODEL_AROME = "meteofrance_seamless" # - snowfall: precipitazione nevosa (cm/h)
# - snow_depth: spessore manto al suolo (m), include neve residua anche senza precipitazione
MODEL_SNOW = "italia_meteo_arpae_icon_2i"
# Soglia minima (cm) per considerare "neve presente" - evita falsi positivi da rumore/modello
# Valori < 1 cm sono tracce trascurabili (dew, frost, errori numerici) - non neve reale
SNOW_THRESHOLD_CM = 1.0
# File di log # File di log
BASE_DIR = os.path.dirname(os.path.abspath(__file__)) BASE_DIR = os.path.dirname(os.path.abspath(__file__))
@@ -189,14 +194,16 @@ def calculate_distance_km(lat1: float, lon1: float, lat2: float, lon2: float) ->
def get_forecast(session: requests.Session, lat: float, lon: float) -> Optional[Dict]: def get_forecast(session: requests.Session, lat: float, lon: float) -> Optional[Dict]:
""" """
Recupera previsioni meteo per una località. Recupera previsioni meteo per una località.
Inclusi: snowfall (precipitazione nevosa cm/h), snow_depth (manto al suolo m).
""" """
params = { params = {
"latitude": lat, "latitude": lat,
"longitude": lon, "longitude": lon,
"hourly": "snowfall,weathercode", "hourly": "snowfall,snow_depth,weathercode",
"timezone": TZ, "timezone": TZ,
"forecast_days": 2, "past_days": 7,
"models": MODEL_AROME, "forecast_days": 7,
"models": MODEL_SNOW,
} }
try: try:
@@ -217,22 +224,25 @@ def get_forecast(session: requests.Session, lat: float, lon: float) -> Optional[
def analyze_snowfall_for_location(data: Dict, now: datetime.datetime) -> Optional[Dict]: def analyze_snowfall_for_location(data: Dict, now: datetime.datetime) -> Optional[Dict]:
""" """
Analizza snowfall per una località. Analizza snowfall e snow_depth per una località.
Nota: Open-Meteo fornisce principalmente dati futuri. Per i dati passati, Combina:
includiamo anche le ore appena passate se disponibili nei dati hourly. - snowfall: precipitazione nevosa (cm/h) - neve che cade
- snow_depth: spessore manto al suolo (m) - neve accumulata, incluso residuo senza precipitazione
Returns: Returns:
Dict con: Dict con valori in cm:
- snow_past_12h: somma snowfall ultime 12 ore (cm) - se disponibile nei dati - snow_past_12h: max(somma snowfall ultime 12h, snow_depth attuale)
- snow_next_12h: somma snowfall prossime 12 ore (cm) - snow_next_12h: somma snowfall prossime 12h
- snow_next_24h: somma snowfall prossime 24 ore (cm) - snow_next_24h: max(somma snowfall prossime 24h, snow_depth max previsto)
- snow_depth_now_cm: manto attuale al suolo (cm)
""" """
hourly = data.get("hourly", {}) or {} hourly = data.get("hourly", {}) or {}
times = hourly.get("time", []) or [] times = hourly.get("time", []) or []
snowfall = hourly.get("snowfall", []) or [] snowfall = hourly.get("snowfall", []) or []
snow_depth_raw = hourly.get("snow_depth", []) or [] # in metri (m)
if not times or not snowfall: if not times:
return None return None
# Converti timestamps # Converti timestamps
@@ -243,29 +253,54 @@ def analyze_snowfall_for_location(data: Dict, now: datetime.datetime) -> Optiona
next_12h_end = now + datetime.timedelta(hours=12) next_12h_end = now + datetime.timedelta(hours=12)
next_24h_end = now + datetime.timedelta(hours=24) next_24h_end = now + datetime.timedelta(hours=24)
snow_past_12h = 0.0 snowfall_past_12h = 0.0
snow_next_12h = 0.0 snowfall_next_12h = 0.0
snow_next_24h = 0.0 snowfall_next_24h = 0.0
snow_depth_now_m = 0.0
snow_depth_max_past_12h_m = 0.0
snow_depth_max_next_24h_m = 0.0
# Rumore numerico: valori < 0.01 cm (snowfall) o < 0.0001 m (snow_depth) → 0
NOISE_FLOOR_SNOWFALL_CM = 0.01
NOISE_FLOOR_SNOW_DEPTH_M = 0.0001
for i, dt in enumerate(dt_list): for i, dt in enumerate(dt_list):
snow_val = float(snowfall[i]) if i < len(snowfall) and snowfall[i] is not None else 0.0 snow_val = float(snowfall[i]) if i < len(snowfall) and snowfall[i] is not None else 0.0
depth_val = float(snow_depth_raw[i]) if i < len(snow_depth_raw) and snow_depth_raw[i] is not None else 0.0
if snow_val < NOISE_FLOOR_SNOWFALL_CM:
snow_val = 0.0
if depth_val < NOISE_FLOOR_SNOW_DEPTH_M:
depth_val = 0.0
# Ultime 12 ore (passato) - solo se i dati includono il passato # Snowfall nelle finestre temporali
if dt < now and dt >= past_12h_start: if dt < now and dt >= past_12h_start:
snow_past_12h += snow_val snowfall_past_12h += snow_val
snow_depth_max_past_12h_m = max(snow_depth_max_past_12h_m, depth_val)
# Prossime 12 ore
if now <= dt < next_12h_end: if now <= dt < next_12h_end:
snow_next_12h += snow_val snowfall_next_12h += snow_val
# Prossime 24 ore
if now <= dt < next_24h_end: if now <= dt < next_24h_end:
snow_next_24h += snow_val snowfall_next_24h += snow_val
snow_depth_max_next_24h_m = max(snow_depth_max_next_24h_m, depth_val)
# snow_depth attuale: usa valore più vicino a "now" (ultima ora passata o prima futura)
if dt <= now:
snow_depth_now_m = depth_val
# snow_depth da m a cm
snow_depth_now_cm = snow_depth_now_m * 100.0
snow_depth_max_past_12h_cm = snow_depth_max_past_12h_m * 100.0
snow_depth_max_next_24h_cm = snow_depth_max_next_24h_m * 100.0
# Combina precipitazione + manto: per passato usa max(somma precipitazione, manto attuale)
# per futuro usa max(somma precipitazione, manto max previsto)
snow_past_12h = max(snowfall_past_12h, snow_depth_now_cm)
snow_next_24h = max(snowfall_next_24h, snow_depth_max_next_24h_cm)
return { return {
"snow_past_12h": snow_past_12h, "snow_past_12h": snow_past_12h,
"snow_next_12h": snow_next_12h, "snow_next_12h": snowfall_next_12h,
"snow_next_24h": snow_next_24h, "snow_next_24h": snow_next_24h,
"snow_depth_now_cm": snow_depth_now_cm,
} }
@@ -314,6 +349,9 @@ def generate_snow_map(results: List[Dict], center_lat: float, center_lon: float,
totals = [r.get(data_field, 0.0) for r in results] totals = [r.get(data_field, 0.0) for r in results]
max_total = max(totals) if totals else 1.0 max_total = max(totals) if totals else 1.0
min_total = min(totals) if totals else 0.0 min_total = min(totals) if totals else 0.0
# Evita vmin==vmax (divisione per zero nel colormap) - tutti 0 → scala 0..1
if max_total <= min_total:
max_total = max(min_total + 0.1, 1.0)
# Estrai coordinate # Estrai coordinate
lats = [r["lat"] for r in results] lats = [r["lat"] for r in results]
@@ -457,9 +495,10 @@ def generate_snow_map(results: List[Dict], center_lat: float, center_lon: float,
ax.legend(handles=legend_elements, loc='lower left', fontsize=10, ax.legend(handles=legend_elements, loc='lower left', fontsize=10,
framealpha=0.95, edgecolor='black', fancybox=True, shadow=True) framealpha=0.95, edgecolor='black', fancybox=True, shadow=True)
# Info timestamp spostata in alto a destra # Info timestamp spostata in alto a destra (Località con neve = solo quelle con neve sopra soglia)
now = now_local() now = now_local()
info_text = f"Aggiornamento: {now.strftime('%d/%m/%Y %H:%M')}\nLocalità con neve: {len(results)}" num_with_snow = sum(1 for r in results if r.get("has_snow", False))
info_text = f"Aggiornamento: {now.strftime('%d/%m/%Y %H:%M')}\nLocalità con neve: {num_with_snow}"
ax.text(0.98, 0.98, info_text, transform=ax.transAxes, ax.text(0.98, 0.98, info_text, transform=ax.transAxes,
fontsize=9, verticalalignment='top', horizontalalignment='right', fontsize=9, verticalalignment='top', horizontalalignment='right',
bbox=dict(boxstyle='round,pad=0.5', facecolor='white', alpha=0.9, bbox=dict(boxstyle='round,pad=0.5', facecolor='white', alpha=0.9,
@@ -660,11 +699,13 @@ def main(chat_ids: Optional[List[str]] = None, debug_mode: bool = False, chat_id
continue continue
# Aggiungi sempre Casa, anche se non c'è neve # Aggiungi sempre Casa, anche se non c'è neve
# Per le altre località, aggiungi solo se c'è neve (passata o prevista) # Per le altre località, aggiungi solo se c'è neve sopra soglia (precipitazione o manto residuo)
is_casa = loc["name"] == "Casa (Strada Cà Toro)" is_casa = loc["name"] == "Casa (Strada Cà Toro)"
has_snow = (snow_analysis["snow_past_12h"] > 0.0 or has_snow = (
snow_analysis["snow_next_12h"] > 0.0 or snow_analysis["snow_past_12h"] >= SNOW_THRESHOLD_CM or
snow_analysis["snow_next_24h"] > 0.0) snow_analysis["snow_next_12h"] >= SNOW_THRESHOLD_CM or
snow_analysis["snow_next_24h"] >= SNOW_THRESHOLD_CM
)
if is_casa or has_snow: if is_casa or has_snow:
results.append({ results.append({
@@ -672,6 +713,7 @@ def main(chat_ids: Optional[List[str]] = None, debug_mode: bool = False, chat_id
"lat": loc["lat"], "lat": loc["lat"],
"lon": loc["lon"], "lon": loc["lon"],
"distance_km": distance_km, "distance_km": distance_km,
"has_snow": has_snow,
**snow_analysis **snow_analysis
}) })
@@ -687,6 +729,7 @@ def main(chat_ids: Optional[List[str]] = None, debug_mode: bool = False, chat_id
# Genera e invia DUE mappe separate # Genera e invia DUE mappe separate
now_str = now.strftime('%d/%m/%Y %H:%M') now_str = now.strftime('%d/%m/%Y %H:%M')
num_with_snow = sum(1 for r in results if r.get("has_snow", False))
# 1. Mappa snowfall passato (12h precedenti) # 1. Mappa snowfall passato (12h precedenti)
map_path_past = os.path.join(BASE_DIR, "snow_radar_past.png") map_path_past = os.path.join(BASE_DIR, "snow_radar_past.png")
@@ -698,7 +741,7 @@ def main(chat_ids: Optional[List[str]] = None, debug_mode: bool = False, chat_id
f"❄️ *SNOW RADAR - Ultime 12h*\n" f"❄️ *SNOW RADAR - Ultime 12h*\n"
f"📍 Centro: San Marino\n" f"📍 Centro: San Marino\n"
f"🕒 {now_str}\n" f"🕒 {now_str}\n"
f"📊 Località con neve: {len(results)}/{len(LOCATIONS)}" f"📊 Località con neve: {num_with_snow}/{len(LOCATIONS)}"
) )
telegram_send_photo(map_path_past, caption_past, chat_ids=chat_ids) telegram_send_photo(map_path_past, caption_past, chat_ids=chat_ids)
# Pulisci file temporaneo # Pulisci file temporaneo
@@ -718,7 +761,7 @@ def main(chat_ids: Optional[List[str]] = None, debug_mode: bool = False, chat_id
f"❄️ *SNOW RADAR - Prossime 24h*\n" f"❄️ *SNOW RADAR - Prossime 24h*\n"
f"📍 Centro: San Marino\n" f"📍 Centro: San Marino\n"
f"🕒 {now_str}\n" f"🕒 {now_str}\n"
f"📊 Località con neve: {len(results)}/{len(LOCATIONS)}" f"📊 Località con neve: {num_with_snow}/{len(LOCATIONS)}"
) )
telegram_send_photo(map_path_future, caption_future, chat_ids=chat_ids) telegram_send_photo(map_path_future, caption_future, chat_ids=chat_ids)
# Pulisci file temporaneo # Pulisci file temporaneo